From 017907f0af8c44ae86d20c241cb9662e0ac8be2b Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Thu, 2 Sep 2010 15:06:42 +0000 Subject: [PATCH] dht changes, mostly making the driver do find peers adaptively... currently not seeing any crazy issues --- src/dht/dht.h | 2 +- src/dht/gnunet-dht-driver.c | 229 ++++++++++++++++++++++++++++++++--- src/dht/gnunet-service-dht.c | 38 +++++- 3 files changed, 246 insertions(+), 23 deletions(-) diff --git a/src/dht/dht.h b/src/dht/dht.h index 0f662f0b1..04931e1c3 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -31,7 +31,7 @@ #define DEBUG_DHT_ROUTING GNUNET_YES -#define DHT_BLOOM_SIZE 16 +#define DHT_BLOOM_SIZE 32 #define DHT_BLOOM_K 8 diff --git a/src/dht/gnunet-dht-driver.c b/src/dht/gnunet-dht-driver.c index 623a7b533..1cd281709 100644 --- a/src/dht/gnunet-dht-driver.c +++ b/src/dht/gnunet-dht-driver.c @@ -50,8 +50,8 @@ /* Timeout for waiting for puts to be sent to the service */ #define DEFAULT_PUT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 10) -/* Timeout for waiting for puts to be sent to the service */ -#define DEFAULT_FIND_PEER_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 90) +/* Time to allow a find peer request to take */ +#define DEFAULT_FIND_PEER_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 40) #define DEFAULT_SECONDS_PER_PEER_START GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 45) @@ -61,9 +61,15 @@ #define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2 +/* If more than this many peers are added, slow down sending */ +#define MAX_FIND_PEER_CUTOFF 2500 + +/* If less than this many peers are added, speed up sending */ +#define MIN_FIND_PEER_CUTOFF 500 + #define DEFAULT_MAX_OUTSTANDING_PUTS 10 -#define DEFAULT_MAX_OUTSTANDING_FIND_PEERS 5 +#define DEFAULT_MAX_OUTSTANDING_FIND_PEERS 64 #define DEFAULT_FIND_PEER_OFFSET GNUNET_TIME_relative_divide (DEFAULT_FIND_PEER_DELAY, DEFAULT_MAX_OUTSTANDING_FIND_PEERS) @@ -244,6 +250,8 @@ struct StatisticsIteratorContext */ struct TopologyIteratorContext { + unsigned int total_iterations; + unsigned int current_iteration; unsigned int total_connections; struct GNUNET_PeerIdentity *peer; GNUNET_SCHEDULER_Task cont; @@ -618,7 +626,7 @@ void log_topology_cb (void *cls, else { GNUNET_assert(dhtlog_handle != NULL); - fprintf(stderr, "topology iteration finished (%u connections), scheduling continuation\n", topo_ctx->total_connections); + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Topology iteration (%u/%u) finished (%u connections)\n", topo_ctx->current_iteration, topo_ctx->total_iterations, topo_ctx->total_connections); dhtlog_handle->update_topology(topo_ctx->total_connections); if (topo_ctx->cont != NULL) GNUNET_SCHEDULER_add_now (sched, topo_ctx->cont, topo_ctx->cls); @@ -1055,12 +1063,54 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) */ struct FindPeerContext { - struct GNUNET_DHT_Handle *dht_handle; + /** + * How long to send find peer requests, once the settle time + * is over don't send any more out! + * + * TODO: Add option for settle time and find peer sending time? + */ struct GNUNET_TIME_Absolute endtime; + + /** + * Number of connections in the current topology + * (after this round of find peer requests has ended). + */ unsigned int current_peers; + + /** + * Number of connections in the current topology + * (before this round of find peer requests started). + */ unsigned int previous_peers; + + /** + * Number of find peer requests we have currently + * outstanding. + */ unsigned int outstanding; + + /** + * Number of find peer requests to send in this round. + */ unsigned int total; + + /** + * Number of find peer requests sent last time around. + */ + unsigned int last_sent; + + /** + * Hashmap of peers in the current topology, value + * is a PeerCount, with the number of connections + * this peer has. + */ + struct GNUNET_CONTAINER_MultiHashMap *peer_hash; + + /** + * Min heap which orders values in the peer_hash for + * easy lookup. + */ + struct GNUNET_CONTAINER_Heap *peer_min_heap; }; static void @@ -1077,15 +1127,72 @@ static unsigned int connection_estimate(unsigned int peer_count, unsigned int bu i = num_peers; filled = 0; - while (i > bucket_size) + while (i >= bucket_size) { filled++; i = i/2; } + filled++; /* Add one filled bucket to account for one "half full" and some miscellaneous */ return filled * bucket_size * peer_count; } +struct PeerCount +{ + /** Node in the heap */ + struct GNUNET_CONTAINER_HeapNode *heap_node; + + /** Peer the count refers to */ + struct GNUNET_PeerIdentity peer_id; + + /** Count of connections this peer has */ + unsigned int count; +}; + + +/** + * Add a connection to the find_peer_context given. This may + * be complete overkill, but allows us to choose the peers with + * the least connections to initiate find peer requests from. + */ +static void add_new_connection(struct FindPeerContext *find_peer_context, + const struct GNUNET_PeerIdentity *first, + const struct GNUNET_PeerIdentity *second) +{ + struct PeerCount *first_count; + struct PeerCount *second_count; + + if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &first->hashPubKey)) + { + first_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &first->hashPubKey); + first_count->count++; + GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, first_count->heap_node, first_count->count); + } + else + { + first_count = GNUNET_malloc(sizeof(struct PeerCount)); + first_count->count = 1; + memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity)); + first_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, first_count->count); + GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &first->hashPubKey, first_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } + + if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &second->hashPubKey)) + { + second_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &second->hashPubKey); + second_count->count++; + GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, second_count->heap_node, second_count->count); + } + else + { + second_count = GNUNET_malloc(sizeof(struct PeerCount)); + second_count->count = 1; + memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity)); + second_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, second_count->count); + GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &second->hashPubKey, second_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } +} + /** * Callback for iterating over all the peer connections of a peer group. */ @@ -1099,17 +1206,18 @@ void count_peers_cb (void *cls, struct FindPeerContext *find_peer_context = cls; if ((first != NULL) && (second != NULL)) { + add_new_connection(find_peer_context, first, second); find_peer_context->current_peers++; } else { GNUNET_assert(dhtlog_handle != NULL); - fprintf(stderr, "peer count finished (%u connections), %u new peers, connection estimate %u\n", find_peer_context->current_peers, find_peer_context->current_peers - find_peer_context->previous_peers, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)); + /*GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections), %u new peers, connection estimate %u\n", find_peer_context->current_peers, find_peer_context->current_peers - find_peer_context->previous_peers, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));*/ if ((find_peer_context->current_peers - find_peer_context->previous_peers > FIND_PEER_THRESHOLD) && (find_peer_context->current_peers < connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) && (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)) { - GNUNET_SCHEDULER_add_now(sched, schedule_find_peer_requests, find_peer_context); + GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context); } else { @@ -1160,7 +1268,7 @@ handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc struct TestFindPeer *test_find_peer = cls; GNUNET_DHT_disconnect(test_find_peer->dht_handle); - GNUNET_SCHEDULER_add_delayed(sched, find_peer_delay, &decrement_find_peers, test_find_peer); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, test_find_peer); } static void @@ -1170,7 +1278,7 @@ send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers) { - GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_FIND_PEER_OFFSET, &send_find_peer_request, test_find_peer); + GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, &send_find_peer_request, test_find_peer); return; } @@ -1187,6 +1295,29 @@ send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t &handle_find_peer_sent, test_find_peer); } +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int remove_peer_count (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct FindPeerContext *find_peer_ctx = cls; + struct PeerCount *peer_count = value; + GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, peer_count->heap_node); + GNUNET_free(peer_count); + + return GNUNET_YES; +} + + /** * Set up a single find peer request for each peer in the topology. Do this * until the settle time is over, limited by the number of outstanding requests @@ -1197,18 +1328,82 @@ schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContex { struct FindPeerContext *find_peer_ctx = cls; struct TestFindPeer *test_find_peer; + struct PeerCount *peer_count; uint32_t i; uint32_t random; - for (i = 0; i < max_outstanding_find_peers; i++) + if (find_peer_ctx->previous_peers == 0) /* First time, go slowly */ + find_peer_ctx->total = 1; + else if (find_peer_ctx->current_peers - find_peer_ctx->previous_peers > MAX_FIND_PEER_CUTOFF) /* Found LOTS of peers, still go slowly */ + find_peer_ctx->total = find_peer_ctx->last_sent / 2; +#if USE_MIN + else if (find_peer_ctx->current_peers - find_peer_ctx->previous_peers < MIN_FIND_PEER_CUTOFF) + find_peer_ctx->total = find_peer_ctx->last_sent * 2; /* FIXME: always multiply by two (unless above max?) */ + else + find_peer_ctx->total = find_peer_ctx->last_sent; +#else + else + find_peer_ctx->total = find_peer_ctx->last_sent * 2; +#endif + + if (find_peer_ctx->total > max_outstanding_find_peers) + find_peer_ctx->total = max_outstanding_find_peers; + + find_peer_ctx->last_sent = find_peer_ctx->total; + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal %u connections)\n", find_peer_ctx->total, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)); + + find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, find_peer_ctx->total); + for (i = 0; i < find_peer_ctx->total; i++) { test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer)); - random = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers); - test_find_peer->daemon = GNUNET_TESTING_daemon_get(pg, random); + if (find_peer_ctx->previous_peers == 0) /* If we haven't sent any requests, yet choose random peers */ + { + /** + * Attempt to spread find peer requests across even sections of the peer address + * space. Choose basically 1 peer in every num_peers / max_outstanding_requests + * each time, then offset it by a randomish value. + * + * For instance, if num_peers is 100 and max_outstanding is 10, first chosen peer + * will be between 0 - 10, second between 10 - 20, etc. + */ + random = (num_peers / find_peer_ctx->total) * i; + random = random + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (num_peers / find_peer_ctx->total)); + if (random >= num_peers) + { + random = random - num_peers; + } + #if REAL_RANDOM + random = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers); + #endif + test_find_peer->daemon = GNUNET_TESTING_daemon_get(pg, random); + } + else /* If we have sent requests, choose peers with a low number of connections to send requests from */ + { + peer_count = GNUNET_CONTAINER_heap_remove_root(find_peer_ctx->peer_min_heap); + GNUNET_CONTAINER_multihashmap_remove(find_peer_ctx->peer_hash, &peer_count->peer_id.hashPubKey, peer_count); + test_find_peer->daemon = GNUNET_TESTING_daemon_get_by_id(pg, &peer_count->peer_id); + GNUNET_assert(test_find_peer->daemon != NULL); + } + test_find_peer->find_peer_context = find_peer_ctx; - find_peer_ctx->total++; - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(DEFAULT_FIND_PEER_OFFSET, i), &send_find_peer_request, test_find_peer); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(find_peer_offset, i), &send_find_peer_request, test_find_peer); + } + + if ((find_peer_ctx->peer_hash == NULL) && (find_peer_ctx->peer_min_heap == NULL)) + { + find_peer_ctx->peer_hash = GNUNET_CONTAINER_multihashmap_create(num_peers); + find_peer_ctx->peer_min_heap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); } + else + { + GNUNET_CONTAINER_multihashmap_iterate(find_peer_ctx->peer_hash, &remove_peer_count, find_peer_ctx); + GNUNET_CONTAINER_multihashmap_destroy(find_peer_ctx->peer_hash); + find_peer_ctx->peer_hash = GNUNET_CONTAINER_multihashmap_create(num_peers); + } + + GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(find_peer_ctx->peer_hash)); + GNUNET_assert(0 == GNUNET_CONTAINER_heap_get_size(find_peer_ctx->peer_min_heap)); + } /** @@ -1276,7 +1471,9 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t for (i = 1; i < max; i++) { topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext)); - fprintf(stderr, "scheduled topology iteration in %d minutes\n", i); + topo_ctx->current_iteration = i; + topo_ctx->total_iterations = max; + //fprintf(stderr, "scheduled topology iteration in %d minutes\n", i); GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, i), &capture_current_topology, topo_ctx); } topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext)); diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index f2379d72c..eb8a53ff4 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -104,7 +104,8 @@ /** * Default options for find peer requests sent by the dht service. */ -#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE +#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE +/*#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE*/ /** * How long at least to wait before sending another find peer request. @@ -832,6 +833,24 @@ static struct GNUNET_TIME_Relative get_average_send_delay() } #endif +/** + * Given the largest send delay, artificially decrease it + * so the next time around we may have a chance at sending + * again. + */ +static void decrease_max_send_delay(struct GNUNET_TIME_Relative max_time) +{ + unsigned int i; + for (i = 0; i < MAX_REPLY_TIMES; i++) + { + if (reply_times[i].value == max_time.value) + { + reply_times[i].value = reply_times[i].value / 2; + return; + } + } +} + /** * Find the maximum send time of the recently sent values. * @@ -1899,6 +1918,7 @@ static int route_result_message(void *cls, pos = record->head; while (pos != NULL) { +#if STRICT_FORWARDING if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) /* If we have already forwarded this peer id, don't do it again! */ { if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pos->find_peers_responded, &new_peer.hashPubKey)) @@ -1910,6 +1930,7 @@ static int route_result_message(void *cls, else GNUNET_CONTAINER_bloomfilter_add(pos->find_peers_responded, &new_peer.hashPubKey); } +#endif if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */ { @@ -2175,13 +2196,17 @@ handle_dht_find_peer (void *cls, } GNUNET_CONTAINER_bloomfilter_free(incoming_bloom); +#if RESTRICT_FIND_PEER + + /** + * Ignore any find peer requests from a peer we have seen very recently. + */ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent_find_peer_requests, &message_context->key)) /* We have recently responded to a find peer request for this peer! */ { increment_stats("# dht find peer requests ignored (recently seen!)"); return; } -#if RESTRICT_FIND_PEER /** * Use this check to only allow the peer to respond to find peer requests if * it would be beneficial to have the requesting peer in this peers routing @@ -2201,7 +2226,7 @@ handle_dht_find_peer (void *cls, recent_hash = GNUNET_malloc(sizeof(GNUNET_HashCode)); memcpy(recent_hash, &message_context->key, sizeof(GNUNET_HashCode)); GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests, &message_context->key, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 120), &remove_recent_find_peer, &recent_hash); + GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30), &remove_recent_find_peer, &recent_hash); /* Simplistic find_peer functionality, always return our hello */ hello_size = ntohs(my_hello->size); @@ -2416,7 +2441,7 @@ get_forward_count (unsigned int hop_count, size_t target_replication) target_value++; #endif - random_value = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, target_replication * (hop_count + 1) + diameter) + 1; + random_value = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_STRONG, target_replication * (hop_count + 1) + diameter) + 1; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "replication %u, at hop %d, will split with probability %f\n", target_replication, hop_count, target_replication / (double)((target_replication * (hop_count + 1) + diameter) + 1)); target_value = 1; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "random %u, target %u, max %u\n", random_value, target_replication, target_replication * (hop_count + 1) + diameter); @@ -3263,7 +3288,7 @@ handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client, switch (ntohs(dht_control_msg->command)) { case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending self seeking find peer request!\n"); + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending self seeking find peer request!\n"); GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL); break; case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET: @@ -3361,7 +3386,8 @@ handle_dht_p2p_route_request (void *cls, if (get_max_send_delay().value > MAX_REQUEST_TIME.value) { - fprintf(stderr, "Sending of previous requests has taken far too long, backing off!\n"); + fprintf(stderr, "Sending of previous replies took far too long, backing off!\n"); + decrease_max_send_delay(get_max_send_delay()); return GNUNET_YES; } -- 2.25.1