}
-/**
- * Find the optimal bucket for this key, regardless
- * of the current number of buckets in use.
- *
- * @param hc the hashcode to compare our identity to
- *
- * @return the proper bucket index, or GNUNET_SYSERR
- * on error (same hashcode)
- */
-static int
-find_bucket (const GNUNET_HashCode * hc)
-{
- unsigned int bits;
-
- bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
- if (bits == MAX_BUCKETS)
- return GNUNET_SYSERR;
- return MAX_BUCKETS - bits - 1;
-}
-
-
/**
* Find which k-bucket this peer should go into,
* taking into account the size of the k-bucket
&update_core_preference_finish, peer);
}
-
-/**
- * Given a peer and its corresponding bucket,
- * remove it from that bucket. Does not free
- * the PeerInfo struct, nor cancel messages
- * or free messages waiting to be sent to this
- * peer!
- *
- * @param peer the peer to remove
- * @param bucket the bucket the peer belongs to
- */
-static void
-remove_peer (struct PeerInfo *peer, unsigned int bucket)
-{
- GNUNET_assert (k_buckets[bucket].peers_size > 0);
- GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail,
- peer);
- k_buckets[bucket].peers_size--;
- if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) &&
- (lowest_bucket < MAX_BUCKETS - 1))
- lowest_bucket++;
-}
-
-/**
- * Removes peer from a bucket, then frees associated
- * resources and frees peer.
- *
- * @param peer peer to be removed and freed
- * @param bucket which bucket this peer belongs to
- */
-static void
-delete_peer (struct PeerInfo *peer, unsigned int bucket)
-{
- struct P2PPendingMessage *pos;
- struct P2PPendingMessage *next;
-
- remove_peer (peer, bucket); /* First remove the peer from its bucket */
- if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (peer->send_task);
- if ((peer->th != NULL) && (coreAPI != NULL))
- GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
-
- pos = peer->head;
- while (pos != NULL) /* Remove any pending messages for this peer */
- {
- increment_stats
- ("# dht pending messages discarded (due to disconnect/shutdown)");
- next = pos->next;
- GNUNET_free (pos);
- pos = next;
- }
-
- GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
- (all_known_peers, &peer->id.hashPubKey));
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
- &peer->id.hashPubKey,
- peer));
- GNUNET_free (peer);
- decrement_stats (STAT_PEERS_KNOWN);
-}
-
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value PeerInfo of the peer to move to new lowest bucket
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
- */
-static int
-move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value)
-{
- struct PeerInfo *peer = value;
- int new_bucket;
-
- GNUNET_assert (lowest_bucket > 0);
- new_bucket = lowest_bucket - 1;
- remove_peer (peer, lowest_bucket);
- GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head,
- k_buckets[new_bucket].tail,
- k_buckets[new_bucket].tail, peer);
- k_buckets[new_bucket].peers_size++;
- return GNUNET_YES;
-}
-
-
-/**
- * The current lowest bucket is full, so change the lowest
- * bucket to the next lower down, and move any appropriate
- * entries in the current lowest bucket to the new bucket.
- */
-static void
-enable_next_bucket ()
-{
- struct GNUNET_CONTAINER_MultiHashMap *to_remove;
- struct PeerInfo *pos;
-
- GNUNET_assert (lowest_bucket > 0);
- to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size);
- pos = k_buckets[lowest_bucket].head;
-
- /* Populate the array of peers which should be in the next lowest bucket */
- while (pos != NULL)
- {
- if (find_bucket (&pos->id.hashPubKey) < lowest_bucket)
- GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
- pos = pos->next;
- }
-
- /* Remove peers from lowest bucket, insert into next lowest bucket */
- GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL);
- GNUNET_CONTAINER_multihashmap_destroy (to_remove);
- lowest_bucket = lowest_bucket - 1;
-}
-
-
/**
* Find the closest peer in our routing table to the
* given hashcode.
}
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
- */
-static int
-add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
-{
- struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
-
- GNUNET_CONTAINER_bloomfilter_add (bloom, key);
- return GNUNET_YES;
-}
-
-/**
- * Task to send a find peer message for our own peer identifier
- * so that we can find the closest peers in the network to ourselves
- * and attempt to connect to them.
- *
- * @param cls closure for this task
- * @param tc the context under which the task is running
- */
-static void
-send_find_peer_message (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
- struct DHT_MessageContext msg_ctx;
- struct GNUNET_TIME_Relative next_send_time;
- struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
-
- if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
- return;
-
- if (newly_found_peers > bucket_size) /* If we are finding peers already, no need to send out our request right now! */
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Have %d newly found peers since last find peer message sent!\n",
- newly_found_peers);
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
- &send_find_peer_message, NULL);
- newly_found_peers = 0;
- return;
- }
-
- increment_stats (STAT_FIND_PEER_START);
-#if FIND_PEER_WITH_HELLO
- find_peer_msg =
- GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) +
- GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *)
- my_hello));
- find_peer_msg->header.size =
- htons (sizeof (struct GNUNET_DHT_FindPeerMessage) +
- GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
- memcpy (&find_peer_msg[1], my_hello,
- GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
-#else
- find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
- find_peer_msg->header.size =
- htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
-#endif
- find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
- temp_bloom =
- GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
- GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
- temp_bloom);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
- find_peer_msg->
- bloomfilter,
- DHT_BLOOM_SIZE));
- GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
- memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
- memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
- msg_ctx.unique_id =
- GNUNET_ntohll (GNUNET_CRYPTO_random_u64
- (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
- msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
- msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
- msg_ctx.network_size = log_of_network_size_estimate;
- msg_ctx.peer = my_identity;
- msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
- msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
-
- demultiplex_message (&find_peer_msg->header, &msg_ctx);
- GNUNET_free (find_peer_msg);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id,
- "DHT", "FIND PEER");
- if (newly_found_peers < bucket_size)
- {
- next_send_time.rel_value =
- (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
- DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
- }
- else
- {
- next_send_time.rel_value =
- DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
- DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value -
- DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
- }
-
- GNUNET_assert (next_send_time.rel_value != 0);
- find_peer_context.count = 0;
- newly_found_peers = 0;
- find_peer_context.start = GNUNET_TIME_absolute_get ();
- GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
- NULL);
-}
-
-
-/**
- * Core handler for p2p route requests.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @param atsi performance data
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_TRANSPORT_ATS_Information
- *atsi)
-{
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s:%s': Received P2P request from peer %s\n", my_short_id,
- "DHT", GNUNET_i2s (peer));
-#endif
- struct GNUNET_DHT_P2PRouteMessage *incoming =
- (struct GNUNET_DHT_P2PRouteMessage *) message;
- struct GNUNET_MessageHeader *enc_msg =
- (struct GNUNET_MessageHeader *) &incoming[1];
- struct DHT_MessageContext *msg_ctx;
- char *route_path;
- int path_size;
-
- if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
- {
- GNUNET_break_op (0);
- return GNUNET_YES;
- }
-
- if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending of previous replies took too long, backing off!\n");
- increment_stats ("# route requests dropped due to high load");
- decrease_max_send_delay (get_max_send_delay ());
- return GNUNET_YES;
- }
- msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
- msg_ctx->bloom =
- GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
- DHT_BLOOM_K);
- GNUNET_assert (msg_ctx->bloom != NULL);
- msg_ctx->hop_count = ntohl (incoming->hop_count);
- memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
- msg_ctx->replication = ntohl (incoming->desired_replication_level);
- msg_ctx->msg_options = ntohl (incoming->options);
- if (GNUNET_DHT_RO_RECORD_ROUTE ==
- (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
- {
- path_size =
- ntohl (incoming->outgoing_path_length) *
- sizeof (struct GNUNET_PeerIdentity);
- if (ntohs (message->size) !=
- (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
- path_size))
- {
- GNUNET_break_op (0);
- GNUNET_free (msg_ctx);
- return GNUNET_YES;
- }
- route_path = (char *) &incoming[1];
- route_path = route_path + ntohs (enc_msg->size);
- msg_ctx->path_history =
- GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
- memcpy (msg_ctx->path_history, route_path, path_size);
- memcpy (&msg_ctx->path_history[path_size], &my_identity,
- sizeof (struct GNUNET_PeerIdentity));
- msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
- }
- msg_ctx->network_size = ntohl (incoming->network_size);
- msg_ctx->peer = *peer;
- msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
- msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
- demultiplex_message (enc_msg, msg_ctx);
- if (msg_ctx->bloom != NULL)
- {
- GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
- msg_ctx->bloom = NULL;
- }
- GNUNET_free (msg_ctx);
- return GNUNET_YES;
-}
-
-
-/**
- * Core handler for p2p route results.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @param atsi performance data
- *
- */
-static int
-handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_TRANSPORT_ATS_Information
- *atsi)
-{
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s:%s': Received request from peer %s\n", my_short_id, "DHT",
- GNUNET_i2s (peer));
-#endif
- const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
- (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
- struct GNUNET_MessageHeader *enc_msg =
- (struct GNUNET_MessageHeader *) &incoming[1];
- struct DHT_MessageContext msg_ctx;
-
- if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
- {
- GNUNET_break_op (0);
- return GNUNET_YES;
- }
-
- memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
- memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
- msg_ctx.msg_options = ntohl (incoming->options);
- msg_ctx.hop_count = ntohl (incoming->hop_count);
- msg_ctx.peer = *peer;
- msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
- msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
- if ((GNUNET_DHT_RO_RECORD_ROUTE ==
- (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
- (ntohl (incoming->outgoing_path_length) > 0))
- {
- if (ntohs (message->size) -
- sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
- ntohs (enc_msg->size) !=
- ntohl (incoming->outgoing_path_length) *
- sizeof (struct GNUNET_PeerIdentity))
- {
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Return message indicated a path was included, but sizes are wrong: Total size %d, enc size %d, left %d, expected %d\n",
- ntohs (message->size), ntohs (enc_msg->size),
- ntohs (message->size) -
- sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
- ntohs (enc_msg->size),
- ntohl (incoming->outgoing_path_length) *
- sizeof (struct GNUNET_PeerIdentity));
-#endif
- GNUNET_break_op (0);
- return GNUNET_NO;
- }
- msg_ctx.path_history = (char *) &incoming[1];
- msg_ctx.path_history += ntohs (enc_msg->size);
- msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
- }
- route_result_message (enc_msg, &msg_ctx);
- return GNUNET_YES;
-}
-
-
/**
* Receive the HELLO from transport service,
* free current and replace if necessary.
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received our `%s' from transport service\n", "HELLO");
#endif
-
GNUNET_assert (message != NULL);
GNUNET_free_non_null (my_hello);
my_hello = GNUNET_malloc (ntohs (message->size));
GNUNET_TRANSPORT_disconnect (transport_handle);
transport_handle = NULL;
}
+ GDS_NEIGHBOURS_done ();
GDS_NSE_done ();
- if (coreAPI != NULL)
- {
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n",
- my_short_id, "DHT");
-#endif
- GNUNET_CORE_disconnect (coreAPI);
- coreAPI = NULL;
- }
for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
{
while (k_buckets[bucket_count].head != NULL)
GNUNET_BLOCK_context_destroy (block_context);
block_context = NULL;
}
- GNUNET_free_non_null (my_short_id);
- my_short_id = NULL;
}
-/**
- * To be called on core init/fail.
- *
- * @param cls service closure
- * @param server handle to the server for this service
- * @param identity the public identity of this peer
- * @param publicKey the public key of this peer
- */
-static void
-core_init (void *cls, struct GNUNET_CORE_Handle *server,
- const struct GNUNET_PeerIdentity *identity,
- const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
-{
-
- if (server == NULL)
- {
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n",
- "dht", GNUNET_i2s (identity));
-#endif
- GNUNET_SCHEDULER_cancel (cleanup_task);
- GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
- return;
- }
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Core connection initialized, I am peer: %s\n", "dht",
- GNUNET_i2s (identity));
-#endif
-
- /* Copy our identity so we can use it */
- memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
- if (my_short_id != NULL)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n",
- "DHT SERVICE");
- my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity));
-}
-
-
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
- {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
- {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
- {NULL, 0, 0}
-};
-
-
-
/**
* Process dht requests.
*/
#define DEFAULT_BUCKET_SIZE 4
+/**
+ * Size of the bloom filter the DHT uses to filter peers.
+ */
+#define DHT_BLOOM_SIZE 128
+
+
+/**
+ * P2P PUT message
+ */
+struct PeerPutMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Processing options
+ */
+ uint32_t options GNUNET_PACKED;
+
+ /**
+ * Content type.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Hop count
+ */
+ uint32_t hop_count GNUNET_PACKED;
+
+ /**
+ * Replication level for this message
+ */
+ uint32_t desired_replication_level GNUNET_PACKED;
+
+ /**
+ * Generic route path length for a message in the
+ * DHT that arrived at a peer and generated
+ * a reply. Copied to the end of this message.
+ */
+ uint32_t outgoing_path_length GNUNET_PACKED;
+
+ /**
+ * Bloomfilter (for peer identities) to stop circular routes
+ */
+ char bloomfilter[DHT_BLOOM_SIZE];
+
+ /**
+ * The key we are storing under.
+ */
+ GNUNET_HashCode key;
+
+ /* put path (if tracked) */
+
+ /* Payload */
+
+};
+
+
+/**
+ * P2P GET message
+ */
+struct PeerGetMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Processing options
+ */
+ uint32_t options GNUNET_PACKED;
+
+ /**
+ * Desired content type.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Hop count
+ */
+ uint32_t hop_count GNUNET_PACKED;
+
+ /**
+ * Desired replication level for this request.
+ */
+ uint32_t desired_replication_level GNUNET_PACKED;
+
+ /**
+ * Size of the extended query.
+ */
+ uint32_t xquery_size;
+
+ /**
+ * Bloomfilter mutator.
+ */
+ uint32_t bf_mutator;
+
+ /**
+ * Bloomfilter (for peer identities) to stop circular routes
+ */
+ char bloomfilter[DHT_BLOOM_SIZE];
+
+ /**
+ * The key we are looking for.
+ */
+ GNUNET_HashCode key;
+
+ /* xquery */
+
+ /* result bloomfilter */
+
+};
+
/**
* Linked list of messages to send to a particular other peer.
/**
- * The lowest currently used bucket.
+ * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
*/
-static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
+static unsigned int closest_bucket;
/**
- * The buckets (Kademlia routing table, complete with growth).
- * Array of size MAX_BUCKET_SIZE.
+ * How many peers have we added since we sent out our last
+ * find peer request?
+ */
+static unsigned int newly_found_peers;
+
+/**
+ * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching.
*/
static struct PeerBucket k_buckets[MAX_BUCKETS];
*/
static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
+/**
+ * Task that sends FIND PEER requests.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
+
+
+/**
+ * Find the optimal bucket for this key.
+ *
+ * @param hc the hashcode to compare our identity to
+ * @return the proper bucket index, or GNUNET_SYSERR
+ * on error (same hashcode)
+ */
+static int
+find_bucket (const GNUNET_HashCode * hc)
+{
+ unsigned int bits;
+
+ bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
+ if (bits == MAX_BUCKETS)
+ {
+ /* How can all bits match? Got my own ID? */
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return MAX_BUCKETS - bits - 1;
+}
/**
/* Check for connect to self message */
if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
-
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s:%s Receives core connect message for peer %s distance %d!\n",
- my_short_id, "dht", GNUNET_i2s (peer), distance);
-#endif
-
if (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
&peer->hashPubKey))
{
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s:%s Received %s message for peer %s, but already have peer in RT!",
- my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
-#endif
GNUNET_break (0);
return;
}
-
- peer_bucket = find_current_bucket (&peer->hashPubKey);
- GNUNET_assert (peer_bucket >= lowest_bucket);
- GNUNET_assert (peer_bucket < MAX_BUCKETS);
+ peer_bucket = find_bucket (&peer->hashPubKey);
+ GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
ret = GNUNET_malloc (sizeof (struct PeerInfo));
#if 0
ret->latency = latency;
k_buckets[peer_bucket].tail,
k_buckets[peer_bucket].tail, ret);
k_buckets[peer_bucket].peers_size++;
- if ((GNUNET_CRYPTO_hash_matching_bits
- (&my_identity.hashPubKey, &peer->hashPubKey) > 0) &&
- (k_buckets[peer_bucket].peers_size <= bucket_size))
- ret->preference_task =
- GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
- if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
- enable_next_bucket ();
+ closest_bucket = GNUNET_MAX (closest_bucket,
+ peer_bucket);
+ if ( (peer_bucket > 0) &&
+ (k_buckets[peer_bucket].peers_size <= bucket_size) )
+ ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
newly_found_peers++;
- GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (all_known_peers,
+ &peer->hashPubKey, ret,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
increment_stats (STAT_PEERS_KNOWN);
-
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
- ret == NULL ? "NOT ADDED" : "PEER ADDED");
-#endif
}
{
struct PeerInfo *to_remove;
int current_bucket;
+ struct P2PPendingMessage *pos;
+ struct P2PPendingMessage *next;
/* Check for disconnect from self message */
if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s:%s: Received peer disconnect message for peer `%s' from %s\n",
- my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
-#endif
-
- if (GNUNET_YES !=
- GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
- &peer->hashPubKey))
- {
- GNUNET_break (0);
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
- my_short_id, "DHT", GNUNET_i2s (peer));
-#endif
- return;
- }
- increment_stats (STAT_DISCONNECTS);
- GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
- (all_known_peers, &peer->hashPubKey));
to_remove =
GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
- GNUNET_assert (to_remove != NULL);
+ if (NULL == to_remove)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
+ &peer->hashPubKey,
+ to_remove));
if (NULL != to_remove->info_ctx)
{
GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
to_remove->info_ctx = NULL;
}
- GNUNET_assert (0 ==
- memcmp (peer, &to_remove->id,
- sizeof (struct GNUNET_PeerIdentity)));
current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
- delete_peer (to_remove, current_bucket);
+ GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
+ k_buckets[current_bucket].tail,
+ to_remove);
+ GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
+ k_buckets[current_bucket].peers_size--;
+ while ( (lowest_bucket > 0) &&
+ (k_buckets[lowest_bucket].peers_size == 0) )
+ lowest_bucket--;
+
+ if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (peer->send_task);
+ peer->send_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (to_remove->th != NULL)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
+ to_remove->th = NULL;
+ }
+ while (NULL != (pos = to_remove->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (to_remove->head,
+ to_remove->tail,
+ pos);
+ GNUNET_free (pos);
+ }
}
+/**
+ * Perform a PUT operation. // FIXME: document if this is only
+ * routing or also storage and/or even local client notification!
+ *
+ * @param type type of the block
+ * @param options routing options
+ * @param desired_replication_level desired replication count
+ * @param expiration_time when does the content expire
+ * @param key key for the content
+ * @param put_path_length number of entries in put_path
+ * @param put_path peers this request has traversed so far (if tracked)
+ * @param data payload to store
+ * @param data_size number of bytes in data
+ */
+void
+GST_NEIGHBOURS_handle_put (uint32_t type,
+ uint32_t options,
+ uint32_t desired_replication_level,
+ GNUNET_TIME_Absolute expiration_time,
+ const GNUNET_HashCode *key,
+ unsigned int put_path_length,
+ struct GNUNET_PeerIdentity *put_path,
+ const void *data,
+ size_t data_size)
+{
+ // FIXME
+}
+
/**
- * Initialize neighbours subsystem.
+ * Perform a GET operation. // FIXME: document if this is only
+ * routing or also state-tracking and/or even local lookup!
+ *
+ * @param type type of the block
+ * @param options routing options
+ * @param desired_replication_level desired replication count
+ * @param key key for the content
+ * @param xquery extended query
+ * @param xquery_size number of bytes in xquery
+ * @param reply_bf bloomfilter to filter duplicates
+ * @param reply_bf_mutator mutator for reply_bf
+ * @param peer_bf filter for peers not to select (again)
*/
void
-GST_NEIGHBOURS_init ()
+GST_NEIGHBOURS_handle_get (uint32_t type,
+ uint32_t options,
+ uint32_t desired_replication_level,
+ const GNUNET_HashCode *key,
+ const void *xquery,
+ size_t xquery_size,
+ const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
+ uint32_t reply_bf_mutator,
+ const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
{
+ // FIXME
}
/**
- * Shutdown neighbours subsystem.
+ * Handle a reply (route to origin). FIXME: should this be here?
+ * (reply-routing table might be better done elsewhere).
+ *
+ * @param type type of the block
+ * @param options routing options
+ * @param expiration_time when does the content expire
+ * @param key key for the content
+ * @param put_path_length number of entries in put_path
+ * @param put_path peers the original PUT traversed (if tracked)
+ * @param get_path_length number of entries in put_path
+ * @param get_path peers this reply has traversed so far (if tracked)
+ * @param data payload of the reply
+ * @param data_size number of bytes in data
*/
void
-GST_NEIGHBOURS_done ()
+GST_NEIGHBOURS_handle_reply (uint32_t type,
+ uint32_t options,
+ GNUNET_TIME_Absolute expiration_time,
+ const GNUNET_HashCode *key,
+ unsigned int put_path_length,
+ struct GNUNET_PeerIdentity *put_path,
+ unsigned int get_path_length,
+ struct GNUNET_PeerIdentity *get_path,
+ const void *data,
+ size_t data_size)
+{
+ // FIXME
+}
+
+
+/**
+ * Add each of the peers we already know to the bloom filter of
+ * the request so that we don't get duplicate HELLOs.
+ *
+ * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
+ * @param key peer identity to add to the bloom filter
+ * @param value value the peer information (unused)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
+{
+ struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
+
+ GNUNET_CONTAINER_bloomfilter_add (bloom, key);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Task to send a find peer message for our own peer identifier
+ * so that we can find the closest peers in the network to ourselves
+ * and attempt to connect to them.
+ *
+ * @param cls closure for this task
+ * @param tc the context under which the task is running
+ */
+static void
+send_find_peer_message (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
+ struct DHT_MessageContext msg_ctx;
+ struct GNUNET_TIME_Relative next_send_time;
+ struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
+
+ find_peer_task = GNUNET_SCHEDULER_NO_TASK;
+ if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+ return;
+ if (newly_found_peers > bucket_size)
+ {
+ /* If we are finding many peers already, no need to send out our request right now! */
+ find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &send_find_peer_message, NULL);
+ newly_found_peers = 0;
+ return;
+ }
+
+ // FIXME: build message...
+ find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
+ find_peer_msg->header.size =
+ htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
+ find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+ temp_bloom =
+ GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
+ GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
+ temp_bloom);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
+ find_peer_msg->
+ bloomfilter,
+ DHT_BLOOM_SIZE));
+ GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
+
+ memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
+ memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
+ msg_ctx.unique_id =
+ GNUNET_ntohll (GNUNET_CRYPTO_random_u64
+ (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
+ msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
+ msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
+ msg_ctx.network_size = log_of_network_size_estimate;
+ msg_ctx.peer = my_identity;
+ msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
+ msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
+ // FIXME: transmit message...
+ demultiplex_message (&find_peer_msg->header, &msg_ctx);
+ GNUNET_free (find_peer_msg);
+
+ /* schedule next round */
+ newly_found_peers = 0;
+ next_send_time.rel_value =
+ (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
+ DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
+ find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
+ &send_find_peer_message,
+ NULL);
+}
+
+
+/**
+ * To be called on core init/fail.
+ *
+ * @param cls service closure
+ * @param server handle to the server for this service
+ * @param identity the public identity of this peer
+ * @param publicKey the public key of this peer
+ */
+static void
+core_init (void *cls, struct GNUNET_CORE_Handle *server,
+ const struct GNUNET_PeerIdentity *identity,
+ const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
{
+ GNUNET_assert (server != NULL);
+ my_identity = *identity;
+ next_send_time.rel_value =
+ DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
+ (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
+ 2) -
+ DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
+ find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
+ &send_find_peer_message,
+ NULL);
}
+/**
+ * Core handler for p2p get requests.
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ * @param atsi performance data
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_TRANSPORT_ATS_Information
+ *atsi)
+{
+ struct GNUNET_DHT_P2PRouteMessage *incoming =
+ (struct GNUNET_DHT_P2PRouteMessage *) message;
+ struct GNUNET_MessageHeader *enc_msg =
+ (struct GNUNET_MessageHeader *) &incoming[1];
+ struct DHT_MessageContext *msg_ctx;
+ char *route_path;
+ int path_size;
+
+ if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_YES;
+ }
+ if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending of previous replies took too long, backing off!\n");
+ increment_stats ("# route requests dropped due to high load");
+ decrease_max_send_delay (get_max_send_delay ());
+ return GNUNET_YES;
+ }
+ msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
+ msg_ctx->bloom =
+ GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
+ DHT_BLOOM_K);
+ GNUNET_assert (msg_ctx->bloom != NULL);
+ msg_ctx->hop_count = ntohl (incoming->hop_count);
+ memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
+ msg_ctx->replication = ntohl (incoming->desired_replication_level);
+ msg_ctx->msg_options = ntohl (incoming->options);
+ if (GNUNET_DHT_RO_RECORD_ROUTE ==
+ (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
+ {
+ path_size =
+ ntohl (incoming->outgoing_path_length) *
+ sizeof (struct GNUNET_PeerIdentity);
+ if (ntohs (message->size) !=
+ (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
+ path_size))
+ {
+ GNUNET_break_op (0);
+ GNUNET_free (msg_ctx);
+ return GNUNET_YES;
+ }
+ route_path = (char *) &incoming[1];
+ route_path = route_path + ntohs (enc_msg->size);
+ msg_ctx->path_history =
+ GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
+ memcpy (msg_ctx->path_history, route_path, path_size);
+ memcpy (&msg_ctx->path_history[path_size], &my_identity,
+ sizeof (struct GNUNET_PeerIdentity));
+ msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
+ }
+ msg_ctx->network_size = ntohl (incoming->network_size);
+ msg_ctx->peer = *peer;
+ msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
+ msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
+ demultiplex_message (enc_msg, msg_ctx);
+ if (msg_ctx->bloom != NULL)
+ {
+ GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
+ msg_ctx->bloom = NULL;
+ }
+ GNUNET_free (msg_ctx);
+ return GNUNET_YES;
+}
+/**
+ * Core handler for p2p put requests.
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ * @param atsi performance data
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_TRANSPORT_ATS_Information
+ *atsi)
+{
+ struct GNUNET_DHT_P2PRouteMessage *incoming =
+ (struct GNUNET_DHT_P2PRouteMessage *) message;
+ struct GNUNET_MessageHeader *enc_msg =
+ (struct GNUNET_MessageHeader *) &incoming[1];
+ struct DHT_MessageContext *msg_ctx;
+ char *route_path;
+ int path_size;
+
+ if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_YES;
+ }
+
+ if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending of previous replies took too long, backing off!\n");
+ increment_stats ("# route requests dropped due to high load");
+ decrease_max_send_delay (get_max_send_delay ());
+ return GNUNET_YES;
+ }
+ msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
+ msg_ctx->bloom =
+ GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
+ DHT_BLOOM_K);
+ GNUNET_assert (msg_ctx->bloom != NULL);
+ msg_ctx->hop_count = ntohl (incoming->hop_count);
+ memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
+ msg_ctx->replication = ntohl (incoming->desired_replication_level);
+ msg_ctx->msg_options = ntohl (incoming->options);
+ if (GNUNET_DHT_RO_RECORD_ROUTE ==
+ (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
+ {
+ path_size =
+ ntohl (incoming->outgoing_path_length) *
+ sizeof (struct GNUNET_PeerIdentity);
+ if (ntohs (message->size) !=
+ (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
+ path_size))
+ {
+ GNUNET_break_op (0);
+ GNUNET_free (msg_ctx);
+ return GNUNET_YES;
+ }
+ route_path = (char *) &incoming[1];
+ route_path = route_path + ntohs (enc_msg->size);
+ msg_ctx->path_history =
+ GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
+ memcpy (msg_ctx->path_history, route_path, path_size);
+ memcpy (&msg_ctx->path_history[path_size], &my_identity,
+ sizeof (struct GNUNET_PeerIdentity));
+ msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
+ }
+ msg_ctx->network_size = ntohl (incoming->network_size);
+ msg_ctx->peer = *peer;
+ msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
+ msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
+ demultiplex_message (enc_msg, msg_ctx);
+ if (msg_ctx->bloom != NULL)
+ {
+ GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
+ msg_ctx->bloom = NULL;
+ }
+ GNUNET_free (msg_ctx);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Core handler for p2p route results.
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ * @param atsi performance data
+ *
+ */
+static int
+handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_TRANSPORT_ATS_Information
+ *atsi)
+{
+ const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
+ (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
+ struct GNUNET_MessageHeader *enc_msg =
+ (struct GNUNET_MessageHeader *) &incoming[1];
+ struct DHT_MessageContext msg_ctx;
+
+ if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_YES;
+ }
+
+ memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
+ memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
+ msg_ctx.msg_options = ntohl (incoming->options);
+ msg_ctx.hop_count = ntohl (incoming->hop_count);
+ msg_ctx.peer = *peer;
+ msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
+ msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
+ if ((GNUNET_DHT_RO_RECORD_ROUTE ==
+ (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
+ (ntohl (incoming->outgoing_path_length) > 0))
+ {
+ if (ntohs (message->size) -
+ sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
+ ntohs (enc_msg->size) !=
+ ntohl (incoming->outgoing_path_length) *
+ sizeof (struct GNUNET_PeerIdentity))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_NO;
+ }
+ msg_ctx.path_history = (char *) &incoming[1];
+ msg_ctx.path_history += ntohs (enc_msg->size);
+ msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
+ }
+ route_result_message (enc_msg, &msg_ctx);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Initialize neighbours subsystem.
+ */
+int
+GST_NEIGHBOURS_init ()
+{
+ static struct GNUNET_CORE_MessageHandler core_handlers[] = {
+ {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
+ {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
+ {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
+ {NULL, 0, 0}
+ };
+ unsigned long long temp_config_num;
+ struct GNUNET_TIME_Relative next_send_time;
+
+ if (GNUNET_OK ==
+ GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
+ &temp_config_num))
+ bucket_size = (unsigned int) temp_config_num;
+ coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */
+ DEFAULT_CORE_QUEUE_SIZE, /* queue size */
+ NULL, /* Closure passed to DHT functions */
+ &core_init, /* Call core_init once connected */
+ &handle_core_connect, /* Handle connects */
+ &handle_core_disconnect, /* remove peers on disconnects */
+ NULL, /* Do we care about "status" updates? */
+ NULL, /* Don't want notified about all incoming messages */
+ GNUNET_NO, /* For header only inbound notification */
+ NULL, /* Don't want notified about all outbound messages */
+ GNUNET_NO, /* For header only outbound notification */
+ core_handlers); /* Register these handlers */
+ if (coreAPI == NULL)
+ return GNUNET_SYSERR;
+ all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Shutdown neighbours subsystem.
+ */
+void
+GST_NEIGHBOURS_done ()
+{
+ GNUNET_assert (coreAPI != NULL);
+ GNUNET_CORE_disconnect (coreAPI);
+ coreAPI = NULL;
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
+ GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
+ all_known_peers = NULL;
+ if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
+ {
+ GNUNET_SCHEDULER_cancel (find_peer_task);
+ find_peer_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+}
+
/* end of gnunet-service-dht_neighbours.c */