2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file dht/gnunet-service-dht_neighbours.c
23 * @brief GNUnet DHT service's bucket and neighbour management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-dht_datacache.h"
44 * How many buckets will we allow total.
46 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
49 * What is the maximum number of peers in a given bucket.
51 #define DEFAULT_BUCKET_SIZE 4
54 * Size of the bloom filter the DHT uses to filter peers.
56 #define DHT_BLOOM_SIZE 128
65 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
67 struct GNUNET_MessageHeader header;
72 uint32_t options GNUNET_PACKED;
77 uint32_t type GNUNET_PACKED;
82 uint32_t hop_count GNUNET_PACKED;
85 * Replication level for this message
87 uint32_t desired_replication_level GNUNET_PACKED;
90 * Length of the PUT path that follows (if tracked).
92 uint32_t put_path_length GNUNET_PACKED;
95 * When does the content expire?
97 struct GNUNET_TIME_AbsoluteNBO expiration_time;
100 * Bloomfilter (for peer identities) to stop circular routes
102 char bloomfilter[DHT_BLOOM_SIZE];
105 * The key we are storing under.
109 /* put path (if tracked) */
119 struct PeerResultMessage
122 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
124 struct GNUNET_MessageHeader header;
129 uint32_t type GNUNET_PACKED;
132 * Length of the PUT path that follows (if tracked).
134 uint32_t put_path_length GNUNET_PACKED;
137 * Length of the GET path that follows (if tracked).
139 uint32_t get_path_length GNUNET_PACKED;
142 * The key of the corresponding GET request.
146 /* put path (if tracked) */
148 /* get path (if tracked) */
158 struct PeerGetMessage
161 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
163 struct GNUNET_MessageHeader header;
168 uint32_t options GNUNET_PACKED;
171 * Desired content type.
173 uint32_t type GNUNET_PACKED;
178 uint32_t hop_count GNUNET_PACKED;
181 * Desired replication level for this request.
183 uint32_t desired_replication_level GNUNET_PACKED;
186 * Size of the extended query.
188 uint32_t xquery_size;
191 * Bloomfilter mutator.
196 * Bloomfilter (for peer identities) to stop circular routes
198 char bloomfilter[DHT_BLOOM_SIZE];
201 * The key we are looking for.
207 /* result bloomfilter */
213 * Linked list of messages to send to a particular other peer.
215 struct P2PPendingMessage
218 * Pointer to next item in the list
220 struct P2PPendingMessage *next;
223 * Pointer to previous item in the list
225 struct P2PPendingMessage *prev;
228 * Message importance level. FIXME: used? useful?
230 unsigned int importance;
233 * When does this message time out?
235 struct GNUNET_TIME_Absolute timeout;
238 * Actual message to be sent, allocated at the end of the struct:
239 * // msg = (cast) &pm[1];
240 * // memcpy (&pm[1], data, len);
242 const struct GNUNET_MessageHeader *msg;
248 * Entry for a peer in a bucket.
253 * Next peer entry (DLL)
255 struct PeerInfo *next;
258 * Prev peer entry (DLL)
260 struct PeerInfo *prev;
263 * Count of outstanding messages for peer. FIXME: NEEDED?
264 * FIXME: bound queue size!?
266 unsigned int pending_count;
269 * Head of pending messages to be sent to this peer.
271 struct P2PPendingMessage *head;
274 * Tail of pending messages to be sent to this peer.
276 struct P2PPendingMessage *tail;
279 * Core handle for sending messages to this peer.
281 struct GNUNET_CORE_TransmitHandle *th;
284 * Preference update context
286 struct GNUNET_CORE_InformationRequestContext *info_ctx;
289 * Task for scheduling message sends.
291 GNUNET_SCHEDULER_TaskIdentifier send_task;
294 * Task for scheduling preference updates
296 GNUNET_SCHEDULER_TaskIdentifier preference_task;
299 * What is the identity of the peer?
301 struct GNUNET_PeerIdentity id;
305 * What is the average latency for replies received?
307 struct GNUNET_TIME_Relative latency;
310 * Transport level distance to peer.
312 unsigned int distance;
319 * Peers are grouped into buckets.
326 struct PeerInfo *head;
331 struct PeerInfo *tail;
334 * Number of peers in the bucket.
336 unsigned int peers_size;
341 * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
343 static unsigned int closest_bucket;
346 * How many peers have we added since we sent out our last
349 static unsigned int newly_found_peers;
352 * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching.
354 static struct PeerBucket k_buckets[MAX_BUCKETS];
357 * Hash map of all known peers, for easy removal from k_buckets on disconnect.
359 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
362 * Maximum size for each bucket.
364 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
367 * Task that sends FIND PEER requests.
369 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
373 * Find the optimal bucket for this key.
375 * @param hc the hashcode to compare our identity to
376 * @return the proper bucket index, or GNUNET_SYSERR
377 * on error (same hashcode)
380 find_bucket (const GNUNET_HashCode * hc)
384 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
385 if (bits == MAX_BUCKETS)
387 /* How can all bits match? Got my own ID? */
389 return GNUNET_SYSERR;
391 return MAX_BUCKETS - bits - 1;
396 * Method called whenever a peer connects.
399 * @param peer peer identity this notification is about
400 * @param atsi performance data
403 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
404 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
406 struct PeerInfo *ret;
409 /* Check for connect to self message */
410 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
413 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
419 peer_bucket = find_bucket (&peer->hashPubKey);
420 GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
421 ret = GNUNET_malloc (sizeof (struct PeerInfo));
423 ret->latency = latency;
424 ret->distance = distance;
427 GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
428 k_buckets[peer_bucket].tail,
429 k_buckets[peer_bucket].tail, ret);
430 k_buckets[peer_bucket].peers_size++;
431 closest_bucket = GNUNET_MAX (closest_bucket,
433 if ( (peer_bucket > 0) &&
434 (k_buckets[peer_bucket].peers_size <= bucket_size) )
435 ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
437 GNUNET_assert (GNUNET_OK ==
438 GNUNET_CONTAINER_multihashmap_put (all_known_peers,
439 &peer->hashPubKey, ret,
440 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
441 increment_stats (STAT_PEERS_KNOWN);
446 * Method called whenever a peer disconnects.
449 * @param peer peer identity this notification is about
452 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
454 struct PeerInfo *to_remove;
456 struct P2PPendingMessage *pos;
457 struct P2PPendingMessage *next;
459 /* Check for disconnect from self message */
460 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
463 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
464 if (NULL == to_remove)
469 GNUNET_assert (GNUNET_YES ==
470 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
473 if (NULL != to_remove->info_ctx)
475 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
476 to_remove->info_ctx = NULL;
478 current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
479 GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
480 k_buckets[current_bucket].tail,
482 GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
483 k_buckets[current_bucket].peers_size--;
484 while ( (lowest_bucket > 0) &&
485 (k_buckets[lowest_bucket].peers_size == 0) )
488 if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
490 GNUNET_SCHEDULER_cancel (peer->send_task);
491 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
493 if (to_remove->th != NULL)
495 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
496 to_remove->th = NULL;
498 while (NULL != (pos = to_remove->head))
500 GNUNET_CONTAINER_DLL_remove (to_remove->head,
509 * Called when core is ready to send a message we asked for
510 * out to the destination.
512 * @param cls the 'struct PeerInfo' of the target peer
513 * @param size number of bytes available in buf
514 * @param buf where the callee should write the message
515 * @return number of bytes written to buf
518 core_transmit_notify (void *cls, size_t size, void *buf)
520 struct PeerInfo *peer = cls;
522 struct P2PPendingMessage *pending;
529 /* client disconnected */
532 if (peer->head == NULL)
534 /* no messages pending */
538 while ( (NULL != (pending = peer->head)) &&
539 (size - off >= (msize = ntohs (pending->msg->size))) )
541 memcpy (&cbuf[off], pending->msg, msize);
543 peer->pending_count--;
544 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
545 GNUNET_free (pending);
547 if (peer->head != NULL)
549 = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
551 pending->timeout, &peer->id, msize,
552 &core_transmit_notify, peer);
559 * Transmit all messages in the peer's message queue.
561 * @param peer message queue to process
564 process_peer_queue (struct PeerInfo *peer)
566 struct P2PPendingMessage *pending;
568 if (NULL != (pending = peer->head))
570 if (NULL != peer->th)
573 = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
575 pending->timeout, &peer->id,
576 ntohs (pending->msg->size),
577 &core_transmit_notify, peer);
582 * To how many peers should we (on average) forward the request to
583 * obtain the desired target_replication count (on average).
585 * FIXME: double-check that this is fine
587 * @param hop_count number of hops the message has traversed
588 * @param target_replication the number of total paths desired
589 * @return Some number of peers to forward the message to
592 get_forward_count (uint32_t hop_count,
593 uint32_t target_replication)
595 uint32_t random_value;
596 uint32_t forward_count;
599 /* bound by system-wide maximum */
600 target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
602 if (hop_count > log_of_network_size_estimate * 2.0)
604 /* Once we have reached our ideal number of hops, only forward to 1 peer */
608 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
609 ((float) (target_replication - 1.0) *
611 /* Set forward count to floor of target_value */
612 forward_count = (uint32_t) target_value;
613 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
614 target_value = target_value - forward_count;
616 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
617 if (random_value < (target_value * UINT32_MAX))
619 return forward_count;
624 * Check whether my identity is closer than any known peers. If a
625 * non-null bloomfilter is given, check if this is the closest peer
626 * that hasn't already been routed to.
629 * @param key hash code to check closeness to
630 * @param bloom bloomfilter, exclude these entries from the decision
631 * @return GNUNET_YES if node location is closest,
632 * GNUNET_NO otherwise.
635 am_closest_peer (const GNUNET_HashCode *key,
636 const struct GNUNET_CONTAINER_BloomFilter *bloom)
642 struct PeerInfo *pos;
643 unsigned int my_distance;
645 if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
647 bucket_num = find_current_bucket (key);
648 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
649 my_distance = distance (&my_identity.hashPubKey, key);
650 pos = k_buckets[bucket_num].head;
652 while ((pos != NULL) && (count < bucket_size))
654 if ((bloom != NULL) &&
656 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
659 continue; /* Skip already checked entries */
661 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
662 if (other_bits > bits)
664 if (other_bits == bits) /* We match the same number of bits */
668 /* No peers closer, we are the closest! */
674 * Select a peer from the routing table that would be a good routing
675 * destination for sending a message for "key". The resulting peer
676 * must not be in the set of blocked peers.<p>
678 * Note that we should not ALWAYS select the closest peer to the
679 * target, peers further away from the target should be chosen with
680 * exponentially declining probability.
682 * FIXME: double-check that this is fine
685 * @param key the key we are selecting a peer to route to
686 * @param bloom a bloomfilter containing entries this request has seen already
687 * @param hops how many hops has this message traversed thus far
688 * @return Peer to route to, or NULL on error
690 static struct PeerInfo *
691 select_peer (const GNUNET_HashCode *key,
692 const struct GNUNET_CONTAINER_BloomFilter *bloom,
697 unsigned int selected;
698 struct PeerInfo *pos;
699 unsigned int distance;
700 unsigned int largest_distance;
701 struct PeerInfo *chosen;
703 if (hops >= log_of_network_size_estimate)
705 /* greedy selection (closest peer that is not in bloomfilter) */
706 largest_distance = 0;
708 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
710 pos = k_buckets[bc].head;
712 while ((pos != NULL) && (count < bucket_size))
714 /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
716 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
718 distance = inverse_distance (key, &pos->id.hashPubKey);
719 if (distance > largest_distance)
722 largest_distance = distance;
732 /* select "random" peer */
733 /* count number of peers that are available and not filtered */
735 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
737 pos = k_buckets[bc].head;
738 while ((pos != NULL) && (count < bucket_size))
741 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
744 continue; /* Ignore bloomfiltered peers */
750 if (count == 0) /* No peers to select from! */
754 /* Now actually choose a peer */
755 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
757 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
759 pos = k_buckets[bc].head;
760 while ((pos != NULL) && (count < bucket_size))
763 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
766 continue; /* Ignore bloomfiltered peers */
779 * Compute the set of peers that the given request should be
782 * @param key routing key
783 * @param bloom bloom filter excluding peers as targets, all selected
784 * peers will be added to the bloom filter
785 * @param hop_count number of hops the request has traversed so far
786 * @param target_replication desired number of replicas
787 * @param targets where to store an array of target peers (to be
788 * free'd by the caller)
789 * @return number of peers returned in 'targets'.
792 get_target_peers (const GNUNET_HashCode *key,
793 struct GNUNET_CONTAINER_BloomFilter *bloom,
795 uint32_t target_replication,
796 struct PeerInfo ***targets)
800 struct PeerInfo **rtargets;
801 struct PeerInfo *nxt;
803 ret = get_forward_count (hop_count, target_replication);
809 rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
813 nxt = select_peer (key, bloom, hop_count);
816 rtargets[off++] = nxt;
817 GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
821 GNUNET_free (rtargets);
831 * Perform a PUT operation. Forwards the given request to other
832 * peers. Does not store the data locally. Does not give the
833 * data to local clients. May do nothing if this is the only
834 * peer in the network (or if we are the closest peer in the
837 * @param type type of the block
838 * @param options routing options
839 * @param desired_replication_level desired replication count
840 * @param expiration_time when does the content expire
841 * @param hop_count how many hops has this message traversed so far
842 * @param bf Bloom filter of peers this PUT has already traversed
843 * @param key key for the content
844 * @param put_path_length number of entries in put_path
845 * @param put_path peers this request has traversed so far (if tracked)
846 * @param data payload to store
847 * @param data_size number of bytes in data
850 GDS_NEIGHBOURS_handle_put (uint32_t type,
852 uint32_t desired_replication_level,
853 GNUNET_TIME_Absolute expiration_time,
855 struct GNUNET_CONTAINER_BloomFilter *bf,
856 const GNUNET_HashCode *key,
857 unsigned int put_path_length,
858 struct GNUNET_PeerIdentity *put_path,
862 unsigned int target_count;
864 struct PeerInfo **targets;
865 struct PeerInfo *target;
866 struct P2PPendingMessage *pending;
868 struct PeerPutMessage *ppm;
869 struct GNUNET_PeerIdentity *pp;
871 target_count = get_target_peers (key, bf, hop_count,
872 desired_replication_level,
874 if (0 == target_count)
876 msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
877 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
880 msize = data_size + sizeof (struct PeerPutMessage);
882 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
887 for (i=0;i<target_count;i++)
890 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
891 pending->importance = 0; /* FIXME */
892 pending->timeout = expiration_time;
893 ppm = (struct PeerPutMessage*) &pending[1];
894 pending->msg = &ppm->header;
895 ppm->header.size = htons (msize);
896 ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
897 ppm->options = htonl (options);
898 ppm->type = htonl (type);
899 ppm->hop_count = htonl (hop_count + 1);
900 ppm->desired_replication_level = htonl (desired_replication_level);
901 ppm->put_path_length = htonl (put_path_length);
902 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
903 GNUNET_assert (GNUNET_OK ==
904 GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
908 pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
909 memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
910 memcpy (&pp[put_path_length], data, data_size);
911 GNUNET_CONTAINER_DLL_insert (target->head,
914 target->pending_count++;
915 process_peer_queue (target);
917 GNUNET_free (targets);
922 * Perform a GET operation. Forwards the given request to other
923 * peers. Does not lookup the key locally. May do nothing if this is
924 * the only peer in the network (or if we are the closest peer in the
927 * @param type type of the block
928 * @param options routing options
929 * @param desired_replication_level desired replication count
930 * @param hop_count how many hops did this request traverse so far?
931 * @param key key for the content
932 * @param xquery extended query
933 * @param xquery_size number of bytes in xquery
934 * @param reply_bf bloomfilter to filter duplicates
935 * @param reply_bf_mutator mutator for reply_bf
936 * @param peer_bf filter for peers not to select (again)
939 GDS_NEIGHBOURS_handle_get (uint32_t type,
941 uint32_t desired_replication_level,
943 const GNUNET_HashCode *key,
946 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
947 uint32_t reply_bf_mutator,
948 const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
950 unsigned int target_count;
952 struct PeerInfo **targets;
953 struct PeerInfo *target;
954 struct P2PPendingMessage *pending;
956 struct PeerGetMessage *pgm;
958 size_t reply_bf_size;
960 target_count = get_target_peers (key, peer_bf, hop_count,
961 desired_replication_level,
963 if (0 == target_count)
965 reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
966 msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
967 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
972 for (i=0;i<target_count;i++)
975 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
976 pending->importance = 0; /* FIXME */
977 pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
978 pgm = (struct PeerGetMessage*) &pending[1];
979 pending->msg = &pgm->header;
980 pgm->header.size = htons (msize);
981 pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
982 pgm->options = htonl (options);
983 pgm->type = htonl (type);
984 pgm->hop_count = htonl (hop_count + 1);
985 pgm->desired_replication_level = htonl (desired_replication_level);
986 pgm->xquery_size = htonl (xquery_size);
987 pgm->bf_mutator = reply_bf_mutator;
988 GNUNET_assert (GNUNET_OK ==
989 GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
993 xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
994 memcpy (xq, xquery, xquery_size);
995 GNUNET_assert (GNUNET_OK ==
996 GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
999 GNUNET_CONTAINER_DLL_insert (target->head,
1002 target->pending_count++;
1003 process_peer_queue (target);
1005 GNUNET_free (targets);
1010 * Handle a reply (route to origin). Only forwards the reply back to
1011 * other peers waiting for it. Does not do local caching or
1012 * forwarding to local clients.
1014 * @param type type of the block
1015 * @param options routing options
1016 * @param expiration_time when does the content expire
1017 * @param key key for the content
1018 * @param put_path_length number of entries in put_path
1019 * @param put_path peers the original PUT traversed (if tracked)
1020 * @param get_path_length number of entries in put_path
1021 * @param get_path peers this reply has traversed so far (if tracked)
1022 * @param data payload of the reply
1023 * @param data_size number of bytes in data
1026 GDS_NEIGHBOURS_handle_reply (uint32_t type,
1028 GNUNET_TIME_Absolute expiration_time,
1029 const GNUNET_HashCode *key,
1030 unsigned int put_path_length,
1031 struct GNUNET_PeerIdentity *put_path,
1032 unsigned int get_path_length,
1033 struct GNUNET_PeerIdentity *get_path,
1042 * Closure for 'add_known_to_bloom'.
1044 struct BloomConstructorContext
1047 * Bloom filter under construction.
1049 struct GNUNET_CONTAINER_BloomFilter *bloom;
1054 uint32_t bf_mutator;
1059 * Add each of the peers we already know to the bloom filter of
1060 * the request so that we don't get duplicate HELLOs.
1062 * @param cls the 'struct BloomConstructorContext'.
1063 * @param key peer identity to add to the bloom filter
1064 * @param value value the peer information (unused)
1065 * @return GNUNET_YES (we should continue to iterate)
1068 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1070 struct BloomConstructorContext *ctx = cls;
1073 GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1074 GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1080 * Task to send a find peer message for our own peer identifier
1081 * so that we can find the closest peers in the network to ourselves
1082 * and attempt to connect to them.
1084 * @param cls closure for this task
1085 * @param tc the context under which the task is running
1088 send_find_peer_message (void *cls,
1089 const struct GNUNET_SCHEDULER_TaskContext *tc)
1091 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
1092 struct DHT_MessageContext msg_ctx;
1093 struct GNUNET_TIME_Relative next_send_time;
1094 struct BloomConstructorContext bcc;
1096 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1097 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1099 if (newly_found_peers > bucket_size)
1101 /* If we are finding many peers already, no need to send out our request right now! */
1102 find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1103 &send_find_peer_message, NULL);
1104 newly_found_peers = 0;
1107 bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1109 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1110 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers,
1111 &add_known_to_bloom,
1113 // FIXME: pass priority!?
1114 GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1115 GNUNET_DHT_RO_FIND_PEER,
1116 16 /* FIXME: replication level? */,
1118 &my_identity.hashPubKey,
1120 bcc.bloom, bcc.bf_mutator, NULL);
1121 GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1122 /* schedule next round */
1123 newly_found_peers = 0;
1124 next_send_time.rel_value =
1125 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1126 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1127 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1128 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1129 &send_find_peer_message,
1135 * To be called on core init/fail.
1137 * @param cls service closure
1138 * @param server handle to the server for this service
1139 * @param identity the public identity of this peer
1140 * @param publicKey the public key of this peer
1143 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1144 const struct GNUNET_PeerIdentity *identity,
1145 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1147 GNUNET_assert (server != NULL);
1148 my_identity = *identity;
1149 next_send_time.rel_value =
1150 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1151 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1152 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1154 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1155 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1156 &send_find_peer_message,
1162 * Core handler for p2p put requests.
1164 * @param cls closure
1165 * @param peer sender of the request
1166 * @param message message
1167 * @param peer peer identity this notification is about
1168 * @param atsi performance data
1169 * @return GNUNET_OK to keep the connection open,
1170 * GNUNET_SYSERR to close it (signal serious error)
1173 handle_dht_p2p_put (void *cls,
1174 const struct GNUNET_PeerIdentity *peer,
1175 const struct GNUNET_MessageHeader *message,
1176 const struct GNUNET_TRANSPORT_ATS_Information
1179 const struct PeerPutMessage *put;
1180 const struct GNUNET_PeerIdentity *put_path;
1181 const void *payload;
1184 size_t payload_size;
1185 struct GNUNET_CONTAINER_BloomFilter *bf;
1186 GNUNET_HashCode test_key;
1188 msize = ntohs (message->size);
1189 if (msize < sizeof (struct PeerPutMessage))
1191 GNUNET_break_op (0);
1194 put = (const struct PeerPutMessage*) message;
1195 putlen = ntohl (put->put_path_length);
1196 if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1197 (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1199 GNUNET_break_op (0);
1202 put_path = (const struct GNUNET_PeerIdentity*) &put[1];
1203 payload = &put_path[putlen];
1204 payload_size = msize - (sizeof (struct PeerPutMessage) +
1205 putlen * sizeof (struct GNUNET_PeerIdentity));
1206 switch (GNUNET_BLOCK_get_key (block_context,
1208 payload, payload_size,
1212 if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
1214 GNUNET_break_op (0);
1219 GNUNET_break_op (0);
1222 /* cannot verify, good luck */
1225 bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1229 struct GNUNET_PeerIdentity pp[putlen+1];
1231 /* extend 'put path' by sender */
1232 memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1233 pp[putlen] = *sender;
1235 /* give to local clients */
1236 GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1245 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1251 /* route to other peers */
1252 GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1253 ntohl (put->options),
1254 ntohl (put->desired_replication_level),
1255 GNUNET_TIME_absolute_ntoh (put->expiration_time),
1256 ntohl (put->hop_count),
1262 GNUNET_CONTAINER_bloomfilter_free (bf);
1268 * Core handler for p2p get requests.
1270 * @param cls closure
1271 * @param peer sender of the request
1272 * @param message message
1273 * @param peer peer identity this notification is about
1274 * @param atsi performance data
1275 * @return GNUNET_OK to keep the connection open,
1276 * GNUNET_SYSERR to close it (signal serious error)
1279 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1280 const struct GNUNET_MessageHeader *message,
1281 const struct GNUNET_TRANSPORT_ATS_Information
1285 // 2) store in routing table
1286 // 3) check options (i.e. FIND PEER)
1287 // 4) local lookup (=> need eval result!)
1288 // 5) p2p forwarding
1291 struct GNUNET_DHT_P2PRouteMessage *incoming =
1292 (struct GNUNET_DHT_P2PRouteMessage *) message;
1293 struct GNUNET_MessageHeader *enc_msg =
1294 (struct GNUNET_MessageHeader *) &incoming[1];
1295 struct DHT_MessageContext *msg_ctx;
1300 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
1302 GNUNET_break_op (0);
1306 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
1308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309 "Sending of previous replies took too long, backing off!\n");
1310 increment_stats ("# route requests dropped due to high load");
1311 decrease_max_send_delay (get_max_send_delay ());
1314 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
1316 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
1318 GNUNET_assert (msg_ctx->bloom != NULL);
1319 msg_ctx->hop_count = ntohl (incoming->hop_count);
1320 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
1321 msg_ctx->replication = ntohl (incoming->desired_replication_level);
1322 msg_ctx->msg_options = ntohl (incoming->options);
1323 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1324 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1327 ntohl (incoming->outgoing_path_length) *
1328 sizeof (struct GNUNET_PeerIdentity);
1329 if (ntohs (message->size) !=
1330 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
1333 GNUNET_break_op (0);
1334 GNUNET_free (msg_ctx);
1337 route_path = (char *) &incoming[1];
1338 route_path = route_path + ntohs (enc_msg->size);
1339 msg_ctx->path_history =
1340 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
1341 memcpy (msg_ctx->path_history, route_path, path_size);
1342 memcpy (&msg_ctx->path_history[path_size], &my_identity,
1343 sizeof (struct GNUNET_PeerIdentity));
1344 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
1346 msg_ctx->network_size = ntohl (incoming->network_size);
1347 msg_ctx->peer = *peer;
1348 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
1349 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
1350 demultiplex_message (enc_msg, msg_ctx);
1351 if (msg_ctx->bloom != NULL)
1353 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
1354 msg_ctx->bloom = NULL;
1356 GNUNET_free (msg_ctx);
1362 * Core handler for p2p result messages.
1364 * @param cls closure
1365 * @param message message
1366 * @param peer peer identity this notification is about
1367 * @param atsi performance data
1371 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1372 const struct GNUNET_MessageHeader *message,
1373 const struct GNUNET_TRANSPORT_ATS_Information
1376 // 1) validate result format
1377 // 2) append 'peer' to put path
1378 // 3) forward to local clients
1380 const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
1381 (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
1382 struct GNUNET_MessageHeader *enc_msg =
1383 (struct GNUNET_MessageHeader *) &incoming[1];
1384 struct DHT_MessageContext msg_ctx;
1387 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
1389 GNUNET_break_op (0);
1393 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
1394 memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
1395 msg_ctx.msg_options = ntohl (incoming->options);
1396 msg_ctx.hop_count = ntohl (incoming->hop_count);
1397 msg_ctx.peer = *peer;
1398 msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
1399 msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1400 if ((GNUNET_DHT_RO_RECORD_ROUTE ==
1401 (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
1402 (ntohl (incoming->outgoing_path_length) > 0))
1404 if (ntohs (message->size) -
1405 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
1406 ntohs (enc_msg->size) !=
1407 ntohl (incoming->outgoing_path_length) *
1408 sizeof (struct GNUNET_PeerIdentity))
1410 GNUNET_break_op (0);
1413 msg_ctx.path_history = (char *) &incoming[1];
1414 msg_ctx.path_history += ntohs (enc_msg->size);
1415 msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
1417 route_result_message (enc_msg, &msg_ctx);
1423 * Initialize neighbours subsystem.
1426 GDS_NEIGHBOURS_init ()
1428 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1429 {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1430 {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1431 {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1434 unsigned long long temp_config_num;
1435 struct GNUNET_TIME_Relative next_send_time;
1438 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
1440 bucket_size = (unsigned int) temp_config_num;
1441 coreAPI = GNUNET_CORE_connect (GDS_cfg,
1442 DEFAULT_CORE_QUEUE_SIZE,
1445 &handle_core_connect,
1446 &handle_core_disconnect,
1447 NULL, /* Do we care about "status" updates? */
1451 if (coreAPI == NULL)
1452 return GNUNET_SYSERR;
1453 all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1459 * Shutdown neighbours subsystem.
1462 GDS_NEIGHBOURS_done ()
1464 GNUNET_assert (coreAPI != NULL);
1465 GNUNET_CORE_disconnect (coreAPI);
1467 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
1468 GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1469 all_known_peers = NULL;
1470 if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1472 GNUNET_SCHEDULER_cancel (find_peer_task);
1473 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1478 /* end of gnunet-service-dht_neighbours.c */