X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_pr.c;h=c16f94464e5fd9f54d363de1ef005b366cd0a81d;hb=987e618a2240e7a8cf5c61e33af9bb413a118e7a;hp=7c113835617f277862ca20d5dcdd597078d7fbdd;hpb=b7981b63fe6630ca580768e555415d8c61c3e5c3;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 7c1138356..c16f94464 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -31,6 +31,25 @@ #include "gnunet-service-fs_pe.h" #include "gnunet-service-fs_pr.h" +/** + * Maximum size of the datastore queue for P2P operations. Needs to + * be large enough to queue MAX_QUEUE_PER_PEER operations for roughly + * the number of active (connected) peers. + */ +#define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER) + +/** + * Bandwidth value of a 0-priority content (must be fairly high + * compared to query since content is typically significantly larger + * -- and more valueable since it can take many queries to get one + * piece of content). + */ +#define CONTENT_BANDWIDTH_VALUE 800 + +/** + * Hard limit on the number of results we may get from the datastore per query. + */ +#define MAX_RESULTS (100 * 1024) /** * An active request. @@ -39,7 +58,7 @@ struct GSF_PendingRequest { /** * Public data for the request. - */ + */ struct GSF_PendingRequestData public_data; /** @@ -99,6 +118,12 @@ struct GSF_PendingRequest */ GNUNET_PEER_Id sender_pid; + /** + * Identity of the peer that we should never forward this query + * to since it originated this query (0 for none). + */ + GNUNET_PEER_Id origin_pid; + /** * Time we started the last datastore lookup. */ @@ -138,6 +163,11 @@ struct GSF_PendingRequest */ uint32_t mingle; + /** + * Do we have a first UID yet? + */ + unsigned int have_first_uid; + }; @@ -160,6 +190,12 @@ static struct GNUNET_LOAD_Value *datastore_put_load; static int active_to_migration; +/** + * Size of the datastore queue we assume for common requests. + * Determined based on the network quota. + */ +static unsigned int datastore_queue_size; + /** * Heap with the request that will expire next at the top. Contains * pointers of type "struct PendingRequest*"; these will *also* be @@ -179,35 +215,6 @@ static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; static unsigned long long max_pending_requests = (32 * 1024); -/** - * How many bytes should a bloomfilter be if we have already seen - * entry_count responses? Note that BLOOMFILTER_K gives us the number - * of bits set per entry. Furthermore, we should not re-size the - * filter too often (to keep it cheap). - * - * Since other peers will also add entries but not resize the filter, - * we should generally pick a slightly larger size than what the - * strict math would suggest. - * - * @return must be a power of two and smaller or equal to 2^15. - */ -static size_t -compute_bloomfilter_size (unsigned int entry_count) -{ - size_t size; - unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4; - uint16_t max = 1 << 15; - - if (entry_count > max) - return max; - size = 8; - while ((size < max) && (size < ideal)) - size *= 2; - if (size > max) - return max; - return size; -} - /** * Recalculate our bloom filter for filtering replies. This function @@ -217,39 +224,22 @@ compute_bloomfilter_size (unsigned int entry_count) * initiator (in which case we may resize to larger than mimimum size). * * @param pr request for which the BF is to be recomputed - * @return GNUNET_YES if a refresh actually happened */ -static int +static void refresh_bloomfilter (struct GSF_PendingRequest *pr) { - unsigned int i; - size_t nsize; - GNUNET_HashCode mhash; - - nsize = compute_bloomfilter_size (pr->replies_seen_count); - if ( (pr->bf != NULL) && - (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) - return GNUNET_NO; /* size not changed */ if (pr->bf != NULL) GNUNET_CONTAINER_bloomfilter_free (pr->bf); - pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - nsize, - BLOOMFILTER_K); - for (i=0;ireplies_seen_count;i++) - { - GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i], - pr->mingle, - &mhash); - GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); - } - return GNUNET_YES; + pr->mingle = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); + pr->bf = + GNUNET_BLOCK_construct_bloomfilter (pr->mingle, pr->replies_seen, + pr->replies_seen_count); } /** - * Create a new pending request. + * Create a new pending request. * * @param options request options * @param type type of the block that is being requested @@ -263,6 +253,7 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr) * @param priority maximum outgoing cummulative request priority to use * @param ttl current time-to-live for the request * @param sender_pid peer ID to use for the sender when forwarding, 0 for none + * @param origin_pid peer ID of origin of query (do not loop back) * @param replies_seen hash codes of known local replies * @param replies_seen_count size of the 'replies_seen' array * @param rh handle to call when we get a reply @@ -271,114 +262,118 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr) */ struct GSF_PendingRequest * GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, - enum GNUNET_BLOCK_Type type, - const GNUNET_HashCode *query, - const GNUNET_HashCode *namespace, - const struct GNUNET_PeerIdentity *target, - const char *bf_data, - size_t bf_size, - uint32_t mingle, - uint32_t anonymity_level, - uint32_t priority, - int32_t ttl, - GNUNET_PEER_Id sender_pid, - const GNUNET_HashCode *replies_seen, - unsigned int replies_seen_count, - GSF_PendingRequestReplyHandler rh, - void *rh_cls) + enum GNUNET_BLOCK_Type type, + const GNUNET_HashCode * query, + const GNUNET_HashCode * namespace, + const struct GNUNET_PeerIdentity *target, + const char *bf_data, size_t bf_size, + uint32_t mingle, uint32_t anonymity_level, + uint32_t priority, int32_t ttl, + GNUNET_PEER_Id sender_pid, + GNUNET_PEER_Id origin_pid, + const GNUNET_HashCode * replies_seen, + unsigned int replies_seen_count, + GSF_PendingRequestReplyHandler rh, void *rh_cls) { struct GSF_PendingRequest *pr; struct GSF_PendingRequest *dpr; - -#if DEBUG_FS > 1 + +#if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating request handle for `%s' of type %d\n", - GNUNET_h2s (query), - type); -#endif + "Creating request handle for `%s' of type %d\n", + GNUNET_h2s (query), type); +#endif + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Pending requests created"), 1, + GNUNET_NO); pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); - pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT64_MAX); + pr->local_result_offset = + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); pr->public_data.query = *query; if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) - { - GNUNET_assert (NULL != namespace); - pr->public_data.namespace = *namespace; - } + { + GNUNET_assert (NULL != namespace); + pr->public_data.namespace = *namespace; + } if (NULL != target) - { - pr->public_data.target = *target; - pr->public_data.has_target = GNUNET_YES; - } + { + pr->public_data.target = *target; + pr->public_data.has_target = GNUNET_YES; + } pr->public_data.anonymity_level = anonymity_level; pr->public_data.priority = priority; pr->public_data.original_priority = priority; pr->public_data.options = options; - pr->public_data.type = type; + pr->public_data.type = type; pr->public_data.start_time = GNUNET_TIME_absolute_get (); pr->sender_pid = sender_pid; + pr->origin_pid = origin_pid; pr->rh = rh; pr->rh_cls = rh_cls; + GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY))); if (ttl >= 0) - pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - (uint32_t) ttl)); + pr->public_data.ttl = + GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, + (uint32_t) ttl)); else - pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - (uint32_t) (- ttl))); + pr->public_data.ttl = + GNUNET_TIME_absolute_subtract (pr->public_data.start_time, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, + (uint32_t) (-ttl))); if (replies_seen_count > 0) + { + pr->replies_seen_size = replies_seen_count; + pr->replies_seen = + GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); + memcpy (pr->replies_seen, replies_seen, + replies_seen_count * sizeof (GNUNET_HashCode)); + pr->replies_seen_count = replies_seen_count; + } + if (NULL != bf_data) + { + pr->bf = + GNUNET_CONTAINER_bloomfilter_init (bf_data, bf_size, + GNUNET_CONSTANTS_BLOOMFILTER_K); + pr->mingle = mingle; + } + else if ((replies_seen_count > 0) && + (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))) + { + refresh_bloomfilter (pr); + } + GNUNET_CONTAINER_multihashmap_put (pr_map, query, pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES)) + { + pr->hnode = + GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr, + pr->public_data.ttl.abs_value); + /* make sure we don't track too many requests */ + while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > + max_pending_requests) { - pr->replies_seen_size = replies_seen_count; - pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); - memcpy (pr->replies_seen, - replies_seen, - replies_seen_count * sizeof (GNUNET_HashCode)); - pr->replies_seen_count = replies_seen_count; - } - if (NULL != bf_data) - { - pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data, - bf_size, - BLOOMFILTER_K); - pr->mingle = mingle; - } - else if ( (replies_seen_count > 0) && - (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) ) - { - GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr)); - } - GNUNET_CONTAINER_multihashmap_put (pr_map, - query, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - if (0 != (options & GSF_PRO_REQUEST_EXPIRES)) - { - pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, - pr, - pr->public_data.ttl.abs_value); - /* make sure we don't track too many requests */ - while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) - { - dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); - GNUNET_assert (dpr != NULL); - if (pr == dpr) - break; /* let the request live briefly... */ - dpr->rh (dpr->rh_cls, - GNUNET_BLOCK_EVALUATION_REQUEST_VALID, - dpr, - GNUNET_TIME_UNIT_FOREVER_ABS, - GNUNET_BLOCK_TYPE_ANY, - NULL, 0); - GSF_pending_request_cancel_ (dpr); - } + dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); + GNUNET_assert (dpr != NULL); + if (pr == dpr) + break; /* let the request live briefly... */ + if (NULL != dpr->rh) + dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr, + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_ABS, + GNUNET_BLOCK_TYPE_ANY, NULL, 0); + GSF_pending_request_cancel_ (dpr, GNUNET_YES); } + } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Pending requests active"), 1, + GNUNET_NO); return pr; } - /** * Obtain the public data associated with a pending request - * + * * @param pr pending request * @return associated public data */ @@ -389,6 +384,33 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr) } +/** + * Test if two pending requests are compatible (would generate + * the same query modulo filters and should thus be processed + * jointly). + * + * @param pra a pending request + * @param prb another pending request + * @return GNUNET_OK if the requests are compatible + */ +int +GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, + struct GSF_PendingRequest *prb) +{ + if ((pra->public_data.type != prb->public_data.type) || + (0 != + memcmp (&pra->public_data.query, &prb->public_data.query, + sizeof (GNUNET_HashCode))) || + ((pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && + (0 != + memcmp (&pra->public_data.namespace, &prb->public_data.namespace, + sizeof (GNUNET_HashCode))))) + return GNUNET_NO; + return GNUNET_OK; +} + + + /** * Update a given pending request with additional replies * that have been seen. @@ -399,57 +421,46 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr) */ void GSF_pending_request_update_ (struct GSF_PendingRequest *pr, - const GNUNET_HashCode *replies_seen, - unsigned int replies_seen_count) + const GNUNET_HashCode * replies_seen, + unsigned int replies_seen_count) { unsigned int i; GNUNET_HashCode mhash; if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) - return; /* integer overflow */ + return; /* integer overflow */ if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) + { + /* we're responsible for the BF, full refresh */ + if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) + GNUNET_array_grow (pr->replies_seen, pr->replies_seen_size, + replies_seen_count + pr->replies_seen_count); + memcpy (&pr->replies_seen[pr->replies_seen_count], replies_seen, + sizeof (GNUNET_HashCode) * replies_seen_count); + pr->replies_seen_count += replies_seen_count; + refresh_bloomfilter (pr); + } + else + { + if (NULL == pr->bf) { - /* we're responsible for the BF, full refresh */ - if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) - GNUNET_array_grow (pr->replies_seen, - pr->replies_seen_size, - replies_seen_count + pr->replies_seen_count); - memcpy (&pr->replies_seen[pr->replies_seen_count], - replies_seen, - sizeof (GNUNET_HashCode) * replies_seen_count); - pr->replies_seen_count += replies_seen_count; - if (GNUNET_NO == refresh_bloomfilter (pr)) - { - /* bf not recalculated, simply extend it with new bits */ - for (i=0;ireplies_seen_count;i++) - { - GNUNET_BLOCK_mingle_hash (&replies_seen[i], - pr->mingle, - &mhash); - GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); - } - } + /* we're not the initiator, but the initiator did not give us + * any bloom-filter, so we need to create one on-the-fly */ + pr->mingle = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); + pr->bf = + GNUNET_BLOCK_construct_bloomfilter (pr->mingle, replies_seen, + replies_seen_count); } - else + else { - if (NULL == pr->bf) - { - /* we're not the initiator, but the initiator did not give us - any bloom-filter, so we need to create one on-the-fly */ - pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - compute_bloomfilter_size (replies_seen_count), - BLOOMFILTER_K); - } - for (i=0;ireplies_seen_count;i++) - { - GNUNET_BLOCK_mingle_hash (&replies_seen[i], - pr->mingle, - &mhash); - GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); - } + for (i = 0; i < pr->replies_seen_count; i++) + { + GNUNET_BLOCK_mingle_hash (&replies_seen[i], pr->mingle, &mhash); + GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); + } } + } } @@ -464,8 +475,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, */ size_t GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, - size_t buf_size, - void *buf) + size_t buf_size, void *buf) { char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; struct GetMessage *gm; @@ -482,40 +492,45 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, #if DEBUG_FS if (buf_size > 0) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Building request message for `%s' of type %d\n", - GNUNET_h2s (&pr->public_data.query), - pr->public_data.type); -#endif + "Building request message for `%s' of type %d\n", + GNUNET_h2s (&pr->public_data.query), pr->public_data.type); +#endif k = 0; bm = 0; do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); - if (! do_route) - { - bm |= GET_MESSAGE_BIT_RETURN_TO; - k++; - } + if ((!do_route) && (pr->sender_pid == 0)) + { + GNUNET_break (0); + do_route = GNUNET_YES; + } + if (!do_route) + { + bm |= GET_MESSAGE_BIT_RETURN_TO; + k++; + } if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) - { - bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; - k++; - } + { + bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; + k++; + } if (GNUNET_YES == pr->public_data.has_target) - { - bm |= GET_MESSAGE_BIT_TRANSMIT_TO; - k++; - } + { + bm |= GET_MESSAGE_BIT_TRANSMIT_TO; + k++; + } bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); - msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode); + msize = sizeof (struct GetMessage) + bf_size + k * sizeof (GNUNET_HashCode); GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); if (buf_size < msize) - return msize; - gm = (struct GetMessage*) lbuf; + return msize; + gm = (struct GetMessage *) lbuf; gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); gm->header.size = htons (msize); gm->type = htonl (pr->public_data.type); if (do_route) - prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - pr->public_data.priority + 1); + prio = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + pr->public_data.priority + 1); else prio = 0; pr->public_data.priority -= prio; @@ -523,25 +538,23 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, now = GNUNET_TIME_absolute_get (); ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value); gm->ttl = htonl (ttl / 1000); - gm->filter_mutator = htonl(pr->mingle); + gm->filter_mutator = htonl (pr->mingle); gm->hash_bitmap = htonl (bm); gm->query = pr->public_data.query; - ext = (GNUNET_HashCode*) &gm[1]; - k = 0; - if (! do_route) - GNUNET_PEER_resolve (pr->sender_pid, - (struct GNUNET_PeerIdentity*) &ext[k++]); + ext = (GNUNET_HashCode *) & gm[1]; + k = 0; + if (!do_route) + GNUNET_PEER_resolve (pr->sender_pid, + (struct GNUNET_PeerIdentity *) &ext[k++]); if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) - memcpy (&ext[k++], - &pr->public_data.namespace, - sizeof (GNUNET_HashCode)); + memcpy (&ext[k++], &pr->public_data.namespace, sizeof (GNUNET_HashCode)); if (GNUNET_YES == pr->public_data.has_target) - GNUNET_PEER_resolve (pr->sender_pid, - (struct GNUNET_PeerIdentity*) &ext[k++]); + ext[k++] = pr->public_data.target.hashPubKey; if (pr->bf != NULL) - GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, - (char*) &ext[k], - bf_size); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, + (char *) &ext[k], + bf_size)); memcpy (buf, gm, msize); return msize; } @@ -555,39 +568,59 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, * @param value value in the hash map (pending request) * @return GNUNET_YES (we should continue to iterate) */ -static int -clean_request (void *cls, - const GNUNET_HashCode * key, - void *value) +static int +clean_request (void *cls, const GNUNET_HashCode * key, void *value) { struct GSF_PendingRequest *pr = value; GSF_LocalLookupContinuation cont; #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cleaning up pending request for `%s'.\n", - GNUNET_h2s (key)); -#endif + "Cleaning up pending request for `%s'.\n", GNUNET_h2s (key)); +#endif if (NULL != (cont = pr->llc_cont)) - { - pr->llc_cont = NULL; - cont (pr->llc_cont_cls, - pr, - pr->local_result); - } + { + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, pr, pr->local_result); + } GSF_plan_notify_request_done_ (pr); GNUNET_free_non_null (pr->replies_seen); if (NULL != pr->bf) + { GNUNET_CONTAINER_bloomfilter_free (pr->bf); + pr->bf = NULL; + } GNUNET_PEER_change_rc (pr->sender_pid, -1); + pr->sender_pid = 0; + GNUNET_PEER_change_rc (pr->origin_pid, -1); + pr->origin_pid = 0; if (NULL != pr->hnode) + { GNUNET_CONTAINER_heap_remove_node (pr->hnode); + pr->hnode = NULL; + } if (NULL != pr->qe) + { GNUNET_DATASTORE_cancel (pr->qe); + pr->qe = NULL; + } if (NULL != pr->gh) + { GNUNET_DHT_get_stop (pr->gh); + pr->gh = NULL; + } if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) + { GNUNET_SCHEDULER_cancel (pr->warn_task); + pr->warn_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (pr_map, + &pr->public_data.query, + pr)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Pending requests active"), -1, + GNUNET_NO); GNUNET_free (pr); return GNUNET_YES; } @@ -597,18 +630,46 @@ clean_request (void *cls, * Explicitly cancel a pending request. * * @param pr request to cancel + * @param full_cleanup fully purge the request */ void -GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) +GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup) { - if (NULL == pr_map) - return; /* already cleaned up! */ - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_remove (pr_map, - &pr->public_data.query, - pr)); + GSF_LocalLookupContinuation cont; + + if (NULL == pr_map) + return; /* already cleaned up! */ + if (GNUNET_YES != full_cleanup) + { + /* make request inactive (we're no longer interested in more results), + * but do NOT remove from our data-structures, we still need it there + * to prevent the request from looping */ + pr->rh = NULL; + if (NULL != (cont = pr->llc_cont)) + { + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, pr, pr->local_result); + } + GSF_plan_notify_request_done_ (pr); + if (NULL != pr->qe) + { + GNUNET_DATASTORE_cancel (pr->qe); + pr->qe = NULL; + } + if (NULL != pr->gh) + { + GNUNET_DHT_get_stop (pr->gh); + pr->gh = NULL; + } + if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) + { + GNUNET_SCHEDULER_cancel (pr->warn_task); + pr->warn_task = GNUNET_SCHEDULER_NO_TASK; + } + return; + } GNUNET_assert (GNUNET_YES == - clean_request (NULL, &pr->public_data.query, pr)); + clean_request (NULL, &pr->public_data.query, pr)); } @@ -619,12 +680,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) * @param cls closure for it */ void -GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, - void *cls) +GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls) { GNUNET_CONTAINER_multihashmap_iterate (pr_map, - (GNUNET_CONTAINER_HashMapIterator) it, - cls); + (GNUNET_CONTAINER_HashMapIterator) it, + cls); } @@ -691,15 +751,14 @@ struct ProcessReplyClosure */ static void update_request_performance_data (struct ProcessReplyClosure *prq, - struct GSF_PendingRequest *pr) + struct GSF_PendingRequest *pr) { if (prq->sender == NULL) - return; - GSF_peer_update_performance_ (prq->sender, - pr->public_data.start_time, - prq->priority); + return; + GSF_peer_update_performance_ (prq->sender, pr->public_data.start_time, + prq->priority); } - + /** * We have received a reply; handle it! @@ -710,109 +769,98 @@ update_request_performance_data (struct ProcessReplyClosure *prq, * @return GNUNET_YES (we should continue to iterate) */ static int -process_reply (void *cls, - const GNUNET_HashCode * key, - void *value) +process_reply (void *cls, const GNUNET_HashCode * key, void *value) { struct ProcessReplyClosure *prq = cls; struct GSF_PendingRequest *pr = value; GNUNET_HashCode chash; + struct GNUNET_TIME_Absolute last_transmission; + if (NULL == pr->rh) + return GNUNET_YES; #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Matched result (type %u) for query `%s' with pending request\n", - (unsigned int) prq->type, - GNUNET_h2s (key)); -#endif + "Matched result (type %u) for query `%s' with pending request\n", + (unsigned int) prq->type, GNUNET_h2s (key)); +#endif GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received and matched"), - 1, - GNUNET_NO); - prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx, - prq->type, - key, - &pr->bf, - pr->mingle, - &pr->public_data.namespace, - (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0, - prq->data, - prq->size); + gettext_noop ("# replies received and matched"), 1, + GNUNET_NO); + prq->eval = + GNUNET_BLOCK_evaluate (GSF_block_ctx, prq->type, key, &pr->bf, pr->mingle, + &pr->public_data.namespace, + (prq->type == + GNUNET_BLOCK_TYPE_FS_SBLOCK) ? + sizeof (GNUNET_HashCode) : 0, prq->data, + prq->size); switch (prq->eval) - { - case GNUNET_BLOCK_EVALUATION_OK_MORE: - update_request_performance_data (prq, pr); - break; - case GNUNET_BLOCK_EVALUATION_OK_LAST: - /* short cut: stop processing early, no BF-update, etc. */ - update_request_performance_data (prq, pr); - GNUNET_LOAD_update (GSF_rt_entry_lifetime, - GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value); - /* pass on to other peers / local clients */ - pr->rh (pr->rh_cls, - prq->eval, - pr, - prq->expiration, - prq->type, - prq->data, prq->size); - return GNUNET_YES; - case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# duplicate replies discarded (bloomfilter)"), - 1, - GNUNET_NO); + { + case GNUNET_BLOCK_EVALUATION_OK_MORE: + update_request_performance_data (prq, pr); + break; + case GNUNET_BLOCK_EVALUATION_OK_LAST: + /* short cut: stop processing early, no BF-update, etc. */ + update_request_performance_data (prq, pr); + GNUNET_LOAD_update (GSF_rt_entry_lifetime, + GNUNET_TIME_absolute_get_duration (pr-> + public_data.start_time).rel_value); + if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission)) + last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value; + /* pass on to other peers / local clients */ + pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration, + last_transmission, prq->type, prq->data, prq->size); + return GNUNET_YES; + case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# duplicate replies discarded (bloomfilter)"), + 1, GNUNET_NO); #if DEBUG_FS && 0 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Duplicate response `%s', discarding.\n", - GNUNET_h2s (&mhash)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate response `%s', discarding.\n", GNUNET_h2s (&mhash)); #endif - return GNUNET_YES; /* duplicate */ - case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: - return GNUNET_YES; /* wrong namespace */ - case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: - GNUNET_break (0); - return GNUNET_YES; - case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: - GNUNET_break (0); - return GNUNET_YES; - case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Unsupported block type %u\n"), - prq->type); - return GNUNET_NO; - } + return GNUNET_YES; /* duplicate */ + case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: + return GNUNET_YES; /* wrong namespace */ + case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: + GNUNET_break (0); + return GNUNET_YES; + case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: + GNUNET_break (0); + return GNUNET_YES; + case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Unsupported block type %u\n"), + prq->type); + return GNUNET_NO; + } /* update bloomfilter */ - GNUNET_CRYPTO_hash (prq->data, - prq->size, - &chash); + GNUNET_CRYPTO_hash (prq->data, prq->size, &chash); GSF_pending_request_update_ (pr, &chash, 1); if (NULL == prq->sender) - { + { #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found result for query `%s' in local datastore\n", - GNUNET_h2s (key)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found result for query `%s' in local datastore\n", + GNUNET_h2s (key)); #endif - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# results found locally"), - 1, - GNUNET_NO); - } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# results found locally"), 1, + GNUNET_NO); + } else - { - GSF_dht_lookup_ (pr); - } + { + GSF_dht_lookup_ (pr); + } prq->priority += pr->public_data.original_priority; pr->public_data.priority = 0; pr->public_data.original_priority = 0; pr->public_data.results_found++; prq->request_found = GNUNET_YES; /* finally, pass on to other peer / local client */ - pr->rh (pr->rh_cls, - prq->eval, - pr, - prq->expiration, - prq->type, - prq->data, prq->size); + if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission)) + last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value; + pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration, + last_transmission, prq->type, prq->data, prq->size); return GNUNET_YES; } @@ -847,61 +895,87 @@ struct PutMigrationContext * * @param cls closure * @param success GNUNET_SYSERR on failure + * @param min_expiration minimum expiration time required for content to be stored * @param msg NULL on success, otherwise an error message */ -static void -put_migration_continuation (void *cls, - int success, +static void +put_migration_continuation (void *cls, int success, + struct GNUNET_TIME_Absolute min_expiration, const char *msg) { struct PutMigrationContext *pmc = cls; - struct GNUNET_TIME_Relative delay; - struct GNUNET_TIME_Relative block_time; struct GSF_ConnectedPeer *cp; + struct GNUNET_TIME_Relative mig_pause; struct GSF_PeerPerformanceData *ppd; - - delay = GNUNET_TIME_absolute_get_duration (pmc->start); - cp = GSF_peer_get_ (&pmc->origin); - if ( (GNUNET_OK != success) && - (GNUNET_NO == pmc->requested) ) + + if (NULL != datastore_put_load) + { + if (GNUNET_SYSERR != success) { - /* block migration for a bit... */ - if (NULL != cp) - { - ppd = GSF_get_peer_performance_data_ (cp); - ppd->migration_duplication++; - block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - 5 * ppd->migration_duplication + - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5)); - GSF_block_peer_migration_ (cp, block_time); - } + GNUNET_LOAD_update (datastore_put_load, + GNUNET_TIME_absolute_get_duration (pmc->start).rel_value); } - else + else { - if (NULL != cp) - { - ppd = GSF_get_peer_performance_data_ (cp); - ppd->migration_duplication = 0; /* reset counter */ - } + /* on queue failure / timeout, increase the put load dramatically */ + GNUNET_LOAD_update (datastore_put_load, + GNUNET_TIME_UNIT_MINUTES.rel_value); } - GNUNET_free (pmc); - /* FIXME: should we really update the load value on failure? */ - GNUNET_LOAD_update (datastore_put_load, - delay.rel_value); + } + cp = GSF_peer_get_ (&pmc->origin); if (GNUNET_OK == success) + { + if (NULL != cp) + { + ppd = GSF_get_peer_performance_data_ (cp); + ppd->migration_delay.rel_value /= 2; + } + GNUNET_free (pmc); return; + } + if ( (GNUNET_NO == success) && + (GNUNET_NO == pmc->requested) && + (NULL != cp) ) + { + ppd = GSF_get_peer_performance_data_ (cp); + if (min_expiration.abs_value > 0) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking to stop migration for %llu ms because datastore is full\n", + (unsigned long long) GNUNET_TIME_absolute_get_remaining (min_expiration).rel_value); +#endif + GSF_block_peer_migration_ (cp, min_expiration); + } + else + { + ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS, + ppd->migration_delay); + ppd->migration_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, + ppd->migration_delay); + mig_pause.rel_value = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + ppd->migration_delay.rel_value); + ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicated content already exists locally, asking to stop migration for %llu ms\n", + (unsigned long long) mig_pause.rel_value); +#endif + GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (mig_pause)); + } + } + GNUNET_free (pmc); GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# datastore 'put' failures"), - 1, - GNUNET_NO); + gettext_noop ("# Datastore `PUT' failures"), 1, + GNUNET_NO); } /** * Test if the DATABASE (PUT) load on this peer is too high * to even consider processing the query at - * all. - * + * all. + * * @return GNUNET_YES if the load is too high to do anything (load high) * GNUNET_NO to process normally (load normal or low) */ @@ -910,15 +984,17 @@ test_put_load_too_high (uint32_t priority) { double ld; + if (NULL == datastore_put_load) + return GNUNET_NO; if (GNUNET_LOAD_get_average (datastore_put_load) < 50) - return GNUNET_NO; /* very fast */ + return GNUNET_NO; /* very fast */ ld = GNUNET_LOAD_get_load (datastore_put_load); if (ld < 2.0 * (1 + priority)) return GNUNET_NO; GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# storage requests dropped due to high load"), - 1, - GNUNET_NO); + gettext_noop + ("# storage requests dropped due to high load"), 1, + GNUNET_NO); return GNUNET_YES; } @@ -930,56 +1006,61 @@ test_put_load_too_high (uint32_t priority) * @param cls closure * @param exp when will this value expire * @param key key of the result - * @param get_path NULL-terminated array of pointers - * to the peers on reverse GET path (or NULL if not recorded) - * @param put_path NULL-terminated array of pointers - * to the peers on the PUT path (or NULL if not recorded) + * @param get_path peers on reply path (or NULL if not recorded) + * @param get_path_length number of entries in get_path + * @param put_path peers on the PUT path (or NULL if not recorded) + * @param put_path_length number of entries in get_path * @param type type of the result * @param size number of bytes in data * @param data pointer to the result data */ static void -handle_dht_reply (void *cls, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode *key, - const struct GNUNET_PeerIdentity * const *get_path, - const struct GNUNET_PeerIdentity * const *put_path, - enum GNUNET_BLOCK_Type type, - size_t size, - const void *data) +handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const struct GNUNET_PeerIdentity *get_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int put_path_length, enum GNUNET_BLOCK_Type type, + size_t size, const void *data) { struct GSF_PendingRequest *pr = cls; struct ProcessReplyClosure prq; struct PutMigrationContext *pmc; + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Replies received from DHT"), 1, + GNUNET_NO); memset (&prq, 0, sizeof (prq)); prq.data = data; prq.expiration = exp; - prq.size = size; + /* do not allow migrated content to live longer than 1 year */ + prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), + prq.expiration); + prq.size = size; prq.type = type; process_reply (&prq, key, pr); - if ( (GNUNET_YES == active_to_migration) && - (GNUNET_NO == test_put_load_too_high (prq.priority)) ) - { + if ((GNUNET_YES == active_to_migration) && + (GNUNET_NO == test_put_load_too_high (prq.priority))) + { #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Replicating result for query `%s' with priority %u\n", - GNUNET_h2s (key), - prq.priority); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicating result for query `%s' with priority %u\n", + GNUNET_h2s (key), prq.priority); #endif - pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); - pmc->start = GNUNET_TIME_absolute_get (); - pmc->requested = GNUNET_YES; - GNUNET_DATASTORE_put (GSF_dsh, - 0, key, size, data, - type, prq.priority, 1 /* anonymity */, - 0 /* replication */, - exp, - 1 + prq.priority, MAX_DATASTORE_QUEUE, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &put_migration_continuation, - pmc); + pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); + pmc->start = GNUNET_TIME_absolute_get (); + pmc->requested = GNUNET_YES; + if (NULL == + GNUNET_DATASTORE_put (GSF_dsh, 0, key, size, data, type, prq.priority, + 1 /* anonymity */ , + 0 /* replication */ , + exp, 1 + prq.priority, MAX_DATASTORE_QUEUE, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &put_migration_continuation, pmc)) + { + put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL); } + } } @@ -999,79 +1080,74 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr) if (0 != pr->public_data.anonymity_level) return; if (NULL != pr->gh) - { - GNUNET_DHT_get_stop (pr->gh); - pr->gh = NULL; - } + { + GNUNET_DHT_get_stop (pr->gh); + pr->gh = NULL; + } xquery = NULL; xquery_size = 0; if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) - { - xquery = buf; - memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode)); - xquery_size = sizeof (GNUNET_HashCode); - } + { + xquery = buf; + memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode)); + xquery_size = sizeof (GNUNET_HashCode); + } if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY)) - { - GNUNET_PEER_resolve (pr->sender_pid, - &pi); - memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); - xquery_size += sizeof (struct GNUNET_PeerIdentity); - } - pr->gh = GNUNET_DHT_get_start (GSF_dht, - GNUNET_TIME_UNIT_FOREVER_REL, - pr->public_data.type, - &pr->public_data.query, - DEFAULT_GET_REPLICATION, - GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, - pr->bf, - pr->mingle, - xquery, - xquery_size, - &handle_dht_reply, - pr); + { + GNUNET_assert (0 != pr->sender_pid); + GNUNET_PEER_resolve (pr->sender_pid, &pi); + memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); + xquery_size += sizeof (struct GNUNET_PeerIdentity); + } + pr->gh = + GNUNET_DHT_get_start (GSF_dht, GNUNET_TIME_UNIT_FOREVER_REL, + pr->public_data.type, &pr->public_data.query, + 5 /* DEFAULT_GET_REPLICATION */ , + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, + /* FIXME: can no longer pass pr->bf/pr->mingle... */ + xquery, xquery_size, &handle_dht_reply, pr); } /** * Task that issues a warning if the datastore lookup takes too long. - * + * * @param cls the 'struct GSF_PendingRequest' * @param tc task context */ static void -warn_delay_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GSF_PendingRequest *pr = cls; GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Datastore lookup already took %llu ms!\n"), - (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &warn_delay_task, - pr); + _("Datastore lookup already took %llu ms!\n"), + (unsigned long long) + GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task, + pr); } /** * Task that issues a warning if the datastore lookup takes too long. - * + * * @param cls the 'struct GSF_PendingRequest' * @param tc task context */ static void -odc_warn_delay_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GSF_PendingRequest *pr = cls; GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("On-demand lookup already took %llu ms!\n"), - (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &odc_warn_delay_task, - pr); + _("On-demand lookup already took %llu ms!\n"), + (unsigned long long) + GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &odc_warn_delay_task, pr); } @@ -1093,201 +1169,261 @@ odc_warn_delay_task (void *cls, * maybe 0 if no unique identifier is available */ static void -process_local_reply (void *cls, - const GNUNET_HashCode *key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, - uint64_t uid) +process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, uint64_t uid) { struct GSF_PendingRequest *pr = cls; GSF_LocalLookupContinuation cont; struct ProcessReplyClosure prq; GNUNET_HashCode query; unsigned int old_rf; - - pr->qe = NULL; + GNUNET_SCHEDULER_cancel (pr->warn_task); pr->warn_task = GNUNET_SCHEDULER_NO_TASK; - if (0 == pr->replies_seen_count) + if (NULL != pr->qe) + { + pr->qe = NULL; + if (NULL == key) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (no results)"), + 1, GNUNET_NO); + } + if (GNUNET_NO == pr->have_first_uid) { pr->first_uid = uid; + pr->have_first_uid = 1; } - else + else { - if (uid == pr->first_uid) - key = NULL; /* all replies seen! */ + if ((uid == pr->first_uid) && (key != NULL)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (seen all)"), + 1, GNUNET_NO); + key = NULL; /* all replies seen! */ + } + pr->have_first_uid++; + if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups aborted (more than MAX_RESULTS)"), + 1, GNUNET_NO); + key = NULL; /* all replies seen! */ + } } + } if (NULL == key) - { -#if DEBUG_FS > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No further local responses available.\n"); + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "No further local responses available.\n"); #endif - if (NULL != (cont = pr->llc_cont)) - { - pr->llc_cont = NULL; - cont (pr->llc_cont_cls, - pr, - pr->local_result); - } - return; - } + if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || + (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK)) + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# requested DBLOCK or IBLOCK not found"), 1, + GNUNET_NO); + goto check_error_and_continue; + } #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reply for `%s' of type %d with UID %llu from datastore.\n", - GNUNET_h2s (key), - type, - (unsigned long long) uid); + "Received reply for `%s' of type %d with UID %llu from datastore.\n", + GNUNET_h2s (key), type, (unsigned long long) uid); #endif if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - { -#if DEBUG_FS > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found ONDEMAND block, performing on-demand encoding\n"); + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found ONDEMAND block, performing on-demand encoding\n"); #endif + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# on-demand blocks matched requests"), 1, + GNUNET_NO); + pr->qe_start = GNUNET_TIME_absolute_get (); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &odc_warn_delay_task, pr); + if (GNUNET_OK == + GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, + anonymity, expiration, uid, + &process_local_reply, pr)) + { GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# on-demand blocks matched requests"), - 1, - GNUNET_NO); - pr->qe_start = GNUNET_TIME_absolute_get (); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &odc_warn_delay_task, - pr); - if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, - anonymity, expiration, uid, - &process_local_reply, - pr)) - { - GNUNET_SCHEDULER_cancel (pr->warn_task); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &warn_delay_task, - pr); - pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, - pr->local_result_offset - 1, - &pr->public_data.query, - pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK - ? GNUNET_BLOCK_TYPE_ANY - : pr->public_data.type, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* queue priority */, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* max queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_local_reply, - pr); - GNUNET_assert (NULL != pr->qe); - } - return; + gettext_noop + ("# on-demand lookups performed successfully"), + 1, GNUNET_NO); + return; /* we're done */ } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# on-demand lookups failed"), 1, + GNUNET_NO); + GNUNET_SCHEDULER_cancel (pr->warn_task); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &warn_delay_task, pr); + pr->qe = + GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1, + &pr->public_data.query, + pr->public_data.type == + GNUNET_BLOCK_TYPE_FS_DBLOCK ? + GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & + pr->public_data.options)) ? UINT_MAX : 1 + /* queue priority */ , + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & + pr->public_data.options)) ? UINT_MAX : + datastore_queue_size + /* max queue size */ , + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, pr); + if (NULL != pr->qe) + return; /* we're done */ + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (error queueing)"), + 1, GNUNET_NO); + goto check_error_and_continue; + } old_rf = pr->public_data.results_found; memset (&prq, 0, sizeof (prq)); prq.data = data; prq.expiration = expiration; - prq.size = size; - if (GNUNET_OK != - GNUNET_BLOCK_get_key (GSF_block_ctx, - type, - data, - size, - &query)) + prq.size = size; + if (GNUNET_OK != + GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query)) + { + GNUNET_break (0); + GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, + GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); + pr->qe_start = GNUNET_TIME_absolute_get (); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &warn_delay_task, pr); + pr->qe = + GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1, + &pr->public_data.query, + pr->public_data.type == + GNUNET_BLOCK_TYPE_FS_DBLOCK ? + GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & + pr->public_data.options)) ? UINT_MAX : 1 + /* queue priority */ , + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & + pr->public_data.options)) ? UINT_MAX : + datastore_queue_size + /* max queue size */ , + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, pr); + if (pr->qe == NULL) { - GNUNET_break (0); - GNUNET_DATASTORE_remove (GSF_dsh, - key, - size, data, - -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, - NULL, NULL); - pr->qe_start = GNUNET_TIME_absolute_get (); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &warn_delay_task, - pr); - pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, - pr->local_result_offset - 1, - &pr->public_data.query, - pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK - ? GNUNET_BLOCK_TYPE_ANY - : pr->public_data.type, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* queue priority */, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* max queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_local_reply, - pr); - GNUNET_assert (NULL != pr->qe); - return; + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (error queueing)"), + 1, GNUNET_NO); + goto check_error_and_continue; } + return; + } prq.type = type; - prq.priority = priority; + prq.priority = priority; prq.request_found = GNUNET_NO; prq.anonymity_level = anonymity; - if ( (old_rf == 0) && - (pr->public_data.results_found == 0) ) + if ((old_rf == 0) && (pr->public_data.results_found == 0)) GSF_update_datastore_delay_ (pr->public_data.start_time); process_reply (&prq, key, pr); pr->local_result = prq.eval; if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) - { - if (NULL != (cont = pr->llc_cont)) - { - pr->llc_cont = NULL; - cont (pr->llc_cont_cls, - pr, - pr->local_result); - } - return; - } - if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && - ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || - (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) ) - { + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (found last result)"), + 1, GNUNET_NO); + goto check_error_and_continue; + } + if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && + ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) || + (pr->public_data.results_found > 5 + 2 * pr->public_data.priority))) + { #if DEBUG_FS > 2 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Load too high, done with request\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n"); #endif - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# processing result set cut short due to load"), - 1, - GNUNET_NO); - if (NULL != (cont = pr->llc_cont)) - { - pr->llc_cont = NULL; - cont (pr->llc_cont_cls, - pr, - pr->local_result); - } - return; - } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (load too high)"), + 1, GNUNET_NO); + goto check_error_and_continue; + } pr->qe_start = GNUNET_TIME_absolute_get (); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &warn_delay_task, - pr); - pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, - pr->local_result_offset++, - &pr->public_data.query, - pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK - ? GNUNET_BLOCK_TYPE_ANY - : pr->public_data.type, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* queue priority */, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* max queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_local_reply, - pr); - GNUNET_assert (NULL != pr->qe); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task, + pr); + pr->qe = + GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++, + &pr->public_data.query, + pr->public_data.type == + GNUNET_BLOCK_TYPE_FS_DBLOCK ? + GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & pr-> + public_data.options)) ? UINT_MAX : 1 + /* queue priority */ , + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & pr-> + public_data.options)) ? UINT_MAX : + datastore_queue_size + /* max queue size */ , + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, pr); + /* check if we successfully queued another datastore request; + * if so, return, otherwise call our continuation (if we have + * any) */ +check_error_and_continue: + if (NULL != pr->qe) + return; + if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) + { + GNUNET_SCHEDULER_cancel (pr->warn_task); + pr->warn_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL == (cont = pr->llc_cont)) + return; /* no continuation */ + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, pr, pr->local_result); +} + + +/** + * Is the given target a legitimate peer for forwarding the given request? + * + * @param pr request + * @param target + * @return GNUNET_YES if this request could be forwarded to the given peer + */ +int +GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr, + const struct GNUNET_PeerIdentity *target) +{ + struct GNUNET_PeerIdentity pi; + + if (0 == pr->origin_pid) + return GNUNET_YES; + GNUNET_PEER_resolve (pr->origin_pid, &pi); + return (0 == + memcmp (&pi, target, + sizeof (struct GNUNET_PeerIdentity))) ? GNUNET_NO : + GNUNET_YES; } @@ -1300,32 +1436,47 @@ process_local_reply (void *cls, */ void GSF_local_lookup_ (struct GSF_PendingRequest *pr, - GSF_LocalLookupContinuation cont, - void *cont_cls) + GSF_LocalLookupContinuation cont, void *cont_cls) { GNUNET_assert (NULL == pr->gh); GNUNET_assert (NULL == pr->llc_cont); pr->llc_cont = cont; pr->llc_cont_cls = cont_cls; pr->qe_start = GNUNET_TIME_absolute_get (); - pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &warn_delay_task, - pr); - pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, - pr->local_result_offset++, - &pr->public_data.query, - pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK - ? GNUNET_BLOCK_TYPE_ANY - : pr->public_data.type, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* queue priority */, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* max queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_local_reply, - pr); + pr->warn_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task, + pr); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Datastore lookups initiated"), 1, + GNUNET_NO); + pr->qe = + GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++, + &pr->public_data.query, + pr->public_data.type == + GNUNET_BLOCK_TYPE_FS_DBLOCK ? + GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & pr-> + public_data.options)) ? UINT_MAX : 1 + /* queue priority */ , + (0 != + (GSF_PRO_PRIORITY_UNLIMITED & pr-> + public_data.options)) ? UINT_MAX : + datastore_queue_size + /* max queue size */ , + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, pr); + if (NULL != pr->qe) + return; + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# Datastore lookups concluded (error queueing)"), + 1, GNUNET_NO); + GNUNET_SCHEDULER_cancel (pr->warn_task); + pr->warn_task = GNUNET_SCHEDULER_NO_TASK; + pr->llc_cont = NULL; + if (NULL != cont) + cont (cont_cls, pr, pr->local_result); } @@ -1345,7 +1496,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, */ int GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *message) { const struct PutMessage *put; uint16_t msize; @@ -1354,34 +1505,36 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute expiration; GNUNET_HashCode query; struct ProcessReplyClosure prq; - struct GNUNET_TIME_Relative block_time; + struct GNUNET_TIME_Relative block_time; double putl; struct PutMigrationContext *pmc; msize = ntohs (message->size); if (msize < sizeof (struct PutMessage)) - { - GNUNET_break_op(0); - return GNUNET_SYSERR; - } - put = (const struct PutMessage*) message; + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + put = (const struct PutMessage *) message; dsize = msize - sizeof (struct PutMessage); type = ntohl (put->type); expiration = GNUNET_TIME_absolute_ntoh (put->expiration); + /* do not allow migrated content to live longer than 1 year */ + expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), + expiration); if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) return GNUNET_SYSERR; if (GNUNET_OK != - GNUNET_BLOCK_get_key (GSF_block_ctx, - type, - &put[1], - dsize, - &query)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# GAP PUT messages received"), 1, + GNUNET_NO); /* now, lookup 'query' */ - prq.data = (const void*) &put[1]; + prq.data = (const void *) &put[1]; if (NULL != cp) prq.sender = cp; else @@ -1390,64 +1543,74 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, prq.type = type; prq.expiration = expiration; prq.priority = 0; - prq.anonymity_level = 1; + prq.anonymity_level = UINT32_MAX; prq.request_found = GNUNET_NO; - GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, - &query, - &process_reply, - &prq); + GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, &query, &process_reply, + &prq); if (NULL != cp) - { - GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); - GSF_get_peer_performance_data_ (cp)->trust += prq.priority; - } - if ( (GNUNET_YES == active_to_migration) && - (GNUNET_NO == test_put_load_too_high (prq.priority)) ) - { + { + GSF_connected_peer_change_preference_ (cp, + CONTENT_BANDWIDTH_VALUE + + 1000 * prq.priority); + GSF_get_peer_performance_data_ (cp)->trust += prq.priority; + } + if ((GNUNET_YES == active_to_migration) && + (GNUNET_NO == test_put_load_too_high (prq.priority))) + { #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Replicating result for query `%s' with priority %u\n", - GNUNET_h2s (&query), - prq.priority); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicating result for query `%s' with priority %u\n", + GNUNET_h2s (&query), prq.priority); #endif - pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); - pmc->start = GNUNET_TIME_absolute_get (); - pmc->requested = prq.request_found; - GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, - &pmc->origin); - GNUNET_DATASTORE_put (GSF_dsh, - 0, &query, dsize, &put[1], - type, prq.priority, 1 /* anonymity */, - 0 /* replication */, - expiration, - 1 + prq.priority, MAX_DATASTORE_QUEUE, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &put_migration_continuation, - pmc); + pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); + pmc->start = GNUNET_TIME_absolute_get (); + pmc->requested = prq.request_found; + GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid); + GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, + &pmc->origin); + if (NULL == + GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, + prq.priority, 1 /* anonymity */ , + 0 /* replication */ , + expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &put_migration_continuation, pmc)) + { + put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL); } + } else - { + { #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Choosing not to keep content `%s' (%d/%d)\n", - GNUNET_h2s (&query), - active_to_migration, - test_put_load_too_high (prq.priority)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Choosing not to keep content `%s' (%d/%d)\n", + GNUNET_h2s (&query), active_to_migration, + test_put_load_too_high (prq.priority)); #endif - } + } putl = GNUNET_LOAD_get_load (datastore_put_load); - if ( (NULL != (cp = prq.sender)) && - (GNUNET_NO == prq.request_found) && - ( (GNUNET_YES != active_to_migration) || - (putl > 2.5 * (1 + prq.priority)) ) ) - { - if (GNUNET_YES != active_to_migration) - putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); - block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - (unsigned int) (60000 * putl * putl))); - GSF_block_peer_migration_ (cp, block_time); - } + if ((NULL != (cp = prq.sender)) && (GNUNET_NO == prq.request_found) && + ((GNUNET_YES != active_to_migration) || + (putl > 2.5 * (1 + prq.priority)))) + { + if (GNUNET_YES != active_to_migration) + putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); + block_time = + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + 5000 + + GNUNET_CRYPTO_random_u32 + (GNUNET_CRYPTO_QUALITY_WEAK, + (unsigned int) (60000 * putl * putl))); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking to stop migration for %llu ms because of load %f and events %d/%d\n", + (unsigned long long) block_time.rel_value, + putl, + active_to_migration, + (GNUNET_NO == prq.request_found)); +#endif + GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); + } return GNUNET_OK; } @@ -1458,22 +1621,39 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, void GSF_pending_request_init_ () { + unsigned long long bps; + if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (GSF_cfg, - "fs", - "MAX_PENDING_REQUESTS", - &max_pending_requests)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Configuration fails to specify `%s', assuming default value."), - "MAX_PENDING_REQUESTS"); - } - active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, - "FS", - "CONTENT_CACHING"); + GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs", + "MAX_PENDING_REQUESTS", + &max_pending_requests)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _ + ("Configuration fails to specify `%s', assuming default value."), + "MAX_PENDING_REQUESTS"); + } + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "ats", "WAN_QUOTA_OUT", + &bps)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _ + ("Configuration fails to specify `%s', assuming default value."), + "WAN_QUOTA_OUT"); + bps = 65536; + } + /* queue size should be #queries we can have pending and satisfy within + * a carry interval: */ + datastore_queue_size = + bps * GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S / DBLOCK_SIZE; + + active_to_migration = + GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING"); datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); - requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + requests_by_expiration_heap = + GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); } @@ -1483,9 +1663,7 @@ GSF_pending_request_init_ () void GSF_pending_request_done_ () { - GNUNET_CONTAINER_multihashmap_iterate (pr_map, - &clean_request, - NULL); + GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL); GNUNET_CONTAINER_multihashmap_destroy (pr_map); pr_map = NULL; GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);