2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file dht/gnunet-service-dht.c
23 * @brief GNUnet DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
28 * - use OPTION_MULTIPLE instead of linked list for the forward_list.hashmap
29 * - use different 'struct DHT_MessageContext' for the different types of
30 * messages (currently rather confusing, especially with things like
31 * peer bloom filters occuring when processing replies).
35 #include "gnunet_block_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_getopt_lib.h"
38 #include "gnunet_os_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_service_lib.h"
41 #include "gnunet_nse_service.h"
42 #include "gnunet_core_service.h"
43 #include "gnunet_signal_lib.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet_datacache_lib.h"
46 #include "gnunet_transport_service.h"
47 #include "gnunet_hello_lib.h"
48 #include "gnunet_dht_service.h"
49 #include "gnunet_statistics_service.h"
55 * Defines whether find peer requests send their HELLO's outgoing,
56 * or expect replies to contain hellos.
58 #define FIND_PEER_WITH_HELLO GNUNET_YES
60 #define DEFAULT_CORE_QUEUE_SIZE 32
63 * Minimum number of peers we need for "good" routing,
64 * any less than this and we will allow messages to
65 * travel much further through the network!
67 #define MINIMUM_PEER_THRESHOLD 20
70 * Number of requests we track at most (for routing replies).
72 #define DHT_MAX_RECENT (1024 * 16)
75 * How long do we wait at most when queueing messages with core
76 * that we are sending on behalf of other peers.
78 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
81 * Default importance for handling messages on behalf of other peers.
83 #define DHT_DEFAULT_P2P_IMPORTANCE 0
86 * How long to keep recent requests around by default.
88 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
91 * Default time to wait to send find peer messages sent by the dht service.
93 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
96 * Default importance for find peer messages sent by the dht service.
98 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
101 * Default replication parameter for find peer messages sent by the dht service.
103 #define DHT_DEFAULT_FIND_PEER_REPLICATION 4
106 * How long at least to wait before sending another find peer request.
108 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
111 * How long at most to wait before sending another find peer request.
113 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
116 * How often to update our preference levels for peers in our routing tables.
118 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
121 * How long at most on average will we allow a reply forward to take
122 * (before we quit sending out new requests)
124 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
127 * How many time differences between requesting a core send and
128 * the actual callback to remember.
130 #define MAX_REPLY_TIMES 8
136 * Context containing information about a DHT message received.
138 struct DHT_MessageContext
141 * The client this request was received from.
142 * (NULL if received from another peer)
144 struct ClientList *client;
147 * The peer this request was received from.
149 struct GNUNET_PeerIdentity peer;
152 * Bloomfilter for this routing request.
154 struct GNUNET_CONTAINER_BloomFilter *bloom;
157 * extended query (see gnunet_block_lib.h).
162 * Bloomfilter to filter out duplicate replies.
164 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
167 * The key this request was about
172 * How long should we wait to transmit this request?
174 struct GNUNET_TIME_Relative timeout;
177 * The unique identifier of this request
182 * Number of bytes in xquery.
187 * Mutator value for the reply_bf, see gnunet_block_lib.h
189 uint32_t reply_bf_mutator;
192 * Desired replication level
194 uint32_t replication;
197 * Network size estimate, either ours or the sum of
198 * those routed to thus far. =~ Log of number of peers
199 * chosen from for this request.
201 uint32_t network_size;
204 * Any message options for this request
206 uint32_t msg_options;
209 * How many hops has the message already traversed?
214 * How many peer identities are present in the path history?
216 uint32_t path_history_len;
224 * How important is this message?
226 unsigned int importance;
229 * Should we (still) forward the request on to other peers?
234 * Did we forward this message? (may need to remember it!)
239 * Are we the closest known peer to this key (out of our neighbors?)
246 * Record used for remembering what peers are waiting for what
247 * responses (based on search key).
249 struct DHTRouteSource
254 struct DHTRouteSource *next;
259 struct DHTRouteSource *prev;
262 * UID of the request, 0 if from another peer.
267 * Source of the request. Replies should be forwarded to
270 struct GNUNET_PeerIdentity source;
273 * If this was a local request, remember the client; otherwise NULL.
275 struct ClientList *client;
278 * Pointer to this nodes heap location (for removal)
280 struct GNUNET_CONTAINER_HeapNode *hnode;
283 * Back pointer to the record storing this information.
285 struct DHTQueryRecord *record;
288 * Task to remove this entry on timeout.
290 GNUNET_SCHEDULER_TaskIdentifier delete_task;
293 * Bloomfilter of peers we have already sent back as
294 * replies to the initial request. Allows us to not
295 * forward the same peer multiple times for a find peer
298 struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
304 * Entry in the DHT routing table.
306 struct DHTQueryRecord
309 * Head of DLL for result forwarding.
311 struct DHTRouteSource *head;
314 * Tail of DLL for result forwarding.
316 struct DHTRouteSource *tail;
319 * Key that the record concerns.
327 * Context used to calculate the number of find peer messages
328 * per X time units since our last scheduled find peer message
329 * was sent. If we have seen too many messages, delay or don't
332 struct FindPeerMessageContext
336 struct GNUNET_TIME_Absolute start;
344 * Position of this node in the min heap.
346 struct GNUNET_CONTAINER_HeapNode *heap_node;
349 * Bloomfilter containing entries for peers
350 * we forwarded this request to.
352 struct GNUNET_CONTAINER_BloomFilter *bloom;
355 * Timestamp of this request, for ordering
358 struct GNUNET_TIME_Absolute timestamp;
361 * Key of this request.
366 * Unique identifier for this request, 0 if from another peer.
371 * Task to remove this entry on timeout.
373 GNUNET_SCHEDULER_TaskIdentifier remove_task;
378 * Recent requests by time inserted.
380 static struct GNUNET_CONTAINER_Heap *recent_heap;
383 * Context to use to calculate find peer rates.
385 static struct FindPeerMessageContext find_peer_context;
388 * How many peers have we added since we sent out our last
391 static unsigned int newly_found_peers;
394 * Handle for the statistics service.
396 struct GNUNET_STATISTICS_Handle *stats;
399 * Handle to get our current HELLO.
401 static struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
404 * The configuration the DHT service is running with
406 static const struct GNUNET_CONFIGURATION_Handle *cfg;
409 * Handle to the core service
411 static struct GNUNET_CORE_Handle *coreAPI;
414 * Handle to the transport service, for getting our hello
416 static struct GNUNET_TRANSPORT_Handle *transport_handle;
419 * The identity of our peer.
421 static struct GNUNET_PeerIdentity my_identity;
424 * Short id of the peer, for printing
426 static char *my_short_id;
431 static struct GNUNET_MessageHeader *my_hello;
434 * Task to run when we shut down, cleaning up all our trash
436 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
439 * Recently seen find peer requests.
441 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
444 * Reply times for requests, if we are busy, don't send any
447 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
450 * Current counter for replies.
452 static unsigned int reply_counter;
455 * Our handle to the BLOCK library.
457 static struct GNUNET_BLOCK_Context *block_context;
461 /** Declare here so retry_core_send is aware of it */
463 core_transmit_notify (void *cls, size_t size, void *buf);
468 * Given the largest send delay, artificially decrease it
469 * so the next time around we may have a chance at sending
473 decrease_max_send_delay (struct GNUNET_TIME_Relative max_time)
477 for (i = 0; i < MAX_REPLY_TIMES; i++)
479 if (reply_times[i].rel_value == max_time.rel_value)
481 reply_times[i].rel_value = reply_times[i].rel_value / 2;
489 * Find the maximum send time of the recently sent values.
491 * @return the average time between asking core to send a message
492 * and when the buffer for copying it is passed
494 static struct GNUNET_TIME_Relative
495 get_max_send_delay ()
498 struct GNUNET_TIME_Relative max_time;
500 max_time = GNUNET_TIME_relative_get_zero ();
502 for (i = 0; i < MAX_REPLY_TIMES; i++)
504 if (reply_times[i].rel_value > max_time.rel_value)
505 max_time.rel_value = reply_times[i].rel_value;
508 if (max_time.rel_value > MAX_REQUEST_TIME.rel_value)
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n",
510 (unsigned long long) max_time.rel_value);
517 increment_stats (const char *value)
521 GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
526 decrement_stats (const char *value)
530 GNUNET_STATISTICS_update (stats, value, -1, GNUNET_NO);
535 * Try to send another message from our core send list
538 try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
540 struct PeerInfo *peer = cls;
541 struct P2PPendingMessage *pending;
544 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
546 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
549 if (peer->th != NULL)
550 return; /* Message send already in progress */
552 pending = peer->head;
555 ssize = ntohs (pending->msg->size);
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n",
559 my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id));
561 pending->scheduled = GNUNET_TIME_absolute_get ();
563 if (reply_counter >= MAX_REPLY_TIMES)
566 GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
568 pending->timeout, &peer->id, ssize,
569 &core_transmit_notify, peer);
570 if (peer->th == NULL)
571 increment_stats ("# notify transmit ready failed");
577 * Function called to send a request out to another peer.
578 * Called both for locally initiated requests and those
579 * received from other peers.
581 * @param msg the encapsulated message
582 * @param peer the peer to forward the message to
583 * @param msg_ctx the context of the message (hop count, bloom, etc.)
586 forward_result_message (const struct GNUNET_MessageHeader *msg,
587 struct PeerInfo *peer,
588 struct DHT_MessageContext *msg_ctx)
590 struct GNUNET_DHT_P2PRouteResultMessage *result_message;
591 struct P2PPendingMessage *pending;
597 increment_stats (STAT_RESULT_FORWARDS);
599 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) +
600 (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len);
601 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
602 psize = sizeof (struct P2PPendingMessage) + msize;
603 pending = GNUNET_malloc (psize);
604 pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
605 pending->importance = DHT_SEND_PRIORITY;
606 pending->timeout = GNUNET_TIME_relative_get_forever ();
607 result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg;
608 result_message->header.size = htons (msize);
609 result_message->header.type =
610 htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
611 result_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
612 if (msg_ctx->path_history_len > 0)
614 /* End of pending is where enc_msg starts */
615 path_start = (char *) &pending[1];
616 /* Offset by the size of the enc_msg */
617 path_start += ntohs (msg->size);
618 memcpy (path_start, msg_ctx->path_history,
619 msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
621 result_message->options = htonl (msg_ctx->msg_options);
622 result_message->hop_count = htonl (msg_ctx->hop_count + 1);
623 memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
624 /* Copy the enc_msg, then the path history as well! */
625 memcpy (&result_message[1], msg, ntohs (msg->size));
626 path_offset = (char *) &result_message[1];
627 path_offset += ntohs (msg->size);
628 /* If we have path history, copy it to the end of the whole thing */
629 if (msg_ctx->path_history_len > 0)
630 memcpy (path_offset, msg_ctx->path_history,
631 msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
633 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634 "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
635 "DHT", msize, GNUNET_i2s (&peer->id));
637 peer->pending_count++;
638 increment_stats ("# pending messages scheduled");
639 GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
641 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
642 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
647 * Called when core is ready to send a message we asked for
648 * out to the destination.
650 * @param cls closure (NULL)
651 * @param size number of bytes available in buf
652 * @param buf where the callee should write the message
653 * @return number of bytes written to buf
656 core_transmit_notify (void *cls, size_t size, void *buf)
658 struct PeerInfo *peer = cls;
660 struct P2PPendingMessage *pending;
668 /* client disconnected */
670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n",
676 if (peer->head == NULL)
680 pending = peer->head;
681 while (NULL != pending &&
682 (size - off >= (msize = ntohs (pending->msg->size))))
684 memcpy (&cbuf[off], pending->msg, msize);
686 peer->pending_count--;
687 increment_stats ("# pending messages sent");
688 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
689 GNUNET_free (pending);
690 pending = peer->head;
692 if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
693 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
700 * Compute the distance between have and target as a 32-bit value.
701 * Differences in the lower bits must count stronger than differences
702 * in the higher bits.
704 * @return 0 if have==target, otherwise a number
705 * that is larger as the distance between
706 * the two hash codes increases
709 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
716 /* We have to represent the distance between two 2^9 (=512)-bit
717 * numbers as a 2^5 (=32)-bit number with "0" being used for the
718 * two numbers being identical; furthermore, we need to
719 * guarantee that a difference in the number of matching
720 * bits is always represented in the result.
722 * We use 2^32/2^9 numerical values to distinguish between
723 * hash codes that have the same LSB bit distance and
724 * use the highest 2^9 bits of the result to signify the
725 * number of (mis)matching LSB bits; if we have 0 matching
726 * and hence 512 mismatching LSB bits we return -1 (since
727 * 512 itself cannot be represented with 9 bits) */
729 /* first, calculate the most significant 9 bits of our
730 * result, aka the number of LSBs */
731 bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
732 /* bucket is now a value between 0 and 512 */
734 return 0; /* perfect match */
736 return (unsigned int) -1; /* LSB differs; use max (if we did the bit-shifting
737 * below, we'd end up with max+1 (overflow)) */
739 /* calculate the most significant bits of the final result */
740 msb = (512 - bucket) << (32 - 9);
741 /* calculate the 32-9 least significant bits of the final result by
742 * looking at the differences in the 32-9 bits following the
743 * mismatching bit at 'bucket' */
746 (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
748 if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
749 GNUNET_CRYPTO_hash_get_bit (have, i))
750 lsb |= (1 << (bucket + 32 - 9 - i)); /* first bit set will be 10,
751 * last bit set will be 31 -- if
752 * i does not reach 512 first... */
759 * Return a number that is larger the closer the
760 * "have" GNUNET_hash code is to the "target".
762 * @return inverse distance metric, non-zero.
763 * Must fudge the value if NO bits match.
766 inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
768 if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
769 return 1; /* Never return 0! */
770 return ((unsigned int) -1) - distance (target, have);
775 * Find which k-bucket this peer should go into,
776 * taking into account the size of the k-bucket
777 * array. This means that if more bits match than
778 * there are currently buckets, lowest_bucket will
781 * @param hc GNUNET_HashCode we are finding the bucket for.
783 * @return the proper bucket index for this key,
784 * or GNUNET_SYSERR on error (same hashcode)
787 find_current_bucket (const GNUNET_HashCode * hc)
791 actual_bucket = find_bucket (hc);
792 if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
793 return lowest_bucket;
794 if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
795 return lowest_bucket;
796 return actual_bucket;
801 * Find a routing table entry from a peer identity
803 * @param peer the peer identity to look up
805 * @return the routing table entry, or NULL if not found
807 static struct PeerInfo *
808 find_peer_by_id (const struct GNUNET_PeerIdentity *peer)
811 struct PeerInfo *pos;
813 bucket = find_current_bucket (&peer->hashPubKey);
815 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
818 pos = k_buckets[bucket].head;
821 if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity)))
825 return NULL; /* No such peer. */
828 /* Forward declaration */
830 update_core_preference (void *cls,
831 const struct GNUNET_SCHEDULER_TaskContext *tc);
835 * Function called with statistics about the given peer.
838 * @param peer identifies the peer
839 * @param bpm_out set to the current bandwidth limit (sending) for this peer
840 * @param amount set to the amount that was actually reserved or unreserved;
841 * either the full requested amount or zero (no partial reservations)
842 * @param res_delay if the reservation could not be satisfied (amount was 0), how
843 * long should the client wait until re-trying?
844 * @param preference current traffic preference for the given peer
847 update_core_preference_finish (void *cls,
848 const struct GNUNET_PeerIdentity *peer,
849 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
851 struct GNUNET_TIME_Relative res_delay,
854 struct PeerInfo *peer_info = cls;
856 peer_info->info_ctx = NULL;
857 GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
858 &update_core_preference, peer_info);
862 update_core_preference (void *cls,
863 const struct GNUNET_SCHEDULER_TaskContext *tc)
865 struct PeerInfo *peer = cls;
867 unsigned int matching;
869 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
874 GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
875 &peer->id.hashPubKey);
879 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
880 "Peer identifier matches by %u bits, only shifting as much as we can!\n",
885 preference = 1LL << matching;
887 GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
888 GNUNET_TIME_UNIT_FOREVER_REL,
889 GNUNET_BANDWIDTH_VALUE_MAX, 0,
891 &update_core_preference_finish, peer);
895 * Find the closest peer in our routing table to the
898 * @return The closest peer in our routing table to the
899 * key, or NULL on error.
901 static struct PeerInfo *
902 find_closest_peer (const GNUNET_HashCode * hc)
904 struct PeerInfo *pos;
905 struct PeerInfo *current_closest;
906 unsigned int lowest_distance;
907 unsigned int temp_distance;
911 lowest_distance = -1;
913 if (k_buckets[lowest_bucket].peers_size == 0)
916 current_closest = NULL;
917 for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
919 pos = k_buckets[bucket].head;
921 while ((pos != NULL) && (count < bucket_size))
923 temp_distance = distance (&pos->id.hashPubKey, hc);
924 if (temp_distance <= lowest_distance)
926 lowest_distance = temp_distance;
927 current_closest = pos;
933 GNUNET_assert (current_closest != NULL);
934 return current_closest;
939 * Function called to send a request out to another peer.
940 * Called both for locally initiated requests and those
941 * received from other peers.
943 * @param msg the encapsulated message
944 * @param peer the peer to forward the message to
945 * @param msg_ctx the context of the message (hop count, bloom, etc.)
948 forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer,
949 struct DHT_MessageContext *msg_ctx)
951 struct GNUNET_DHT_P2PRouteMessage *route_message;
952 struct P2PPendingMessage *pending;
957 increment_stats (STAT_ROUTE_FORWARDS);
958 GNUNET_assert (peer != NULL);
959 if ((msg_ctx->closest != GNUNET_YES) &&
960 (peer == find_closest_peer (&msg_ctx->key)))
961 increment_stats (STAT_ROUTE_FORWARDS_CLOSEST);
964 sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) +
965 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
966 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
967 psize = sizeof (struct P2PPendingMessage) + msize;
968 pending = GNUNET_malloc (psize);
969 pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
970 pending->importance = msg_ctx->importance;
971 pending->timeout = msg_ctx->timeout;
972 route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg;
973 route_message->header.size = htons (msize);
974 route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
975 route_message->options = htonl (msg_ctx->msg_options);
976 route_message->hop_count = htonl (msg_ctx->hop_count + 1);
977 route_message->network_size = htonl (msg_ctx->network_size);
978 route_message->desired_replication_level = htonl (msg_ctx->replication);
979 if (msg_ctx->bloom != NULL)
980 GNUNET_assert (GNUNET_OK ==
981 GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
985 memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
986 memcpy (&route_message[1], msg, ntohs (msg->size));
987 if (GNUNET_DHT_RO_RECORD_ROUTE ==
988 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
990 route_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
991 /* Set pointer to start of enc_msg */
992 route_path = (char *) &route_message[1];
993 /* Offset to the end of the enc_msg */
994 route_path += ntohs (msg->size);
995 /* Copy the route_path after enc_msg */
996 memcpy (route_path, msg_ctx->path_history,
997 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1000 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001 "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1002 "DHT", msize, GNUNET_i2s (&peer->id));
1004 peer->pending_count++;
1005 increment_stats ("# pending messages scheduled");
1006 GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1008 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1009 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1016 * Called when a reply needs to be sent to a client, as
1017 * a result it found to a GET or FIND PEER request.
1019 * @param client the client to send the reply to
1020 * @param message the encapsulated message to send
1021 * @param msg_ctx the context of the received message
1024 send_reply_to_client (struct ClientList *client,
1025 const struct GNUNET_MessageHeader *message,
1026 struct DHT_MessageContext *msg_ctx)
1028 struct GNUNET_DHT_RouteResultMessage *reply;
1029 struct PendingMessage *pending_message;
1035 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n",
1036 my_short_id, "DHT");
1038 msize = ntohs (message->size);
1040 sizeof (struct GNUNET_DHT_RouteResultMessage) + msize +
1041 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1042 if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1044 GNUNET_break_op (0);
1047 pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1048 pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1];
1049 reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1];
1050 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1051 reply->header.size = htons (tsize);
1052 reply->outgoing_path_length = htonl (msg_ctx->path_history_len);
1053 reply->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1054 memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1055 reply_offset = (char *) &reply[1];
1056 memcpy (&reply[1], message, msize);
1057 if (msg_ctx->path_history_len > 0)
1059 reply_offset += msize;
1060 memcpy (reply_offset, msg_ctx->path_history,
1061 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1063 add_pending_message (client, pending_message);
1067 * Consider whether or not we would like to have this peer added to
1068 * our routing table. Check whether bucket for this peer is full,
1069 * if so return negative; if not return positive. Since peers are
1070 * only added on CORE level connect, this doesn't actually add the
1071 * peer to the routing table.
1073 * @param peer the peer we are considering adding
1075 * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1079 consider_peer (struct GNUNET_PeerIdentity *peer)
1084 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
1085 &peer->hashPubKey)) ||
1086 (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))))
1087 return GNUNET_NO; /* We already know this peer (are connected even!) */
1088 bucket = find_current_bucket (&peer->hashPubKey);
1090 if ((k_buckets[bucket].peers_size < bucket_size) ||
1091 ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1099 * Task used to remove forwarding entries, either
1100 * after timeout, when full, or on shutdown.
1102 * @param cls the entry to remove
1103 * @param tc context, reason, etc.
1106 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1108 struct DHTRouteSource *source_info = cls;
1109 struct DHTQueryRecord *record;
1111 source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode);
1112 record = source_info->record;
1113 GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1115 if (record->head == NULL) /* No more entries in DLL */
1117 GNUNET_assert (GNUNET_YES ==
1118 GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1119 &record->key, record));
1120 GNUNET_free (record);
1122 if (source_info->find_peers_responded != NULL)
1123 GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1124 GNUNET_free (source_info);
1128 * Main function that handles whether or not to route a result
1129 * message to other peers, or to send to our local client.
1131 * @param msg the result message to be routed
1132 * @param msg_ctx context of the message we are routing
1134 * @return the number of peers the message was routed to,
1135 * GNUNET_SYSERR on failure
1138 route_result_message (struct GNUNET_MessageHeader *msg,
1139 struct DHT_MessageContext *msg_ctx)
1141 struct GNUNET_PeerIdentity new_peer;
1142 struct DHTQueryRecord *record;
1143 struct DHTRouteSource *pos;
1144 struct PeerInfo *peer_info;
1145 const struct GNUNET_MessageHeader *hello_msg;
1151 increment_stats (STAT_RESULTS);
1153 * If a find peer result message is received and contains a valid
1154 * HELLO for another peer, offer it to the transport service.
1156 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1158 if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader))
1159 GNUNET_break_op (0);
1161 hello_msg = &msg[1];
1162 if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1164 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg,
1167 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1168 "%s:%s Received non-HELLO message type in find peer result message!\n",
1169 my_short_id, "DHT");
1170 GNUNET_break_op (0);
1173 else /* We have a valid hello, and peer id stored in new_peer */
1175 find_peer_context.count++;
1176 increment_stats (STAT_FIND_PEER_REPLY);
1177 if (GNUNET_YES == consider_peer (&new_peer))
1179 increment_stats (STAT_HELLOS_PROVIDED);
1180 GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL);
1181 GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL);
1187 GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1189 if (record == NULL) /* No record of this message! */
1192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193 "`%s:%s': Have no record of response key %s uid %llu\n",
1194 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1195 msg_ctx->unique_id);
1203 if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
1206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207 "`%s:%s': Sending response key %s uid %llu to client\n",
1208 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1209 msg_ctx->unique_id);
1211 increment_stats (STAT_RESULTS_TO_CLIENT);
1212 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
1213 increment_stats (STAT_GET_REPLY);
1215 for (i = 0; i < msg_ctx->path_history_len; i++)
1220 &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "(before client) Key %s Found peer %d:%s\n",
1223 GNUNET_h2s (&msg_ctx->key), i,
1224 GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1227 send_reply_to_client (pos->client, msg, msg_ctx);
1229 else /* Send to peer */
1231 peer_info = find_peer_by_id (&pos->source);
1232 if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239 "`%s:%s': Forwarding response key %s uid %llu to peer %s\n",
1240 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1241 msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
1243 forward_result_message (msg, peer_info, msg_ctx);
1244 /* Try removing forward entries after sending once, only allows ONE response per request */
1245 if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
1247 GNUNET_SCHEDULER_cancel (pos->delete_task);
1249 GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);
1261 * Main function that handles whether or not to route a message to other
1264 * @param msg the message to be routed
1265 * @param msg_ctx the context containing all pertinent information about the message
1268 route_message (const struct GNUNET_MessageHeader *msg,
1269 struct DHT_MessageContext *msg_ctx);
1273 * Server handler for all dht get requests, look for data,
1274 * if found, send response either to clients or other peers.
1276 * @param msg the actual get message
1277 * @param msg_ctx struct containing pertinent information about the get request
1279 * @return number of items found for GET request
1282 handle_dht_get (const struct GNUNET_MessageHeader *msg,
1283 struct DHT_MessageContext *msg_ctx)
1285 const struct GNUNET_DHT_GetMessage *get_msg;
1288 unsigned int results;
1290 enum GNUNET_BLOCK_Type type;
1292 msize = ntohs (msg->size);
1293 if (msize < sizeof (struct GNUNET_DHT_GetMessage))
1298 get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1299 bf_size = ntohs (get_msg->bf_size);
1300 msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
1301 msg_ctx->reply_bf_mutator = get_msg->bf_mutator;
1303 sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
1305 GNUNET_break_op (0);
1308 end = (const char *) &get_msg[1];
1309 if (msg_ctx->xquery_size == 0)
1311 msg_ctx->xquery = NULL;
1315 msg_ctx->xquery = (const void *) end;
1316 end += msg_ctx->xquery_size;
1320 msg_ctx->reply_bf = NULL;
1325 GNUNET_CONTAINER_bloomfilter_init (end, bf_size,
1326 GNUNET_DHT_GET_BLOOMFILTER_K);
1328 type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
1330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331 "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
1332 my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key),
1333 msg_ctx->unique_id);
1335 increment_stats (STAT_GETS);
1337 msg_ctx->do_forward = GNUNET_YES;
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340 "`%s:%s': Found %d results for `%s' request uid %llu\n",
1341 my_short_id, "DHT", results, "GET", msg_ctx->unique_id);
1348 /* check query valid */
1349 if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID ==
1350 GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key,
1351 &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator,
1352 msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0))
1354 GNUNET_break_op (0);
1355 msg_ctx->do_forward = GNUNET_NO;
1359 if (msg_ctx->do_forward == GNUNET_YES)
1360 route_message (msg, msg_ctx);
1361 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
1367 remove_recent_find_peer (void *cls,
1368 const struct GNUNET_SCHEDULER_TaskContext *tc)
1370 GNUNET_HashCode *key = cls;
1372 GNUNET_assert (GNUNET_YES ==
1373 GNUNET_CONTAINER_multihashmap_remove
1374 (recent_find_peer_requests, key, NULL));
1380 * Server handler for initiating local dht find peer requests
1382 * @param find_msg the actual find peer message
1383 * @param msg_ctx struct containing pertinent information about the request
1387 handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
1388 struct DHT_MessageContext *msg_ctx)
1390 struct GNUNET_MessageHeader *find_peer_result;
1391 struct GNUNET_DHT_FindPeerMessage *find_peer_message;
1392 struct DHT_MessageContext *new_msg_ctx;
1393 struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
1396 GNUNET_HashCode *recent_hash;
1397 struct GNUNET_MessageHeader *other_hello;
1398 size_t other_hello_size;
1399 struct GNUNET_PeerIdentity peer_id;
1401 find_peer_message = (struct GNUNET_DHT_FindPeerMessage *) find_msg;
1402 GNUNET_break_op (ntohs (find_msg->size) >=
1403 (sizeof (struct GNUNET_DHT_FindPeerMessage)));
1404 if (ntohs (find_msg->size) < sizeof (struct GNUNET_DHT_FindPeerMessage))
1407 other_hello_size = 0;
1408 if (ntohs (find_msg->size) > sizeof (struct GNUNET_DHT_FindPeerMessage))
1411 ntohs (find_msg->size) - sizeof (struct GNUNET_DHT_FindPeerMessage);
1412 other_hello = GNUNET_malloc (other_hello_size);
1413 memcpy (other_hello, &find_peer_message[1], other_hello_size);
1414 if ((GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) other_hello) == 0)
1416 GNUNET_HELLO_get_id ((struct GNUNET_HELLO_Message *) other_hello,
1419 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1420 "Received invalid HELLO message in find peer request!\n");
1421 GNUNET_free (other_hello);
1424 #if FIND_PEER_WITH_HELLO
1425 if (GNUNET_YES == consider_peer (&peer_id))
1427 increment_stats (STAT_HELLOS_PROVIDED);
1428 GNUNET_TRANSPORT_offer_hello (transport_handle, other_hello, NULL, NULL);
1429 GNUNET_CORE_peer_request_connect (coreAPI, &peer_id, NULL, NULL);
1430 route_message (find_msg, msg_ctx);
1431 GNUNET_free (other_hello);
1434 else /* We don't want this peer! */
1436 route_message (find_msg, msg_ctx);
1437 GNUNET_free (other_hello);
1444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1445 "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
1446 my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&msg_ctx->key),
1447 ntohs (find_msg->size), sizeof (struct GNUNET_MessageHeader));
1449 if (my_hello == NULL)
1452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453 "`%s': Our HELLO is null, can't return.\n", "DHT");
1455 GNUNET_free_non_null (other_hello);
1456 route_message (find_msg, msg_ctx);
1461 GNUNET_CONTAINER_bloomfilter_init (find_peer_message->bloomfilter,
1462 DHT_BLOOM_SIZE, DHT_BLOOM_K);
1464 GNUNET_CONTAINER_bloomfilter_test (incoming_bloom,
1465 &my_identity.hashPubKey))
1467 increment_stats (STAT_BLOOM_FIND_PEER);
1468 GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
1469 GNUNET_free_non_null (other_hello);
1470 route_message (find_msg, msg_ctx);
1471 return; /* We match the bloomfilter, do not send a response to this peer (they likely already know us!) */
1473 GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
1476 * Ignore any find peer requests from a peer we have seen very recently.
1478 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent_find_peer_requests, &msg_ctx->key)) /* We have recently responded to a find peer request for this peer! */
1480 increment_stats ("# dht find peer requests ignored (recently seen!)");
1481 GNUNET_free_non_null (other_hello);
1486 * Use this check to only allow the peer to respond to find peer requests if
1487 * it would be beneficial to have the requesting peer in this peers routing
1488 * table. Can be used to thwart peers flooding the network with find peer
1489 * requests that we don't care about. However, if a new peer is joining
1490 * the network and has no other peers this is a problem (assume all buckets
1491 * full, no one will respond!).
1493 memcpy (&peer_id.hashPubKey, &msg_ctx->key, sizeof (GNUNET_HashCode));
1494 if (GNUNET_NO == consider_peer (&peer_id))
1496 increment_stats ("# dht find peer requests ignored (do not need!)");
1497 GNUNET_free_non_null (other_hello);
1498 route_message (find_msg, msg_ctx);
1502 recent_hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
1503 memcpy (recent_hash, &msg_ctx->key, sizeof (GNUNET_HashCode));
1504 if (GNUNET_SYSERR !=
1505 GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests,
1506 &msg_ctx->key, NULL,
1507 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511 "Adding recent remove task for key `%s`!\n",
1512 GNUNET_h2s (&msg_ctx->key));
1514 /* Only add a task if there wasn't one for this key already! */
1515 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
1516 (GNUNET_TIME_UNIT_SECONDS, 30),
1517 &remove_recent_find_peer, recent_hash);
1521 GNUNET_free (recent_hash);
1523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524 "Received duplicate find peer request too soon!\n");
1528 /* Simplistic find_peer functionality, always return our hello */
1529 hello_size = ntohs (my_hello->size);
1530 tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
1532 if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1534 GNUNET_break_op (0);
1535 GNUNET_free_non_null (other_hello);
1539 find_peer_result = GNUNET_malloc (tsize);
1540 find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
1541 find_peer_result->size = htons (tsize);
1542 memcpy (&find_peer_result[1], my_hello, hello_size);
1544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1545 "`%s': Sending hello size %d to requesting peer.\n", "DHT",
1549 new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
1550 memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1551 new_msg_ctx->peer = my_identity;
1552 new_msg_ctx->bloom =
1553 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1554 new_msg_ctx->hop_count = 0;
1555 new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make find peer requests a higher priority */
1556 new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
1557 increment_stats (STAT_FIND_PEER_ANSWER);
1558 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1559 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1561 new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1562 new_msg_ctx->path_history_len = msg_ctx->path_history_len;
1563 /* Assign to previous msg_ctx path history, caller should free after our return */
1564 new_msg_ctx->path_history = msg_ctx->path_history;
1566 route_result_message (find_peer_result, new_msg_ctx);
1567 GNUNET_free (new_msg_ctx);
1568 GNUNET_free_non_null (other_hello);
1569 GNUNET_free (find_peer_result);
1570 route_message (find_msg, msg_ctx);
1575 * Server handler for initiating local dht put requests
1577 * @param msg the actual put message
1578 * @param msg_ctx struct containing pertinent information about the request
1581 handle_dht_put (const struct GNUNET_MessageHeader *msg,
1582 struct DHT_MessageContext *msg_ctx)
1584 const struct GNUNET_DHT_PutMessage *put_msg;
1585 struct DHTPutEntry *put_entry;
1586 unsigned int put_size;
1588 enum GNUNET_BLOCK_Type put_type;
1591 GNUNET_HashCode key;
1592 struct DHTQueryRecord *record;
1594 GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage));
1596 put_msg = (const struct GNUNET_DHT_PutMessage *) msg;
1597 put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
1599 ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1601 GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size,
1603 if (GNUNET_NO == ret)
1606 GNUNET_break_op (0);
1609 if ((GNUNET_YES == ret) &&
1610 (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode))))
1612 /* invalid wrapper: key mismatch! */
1613 GNUNET_break_op (0);
1616 /* ret == GNUNET_SYSERR means that there is no known relationship between
1617 * data and the key, so we cannot check it */
1619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1621 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1622 msg_ctx->unique_id);
1625 record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
1629 struct DHTRouteSource *pos;
1630 struct GNUNET_DHT_GetResultMessage *get_result;
1631 struct DHT_MessageContext new_msg_ctx;
1637 /* TODO: do only for local started requests? or also for remote peers? */
1638 /* TODO: include this in statistics? under what? */
1639 /* TODO: reverse order of path_history? */
1640 if (NULL == pos->client)
1646 memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1647 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1648 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1650 new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1654 sizeof (struct GNUNET_DHT_GetResultMessage) + data_size +
1655 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1656 get_result = GNUNET_malloc (get_size);
1657 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1658 get_result->header.size = htons (get_size);
1659 get_result->expiration = put_msg->expiration;
1660 get_result->type = put_msg->type;
1661 get_result->put_path_length = htons (msg_ctx->path_history_len);
1663 /* Copy the actual data and the path_history to the end of the get result */
1664 memcpy (&get_result[1], &put_msg[1], data_size);
1665 path_offset = (char *) &get_result[1];
1666 path_offset += data_size;
1667 memcpy (path_offset, msg_ctx->path_history,
1668 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1669 new_msg_ctx.peer = my_identity;
1670 new_msg_ctx.bloom = NULL;
1671 new_msg_ctx.hop_count = 0;
1672 /* Make result routing a higher priority */
1673 new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;
1674 new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1675 new_msg_ctx.unique_id = pos->uid;
1676 send_reply_to_client(pos->client, &get_result->header, &new_msg_ctx);
1677 GNUNET_free (get_result);
1682 if (msg_ctx->closest != GNUNET_YES)
1684 route_message (msg, msg_ctx);
1689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1691 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1692 msg_ctx->unique_id);
1695 increment_stats (STAT_PUTS_INSERTED);
1697 route_message (msg, msg_ctx);
1702 * To how many peers should we (on average)
1703 * forward the request to obtain the desired
1704 * target_replication count (on average).
1706 * returns: target_replication / (est. hops) + (target_replication * hop_count)
1707 * where est. hops is typically 2 * the routing table depth
1709 * @param hop_count number of hops the message has traversed
1710 * @param target_replication the number of total paths desired
1712 * @return Some number of peers to forward the message to
1715 get_forward_count (unsigned int hop_count, size_t target_replication)
1717 uint32_t random_value;
1718 unsigned int forward_count;
1721 if (hop_count > log_of_network_size_estimate * 4.0)
1723 /* forcefully terminate */
1727 if (hop_count > log_of_network_size_estimate * 2.0)
1729 /* keep forwarding, but no more replication */
1734 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
1735 ((float) (target_replication - 1.0) *
1737 /* Set forward count to floor of target_value */
1738 forward_count = (unsigned int) target_value;
1739 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1740 target_value = target_value - forward_count;
1742 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
1743 if (random_value < (target_value * UINT32_MAX))
1745 return forward_count;
1750 * Check whether my identity is closer than any known peers.
1751 * If a non-null bloomfilter is given, check if this is the closest
1752 * peer that hasn't already been routed to.
1754 * @param target hash code to check closeness to
1755 * @param bloom bloomfilter, exclude these entries from the decision
1756 * @return GNUNET_YES if node location is closest,
1757 * GNUNET_NO otherwise.
1760 am_closest_peer (const GNUNET_HashCode * target,
1761 struct GNUNET_CONTAINER_BloomFilter *bloom)
1767 struct PeerInfo *pos;
1768 unsigned int my_distance;
1770 if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode)))
1773 bucket_num = find_current_bucket (target);
1775 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target);
1776 my_distance = distance (&my_identity.hashPubKey, target);
1777 pos = k_buckets[bucket_num].head;
1779 while ((pos != NULL) && (count < bucket_size))
1781 if ((bloom != NULL) &&
1783 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1786 continue; /* Skip already checked entries */
1789 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target);
1790 if (other_bits > bits)
1792 else if (other_bits == bits) /* We match the same number of bits */
1794 if (distance (&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
1800 /* No peers closer, we are the closest! */
1806 * Select a peer from the routing table that would be a good routing
1807 * destination for sending a message for "target". The resulting peer
1808 * must not be in the set of blocked peers.<p>
1810 * Note that we should not ALWAYS select the closest peer to the
1811 * target, peers further away from the target should be chosen with
1812 * exponentially declining probability.
1814 * @param target the key we are selecting a peer to route to
1815 * @param bloom a bloomfilter containing entries this request has seen already
1816 * @param hops how many hops has this message traversed thus far
1818 * @return Peer to route to, or NULL on error
1820 static struct PeerInfo *
1821 select_peer (const GNUNET_HashCode * target,
1822 struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
1826 unsigned int selected;
1827 struct PeerInfo *pos;
1828 unsigned int distance;
1829 unsigned int largest_distance;
1830 struct PeerInfo *chosen;
1832 if (hops >= log_of_network_size_estimate)
1834 /* greedy selection (closest peer that is not in bloomfilter) */
1835 largest_distance = 0;
1837 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1839 pos = k_buckets[bc].head;
1841 while ((pos != NULL) && (count < bucket_size))
1843 /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
1845 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1847 distance = inverse_distance (target, &pos->id.hashPubKey);
1848 if (distance > largest_distance)
1851 largest_distance = distance;
1858 if ((largest_distance > 0) && (chosen != NULL))
1860 GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey);
1863 return NULL; /* no peer available or we are the closest */
1867 /* select "random" peer */
1868 /* count number of peers that are available and not filtered */
1870 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1872 pos = k_buckets[bc].head;
1873 while ((pos != NULL) && (count < bucket_size))
1876 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1879 increment_stats ("# peer blocked from selection by Bloom filter");
1880 continue; /* Ignore bloomfiltered peers */
1886 if (count == 0) /* No peers to select from! */
1888 increment_stats ("# failed to select peer");
1891 /* Now actually choose a peer */
1892 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1894 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1896 pos = k_buckets[bc].head;
1897 while ((pos != NULL) && (count < bucket_size))
1900 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1903 continue; /* Ignore bloomfiltered peers */
1905 if (0 == selected--)
1916 * Remember this routing request so that if a reply is
1917 * received we can either forward it to the correct peer
1918 * or return the result locally.
1920 * @param msg_ctx Context of the route request
1922 * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1925 cache_response (struct DHT_MessageContext *msg_ctx)
1927 struct DHTQueryRecord *record;
1928 struct DHTRouteSource *source_info;
1929 struct DHTRouteSource *pos;
1930 struct GNUNET_TIME_Absolute now;
1931 unsigned int current_size;
1933 current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
1935 while (current_size >= MAX_OUTSTANDING_FORWARDS)
1937 source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
1938 GNUNET_assert (source_info != NULL);
1939 record = source_info->record;
1940 GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1941 if (record->head == NULL) /* No more entries in DLL */
1943 GNUNET_assert (GNUNET_YES ==
1944 GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1947 GNUNET_free (record);
1949 if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
1951 GNUNET_SCHEDULER_cancel (source_info->delete_task);
1952 source_info->delete_task = GNUNET_SCHEDULER_NO_TASK;
1954 if (source_info->find_peers_responded != NULL)
1955 GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1956 GNUNET_free (source_info);
1957 current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
1960 /** Non-local request and have too many outstanding forwards, discard! */
1961 if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
1964 now = GNUNET_TIME_absolute_get ();
1966 GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1967 if (record != NULL) /* Already know this request! */
1973 memcmp (&msg_ctx->peer, &pos->source,
1974 sizeof (struct GNUNET_PeerIdentity)))
1975 break; /* Already have this peer in reply list! */
1978 if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
1980 GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode,
1987 record = GNUNET_malloc (sizeof (struct DHTQueryRecord));
1988 GNUNET_assert (GNUNET_OK ==
1989 GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap,
1990 &msg_ctx->key, record,
1991 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1992 memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1995 source_info = GNUNET_malloc (sizeof (struct DHTRouteSource));
1996 source_info->record = record;
1997 source_info->delete_task =
1998 GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry,
2000 source_info->find_peers_responded =
2001 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2002 source_info->source = msg_ctx->peer;
2003 GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail,
2005 if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
2007 source_info->client = msg_ctx->client;
2008 now = GNUNET_TIME_absolute_get_forever ();
2010 source_info->hnode =
2011 GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info,
2013 source_info->uid = msg_ctx->unique_id;
2015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016 "`%s:%s': Created new forward source info for %s uid %llu\n",
2017 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2018 msg_ctx->unique_id);
2025 * Main function that handles whether or not to route a message to other
2028 * @param msg the message to be routed
2029 * @param msg_ctx the context containing all pertinent information about the message
2032 route_message (const struct GNUNET_MessageHeader *msg,
2033 struct DHT_MessageContext *msg_ctx)
2036 struct PeerInfo *selected;
2037 unsigned int target_forward_count;
2038 unsigned int forward_count;
2039 struct RecentRequest *recent_req;
2040 char *stat_forward_count;
2041 char *temp_stat_str;
2043 increment_stats (STAT_ROUTES);
2044 target_forward_count =
2045 get_forward_count (msg_ctx->hop_count, msg_ctx->replication);
2046 GNUNET_asprintf (&stat_forward_count, "# forward counts of %d",
2047 target_forward_count);
2048 increment_stats (stat_forward_count);
2049 GNUNET_free (stat_forward_count);
2050 if (msg_ctx->bloom == NULL)
2052 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2054 if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT)
2056 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2057 GNUNET_assert (recent_req != NULL);
2058 GNUNET_SCHEDULER_cancel (recent_req->remove_task);
2059 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2060 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2061 GNUNET_free (recent_req);
2064 recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
2065 recent_req->uid = msg_ctx->unique_id;
2066 memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
2067 recent_req->heap_node =
2068 GNUNET_CONTAINER_heap_insert (recent_heap, recent_req,
2069 GNUNET_TIME_absolute_get ().abs_value);
2071 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2074 for (i = 0; i < target_forward_count; i++)
2076 selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
2077 if (selected == NULL)
2080 if (GNUNET_CRYPTO_hash_matching_bits
2081 (&selected->id.hashPubKey,
2083 GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
2085 GNUNET_asprintf (&temp_stat_str,
2086 "# requests routed to close(r) peer hop %u",
2087 msg_ctx->hop_count);
2089 GNUNET_asprintf (&temp_stat_str,
2090 "# requests routed to less close peer hop %u",
2091 msg_ctx->hop_count);
2092 if (temp_stat_str != NULL)
2094 increment_stats (temp_stat_str);
2095 GNUNET_free (temp_stat_str);
2097 GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
2098 &selected->id.hashPubKey);
2099 forward_message (msg, selected, msg_ctx);
2102 if (msg_ctx->bloom != NULL)
2104 GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom,
2106 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2107 msg_ctx->bloom = NULL;
2113 * Main function that handles whether or not to route a message to other
2116 * @param msg the message to be routed
2117 * @param msg_ctx the context containing all pertinent information about the message
2120 demultiplex_message (const struct GNUNET_MessageHeader *msg,
2121 struct DHT_MessageContext *msg_ctx)
2123 /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */
2124 msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom);
2126 switch (ntohs (msg->type))
2128 case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
2129 cache_response (msg_ctx);
2130 handle_dht_get (msg, msg_ctx);
2132 case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. */
2133 increment_stats (STAT_PUTS);
2134 handle_dht_put (msg, msg_ctx);
2136 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
2137 increment_stats (STAT_FIND_PEER);
2138 if (((msg_ctx->hop_count > 0) &&
2140 memcmp (&msg_ctx->peer, &my_identity,
2141 sizeof (struct GNUNET_PeerIdentity)))) ||
2142 (msg_ctx->client != NULL))
2144 cache_response (msg_ctx);
2145 if ((msg_ctx->closest == GNUNET_YES) ||
2146 (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2147 handle_dht_find_peer (msg, msg_ctx);
2150 route_message (msg, msg_ctx);
2153 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2154 "`%s': Message type (%d) not handled, forwarding anyway!\n",
2155 "DHT", ntohs (msg->type));
2156 route_message (msg, msg_ctx);
2162 * Receive the HELLO from transport service,
2163 * free current and replace if necessary.
2166 * @param message HELLO message of peer
2169 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2173 "Received our `%s' from transport service\n", "HELLO");
2175 GNUNET_assert (message != NULL);
2176 GNUNET_free_non_null (my_hello);
2177 my_hello = GNUNET_malloc (ntohs (message->size));
2178 memcpy (my_hello, message, ntohs (message->size));
2183 * Task run during shutdown.
2189 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2192 struct PeerInfo *pos;
2196 GNUNET_TRANSPORT_get_hello_cancel (ghh);
2199 if (transport_handle != NULL)
2201 GNUNET_free_non_null (my_hello);
2202 GNUNET_TRANSPORT_disconnect (transport_handle);
2203 transport_handle = NULL;
2205 GDS_NEIGHBOURS_done ();
2206 GDS_DATACACHE_done ();
2208 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2210 while (k_buckets[bucket_count].head != NULL)
2212 pos = k_buckets[bucket_count].head;
2214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2215 "%s:%s Removing peer %s from bucket %d!\n", my_short_id,
2216 "DHT", GNUNET_i2s (&pos->id), bucket_count);
2218 delete_peer (pos, bucket_count);
2223 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
2226 if (block_context != NULL)
2228 GNUNET_BLOCK_context_destroy (block_context);
2229 block_context = NULL;
2236 * Process dht requests.
2238 * @param cls closure
2239 * @param server the initialized server
2240 * @param c configuration to use
2243 run (void *cls, struct GNUNET_SERVER_Handle *server,
2244 const struct GNUNET_CONFIGURATION_Handle *c)
2246 struct GNUNET_TIME_Relative next_send_time;
2247 unsigned long long temp_config_num;
2250 GDS_DATACACHE_init ();
2251 coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */
2252 DEFAULT_CORE_QUEUE_SIZE, /* queue size */
2253 NULL, /* Closure passed to DHT functions */
2254 &core_init, /* Call core_init once connected */
2255 &handle_core_connect, /* Handle connects */
2256 &handle_core_disconnect, /* remove peers on disconnects */
2257 NULL, /* Do we care about "status" updates? */
2258 NULL, /* Don't want notified about all incoming messages */
2259 GNUNET_NO, /* For header only inbound notification */
2260 NULL, /* Don't want notified about all outbound messages */
2261 GNUNET_NO, /* For header only outbound notification */
2262 core_handlers); /* Register these handlers */
2264 if (coreAPI == NULL)
2267 GNUNET_TRANSPORT_connect (cfg, NULL, NULL, NULL, NULL, NULL);
2268 if (transport_handle != NULL)
2269 ghh = GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
2271 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2272 "Failed to connect to transport service!\n");
2273 block_context = GNUNET_BLOCK_context_create (cfg);
2274 lowest_bucket = MAX_BUCKETS - 1;
2275 all_known_peers = GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2276 GNUNET_assert (all_known_peers != NULL);
2279 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
2282 bucket_size = (unsigned int) temp_config_num;
2285 stats = GNUNET_STATISTICS_create ("dht", cfg);
2286 next_send_time.rel_value =
2287 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
2288 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2289 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
2291 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
2292 find_peer_context.start = GNUNET_TIME_absolute_get ();
2293 GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
2294 &find_peer_context);
2296 /* Scheduled the task to clean up when shutdown is called */
2298 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2299 &shutdown_task, NULL);
2304 * The main function for the dht service.
2306 * @param argc number of arguments from the command line
2307 * @param argv command line arguments
2308 * @return 0 ok, 1 on error
2311 main (int argc, char *const *argv)
2314 struct RecentRequest *recent_req;
2317 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2318 recent_find_peer_requests =
2319 GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2322 GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
2324 while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0)
2326 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2327 GNUNET_assert (recent_req != NULL);
2328 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2329 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2330 GNUNET_free (recent_req);
2332 GNUNET_CONTAINER_heap_destroy (recent_heap);
2334 GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
2335 recent_find_peer_requests = NULL;
2339 /* end of gnunet-service-dht.c */