2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file dht/gnunet-service-dht_neighbours.c
23 * @brief GNUnet DHT service's bucket and neighbour management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-dht_datacache.h"
41 #include "gnunet-service-dht_routing.h"
45 * How many buckets will we allow total.
47 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
50 * What is the maximum number of peers in a given bucket.
52 #define DEFAULT_BUCKET_SIZE 4
55 * Size of the bloom filter the DHT uses to filter peers.
57 #define DHT_BLOOM_SIZE 128
66 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
68 struct GNUNET_MessageHeader header;
73 uint32_t options GNUNET_PACKED;
78 uint32_t type GNUNET_PACKED;
83 uint32_t hop_count GNUNET_PACKED;
86 * Replication level for this message
88 uint32_t desired_replication_level GNUNET_PACKED;
91 * Length of the PUT path that follows (if tracked).
93 uint32_t put_path_length GNUNET_PACKED;
96 * When does the content expire?
98 struct GNUNET_TIME_AbsoluteNBO expiration_time;
101 * Bloomfilter (for peer identities) to stop circular routes
103 char bloomfilter[DHT_BLOOM_SIZE];
106 * The key we are storing under.
110 /* put path (if tracked) */
120 struct PeerResultMessage
123 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
125 struct GNUNET_MessageHeader header;
130 uint32_t type GNUNET_PACKED;
133 * Length of the PUT path that follows (if tracked).
135 uint32_t put_path_length GNUNET_PACKED;
138 * Length of the GET path that follows (if tracked).
140 uint32_t get_path_length GNUNET_PACKED;
143 * When does the content expire?
145 struct GNUNET_TIME_AbsoluteNBO expiration_time;
148 * The key of the corresponding GET request.
152 /* put path (if tracked) */
154 /* get path (if tracked) */
164 struct PeerGetMessage
167 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
169 struct GNUNET_MessageHeader header;
174 uint32_t options GNUNET_PACKED;
177 * Desired content type.
179 uint32_t type GNUNET_PACKED;
184 uint32_t hop_count GNUNET_PACKED;
187 * Desired replication level for this request.
189 uint32_t desired_replication_level GNUNET_PACKED;
192 * Size of the extended query.
194 uint32_t xquery_size;
197 * Bloomfilter mutator.
202 * Bloomfilter (for peer identities) to stop circular routes
204 char bloomfilter[DHT_BLOOM_SIZE];
207 * The key we are looking for.
213 /* result bloomfilter */
219 * Linked list of messages to send to a particular other peer.
221 struct P2PPendingMessage
224 * Pointer to next item in the list
226 struct P2PPendingMessage *next;
229 * Pointer to previous item in the list
231 struct P2PPendingMessage *prev;
234 * Message importance level. FIXME: used? useful?
236 unsigned int importance;
239 * When does this message time out?
241 struct GNUNET_TIME_Absolute timeout;
244 * Actual message to be sent, allocated at the end of the struct:
245 * // msg = (cast) &pm[1];
246 * // memcpy (&pm[1], data, len);
248 const struct GNUNET_MessageHeader *msg;
254 * Entry for a peer in a bucket.
259 * Next peer entry (DLL)
261 struct PeerInfo *next;
264 * Prev peer entry (DLL)
266 struct PeerInfo *prev;
269 * Count of outstanding messages for peer. FIXME: NEEDED?
270 * FIXME: bound queue size!?
272 unsigned int pending_count;
275 * Head of pending messages to be sent to this peer.
277 struct P2PPendingMessage *head;
280 * Tail of pending messages to be sent to this peer.
282 struct P2PPendingMessage *tail;
285 * Core handle for sending messages to this peer.
287 struct GNUNET_CORE_TransmitHandle *th;
290 * Preference update context
292 struct GNUNET_CORE_InformationRequestContext *info_ctx;
295 * Task for scheduling message sends.
297 GNUNET_SCHEDULER_TaskIdentifier send_task;
300 * Task for scheduling preference updates
302 GNUNET_SCHEDULER_TaskIdentifier preference_task;
305 * What is the identity of the peer?
307 struct GNUNET_PeerIdentity id;
311 * What is the average latency for replies received?
313 struct GNUNET_TIME_Relative latency;
316 * Transport level distance to peer.
318 unsigned int distance;
325 * Peers are grouped into buckets.
332 struct PeerInfo *head;
337 struct PeerInfo *tail;
340 * Number of peers in the bucket.
342 unsigned int peers_size;
347 * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
349 static unsigned int closest_bucket;
352 * How many peers have we added since we sent out our last
355 static unsigned int newly_found_peers;
358 * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching.
360 static struct PeerBucket k_buckets[MAX_BUCKETS];
363 * Hash map of all known peers, for easy removal from k_buckets on disconnect.
365 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
368 * Maximum size for each bucket.
370 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
373 * Task that sends FIND PEER requests.
375 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
379 * Find the optimal bucket for this key.
381 * @param hc the hashcode to compare our identity to
382 * @return the proper bucket index, or GNUNET_SYSERR
383 * on error (same hashcode)
386 find_bucket (const GNUNET_HashCode * hc)
390 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
391 if (bits == MAX_BUCKETS)
393 /* How can all bits match? Got my own ID? */
395 return GNUNET_SYSERR;
397 return MAX_BUCKETS - bits - 1;
402 * Method called whenever a peer connects.
405 * @param peer peer identity this notification is about
406 * @param atsi performance data
409 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
410 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
412 struct PeerInfo *ret;
415 /* Check for connect to self message */
416 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
419 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
425 peer_bucket = find_bucket (&peer->hashPubKey);
426 GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
427 ret = GNUNET_malloc (sizeof (struct PeerInfo));
429 ret->latency = latency;
430 ret->distance = distance;
433 GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
434 k_buckets[peer_bucket].tail,
435 k_buckets[peer_bucket].tail, ret);
436 k_buckets[peer_bucket].peers_size++;
437 closest_bucket = GNUNET_MAX (closest_bucket,
439 if ( (peer_bucket > 0) &&
440 (k_buckets[peer_bucket].peers_size <= bucket_size) )
441 ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
443 GNUNET_assert (GNUNET_OK ==
444 GNUNET_CONTAINER_multihashmap_put (all_known_peers,
445 &peer->hashPubKey, ret,
446 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
447 increment_stats (STAT_PEERS_KNOWN);
452 * Method called whenever a peer disconnects.
455 * @param peer peer identity this notification is about
458 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
460 struct PeerInfo *to_remove;
462 struct P2PPendingMessage *pos;
463 struct P2PPendingMessage *next;
465 /* Check for disconnect from self message */
466 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
469 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
470 if (NULL == to_remove)
475 GNUNET_assert (GNUNET_YES ==
476 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
479 if (NULL != to_remove->info_ctx)
481 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
482 to_remove->info_ctx = NULL;
484 current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
485 GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
486 k_buckets[current_bucket].tail,
488 GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
489 k_buckets[current_bucket].peers_size--;
490 while ( (lowest_bucket > 0) &&
491 (k_buckets[lowest_bucket].peers_size == 0) )
494 if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
496 GNUNET_SCHEDULER_cancel (peer->send_task);
497 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
499 if (to_remove->th != NULL)
501 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
502 to_remove->th = NULL;
504 while (NULL != (pos = to_remove->head))
506 GNUNET_CONTAINER_DLL_remove (to_remove->head,
515 * Called when core is ready to send a message we asked for
516 * out to the destination.
518 * @param cls the 'struct PeerInfo' of the target peer
519 * @param size number of bytes available in buf
520 * @param buf where the callee should write the message
521 * @return number of bytes written to buf
524 core_transmit_notify (void *cls, size_t size, void *buf)
526 struct PeerInfo *peer = cls;
528 struct P2PPendingMessage *pending;
535 /* client disconnected */
538 if (peer->head == NULL)
540 /* no messages pending */
544 while ( (NULL != (pending = peer->head)) &&
545 (size - off >= (msize = ntohs (pending->msg->size))) )
547 memcpy (&cbuf[off], pending->msg, msize);
549 peer->pending_count--;
550 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
551 GNUNET_free (pending);
553 if (peer->head != NULL)
555 = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
557 pending->timeout, &peer->id, msize,
558 &core_transmit_notify, peer);
565 * Transmit all messages in the peer's message queue.
567 * @param peer message queue to process
570 process_peer_queue (struct PeerInfo *peer)
572 struct P2PPendingMessage *pending;
574 if (NULL != (pending = peer->head))
576 if (NULL != peer->th)
579 = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
581 pending->timeout, &peer->id,
582 ntohs (pending->msg->size),
583 &core_transmit_notify, peer);
588 * To how many peers should we (on average) forward the request to
589 * obtain the desired target_replication count (on average).
591 * @param hop_count number of hops the message has traversed
592 * @param target_replication the number of total paths desired
593 * @return Some number of peers to forward the message to
596 get_forward_count (uint32_t hop_count,
597 uint32_t target_replication)
599 uint32_t random_value;
600 uint32_t forward_count;
603 if (hop_count > log_of_network_size_estimate * 4.0)
605 /* forcefully terminate */
608 if (hop_count > log_of_network_size_estimate * 2.0)
610 /* Once we have reached our ideal number of hops, only forward to 1 peer */
613 /* bound by system-wide maximum */
614 target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
617 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
618 ((float) (target_replication - 1.0) *
620 /* Set forward count to floor of target_value */
621 forward_count = (uint32_t) target_value;
622 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
623 target_value = target_value - forward_count;
625 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
626 if (random_value < (target_value * UINT32_MAX))
628 return forward_count;
633 * Check whether my identity is closer than any known peers. If a
634 * non-null bloomfilter is given, check if this is the closest peer
635 * that hasn't already been routed to.
638 * @param key hash code to check closeness to
639 * @param bloom bloomfilter, exclude these entries from the decision
640 * @return GNUNET_YES if node location is closest,
641 * GNUNET_NO otherwise.
644 am_closest_peer (const GNUNET_HashCode *key,
645 const struct GNUNET_CONTAINER_BloomFilter *bloom)
651 struct PeerInfo *pos;
652 unsigned int my_distance;
654 if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
656 bucket_num = find_current_bucket (key);
657 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
658 my_distance = distance (&my_identity.hashPubKey, key);
659 pos = k_buckets[bucket_num].head;
661 while ((pos != NULL) && (count < bucket_size))
663 if ((bloom != NULL) &&
665 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
668 continue; /* Skip already checked entries */
670 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
671 if (other_bits > bits)
673 if (other_bits == bits) /* We match the same number of bits */
677 /* No peers closer, we are the closest! */
683 * Select a peer from the routing table that would be a good routing
684 * destination for sending a message for "key". The resulting peer
685 * must not be in the set of blocked peers.<p>
687 * Note that we should not ALWAYS select the closest peer to the
688 * target, peers further away from the target should be chosen with
689 * exponentially declining probability.
691 * FIXME: double-check that this is fine
694 * @param key the key we are selecting a peer to route to
695 * @param bloom a bloomfilter containing entries this request has seen already
696 * @param hops how many hops has this message traversed thus far
697 * @return Peer to route to, or NULL on error
699 static struct PeerInfo *
700 select_peer (const GNUNET_HashCode *key,
701 const struct GNUNET_CONTAINER_BloomFilter *bloom,
706 unsigned int selected;
707 struct PeerInfo *pos;
708 unsigned int distance;
709 unsigned int largest_distance;
710 struct PeerInfo *chosen;
712 if (hops >= log_of_network_size_estimate)
714 /* greedy selection (closest peer that is not in bloomfilter) */
715 largest_distance = 0;
717 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
719 pos = k_buckets[bc].head;
721 while ((pos != NULL) && (count < bucket_size))
723 /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
725 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
727 distance = inverse_distance (key, &pos->id.hashPubKey);
728 if (distance > largest_distance)
731 largest_distance = distance;
741 /* select "random" peer */
742 /* count number of peers that are available and not filtered */
744 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
746 pos = k_buckets[bc].head;
747 while ((pos != NULL) && (count < bucket_size))
750 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
753 continue; /* Ignore bloomfiltered peers */
759 if (count == 0) /* No peers to select from! */
763 /* Now actually choose a peer */
764 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
766 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
768 pos = k_buckets[bc].head;
769 while ((pos != NULL) && (count < bucket_size))
772 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
775 continue; /* Ignore bloomfiltered peers */
788 * Compute the set of peers that the given request should be
791 * @param key routing key
792 * @param bloom bloom filter excluding peers as targets, all selected
793 * peers will be added to the bloom filter
794 * @param hop_count number of hops the request has traversed so far
795 * @param target_replication desired number of replicas
796 * @param targets where to store an array of target peers (to be
797 * free'd by the caller)
798 * @return number of peers returned in 'targets'.
801 get_target_peers (const GNUNET_HashCode *key,
802 struct GNUNET_CONTAINER_BloomFilter *bloom,
804 uint32_t target_replication,
805 struct PeerInfo ***targets)
809 struct PeerInfo **rtargets;
810 struct PeerInfo *nxt;
812 ret = get_forward_count (hop_count, target_replication);
818 rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
822 nxt = select_peer (key, bloom, hop_count);
825 rtargets[off++] = nxt;
826 GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
830 GNUNET_free (rtargets);
840 * Perform a PUT operation. Forwards the given request to other
841 * peers. Does not store the data locally. Does not give the
842 * data to local clients. May do nothing if this is the only
843 * peer in the network (or if we are the closest peer in the
846 * @param type type of the block
847 * @param options routing options
848 * @param desired_replication_level desired replication count
849 * @param expiration_time when does the content expire
850 * @param hop_count how many hops has this message traversed so far
851 * @param bf Bloom filter of peers this PUT has already traversed
852 * @param key key for the content
853 * @param put_path_length number of entries in put_path
854 * @param put_path peers this request has traversed so far (if tracked)
855 * @param data payload to store
856 * @param data_size number of bytes in data
859 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
860 enum GNUNET_DHT_RouteOption options,
861 uint32_t desired_replication_level,
862 GNUNET_TIME_Absolute expiration_time,
864 struct GNUNET_CONTAINER_BloomFilter *bf,
865 const GNUNET_HashCode *key,
866 unsigned int put_path_length,
867 struct GNUNET_PeerIdentity *put_path,
871 unsigned int target_count;
873 struct PeerInfo **targets;
874 struct PeerInfo *target;
875 struct P2PPendingMessage *pending;
877 struct PeerPutMessage *ppm;
878 struct GNUNET_PeerIdentity *pp;
880 target_count = get_target_peers (key, bf, hop_count,
881 desired_replication_level,
883 if (0 == target_count)
885 msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
886 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
889 msize = data_size + sizeof (struct PeerPutMessage);
891 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
896 for (i=0;i<target_count;i++)
899 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
900 pending->importance = 0; /* FIXME */
901 pending->timeout = expiration_time;
902 ppm = (struct PeerPutMessage*) &pending[1];
903 pending->msg = &ppm->header;
904 ppm->header.size = htons (msize);
905 ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
906 ppm->options = htonl (options);
907 ppm->type = htonl (type);
908 ppm->hop_count = htonl (hop_count + 1);
909 ppm->desired_replication_level = htonl (desired_replication_level);
910 ppm->put_path_length = htonl (put_path_length);
911 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
912 GNUNET_assert (GNUNET_OK ==
913 GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
917 pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
918 memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
919 memcpy (&pp[put_path_length], data, data_size);
920 GNUNET_CONTAINER_DLL_insert (target->head,
923 target->pending_count++;
924 process_peer_queue (target);
926 GNUNET_free (targets);
931 * Perform a GET operation. Forwards the given request to other
932 * peers. Does not lookup the key locally. May do nothing if this is
933 * the only peer in the network (or if we are the closest peer in the
936 * @param type type of the block
937 * @param options routing options
938 * @param desired_replication_level desired replication count
939 * @param hop_count how many hops did this request traverse so far?
940 * @param key key for the content
941 * @param xquery extended query
942 * @param xquery_size number of bytes in xquery
943 * @param reply_bf bloomfilter to filter duplicates
944 * @param reply_bf_mutator mutator for reply_bf
945 * @param peer_bf filter for peers not to select (again)
948 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
949 enum GNUNET_DHT_RouteOption options,
950 uint32_t desired_replication_level,
952 const GNUNET_HashCode *key,
955 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
956 uint32_t reply_bf_mutator,
957 const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
959 unsigned int target_count;
961 struct PeerInfo **targets;
962 struct PeerInfo *target;
963 struct P2PPendingMessage *pending;
965 struct PeerGetMessage *pgm;
967 size_t reply_bf_size;
969 target_count = get_target_peers (key, peer_bf, hop_count,
970 desired_replication_level,
972 if (0 == target_count)
974 reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
975 msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
976 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
981 /* forward request */
982 for (i=0;i<target_count;i++)
985 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
986 pending->importance = 0; /* FIXME */
987 pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
988 pgm = (struct PeerGetMessage*) &pending[1];
989 pending->msg = &pgm->header;
990 pgm->header.size = htons (msize);
991 pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
992 pgm->options = htonl (options);
993 pgm->type = htonl (type);
994 pgm->hop_count = htonl (hop_count + 1);
995 pgm->desired_replication_level = htonl (desired_replication_level);
996 pgm->xquery_size = htonl (xquery_size);
997 pgm->bf_mutator = reply_bf_mutator;
998 GNUNET_assert (GNUNET_OK ==
999 GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1003 xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
1004 memcpy (xq, xquery, xquery_size);
1005 GNUNET_assert (GNUNET_OK ==
1006 GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1009 GNUNET_CONTAINER_DLL_insert (target->head,
1012 target->pending_count++;
1013 process_peer_queue (target);
1015 GNUNET_free (targets);
1020 * Handle a reply (route to origin). Only forwards the reply back to
1021 * the given peer. Does not do local caching or forwarding to local
1024 * @param target neighbour that should receive the block (if still connected)
1025 * @param type type of the block
1026 * @param expiration_time when does the content expire
1027 * @param key key for the content
1028 * @param put_path_length number of entries in put_path
1029 * @param put_path peers the original PUT traversed (if tracked)
1030 * @param get_path_length number of entries in put_path
1031 * @param get_path peers this reply has traversed so far (if tracked)
1032 * @param data payload of the reply
1033 * @param data_size number of bytes in data
1036 GDS_NEIGHBOURS_handle_reply (const GNUNET_PeerIdentity *target,
1037 enum GNUNET_BLOCK_Type type,
1038 GNUNET_TIME_Absolute expiration_time,
1039 const GNUNET_HashCode *key,
1040 unsigned int put_path_length,
1041 struct GNUNET_PeerIdentity *put_path,
1042 unsigned int get_path_length,
1043 struct GNUNET_PeerIdentity *get_path,
1047 struct PeerInfo *pi;
1048 struct P2PPendingMessage *pending;
1050 struct PeerResultMessage *prm;
1051 struct GNUNET_PeerIdentity *paths;
1053 msize = data_size + sizeof (struct PeerResultMessage) +
1054 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1055 if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1056 (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1057 (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1058 (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1063 pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1064 &target->hashPubKey);
1067 /* peer disconnected in the meantime, drop reply */
1070 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1071 pending->importance = 0; /* FIXME */
1072 pending->timeout = expiration_time;
1073 prm = (struct PeerResultMessage*) &pending[1];
1074 pending->msg = &prm->header;
1075 prm->header.size = htons (msize);
1076 prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1077 prm->type = htonl (type);
1078 prm->put_path_length = htonl (put_path_length);
1079 prm->get_path_length = htonl (get_path_length);
1080 prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1082 paths = (struct GNUNET_PeerIdentity) &prm[1];
1083 memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1084 memcpy (&paths[put_path_length],
1085 get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1086 memcpy (&paths[put_path_length + get_path_length],
1088 GNUNET_CONTAINER_DLL_insert (target->head,
1091 target->pending_count++;
1092 process_peer_queue (target);
1097 * Closure for 'add_known_to_bloom'.
1099 struct BloomConstructorContext
1102 * Bloom filter under construction.
1104 struct GNUNET_CONTAINER_BloomFilter *bloom;
1109 uint32_t bf_mutator;
1114 * Add each of the peers we already know to the bloom filter of
1115 * the request so that we don't get duplicate HELLOs.
1117 * @param cls the 'struct BloomConstructorContext'.
1118 * @param key peer identity to add to the bloom filter
1119 * @param value value the peer information (unused)
1120 * @return GNUNET_YES (we should continue to iterate)
1123 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1125 struct BloomConstructorContext *ctx = cls;
1128 GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1129 GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1135 * Task to send a find peer message for our own peer identifier
1136 * so that we can find the closest peers in the network to ourselves
1137 * and attempt to connect to them.
1139 * @param cls closure for this task
1140 * @param tc the context under which the task is running
1143 send_find_peer_message (void *cls,
1144 const struct GNUNET_SCHEDULER_TaskContext *tc)
1146 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
1147 struct DHT_MessageContext msg_ctx;
1148 struct GNUNET_TIME_Relative next_send_time;
1149 struct BloomConstructorContext bcc;
1151 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1152 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1154 if (newly_found_peers > bucket_size)
1156 /* If we are finding many peers already, no need to send out our request right now! */
1157 find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1158 &send_find_peer_message, NULL);
1159 newly_found_peers = 0;
1162 bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1164 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1165 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers,
1166 &add_known_to_bloom,
1168 // FIXME: pass priority!?
1169 GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1170 GNUNET_DHT_RO_FIND_PEER,
1171 16 /* FIXME: replication level? */,
1173 &my_identity.hashPubKey,
1175 bcc.bloom, bcc.bf_mutator, NULL);
1176 GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1177 /* schedule next round */
1178 newly_found_peers = 0;
1179 next_send_time.rel_value =
1180 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1181 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1182 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1183 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1184 &send_find_peer_message,
1190 * To be called on core init/fail.
1192 * @param cls service closure
1193 * @param server handle to the server for this service
1194 * @param identity the public identity of this peer
1195 * @param publicKey the public key of this peer
1198 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1199 const struct GNUNET_PeerIdentity *identity,
1200 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1202 GNUNET_assert (server != NULL);
1203 my_identity = *identity;
1204 next_send_time.rel_value =
1205 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1206 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1207 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1209 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1210 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1211 &send_find_peer_message,
1217 * Core handler for p2p put requests.
1219 * @param cls closure
1220 * @param peer sender of the request
1221 * @param message message
1222 * @param peer peer identity this notification is about
1223 * @param atsi performance data
1224 * @return GNUNET_OK to keep the connection open,
1225 * GNUNET_SYSERR to close it (signal serious error)
1228 handle_dht_p2p_put (void *cls,
1229 const struct GNUNET_PeerIdentity *peer,
1230 const struct GNUNET_MessageHeader *message,
1231 const struct GNUNET_TRANSPORT_ATS_Information
1234 const struct PeerPutMessage *put;
1235 const struct GNUNET_PeerIdentity *put_path;
1236 const void *payload;
1239 size_t payload_size;
1240 struct GNUNET_CONTAINER_BloomFilter *bf;
1241 GNUNET_HashCode test_key;
1243 msize = ntohs (message->size);
1244 if (msize < sizeof (struct PeerPutMessage))
1246 GNUNET_break_op (0);
1249 put = (const struct PeerPutMessage*) message;
1250 putlen = ntohl (put->put_path_length);
1251 if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1252 (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1254 GNUNET_break_op (0);
1257 put_path = (const struct GNUNET_PeerIdentity*) &put[1];
1258 payload = &put_path[putlen];
1259 payload_size = msize - (sizeof (struct PeerPutMessage) +
1260 putlen * sizeof (struct GNUNET_PeerIdentity));
1261 switch (GNUNET_BLOCK_get_key (block_context,
1263 payload, payload_size,
1267 if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
1269 GNUNET_break_op (0);
1274 GNUNET_break_op (0);
1277 /* cannot verify, good luck */
1280 bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1284 struct GNUNET_PeerIdentity pp[putlen+1];
1286 /* extend 'put path' by sender */
1287 memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1288 pp[putlen] = *sender;
1290 /* give to local clients */
1291 GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1300 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1306 /* route to other peers */
1307 GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1308 ntohl (put->options),
1309 ntohl (put->desired_replication_level),
1310 GNUNET_TIME_absolute_ntoh (put->expiration_time),
1311 ntohl (put->hop_count),
1317 GNUNET_CONTAINER_bloomfilter_free (bf);
1323 * Core handler for p2p get requests.
1325 * @param cls closure
1326 * @param peer sender of the request
1327 * @param message message
1328 * @param peer peer identity this notification is about
1329 * @param atsi performance data
1330 * @return GNUNET_OK to keep the connection open,
1331 * GNUNET_SYSERR to close it (signal serious error)
1334 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1335 const struct GNUNET_MessageHeader *message,
1336 const struct GNUNET_TRANSPORT_ATS_Information
1339 struct PeerGetMessage *get;
1340 uint32_t xquery_size;
1341 size_t reply_bf_size;
1343 enum GNUNET_BLOCK_Type type;
1344 enum GNUNET_DHT_RouteOption options;
1345 enum GNUNET_BLOCK_EvaluationResult eval;
1346 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1347 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1350 /* parse and validate message */
1351 msize = ntohs (message->size);
1352 if (msize < sizeof (struct PeerGetMessage))
1354 GNUNET_break_op (0);
1357 get = (struct PeerGetMessage *) message;
1358 xquery_size = ntohl (get->xquery_size);
1359 if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1361 GNUNET_break_op (0);
1364 reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1365 type = ntohl (get->type);
1366 options = ntohl (get->options);
1367 xquery = (const char*) &get[1];
1369 if (reply_bf_size > 0)
1370 reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1372 GNUNET_DHT_GET_BLOOMFILTER_K);
1373 eval = GNUNET_BLOCK_evaluate (block_context,
1378 xquery, xquery_size,
1380 if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1382 /* request invalid or block type not supported */
1383 GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1384 if (NULL != reply_bf)
1385 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1389 GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
1393 /* remember request for routing replies */
1394 GDS_ROUTING_add (peer,
1397 xquery, xquery_size,
1398 reply_bf, get->reply_bf_mutator);
1399 /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1401 /* local lookup (this may update the reply_bf) */
1402 GDS_DATACACHE_handle_get (&get->key,
1404 xquery, xquery_size,
1406 get->reply_bf_mutator);
1407 /* FIXME: should track if the local lookup resulted in a
1408 definitive result and then NOT do P2P forwarding */
1410 /* P2P forwarding */
1411 GDS_NEIGHBOURS_handle_get (type,
1413 ntohl (get->desired_replication_level),
1414 ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1416 xquery, xquery_size,
1418 get->reply_bf_mutator,
1421 if (NULL != reply_bf)
1422 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1423 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
1429 * Core handler for p2p result messages.
1431 * @param cls closure
1432 * @param message message
1433 * @param peer peer identity this notification is about
1434 * @param atsi performance data
1435 * @return GNUNET_YES (do not cut p2p connection)
1438 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1439 const struct GNUNET_MessageHeader *message,
1440 const struct GNUNET_TRANSPORT_ATS_Information
1443 const struct PeerResultMessage *prm;
1444 const struct GNUNET_PeerIdentity *put_path;
1445 const struct GNUNET_PeerIdentity *get_path;
1447 uint32_t get_path_length;
1448 uint32_t put_path_length;
1451 enum GNUNET_BLOCK_Type type;
1453 /* parse and validate message */
1454 msize = ntohs (message->size);
1455 if (msize < sizeof (struct PeerResultMessage))
1457 GNUNET_break_op (0);
1460 prm = (struct PeerResultMessage *) message;
1461 put_path_length = ntohl (prm->put_path_length);
1462 get_path_length = ntohl (prm->get_path_length);
1463 if ( (msize < sizeof (struct PeerResultMessage) +
1464 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1465 (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1466 (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1468 GNUNET_break_op (0);
1471 put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1472 get_path = &put_path[put_path_length];
1473 type = ntohl (prm->type);
1474 data = (const void*) &get_path[get_path_length];
1475 data_size = msize - (sizeof (struct PeerResultMessage) +
1476 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1477 /* append 'peer' to 'get_path' */
1479 struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1481 memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1482 xget_path[get_path_length] = *peer;
1484 /* forward to local clients */
1485 GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration),
1487 get_path_length + 1,
1495 /* forward to other peers */
1496 GDS_ROUTING_process (type,
1497 GNUNET_TIME_absolute_ntoh (prm->expiration),
1501 get_path_length + 1,
1511 * Initialize neighbours subsystem.
1513 * @return GNUNET_OK on success, GNUNET_SYSERR on error
1516 GDS_NEIGHBOURS_init ()
1518 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1519 {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1520 {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1521 {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1524 unsigned long long temp_config_num;
1525 struct GNUNET_TIME_Relative next_send_time;
1528 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
1530 bucket_size = (unsigned int) temp_config_num;
1531 coreAPI = GNUNET_CORE_connect (GDS_cfg,
1535 &handle_core_connect,
1536 &handle_core_disconnect,
1537 NULL, /* Do we care about "status" updates? */
1541 if (coreAPI == NULL)
1542 return GNUNET_SYSERR;
1543 all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1546 next_send_time.rel_value =
1547 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1548 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1549 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1551 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1552 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1553 &send_find_peer_message,
1554 &find_peer_context);
1561 * Shutdown neighbours subsystem.
1564 GDS_NEIGHBOURS_done ()
1566 if (coreAPI == NULL)
1568 GNUNET_CORE_disconnect (coreAPI);
1570 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
1571 GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1572 all_known_peers = NULL;
1573 if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1575 GNUNET_SCHEDULER_cancel (find_peer_task);
1576 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1581 /* end of gnunet-service-dht_neighbours.c */
1586 /* Forward declaration */
1588 update_core_preference (void *cls,
1589 const struct GNUNET_SCHEDULER_TaskContext *tc);
1593 * Function called with statistics about the given peer.
1595 * @param cls closure
1596 * @param peer identifies the peer
1597 * @param bpm_out set to the current bandwidth limit (sending) for this peer
1598 * @param amount set to the amount that was actually reserved or unreserved;
1599 * either the full requested amount or zero (no partial reservations)
1600 * @param res_delay if the reservation could not be satisfied (amount was 0), how
1601 * long should the client wait until re-trying?
1602 * @param preference current traffic preference for the given peer
1605 update_core_preference_finish (void *cls,
1606 const struct GNUNET_PeerIdentity *peer,
1607 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1609 struct GNUNET_TIME_Relative res_delay,
1610 uint64_t preference)
1612 struct PeerInfo *peer_info = cls;
1614 peer_info->info_ctx = NULL;
1615 GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
1616 &update_core_preference, peer_info);
1621 update_core_preference (void *cls,
1622 const struct GNUNET_SCHEDULER_TaskContext *tc)
1624 struct PeerInfo *peer = cls;
1625 uint64_t preference;
1626 unsigned int matching;
1628 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1633 GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
1634 &peer->id.hashPubKey);
1638 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1639 "Peer identifier matches by %u bits, only shifting as much as we can!\n",
1644 preference = 1LL << matching;
1646 GNUNET_CORE_peer_change_preference (core_api, &peer->id,
1647 GNUNET_TIME_UNIT_FOREVER_REL,
1648 GNUNET_BANDWIDTH_VALUE_MAX, 0,
1650 &update_core_preference_finish, peer);