#include "dhtlog.h"
#include "dht.h"
+#define PRINT_TABLES GNUNET_NO
+
+#define EXTRA_CHECKS GNUNET_YES
/**
* How many buckets will we allow total.
*/
#define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
+/**
+ * Should the DHT issue FIND_PEER requests to get better routing tables?
+ */
+#define DO_FIND_PEER GNUNET_YES
+
/**
* What is the maximum number of peers in a given bucket.
*/
#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
-#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
+#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
/**
* Real maximum number of hops, at which point we refuse
*/
static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
+/**
+ * Hash map of all known peers, for easy removal from k_buckets on disconnect.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
+
/**
* Maximum size for each bucket.
*/
return actual_bucket;
}
+/**
+ * Find a routing table entry from a peer identity
+ *
+ * @param peer the peer to look up
+ *
+ * @return the bucket number holding the peer, GNUNET_SYSERR if not found
+ */
+static int
+find_bucket_by_peer(const struct PeerInfo *peer)
+{
+ int bucket;
+ struct PeerInfo *pos;
+
+ for (bucket = lowest_bucket; bucket < MAX_BUCKETS - 1; bucket++)
+ {
+ pos = k_buckets[bucket].head;
+ while (pos != NULL)
+ {
+ if (peer == pos)
+ return bucket;
+ pos = pos->next;
+ }
+ }
+
+ return GNUNET_SYSERR; /* No such peer. */
+}
+
+#if PRINT_TABLES
+/**
+ * Print the complete routing table for this peer.
+ */
+static void
+print_routing_table ()
+{
+ int bucket;
+ struct PeerInfo *pos;
+ char char_buf[30000];
+ int char_pos;
+ memset(char_buf, 0, sizeof(char_buf));
+ char_pos = 0;
+ char_pos += sprintf(&char_buf[char_pos], "Printing routing table for peer %s\n", my_short_id);
+ //fprintf(stderr, "Printing routing table for peer %s\n", my_short_id);
+ for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
+ {
+ pos = k_buckets[bucket].head;
+ char_pos += sprintf(&char_buf[char_pos], "Bucket %d:\n", bucket);
+ //fprintf(stderr, "Bucket %d:\n", bucket);
+ while (pos != NULL)
+ {
+ //fprintf(stderr, "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
+ char_pos += sprintf(&char_buf[char_pos], "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
+ pos = pos->next;
+ }
+ }
+ fprintf(stderr, "%s", char_buf);
+ fflush(stderr);
+}
+#endif
+
/**
* Find a routing table entry from a peer identity
*
* the peer to
* @param latency the core reported latency of this peer
* @param distance the transport level distance to this peer
+ *
+ * @return the newly added PeerInfo
*/
-static void add_peer(const struct GNUNET_PeerIdentity *peer,
- unsigned int bucket,
- struct GNUNET_TIME_Relative latency,
- unsigned int distance)
+static struct PeerInfo *
+add_peer(const struct GNUNET_PeerIdentity *peer,
+ unsigned int bucket,
+ struct GNUNET_TIME_Relative latency,
+ unsigned int distance)
{
struct PeerInfo *new_peer;
GNUNET_assert(bucket < MAX_BUCKETS);
k_buckets[bucket].tail,
new_peer);
k_buckets[bucket].peers_size++;
+
+ return new_peer;
}
/**
k_buckets[bucket].tail,
peer);
k_buckets[bucket].peers_size--;
+ if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) && (lowest_bucket < MAX_BUCKETS - 1))
+ lowest_bucket++;
}
/**
{
struct P2PPendingMessage *pos;
struct P2PPendingMessage *next;
+ //fprintf(stderr, "BEFORE REMOVAL\n");
+ //print_routing_table();
+#if EXTRA_CHECKS
+ struct PeerInfo *peer_pos;
+
+ peer_pos = k_buckets[bucket].head;
+ while ((peer_pos != NULL) && (peer_pos != peer))
+ peer_pos = peer_pos->next;
+ if (peer_pos == NULL)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Expected peer `%s' in bucket %d\n", my_short_id, "DHT", GNUNET_i2s(&peer->id), bucket);
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Lowest bucket: %d, find_current_bucket: %d, peer resides in bucket: %d\n", my_short_id, "DHT", lowest_bucket, find_current_bucket(&peer->id.hashPubKey), find_bucket_by_peer(peer));
+ }
+ GNUNET_assert(peer_pos != NULL);
+#endif
remove_peer(peer, bucket); /* First remove the peer from its bucket */
if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
GNUNET_free(pos);
pos = next;
}
+
+ GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey));
+ GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer));
GNUNET_free(peer);
+ //fprintf(stderr, "AFTER REMOVAL\n");
+ //print_routing_table();
}
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value PeerInfo of the peer to move to new lowest bucket
+ * @return GNUNET_YES if we should continue to
+ * iterate,
+ * GNUNET_NO if not.
+ */
+static int move_lowest_bucket (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct PeerInfo *peer = value;
+ int new_bucket;
+
+ new_bucket = lowest_bucket - 1;
+ remove_peer(peer, lowest_bucket);
+ GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
+ k_buckets[new_bucket].tail,
+ k_buckets[new_bucket].tail,
+ peer);
+ k_buckets[new_bucket].peers_size++;
+ return GNUNET_YES;
+}
+
+
/**
* The current lowest bucket is full, so change the lowest
* bucket to the next lower down, and move any appropriate
*/
static void enable_next_bucket()
{
- unsigned int new_bucket;
- unsigned int to_remove;
- int i;
- struct PeerInfo *to_remove_list[bucket_size]; /* We either use CPU by making a list, or memory with array. Use memory. */
+ struct GNUNET_CONTAINER_MultiHashMap *to_remove;
struct PeerInfo *pos;
GNUNET_assert(lowest_bucket > 0);
-
+ to_remove = GNUNET_CONTAINER_multihashmap_create(bucket_size);
pos = k_buckets[lowest_bucket].head;
- memset(to_remove_list, 0, sizeof(to_remove_list));
- to_remove = 0;
+
+#if PRINT_TABLES
+ fprintf(stderr, "Printing RT before new bucket\n");
+ print_routing_table();
+#endif
/* Populate the array of peers which should be in the next lowest bucket */
- while (pos->next != NULL)
+ while (pos != NULL)
{
if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
- {
- to_remove_list[to_remove] = pos;
- to_remove++;
- }
+ GNUNET_CONTAINER_multihashmap_put(to_remove, &pos->id.hashPubKey, pos, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
pos = pos->next;
}
- new_bucket = lowest_bucket - 1;
/* Remove peers from lowest bucket, insert into next lowest bucket */
- for (i = 0; i < bucket_size; i++)
- {
- if (to_remove_list[i] != NULL)
- {
- remove_peer(to_remove_list[i], lowest_bucket);
- GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
- k_buckets[new_bucket].tail,
- k_buckets[new_bucket].tail,
- to_remove_list[i]);
- k_buckets[new_bucket].peers_size++;
- }
- else
- break;
- }
- lowest_bucket = new_bucket;
+ GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL);
+ lowest_bucket = lowest_bucket - 1;
+#if PRINT_TABLES
+ fprintf(stderr, "Printing RT after new bucket\n");
+ print_routing_table();
+#endif
}
+
+
/**
* Attempt to add a peer to our k-buckets.
*
* @param peer, the peer identity of the peer being added
*
- * @return GNUNET_YES if the peer was added,
- * GNUNET_NO if not,
- * GNUNET_SYSERR on err (peer is us!)
+ * @return NULL if the peer was not added,
+ * pointer to PeerInfo for new peer otherwise
*/
-static int try_add_peer(const struct GNUNET_PeerIdentity *peer,
- unsigned int bucket,
- struct GNUNET_TIME_Relative latency,
- unsigned int distance)
+static struct PeerInfo *
+try_add_peer(const struct GNUNET_PeerIdentity *peer,
+ unsigned int bucket,
+ struct GNUNET_TIME_Relative latency,
+ unsigned int distance)
{
int peer_bucket;
-
+ struct PeerInfo *new_peer;
peer_bucket = find_current_bucket(&peer->hashPubKey);
if (peer_bucket == GNUNET_SYSERR)
- return GNUNET_SYSERR;
+ return NULL;
GNUNET_assert(peer_bucket >= lowest_bucket);
- if ((k_buckets[peer_bucket].peers_size) < bucket_size)
- {
- add_peer(peer, peer_bucket, latency, distance);
- return GNUNET_YES;
- }
- else if ((peer_bucket == lowest_bucket) && (lowest_bucket > 0))
- {
- enable_next_bucket();
- return try_add_peer(peer, bucket, latency, distance); /* Recurse, if proper bucket still full ping peers */
- }
- else if ((k_buckets[peer_bucket].peers_size) == bucket_size)
- {
- /* TODO: implement ping_oldest_peer */
- //ping_oldest_peer(bucket, peer, latency, distance); /* Find oldest peer, ping it. If no response, remove and add new peer! */
- return GNUNET_NO;
- }
- GNUNET_break(0);
- return GNUNET_NO;
+ new_peer = add_peer(peer, peer_bucket, latency, distance);
+
+ if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
+ enable_next_bucket();
+
+ return new_peer;
}
{
if (GNUNET_YES == consider_peer(&new_peer))
{
- GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received HELLO message for another peer, offering to transport!\n", my_short_id, "DHT");
GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg);
- GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, &new_peer, NULL, NULL); /* FIXME: Do we need this??? */
+ /* GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, &new_peer, NULL, NULL); */
+ /* peer_request_connect call causes service to segfault */
+ /* FIXME: Do we need this (peer_request_connect call)??? */
}
}
ntohs (find_msg->size),
sizeof (struct GNUNET_MessageHeader));
#endif
- if ((my_hello == NULL) || (message_context->closest != GNUNET_YES))
+ if (my_hello == NULL)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
new_msg_ctx->hop_count = 0;
route_result_message(cls, find_peer_result, new_msg_ctx);
+#if DEBUG_DHT_ROUTING
+ if ((debug_routes) && (dhtlog_handle != NULL))
+ {
+ dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
+ message_context->hop_count, GNUNET_YES, &my_identity,
+ message_context->key);
+ }
+#endif
//send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
GNUNET_free(find_peer_result);
}
unsigned int lowest_distance;
unsigned int temp_distance;
int bucket;
+ int count;
lowest_distance = -1;
for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
{
pos = k_buckets[bucket].head;
- while (pos != NULL)
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
{
temp_distance = distance(&pos->id.hashPubKey, hc);
if (temp_distance <= lowest_distance)
current_closest = pos;
}
pos = pos->next;
+ count++;
}
}
GNUNET_assert(current_closest != NULL);
int bits;
int other_bits;
int bucket_num;
+ int count;
struct PeerInfo *pos;
unsigned int my_distance;
my_distance = distance(&my_identity.hashPubKey, target);
pos = k_buckets[bucket_num].head;
- while (pos != NULL)
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
{
other_bits = matching_bits(&pos->id.hashPubKey, target);
if (other_bits > bits)
return GNUNET_NO;
else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
{
+ /* FIXME: why not just return GNUNET_YES here? We are certainly close. */
if (distance(&pos->id.hashPubKey, target) < my_distance)
return GNUNET_NO;
}
{
unsigned int distance;
unsigned int bc;
+ unsigned int count;
struct PeerInfo *pos;
#if USE_KADEMLIA
const struct PeerInfo *chosen;
for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
{
pos = k_buckets[bc].head;
- while (pos != NULL)
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
{
if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey));
#endif
pos = pos->next;
+ count++;
}
}
if (total_distance == 0)
for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
{
pos = k_buckets[bc].head;
- while (pos != NULL)
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
{
if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
{
#endif
}
pos = pos->next;
+ count++;
}
}
#if DEBUG_DHT
if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
forward_count = 0;
break;
- case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding?*/
+ case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding to reduce complexity?*/
if (message_context->closest == GNUNET_YES)
{
#if DEBUG_DHT_ROUTING
#endif
break;
case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
- if (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))
+ if (((message_context->hop_count > 0) && (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))) || (message_context->client != NULL))
{
cache_response (cls, message_context);
if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
handle_dht_find_peer (cls, msg, message_context);
}
+#if DEBUG_DHT_ROUTING
+ if (message_context->hop_count == 0) /* Locally initiated request */
+ {
+ if ((debug_routes) && (dhtlog_handle != NULL))
+ {
+ dhtlog_handle->insert_dhtkey(NULL, message_context->key);
+ dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
+ message_context->hop_count, GNUNET_NO, &my_identity,
+ message_context->key);
+ }
+ }
+#endif
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
for (i = 0; i < forward_count; i++)
{
selected = select_peer(message_context->key, message_context->bloom);
-
+ /* FIXME: either log to sql or log to stats or both when selected is NULL at this point! */
if (selected != NULL)
{
GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
memset(&message_context, 0, sizeof(struct DHT_MessageContext));
message_context.key = &my_identity.hashPubKey;
- message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
+ message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS);
message_context.network_size = estimate_diameter();
struct GNUNET_TIME_Relative latency,
uint32_t distance)
{
- int ret;
+ struct PeerInfo *ret;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
find_current_bucket(&peer->hashPubKey),
latency,
distance);
+ if (ret != NULL)
+ GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == GNUNET_YES ? "PEER ADDED" : "NOT ADDED");
+ "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED");
#endif
}
+/**
+ * Method called whenever a peer disconnects.
+ *
+ * @param cls closure
+ * @param peer peer identity this notification is about
+ */
+void handle_core_disconnect (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer)
+{
+ struct PeerInfo *to_remove;
+ int current_bucket;
+
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: Received peer disconnect message for peer `%s' from %s\n", my_short_id, "DHT", GNUNET_i2s(peer), "CORE");
+
+ GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey));
+ to_remove = GNUNET_CONTAINER_multihashmap_get(all_known_peers, &peer->hashPubKey);
+ GNUNET_assert(0 == memcmp(peer, &to_remove->id, sizeof(struct GNUNET_PeerIdentity)));
+ current_bucket = find_current_bucket(&to_remove->id.hashPubKey);
+ delete_peer(to_remove, current_bucket);
+}
+
/**
* Process dht requests.
*
GNUNET_TIME_UNIT_FOREVER_REL,
NULL, /* Closure passed to DHT functionas around? */
&core_init, /* Call core_init once connected */
- &handle_core_connect, /* Don't care about connects */
- NULL, /* FIXME: remove peers on disconnects */
+ &handle_core_connect, /* Handle connects */
+ &handle_core_disconnect, /* FIXME: remove peers on disconnects */
NULL, /* Do we care about "status" updates? */
NULL, /* Don't want notified about all incoming messages */
GNUNET_NO, /* For header only inbound notification */
lowest_bucket = MAX_BUCKETS - 1;
forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
- /* Scheduled the task to clean up when shutdown is called */
-
+ all_known_peers = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
+ GNUNET_assert(all_known_peers != NULL);
if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
{
debug_routes = GNUNET_YES;
}
}
+#if DO_FIND_PEER
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30),
&send_find_peer_message, NULL);
+#endif
+ /* Scheduled the task to clean up when shutdown is called */
cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task, NULL);