* @author Christian Grothoff
*
* TODO:
- * - track per-peer request latency (using new load API)
- * - consider more precise latency estimation (per-peer & request) -- again load API?
- * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
* - introduce random latency in processing
* - more statistics
*/
/**
* Remove this request after transmission of the current response.
*/
- int16_t do_remove;
+ int8_t do_remove;
/**
* GNUNET_YES if we should not forward this request to other peers.
*/
- int16_t local_only;
+ int8_t local_only;
+
+ /**
+ * GNUNET_YES if we should not forward this request to other peers.
+ */
+ int8_t forward_only;
};
*/
static struct GNUNET_LOAD_Value *datastore_put_load;
+/**
+ * How long do requests typically stay in the routing table?
+ */
+static struct GNUNET_LOAD_Value *rt_entry_lifetime;
/**
* We've just now completed a datastore request. Update our
-1,
GNUNET_NO);
}
- /* might have already been removed from map in 'process_reply' (if
- there was a unique reply) or never inserted if it was a
- duplicate; hence ignore the return value here */
- (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
- &pr->query,
- pr);
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ &pr->query,
+ pr))
+ {
+ GNUNET_LOAD_update (rt_entry_lifetime,
+ GNUNET_TIME_absolute_get_duration (pr->start_time).value);
+ }
if (pr->qe != NULL)
{
GNUNET_DATASTORE_cancel (pr->qe);
GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
query_request_map = NULL;
+ GNUNET_LOAD_value_free (rt_entry_lifetime);
+ rt_entry_lifetime = NULL;
GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
peer_request_map = NULL;
/**
- * Test if the load on this peer is too high
+ * Test if the DATABASE (GET) load on this peer is too high
* to even consider processing the query at
- * all.
+ * all.
*
- * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ * GNUNET_NO to process normally (load normal)
+ * GNUNET_SYSERR to process for free (load low)
*/
static int
-test_load_too_high ()
+test_get_load_too_high (uint32_t priority)
{
- return GNUNET_SYSERR; // FIXME
+ double ld;
+
+ ld = GNUNET_LOAD_get_load (datastore_get_load);
+ if (ld < 1)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests done for free (low load)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
+ if (ld <= priority)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests done for a price (normal load)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_NO;
+ }
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to high load"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_YES;
+}
+
+
+
+
+/**
+ * Test if the DATABASE (PUT) load on this peer is too high
+ * to even consider processing the query at
+ * all.
+ *
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ * GNUNET_NO to process normally (load normal or low)
+ */
+static int
+test_put_load_too_high (uint32_t priority)
+{
+ double ld;
+
+ if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
+ return GNUNET_NO; /* very fast */
+ ld = GNUNET_LOAD_get_load (datastore_put_load);
+ if ( (ld < 1) || (ld < priority) )
+ return GNUNET_NO;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# storage requests dropped due to high load"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_YES;
}
pr);
return; /* this target round failed */
}
- /* FIXME: if we are "quite" busy, we may still want to skip
- this round; need more load detection code! */
no_route = GNUNET_YES;
}
P2P_SUCCESS_LIST_SIZE);
for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
if (cp->last_p2p_replies[i] == pr->cp->pid)
- score += 1; /* likely successful based on hot path */
+ score += 1.0; /* likely successful based on hot path */
}
else
{
CS2P_SUCCESS_LIST_SIZE);
for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
- score += 1; /* likely successful based on hot path */
+ score += 1.0; /* likely successful based on hot path */
}
/* 3b) include latency */
if (cp->avg_delay.value < 4 * TTL_DECREMENT)
- score += 1; /* likely fast based on latency */
+ score += 1.0; /* likely fast based on latency */
/* 3c) include priorities */
if (cp->avg_priority <= pr->remaining_priority / 2.0)
- score += 1; /* likely successful based on priorities */
+ score += 1.0; /* likely successful based on priorities */
/* 3d) penalize for queue size */
score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
/* 3e) include peer proximity */
return; /* configured to not do P2P search */
/* (0) try DHT */
if ( (0 == pr->anonymity_level) &&
+ (GNUNET_YES != pr->forward_only) &&
(pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
(pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
{
}
/* (3) reserve reply bandwidth */
- cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
- &psc.target.hashPubKey);
- GNUNET_assert (NULL != cp);
- pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
- &psc.target,
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_BANDWIDTH_value_init (UINT32_MAX),
- DBLOCK_SIZE * 2,
- cp->inc_preference,
- &target_reservation_cb,
- pr);
- cp->inc_preference = 0;
+ if (GNUNET_NO == pr->forward_only)
+ {
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &psc.target.hashPubKey);
+ GNUNET_assert (NULL != cp);
+ pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
+ &psc.target,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ GNUNET_BANDWIDTH_value_init (UINT32_MAX),
+ DBLOCK_SIZE * 2,
+ cp->inc_preference,
+ &target_reservation_cb,
+ pr);
+ cp->inc_preference = 0;
+ }
+ else
+ {
+ /* force forwarding */
+ static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
+ target_reservation_cb (pr, &psc.target,
+ zerobw, zerobw, 0, 0.0);
+ }
}
GNUNET_NO);
if (prq->sender != NULL)
{
- /* FIXME: should we be more precise here and not use
- "start_time" but a peer-specific time stamp? */
cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
prq->sender->avg_delay.value
= (prq->sender->avg_delay.value *
GNUNET_CONTAINER_multihashmap_remove (query_request_map,
key,
pr));
+ GNUNET_LOAD_update (rt_entry_lifetime,
+ GNUNET_TIME_absolute_get_duration (pr->start_time).value);
break;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
GNUNET_STATISTICS_update (stats,
prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
prq.sender->trust += prq.priority;
}
- if (GNUNET_YES == active_migration)
- {
+ if ( (GNUNET_YES == active_migration) &&
+ (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+ {
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Replicating result for query `%s' with priority %u\n",
return;
}
if ( (pr->client_request_list == NULL) &&
- ( (GNUNET_YES == test_load_too_high()) ||
+ ( (GNUNET_YES == test_get_load_too_high (0)) ||
(pr->results_found > 5 + 2 * pr->priority) ) )
{
#if DEBUG_FS > 2
* @param cp the peer making the request
* @return effective priority
*/
-static uint32_t
+static int32_t
bound_priority (uint32_t prio_in,
struct ConnectedPeer *cp)
{
double rret;
int ld;
- ld = test_load_too_high ();
+ ld = test_get_load_too_high (0);
if (ld == GNUNET_SYSERR)
return 0; /* excess resources */
ret = change_host_trust (cp, prio_in);
current_priorities
= (current_priorities * (N-1) + rret)/N;
}
+ if ( (ld == GNUNET_YES) && (ret > 0) )
+ {
+ /* try with charging */
+ ld = test_get_load_too_high (ret);
+ }
+ if (ld == GNUNET_YES)
+ {
+ /* undo charge */
+ if (ret != 0)
+ change_host_trust (cp, -ret);
+ return -1; /* not enough resources */
+ }
#undef N
return ret;
}
uint32_t bm;
size_t bfsize;
uint32_t ttl_decrement;
+ int32_t priority;
enum GNUNET_BLOCK_Type type;
int have_ns;
- int ld;
msize = ntohs(message->size);
if (msize < sizeof (struct GetMessage))
/* note that we can really only check load here since otherwise
peers could find out that we are overloaded by not being
disconnected after sending us a malformed query... */
-
- /* FIXME: query priority should play
- a major role here! */
- ld = test_load_too_high ();
- if (GNUNET_YES == ld)
+ priority = bound_priority (ntohl (gm->priority), cps);
+ if (priority < 0)
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_NO);
return GNUNET_OK;
}
- /* FIXME: if ld == GNUNET_NO, forward
- instead of indirecting! */
-
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for `%s' of type %u from peer `%4s' with flags %u\n",
pr->namespace = (GNUNET_HashCode*) &pr[1];
memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
}
+ if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
+ (GNUNET_LOAD_get_average (cp->transmission_delay) >
+ GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
+ {
+ /* don't have BW to send to peer, or would likely take longer than we have for it,
+ so at best indirect the query */
+ priority = 0;
+ pr->forward_only = GNUNET_YES;
+ }
pr->type = type;
pr->mingle = ntohl (gm->filter_mutator);
if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
pr->anonymity_level = 1;
- pr->priority = bound_priority (ntohl (gm->priority), cps);
+ pr->priority = (uint32_t) priority;
pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
pr->query = gm->query;
/* decrement ttl (always) */
type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
(pr->priority + 1));
- pr->qe = GNUNET_DATASTORE_get (dsh,
- &gm->query,
- type,
- pr->priority + 1,
- MAX_DATASTORE_QUEUE,
- timeout,
- &process_local_reply,
- pr);
+ if (GNUNET_YES != pr->forward_only)
+ pr->qe = GNUNET_DATASTORE_get (dsh,
+ &gm->query,
+ type,
+ pr->priority + 1,
+ MAX_DATASTORE_QUEUE,
+ timeout,
+ &process_local_reply,
+ pr);
+ else
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests forwarded due to high load"),
+ 1,
+ GNUNET_NO);
- /* Are multiple results possible? If so, start processing remotely now! */
+ /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
switch (pr->type)
{
case GNUNET_BLOCK_TYPE_FS_DBLOCK:
case GNUNET_BLOCK_TYPE_FS_IBLOCK:
/* only one result, wait for datastore */
- break;
+ if (GNUNET_YES != pr->forward_only)
+ break;
default:
if (pr->task == GNUNET_SCHEDULER_NO_TASK)
pr->task = GNUNET_SCHEDULER_add_now (sched,
}
connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
+ rt_entry_lifetime = GNUNET_LOAD_value_init ();
peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
core = GNUNET_CORE_connect (sched,
connected_peers = NULL;
GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
query_request_map = NULL;
+ GNUNET_LOAD_value_free (rt_entry_lifetime);
+ rt_entry_lifetime = NULL;
GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
requests_by_expiration_heap = NULL;
GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);