From c59802b4a2aa41f3293121742eaacd8df295302c Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Thu, 5 Aug 2010 15:45:37 +0000 Subject: [PATCH] routing table changes, not stable --- src/dht/gnunet-service-dht.c | 318 ++++++++++++++++++++++++++--------- 1 file changed, 241 insertions(+), 77 deletions(-) diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 1e2ba80a1..125bd7fff 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -41,11 +41,19 @@ #include "dhtlog.h" #include "dht.h" +#define PRINT_TABLES GNUNET_NO + +#define EXTRA_CHECKS GNUNET_YES /** * How many buckets will we allow total. */ #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8 +/** + * Should the DHT issue FIND_PEER requests to get better routing tables? + */ +#define DO_FIND_PEER GNUNET_YES + /** * What is the maximum number of peers in a given bucket. */ @@ -62,7 +70,7 @@ #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE -#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5) +#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1) /** * Real maximum number of hops, at which point we refuse @@ -472,6 +480,11 @@ static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */ */ static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */ +/** + * Hash map of all known peers, for easy removal from k_buckets on disconnect. + */ +static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers; + /** * Maximum size for each bucket. */ @@ -795,6 +808,65 @@ static int find_current_bucket(const GNUNET_HashCode *hc) return actual_bucket; } +/** + * Find a routing table entry from a peer identity + * + * @param peer the peer to look up + * + * @return the bucket number holding the peer, GNUNET_SYSERR if not found + */ +static int +find_bucket_by_peer(const struct PeerInfo *peer) +{ + int bucket; + struct PeerInfo *pos; + + for (bucket = lowest_bucket; bucket < MAX_BUCKETS - 1; bucket++) + { + pos = k_buckets[bucket].head; + while (pos != NULL) + { + if (peer == pos) + return bucket; + pos = pos->next; + } + } + + return GNUNET_SYSERR; /* No such peer. */ +} + +#if PRINT_TABLES +/** + * Print the complete routing table for this peer. + */ +static void +print_routing_table () +{ + int bucket; + struct PeerInfo *pos; + char char_buf[30000]; + int char_pos; + memset(char_buf, 0, sizeof(char_buf)); + char_pos = 0; + char_pos += sprintf(&char_buf[char_pos], "Printing routing table for peer %s\n", my_short_id); + //fprintf(stderr, "Printing routing table for peer %s\n", my_short_id); + for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++) + { + pos = k_buckets[bucket].head; + char_pos += sprintf(&char_buf[char_pos], "Bucket %d:\n", bucket); + //fprintf(stderr, "Bucket %d:\n", bucket); + while (pos != NULL) + { + //fprintf(stderr, "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey)); + char_pos += sprintf(&char_buf[char_pos], "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey)); + pos = pos->next; + } + } + fprintf(stderr, "%s", char_buf); + fflush(stderr); +} +#endif + /** * Find a routing table entry from a peer identity * @@ -831,11 +903,14 @@ find_peer_by_id(const struct GNUNET_PeerIdentity *peer) * the peer to * @param latency the core reported latency of this peer * @param distance the transport level distance to this peer + * + * @return the newly added PeerInfo */ -static void add_peer(const struct GNUNET_PeerIdentity *peer, - unsigned int bucket, - struct GNUNET_TIME_Relative latency, - unsigned int distance) +static struct PeerInfo * +add_peer(const struct GNUNET_PeerIdentity *peer, + unsigned int bucket, + struct GNUNET_TIME_Relative latency, + unsigned int distance) { struct PeerInfo *new_peer; GNUNET_assert(bucket < MAX_BUCKETS); @@ -850,6 +925,8 @@ static void add_peer(const struct GNUNET_PeerIdentity *peer, k_buckets[bucket].tail, new_peer); k_buckets[bucket].peers_size++; + + return new_peer; } /** @@ -870,6 +947,8 @@ static void remove_peer (struct PeerInfo *peer, k_buckets[bucket].tail, peer); k_buckets[bucket].peers_size--; + if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) && (lowest_bucket < MAX_BUCKETS - 1)) + lowest_bucket++; } /** @@ -884,6 +963,21 @@ static void delete_peer (struct PeerInfo *peer, { struct P2PPendingMessage *pos; struct P2PPendingMessage *next; + //fprintf(stderr, "BEFORE REMOVAL\n"); + //print_routing_table(); +#if EXTRA_CHECKS + struct PeerInfo *peer_pos; + + peer_pos = k_buckets[bucket].head; + while ((peer_pos != NULL) && (peer_pos != peer)) + peer_pos = peer_pos->next; + if (peer_pos == NULL) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Expected peer `%s' in bucket %d\n", my_short_id, "DHT", GNUNET_i2s(&peer->id), bucket); + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Lowest bucket: %d, find_current_bucket: %d, peer resides in bucket: %d\n", my_short_id, "DHT", lowest_bucket, find_current_bucket(&peer->id.hashPubKey), find_bucket_by_peer(peer)); + } + GNUNET_assert(peer_pos != NULL); +#endif remove_peer(peer, bucket); /* First remove the peer from its bucket */ if (peer->send_task != GNUNET_SCHEDULER_NO_TASK) @@ -898,9 +992,43 @@ static void delete_peer (struct PeerInfo *peer, GNUNET_free(pos); pos = next; } + + GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey)); + GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer)); GNUNET_free(peer); + //fprintf(stderr, "AFTER REMOVAL\n"); + //print_routing_table(); } + +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value PeerInfo of the peer to move to new lowest bucket + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int move_lowest_bucket (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct PeerInfo *peer = value; + int new_bucket; + + new_bucket = lowest_bucket - 1; + remove_peer(peer, lowest_bucket); + GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head, + k_buckets[new_bucket].tail, + k_buckets[new_bucket].tail, + peer); + k_buckets[new_bucket].peers_size++; + return GNUNET_YES; +} + + /** * The current lowest bucket is full, so change the lowest * bucket to the next lower down, and move any appropriate @@ -908,84 +1036,61 @@ static void delete_peer (struct PeerInfo *peer, */ static void enable_next_bucket() { - unsigned int new_bucket; - unsigned int to_remove; - int i; - struct PeerInfo *to_remove_list[bucket_size]; /* We either use CPU by making a list, or memory with array. Use memory. */ + struct GNUNET_CONTAINER_MultiHashMap *to_remove; struct PeerInfo *pos; GNUNET_assert(lowest_bucket > 0); - + to_remove = GNUNET_CONTAINER_multihashmap_create(bucket_size); pos = k_buckets[lowest_bucket].head; - memset(to_remove_list, 0, sizeof(to_remove_list)); - to_remove = 0; + +#if PRINT_TABLES + fprintf(stderr, "Printing RT before new bucket\n"); + print_routing_table(); +#endif /* Populate the array of peers which should be in the next lowest bucket */ - while (pos->next != NULL) + while (pos != NULL) { if (find_bucket(&pos->id.hashPubKey) < lowest_bucket) - { - to_remove_list[to_remove] = pos; - to_remove++; - } + GNUNET_CONTAINER_multihashmap_put(to_remove, &pos->id.hashPubKey, pos, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); pos = pos->next; } - new_bucket = lowest_bucket - 1; /* Remove peers from lowest bucket, insert into next lowest bucket */ - for (i = 0; i < bucket_size; i++) - { - if (to_remove_list[i] != NULL) - { - remove_peer(to_remove_list[i], lowest_bucket); - GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head, - k_buckets[new_bucket].tail, - k_buckets[new_bucket].tail, - to_remove_list[i]); - k_buckets[new_bucket].peers_size++; - } - else - break; - } - lowest_bucket = new_bucket; + GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL); + lowest_bucket = lowest_bucket - 1; +#if PRINT_TABLES + fprintf(stderr, "Printing RT after new bucket\n"); + print_routing_table(); +#endif } + + /** * Attempt to add a peer to our k-buckets. * * @param peer, the peer identity of the peer being added * - * @return GNUNET_YES if the peer was added, - * GNUNET_NO if not, - * GNUNET_SYSERR on err (peer is us!) + * @return NULL if the peer was not added, + * pointer to PeerInfo for new peer otherwise */ -static int try_add_peer(const struct GNUNET_PeerIdentity *peer, - unsigned int bucket, - struct GNUNET_TIME_Relative latency, - unsigned int distance) +static struct PeerInfo * +try_add_peer(const struct GNUNET_PeerIdentity *peer, + unsigned int bucket, + struct GNUNET_TIME_Relative latency, + unsigned int distance) { int peer_bucket; - + struct PeerInfo *new_peer; peer_bucket = find_current_bucket(&peer->hashPubKey); if (peer_bucket == GNUNET_SYSERR) - return GNUNET_SYSERR; + return NULL; GNUNET_assert(peer_bucket >= lowest_bucket); - if ((k_buckets[peer_bucket].peers_size) < bucket_size) - { - add_peer(peer, peer_bucket, latency, distance); - return GNUNET_YES; - } - else if ((peer_bucket == lowest_bucket) && (lowest_bucket > 0)) - { - enable_next_bucket(); - return try_add_peer(peer, bucket, latency, distance); /* Recurse, if proper bucket still full ping peers */ - } - else if ((k_buckets[peer_bucket].peers_size) == bucket_size) - { - /* TODO: implement ping_oldest_peer */ - //ping_oldest_peer(bucket, peer, latency, distance); /* Find oldest peer, ping it. If no response, remove and add new peer! */ - return GNUNET_NO; - } - GNUNET_break(0); - return GNUNET_NO; + new_peer = add_peer(peer, peer_bucket, latency, distance); + + if ((k_buckets[lowest_bucket].peers_size) >= bucket_size) + enable_next_bucket(); + + return new_peer; } @@ -1181,9 +1286,10 @@ static int route_result_message(void *cls, { if (GNUNET_YES == consider_peer(&new_peer)) { - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received HELLO message for another peer, offering to transport!\n", my_short_id, "DHT"); GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg); - GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, &new_peer, NULL, NULL); /* FIXME: Do we need this??? */ + /* GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, &new_peer, NULL, NULL); */ + /* peer_request_connect call causes service to segfault */ + /* FIXME: Do we need this (peer_request_connect call)??? */ } } @@ -1440,7 +1546,7 @@ handle_dht_find_peer (void *cls, ntohs (find_msg->size), sizeof (struct GNUNET_MessageHeader)); #endif - if ((my_hello == NULL) || (message_context->closest != GNUNET_YES)) + if (my_hello == NULL) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1474,6 +1580,14 @@ handle_dht_find_peer (void *cls, new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); new_msg_ctx->hop_count = 0; route_result_message(cls, find_peer_result, new_msg_ctx); +#if DEBUG_DHT_ROUTING + if ((debug_routes) && (dhtlog_handle != NULL)) + { + dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER, + message_context->hop_count, GNUNET_YES, &my_identity, + message_context->key); + } +#endif //send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id); GNUNET_free(find_peer_result); } @@ -1613,6 +1727,7 @@ find_closest_peer (const GNUNET_HashCode *hc) unsigned int lowest_distance; unsigned int temp_distance; int bucket; + int count; lowest_distance = -1; @@ -1623,7 +1738,8 @@ find_closest_peer (const GNUNET_HashCode *hc) for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++) { pos = k_buckets[bucket].head; - while (pos != NULL) + count = 0; + while ((pos != NULL) && (count < bucket_size)) { temp_distance = distance(&pos->id.hashPubKey, hc); if (temp_distance <= lowest_distance) @@ -1632,6 +1748,7 @@ find_closest_peer (const GNUNET_HashCode *hc) current_closest = pos; } pos = pos->next; + count++; } } GNUNET_assert(current_closest != NULL); @@ -1652,6 +1769,7 @@ am_closest_peer (const GNUNET_HashCode * target) int bits; int other_bits; int bucket_num; + int count; struct PeerInfo *pos; unsigned int my_distance; @@ -1663,13 +1781,15 @@ am_closest_peer (const GNUNET_HashCode * target) my_distance = distance(&my_identity.hashPubKey, target); pos = k_buckets[bucket_num].head; - while (pos != NULL) + count = 0; + while ((pos != NULL) && (count < bucket_size)) { other_bits = matching_bits(&pos->id.hashPubKey, target); if (other_bits > bits) return GNUNET_NO; else if (other_bits == bits) /* We match the same number of bits, do distance comparison */ { + /* FIXME: why not just return GNUNET_YES here? We are certainly close. */ if (distance(&pos->id.hashPubKey, target) < my_distance) return GNUNET_NO; } @@ -1724,6 +1844,7 @@ select_peer (const GNUNET_HashCode * target, { unsigned int distance; unsigned int bc; + unsigned int count; struct PeerInfo *pos; #if USE_KADEMLIA const struct PeerInfo *chosen; @@ -1769,7 +1890,8 @@ select_peer (const GNUNET_HashCode * target, for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) { pos = k_buckets[bc].head; - while (pos != NULL) + count = 0; + while ((pos != NULL) && (count < bucket_size)) { if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey); @@ -1779,6 +1901,7 @@ select_peer (const GNUNET_HashCode * target, my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey)); #endif pos = pos->next; + count++; } } if (total_distance == 0) @@ -1790,7 +1913,8 @@ select_peer (const GNUNET_HashCode * target, for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) { pos = k_buckets[bc].head; - while (pos != NULL) + count = 0; + while ((pos != NULL) && (count < bucket_size)) { if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) { @@ -1808,6 +1932,7 @@ select_peer (const GNUNET_HashCode * target, #endif } pos = pos->next; + count++; } } #if DEBUG_DHT @@ -2023,7 +2148,7 @@ static int route_message(void *cls, if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES)) forward_count = 0; break; - case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding?*/ + case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding to reduce complexity?*/ if (message_context->closest == GNUNET_YES) { #if DEBUG_DHT_ROUTING @@ -2050,12 +2175,24 @@ static int route_message(void *cls, #endif break; case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */ - if (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity))) + if (((message_context->hop_count > 0) && (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))) || (message_context->client != NULL)) { cache_response (cls, message_context); if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) handle_dht_find_peer (cls, msg, message_context); } +#if DEBUG_DHT_ROUTING + if (message_context->hop_count == 0) /* Locally initiated request */ + { + if ((debug_routes) && (dhtlog_handle != NULL)) + { + dhtlog_handle->insert_dhtkey(NULL, message_context->key); + dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER, + message_context->hop_count, GNUNET_NO, &my_identity, + message_context->key); + } + } +#endif break; default: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -2065,7 +2202,7 @@ static int route_message(void *cls, for (i = 0; i < forward_count; i++) { selected = select_peer(message_context->key, message_context->bloom); - + /* FIXME: either log to sql or log to stats or both when selected is NULL at this point! */ if (selected != NULL) { GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey); @@ -2160,7 +2297,7 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.key = &my_identity.hashPubKey; - message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1)); + message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1)); message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION); message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS); message_context.network_size = estimate_diameter(); @@ -2488,7 +2625,7 @@ void handle_core_connect (void *cls, struct GNUNET_TIME_Relative latency, uint32_t distance) { - int ret; + struct PeerInfo *ret; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2500,12 +2637,36 @@ void handle_core_connect (void *cls, find_current_bucket(&peer->hashPubKey), latency, distance); + if (ret != NULL) + GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == GNUNET_YES ? "PEER ADDED" : "NOT ADDED"); + "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED"); #endif } +/** + * Method called whenever a peer disconnects. + * + * @param cls closure + * @param peer peer identity this notification is about + */ +void handle_core_disconnect (void *cls, + const struct + GNUNET_PeerIdentity * peer) +{ + struct PeerInfo *to_remove; + int current_bucket; + + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: Received peer disconnect message for peer `%s' from %s\n", my_short_id, "DHT", GNUNET_i2s(peer), "CORE"); + + GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey)); + to_remove = GNUNET_CONTAINER_multihashmap_get(all_known_peers, &peer->hashPubKey); + GNUNET_assert(0 == memcmp(peer, &to_remove->id, sizeof(struct GNUNET_PeerIdentity))); + current_bucket = find_current_bucket(&to_remove->id.hashPubKey); + delete_peer(to_remove, current_bucket); +} + /** * Process dht requests. * @@ -2529,8 +2690,8 @@ run (void *cls, GNUNET_TIME_UNIT_FOREVER_REL, NULL, /* Closure passed to DHT functionas around? */ &core_init, /* Call core_init once connected */ - &handle_core_connect, /* Don't care about connects */ - NULL, /* FIXME: remove peers on disconnects */ + &handle_core_connect, /* Handle connects */ + &handle_core_disconnect, /* FIXME: remove peers on disconnects */ NULL, /* Do we care about "status" updates? */ NULL, /* Don't want notified about all incoming messages */ GNUNET_NO, /* For header only inbound notification */ @@ -2550,8 +2711,8 @@ run (void *cls, lowest_bucket = MAX_BUCKETS - 1; forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10); forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); - /* Scheduled the task to clean up when shutdown is called */ - + all_known_peers = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8); + GNUNET_assert(all_known_peers != NULL); if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging")) { debug_routes = GNUNET_YES; @@ -2589,10 +2750,13 @@ run (void *cls, } } +#if DO_FIND_PEER GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30), &send_find_peer_message, NULL); +#endif + /* Scheduled the task to clean up when shutdown is called */ cleanup_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); -- 2.25.1