From 8111b29a93408e1befe322c87b468d6a86a8ae66 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 4 Mar 2011 12:39:07 +0000 Subject: [PATCH] fixes --- src/fs/gnunet-service-fs.h | 7 ++ src/fs/gnunet-service-fs_pr.c | 153 ++++++++++++++++++++-------------- src/fs/gnunet-service-fs_pr.h | 4 +- 3 files changed, 101 insertions(+), 63 deletions(-) diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index de83e1b10..d16b1a856 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -99,6 +99,13 @@ extern double GSF_current_priorities; extern unsigned int GSF_cover_query_count; +/** + * Our block context. + */ +extern struct GNUNET_BLOCK_Context *GSF_block_ctx; + + + #endif /* end of gnunet-service-fs.h */ diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index aca63ac94..3c291cfc9 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -24,6 +24,8 @@ * @author Christian Grothoff */ #include "platform.h" +#include "gnunet_load_lib.h" +#include "gnunet-service-fs_cp.h" #include "gnunet-service-fs_pr.h" @@ -110,6 +112,14 @@ static int active_to_migration; static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; +/** + * Maximum number of requests (from other peers, overall) that we're + * willing to have pending at any given point in time. Can be changed + * via the configuration file (32k is just the default). + */ +static unsigned long long max_pending_requests = (32 * 1024); + + /** * How many bytes should a bloomfilter be if we have already seen * entry_count responses? Note that BLOOMFILTER_K gives us the number @@ -157,8 +167,8 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr) size_t nsize; GNUNET_HashCode mhash; - nsize = compute_bloomfilter_size (pr->replies_seen_off); - if ( (bf != NULL) && + nsize = compute_bloomfilter_size (pr->replies_seen_count); + if ( (pr->bf != NULL) && (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) return GNUNET_NO; /* size not changed */ if (pr->bf != NULL) @@ -221,7 +231,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); pr->public_data.query = *query; - if (GNUNET_BLOCK_TYPE_SBLOCK == type) + if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) { GNUNET_assert (NULL != namespace); pr->public_data.namespace = *namespace; @@ -229,9 +239,9 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, if (NULL != target) { pr->public_data.target = *target; - pr->has_target = GNUNET_YES; + pr->public_data.has_target = GNUNET_YES; } - pr->public_data.anonymity_level = anonymity_data; + pr->public_data.anonymity_level = anonymity_level; pr->public_data.priority = priority; pr->public_data.original_priority = priority; pr->public_data.options = options; @@ -240,19 +250,19 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, pr->rh = rh; pr->rh_cls = rh_cls; if (ttl >= 0) - pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - (uint32_t) ttl)); + pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + (uint32_t) ttl)); else - pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - (uint32_t) (- ttl))); + pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + (uint32_t) (- ttl))); if (replies_seen_count > 0) { pr->replies_seen_size = replies_seen_count; pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); memcpy (pr->replies_seen, replies_seen, - replies_seen_count * sizeof (struct GNUNET_HashCode)); + replies_seen_count * sizeof (GNUNET_HashCode)); pr->replies_seen_count = replies_seen_count; } if (NULL != bf_data) @@ -275,7 +285,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, { pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr, - pr->ttl.abs_value); + pr->public_data.ttl.abs_value); /* make sure we don't track too many requests */ while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) { @@ -326,7 +336,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) return; /* integer overflow */ - if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) + if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) { /* we're responsible for the BF, full refresh */ if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) @@ -336,7 +346,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, memcpy (&pr->replies_seen[pr->replies_seen_count], replies_seen, sizeof (GNUNET_HashCode) * replies_seen_count); - pr->replies_seen_count += replies_seen; + pr->replies_seen_count += replies_seen_count; if (GNUNET_NO == refresh_bloomfilter (pr)) { /* bf not recalculated, simply extend it with new bits */ @@ -357,8 +367,8 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, any bloom-filter, so we need to create one on-the-fly */ pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); - pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count), - pr->mingle, + pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + compute_bloomfilter_size (replies_seen_count), BLOOMFILTER_K); } for (i=0;ireplies_seen_count;i++) @@ -388,16 +398,16 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, size_t buf_size, void *buf) { - struct PendingMessage *pm; char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; struct GetMessage *gm; GNUNET_HashCode *ext; size_t msize; unsigned int k; - int no_route; uint32_t bm; uint32_t prio; size_t bf_size; + struct GNUNET_TIME_Absolute now; + int64_t ttl; k = 0; bm = 0; @@ -406,12 +416,12 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, bm |= GET_MESSAGE_BIT_RETURN_TO; k++; } - if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) + if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) { bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; k++; } - if (GNUNET_YES == pr->has_target) + if (GNUNET_YES == pr->public_data.has_target) { bm |= GET_MESSAGE_BIT_TRANSMIT_TO; k++; @@ -424,7 +434,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, gm = (struct GetMessage*) lbuf; gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); gm->header.size = htons (msize); - gm->type = htonl (pr->type); + gm->type = htonl (pr->public_data.type); if (GNUNET_YES == do_route) prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, pr->public_data.priority + 1); @@ -432,18 +442,24 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, prio = 0; pr->public_data.priority -= prio; gm->priority = htonl (prio); - gm->ttl = htonl (pr->ttl); + now = GNUNET_TIME_absolute_get (); + ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value); + gm->ttl = htonl (ttl / 1000); gm->filter_mutator = htonl(pr->mingle); gm->hash_bitmap = htonl (bm); - gm->query = pr->query; + gm->query = pr->public_data.query; ext = (GNUNET_HashCode*) &gm[1]; - k = 0; + k = 0; if (GNUNET_YES != do_route) - GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); - if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) - memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); - if (GNUNET_YES == pr->has_target) - GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); + GNUNET_PEER_resolve (pr->cp->pid, + (struct GNUNET_PeerIdentity*) &ext[k++]); + if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) + memcpy (&ext[k++], + &pr->public_data.namespace, + sizeof (GNUNET_HashCode)); + if (GNUNET_YES == pr->public_data.has_target) + GNUNET_PEER_resolve (pr->public_data.target_pid, + (struct GNUNET_PeerIdentity*) &ext[k++]); if (pr->bf != NULL) GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, (char*) &ext[k], @@ -582,13 +598,10 @@ static void update_request_performance_data (struct ProcessReplyClosure *prq, struct GSF_PendingRequest *pr) { - unsigned int i; - struct GNUNET_TIME_Relative cur_delay; - if (prq->sender == NULL) return; GSF_peer_update_performance_ (prq->sender, - pr->start_time, + pr->public_data.start_time, prq->priority); } @@ -608,12 +621,6 @@ process_reply (void *cls, { struct ProcessReplyClosure *prq = cls; struct GSF_PendingRequest *pr = value; - struct PendingMessage *reply; - struct ClientResponseMessage *creply; - struct ClientList *cl; - struct PutMessage *pm; - struct ConnectedPeer *cp; - size_t msize; GNUNET_HashCode chash; #if DEBUG_FS @@ -622,16 +629,17 @@ process_reply (void *cls, (unsigned int) prq->type, GNUNET_h2s (key)); #endif - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies received and matched"), 1, GNUNET_NO); - prq->eval = GNUNET_BLOCK_evaluate (block_ctx, + prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx, prq->type, key, &pr->bf, pr->mingle, - pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, + &pr->public_data.namespace, + (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0, prq->data, prq->size); switch (prq->eval) @@ -642,15 +650,19 @@ process_reply (void *cls, case GNUNET_BLOCK_EVALUATION_OK_LAST: /* short cut: stop processing early, no BF-update, etc. */ update_request_performance_data (prq, pr); - GNUNET_LOAD_update (rt_entry_lifetime, - GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); + GNUNET_LOAD_update (GSF_rt_entry_lifetime, + GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value); /* pass on to other peers / local clients */ - pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO); + pr->rh (pr->rh_cls, + pr, + prq->expiration, + prq->data, prq->size, + GNUNET_NO); /* destroy request, we're done */ GSF_pending_request_cancel_ (pr); return GNUNET_YES; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# duplicate replies discarded (bloomfilter)"), 1, GNUNET_NO); @@ -686,18 +698,22 @@ process_reply (void *cls, "Found result for query `%s' in local datastore\n", GNUNET_h2s (key)); #endif - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# results found locally"), 1, GNUNET_NO); } prq->priority += pr->public_data.original_priority; - pr->public_data.remaining_priority = 0; + pr->public_data.priority = 0; pr->public_data.original_priority = 0; pr->public_data.results_found++; prq->request_found = GNUNET_YES; /* finally, pass on to other peer / local client */ - pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES); + pr->rh (pr->rh_cls, + pr, + prq->expiration, + prq->data, prq->size, + GNUNET_YES); return GNUNET_YES; } @@ -725,7 +741,7 @@ put_migration_continuation (void *cls, delay.rel_value); if (GNUNET_OK == success) return; - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# datastore 'put' failures"), 1, GNUNET_NO); @@ -750,7 +766,7 @@ test_put_load_too_high (uint32_t priority) ld = GNUNET_LOAD_get_load (datastore_put_load); if (ld < 2.0 * (1 + priority)) return GNUNET_NO; - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# storage requests dropped due to high load"), 1, GNUNET_NO); @@ -776,7 +792,7 @@ test_put_load_too_high (uint32_t priority) void GSF_handle_dht_reply_ (void *cls, struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, const struct GNUNET_PeerIdentity * const *get_path, const struct GNUNET_PeerIdentity * const *put_path, enum GNUNET_BLOCK_Type type, @@ -785,6 +801,7 @@ GSF_handle_dht_reply_ (void *cls, { struct GSF_PendingRequest *pr = cls; struct ProcessReplyClosure prq; + struct GNUNET_TIME_Absolute *start; memset (&prq, 0, sizeof (prq)); prq.data = data; @@ -803,10 +820,10 @@ GSF_handle_dht_reply_ (void *cls, #endif start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); *start = GNUNET_TIME_absolute_get (); - GNUNET_DATASTORE_put (dsh, - 0, &query, dsize, &put[1], + GNUNET_DATASTORE_put (GSF_dsh, + 0, key, size, data, type, prq.priority, 1 /* anonymity */, - expiration, + exp, 1 + prq.priority, MAX_DATASTORE_QUEUE, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, @@ -856,7 +873,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) return GNUNET_SYSERR; if (GNUNET_OK != - GNUNET_BLOCK_get_key (block_ctx, + GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, @@ -878,14 +895,14 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, prq.anonymity_level = 1; prq.finished = GNUNET_NO; prq.request_found = GNUNET_NO; - GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, + GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, &query, &process_reply, &prq); if (NULL != cp) { - GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); - GSF_get_peer_performance_data (cp)->trust += prq.priority; + GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); + GSF_get_peer_performance_data_ (cp)->trust += prq.priority; } if ( (GNUNET_YES == active_to_migration) && (GNUNET_NO == test_put_load_too_high (prq.priority)) ) @@ -898,7 +915,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, #endif start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); *start = GNUNET_TIME_absolute_get (); - GNUNET_DATASTORE_put (dsh, + GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, prq.priority, 1 /* anonymity */, expiration, @@ -918,7 +935,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (unsigned int) (60000 * putl * putl))); - GSF_block_peer_migration (cp, block_time); + GSF_block_peer_migration_ (cp, block_time); } return GNUNET_OK; } @@ -926,10 +943,22 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, /** * Setup the subsystem. + * + * @param cfg configuration to use */ void -GSF_pending_request_init_ () +GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg) { + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (cfg, + "fs", + "MAX_PENDING_REQUESTS", + &max_pending_requests)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Configuration fails to specify `%s', assuming default value."), + "MAX_PENDING_REQUESTS"); + } pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); } diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 4357565c7..4af4f36ee 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -323,9 +323,11 @@ GSF_handle_dht_reply_ (void *cls, /** * Setup the subsystem. + * + * @param cfg configuration to use */ void -GSF_pending_request_init_ (void); +GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg); /** -- 2.25.1