87cf97a401792c011df44bcf07f61f574a09d759
[oweals/gnunet.git] / src / dht / gnunet-service-dht-new.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief GNUnet DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO:
28  * - use OPTION_MULTIPLE instead of linked list for the forward_list.hashmap
29  * - use different 'struct DHT_MessageContext' for the different types of
30  *   messages (currently rather confusing, especially with things like
31  *   peer bloom filters occuring when processing replies).
32  */
33
34 #include "platform.h"
35 #include "gnunet_block_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_getopt_lib.h"
38 #include "gnunet_os_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_service_lib.h"
41 #include "gnunet_nse_service.h"
42 #include "gnunet_core_service.h"
43 #include "gnunet_signal_lib.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet_datacache_lib.h"
46 #include "gnunet_transport_service.h"
47 #include "gnunet_hello_lib.h"
48 #include "gnunet_dht_service.h"
49 #include "gnunet_statistics_service.h"
50 #include "dht.h"
51 #include <fenv.h>
52
53
54 /**
55  * Defines whether find peer requests send their HELLO's outgoing,
56  * or expect replies to contain hellos.
57  */
58 #define FIND_PEER_WITH_HELLO GNUNET_YES
59
60 #define DEFAULT_CORE_QUEUE_SIZE 32
61
62 /**
63  * Minimum number of peers we need for "good" routing,
64  * any less than this and we will allow messages to
65  * travel much further through the network!
66  */
67 #define MINIMUM_PEER_THRESHOLD 20
68
69 /**
70  * Number of requests we track at most (for routing replies).
71  */
72 #define DHT_MAX_RECENT (1024 * 16)
73
74 /**
75  * How long do we wait at most when queueing messages with core
76  * that we are sending on behalf of other peers.
77  */
78 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
79
80 /**
81  * Default importance for handling messages on behalf of other peers.
82  */
83 #define DHT_DEFAULT_P2P_IMPORTANCE 0
84
85 /**
86  * How long to keep recent requests around by default.
87  */
88 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
89
90 /**
91  * Default time to wait to send find peer messages sent by the dht service.
92  */
93 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
94
95 /**
96  * Default importance for find peer messages sent by the dht service.
97  */
98 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
99
100 /**
101  * Default replication parameter for find peer messages sent by the dht service.
102  */
103 #define DHT_DEFAULT_FIND_PEER_REPLICATION 4
104
105 /**
106  * How long at least to wait before sending another find peer request.
107  */
108 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
109
110 /**
111  * How long at most to wait before sending another find peer request.
112  */
113 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
114
115 /**
116  * How often to update our preference levels for peers in our routing tables.
117  */
118 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
119
120 /**
121  * How long at most on average will we allow a reply forward to take
122  * (before we quit sending out new requests)
123  */
124 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
125
126 /**
127  * How many time differences between requesting a core send and
128  * the actual callback to remember.
129  */
130 #define MAX_REPLY_TIMES 8
131
132
133
134
135 /**
136  * Context containing information about a DHT message received.
137  */
138 struct DHT_MessageContext
139 {
140   /**
141    * The client this request was received from.
142    * (NULL if received from another peer)
143    */
144   struct ClientList *client;
145
146   /**
147    * The peer this request was received from.
148    */
149   struct GNUNET_PeerIdentity peer;
150
151   /**
152    * Bloomfilter for this routing request.
153    */
154   struct GNUNET_CONTAINER_BloomFilter *bloom;
155
156   /**
157    * extended query (see gnunet_block_lib.h).
158    */
159   const void *xquery;
160
161   /**
162    * Bloomfilter to filter out duplicate replies.
163    */
164   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
165
166   /**
167    * The key this request was about
168    */
169   GNUNET_HashCode key;
170
171   /**
172    * How long should we wait to transmit this request?
173    */
174   struct GNUNET_TIME_Relative timeout;
175
176   /**
177    * The unique identifier of this request
178    */
179   uint64_t unique_id;
180
181   /**
182    * Number of bytes in xquery.
183    */
184   size_t xquery_size;
185
186   /**
187    * Mutator value for the reply_bf, see gnunet_block_lib.h
188    */
189   uint32_t reply_bf_mutator;
190
191   /**
192    * Desired replication level
193    */
194   uint32_t replication;
195
196   /**
197    * Network size estimate, either ours or the sum of
198    * those routed to thus far. =~ Log of number of peers
199    * chosen from for this request.
200    */
201   uint32_t network_size;
202
203   /**
204    * Any message options for this request
205    */
206   uint32_t msg_options;
207
208   /**
209    * How many hops has the message already traversed?
210    */
211   uint32_t hop_count;
212
213   /**
214    * How many peer identities are present in the path history?
215    */
216   uint32_t path_history_len;
217
218   /**
219    * Path history.
220    */
221   char *path_history;
222
223   /**
224    * How important is this message?
225    */
226   unsigned int importance;
227
228   /**
229    * Should we (still) forward the request on to other peers?
230    */
231   int do_forward;
232
233   /**
234    * Did we forward this message? (may need to remember it!)
235    */
236   int forwarded;
237
238   /**
239    * Are we the closest known peer to this key (out of our neighbors?)
240    */
241   int closest;
242 };
243
244
245 /**
246  * Record used for remembering what peers are waiting for what
247  * responses (based on search key).
248  */
249 struct DHTRouteSource
250 {
251   /**
252    * This is a DLL.
253    */
254   struct DHTRouteSource *next;
255
256   /**
257    * This is a DLL.
258    */
259   struct DHTRouteSource *prev;
260
261   /**
262    * UID of the request, 0 if from another peer.
263    */
264   uint64_t uid;
265
266   /**
267    * Source of the request.  Replies should be forwarded to
268    * this peer.
269    */
270   struct GNUNET_PeerIdentity source;
271
272   /**
273    * If this was a local request, remember the client; otherwise NULL.
274    */
275   struct ClientList *client;
276
277   /**
278    * Pointer to this nodes heap location (for removal)
279    */
280   struct GNUNET_CONTAINER_HeapNode *hnode;
281
282   /**
283    * Back pointer to the record storing this information.
284    */
285   struct DHTQueryRecord *record;
286
287   /**
288    * Task to remove this entry on timeout.
289    */
290   GNUNET_SCHEDULER_TaskIdentifier delete_task;
291
292   /**
293    * Bloomfilter of peers we have already sent back as
294    * replies to the initial request.  Allows us to not
295    * forward the same peer multiple times for a find peer
296    * request.
297    */
298   struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
299
300 };
301
302
303 /**
304  * Entry in the DHT routing table.
305  */
306 struct DHTQueryRecord
307 {
308   /**
309    * Head of DLL for result forwarding.
310    */
311   struct DHTRouteSource *head;
312
313   /**
314    * Tail of DLL for result forwarding.
315    */
316   struct DHTRouteSource *tail;
317
318   /**
319    * Key that the record concerns.
320    */
321   GNUNET_HashCode key;
322
323 };
324
325
326 /**
327  * Context used to calculate the number of find peer messages
328  * per X time units since our last scheduled find peer message
329  * was sent.  If we have seen too many messages, delay or don't
330  * send our own out.
331  */
332 struct FindPeerMessageContext
333 {
334   unsigned int count;
335
336   struct GNUNET_TIME_Absolute start;
337
338 };
339
340
341 struct RecentRequest
342 {
343   /**
344    * Position of this node in the min heap.
345    */
346   struct GNUNET_CONTAINER_HeapNode *heap_node;
347
348   /**
349    * Bloomfilter containing entries for peers
350    * we forwarded this request to.
351    */
352   struct GNUNET_CONTAINER_BloomFilter *bloom;
353
354   /**
355    * Timestamp of this request, for ordering
356    * the min heap.
357    */
358   struct GNUNET_TIME_Absolute timestamp;
359
360   /**
361    * Key of this request.
362    */
363   GNUNET_HashCode key;
364
365   /**
366    * Unique identifier for this request, 0 if from another peer.
367    */
368   uint64_t uid;
369
370   /**
371    * Task to remove this entry on timeout.
372    */
373   GNUNET_SCHEDULER_TaskIdentifier remove_task;
374 };
375
376
377 /**
378  * Recent requests by time inserted.
379  */
380 static struct GNUNET_CONTAINER_Heap *recent_heap;
381
382 /**
383  * Context to use to calculate find peer rates.
384  */
385 static struct FindPeerMessageContext find_peer_context;
386
387 /**
388  * How many peers have we added since we sent out our last
389  * find peer request?
390  */
391 static unsigned int newly_found_peers;
392
393 /**
394  * Handle for the statistics service.
395  */
396 struct GNUNET_STATISTICS_Handle *stats;
397
398 /**
399  * Handle to get our current HELLO.
400  */
401 static struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
402
403 /**
404  * The configuration the DHT service is running with
405  */
406 static const struct GNUNET_CONFIGURATION_Handle *cfg;
407
408 /**
409  * Handle to the core service
410  */
411 static struct GNUNET_CORE_Handle *coreAPI;
412
413 /**
414  * Handle to the transport service, for getting our hello
415  */
416 static struct GNUNET_TRANSPORT_Handle *transport_handle;
417
418 /**
419  * The identity of our peer.
420  */
421 static struct GNUNET_PeerIdentity my_identity;
422
423 /**
424  * Short id of the peer, for printing
425  */
426 static char *my_short_id;
427
428 /**
429  * Our HELLO
430  */
431 static struct GNUNET_MessageHeader *my_hello;
432
433 /**
434  * Task to run when we shut down, cleaning up all our trash
435  */
436 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
437
438 /**
439  * Recently seen find peer requests.
440  */
441 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
442
443 /**
444  * Reply times for requests, if we are busy, don't send any
445  * more requests!
446  */
447 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
448
449 /**
450  * Current counter for replies.
451  */
452 static unsigned int reply_counter;
453
454 /**
455  * Our handle to the BLOCK library.
456  */
457 static struct GNUNET_BLOCK_Context *block_context;
458
459
460
461 /** Declare here so retry_core_send is aware of it */
462 static size_t
463 core_transmit_notify (void *cls, size_t size, void *buf);
464
465
466
467 /**
468  * Given the largest send delay, artificially decrease it
469  * so the next time around we may have a chance at sending
470  * again.
471  */
472 static void
473 decrease_max_send_delay (struct GNUNET_TIME_Relative max_time)
474 {
475   unsigned int i;
476
477   for (i = 0; i < MAX_REPLY_TIMES; i++)
478   {
479     if (reply_times[i].rel_value == max_time.rel_value)
480     {
481       reply_times[i].rel_value = reply_times[i].rel_value / 2;
482       return;
483     }
484   }
485 }
486
487
488 /**
489  * Find the maximum send time of the recently sent values.
490  *
491  * @return the average time between asking core to send a message
492  *         and when the buffer for copying it is passed
493  */
494 static struct GNUNET_TIME_Relative
495 get_max_send_delay ()
496 {
497   unsigned int i;
498   struct GNUNET_TIME_Relative max_time;
499
500   max_time = GNUNET_TIME_relative_get_zero ();
501
502   for (i = 0; i < MAX_REPLY_TIMES; i++)
503   {
504     if (reply_times[i].rel_value > max_time.rel_value)
505       max_time.rel_value = reply_times[i].rel_value;
506   }
507 #if DEBUG_DHT
508   if (max_time.rel_value > MAX_REQUEST_TIME.rel_value)
509     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n",
510                 (unsigned long long) max_time.rel_value);
511 #endif
512   return max_time;
513 }
514
515
516 static void
517 increment_stats (const char *value)
518 {
519   if (stats == NULL)
520     return;
521   GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
522 }
523
524
525 static void
526 decrement_stats (const char *value)
527 {
528   if (stats == NULL)
529     return;
530   GNUNET_STATISTICS_update (stats, value, -1, GNUNET_NO);
531 }
532
533
534 /**
535  *  Try to send another message from our core send list
536  */
537 static void
538 try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
539 {
540   struct PeerInfo *peer = cls;
541   struct P2PPendingMessage *pending;
542   size_t ssize;
543
544   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
545
546   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
547     return;
548
549   if (peer->th != NULL)
550     return;                     /* Message send already in progress */
551
552   pending = peer->head;
553   if (pending != NULL)
554   {
555     ssize = ntohs (pending->msg->size);
556 #if DEBUG_DHT > 1
557     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n",
559                 my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id));
560 #endif
561     pending->scheduled = GNUNET_TIME_absolute_get ();
562     reply_counter++;
563     if (reply_counter >= MAX_REPLY_TIMES)
564       reply_counter = 0;
565     peer->th =
566         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
567                                            pending->importance,
568                                            pending->timeout, &peer->id, ssize,
569                                            &core_transmit_notify, peer);
570     if (peer->th == NULL)
571       increment_stats ("# notify transmit ready failed");
572   }
573 }
574
575
576 /**
577  * Function called to send a request out to another peer.
578  * Called both for locally initiated requests and those
579  * received from other peers.
580  *
581  * @param msg the encapsulated message
582  * @param peer the peer to forward the message to
583  * @param msg_ctx the context of the message (hop count, bloom, etc.)
584  */
585 static void
586 forward_result_message (const struct GNUNET_MessageHeader *msg,
587                         struct PeerInfo *peer,
588                         struct DHT_MessageContext *msg_ctx)
589 {
590   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
591   struct P2PPendingMessage *pending;
592   size_t msize;
593   size_t psize;
594   char *path_start;
595   char *path_offset;
596
597   increment_stats (STAT_RESULT_FORWARDS);
598   msize =
599       sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) +
600       (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len);
601   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
602   psize = sizeof (struct P2PPendingMessage) + msize;
603   pending = GNUNET_malloc (psize);
604   pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
605   pending->importance = DHT_SEND_PRIORITY;
606   pending->timeout = GNUNET_TIME_relative_get_forever ();
607   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg;
608   result_message->header.size = htons (msize);
609   result_message->header.type =
610       htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
611   result_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
612   if (msg_ctx->path_history_len > 0)
613   {
614     /* End of pending is where enc_msg starts */
615     path_start = (char *) &pending[1];
616     /* Offset by the size of the enc_msg */
617     path_start += ntohs (msg->size);
618     memcpy (path_start, msg_ctx->path_history,
619             msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
620   }
621   result_message->options = htonl (msg_ctx->msg_options);
622   result_message->hop_count = htonl (msg_ctx->hop_count + 1);
623   memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
624   /* Copy the enc_msg, then the path history as well! */
625   memcpy (&result_message[1], msg, ntohs (msg->size));
626   path_offset = (char *) &result_message[1];
627   path_offset += ntohs (msg->size);
628   /* If we have path history, copy it to the end of the whole thing */
629   if (msg_ctx->path_history_len > 0)
630     memcpy (path_offset, msg_ctx->path_history,
631             msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
632 #if DEBUG_DHT > 1
633   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634               "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
635               "DHT", msize, GNUNET_i2s (&peer->id));
636 #endif
637   peer->pending_count++;
638   increment_stats ("# pending messages scheduled");
639   GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
640                                      pending);
641   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
642     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
643 }
644
645
646 /**
647  * Called when core is ready to send a message we asked for
648  * out to the destination.
649  *
650  * @param cls closure (NULL)
651  * @param size number of bytes available in buf
652  * @param buf where the callee should write the message
653  * @return number of bytes written to buf
654  */
655 static size_t
656 core_transmit_notify (void *cls, size_t size, void *buf)
657 {
658   struct PeerInfo *peer = cls;
659   char *cbuf = buf;
660   struct P2PPendingMessage *pending;
661
662   size_t off;
663   size_t msize;
664
665   peer->th = NULL;
666   if (buf == NULL)
667   {
668     /* client disconnected */
669 #if DEBUG_DHT
670     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n",
671                 my_short_id, "DHT");
672 #endif
673     return 0;
674   }
675
676   if (peer->head == NULL)
677     return 0;
678
679   off = 0;
680   pending = peer->head;
681   while (NULL != pending &&
682          (size - off >= (msize = ntohs (pending->msg->size))))
683   {
684     memcpy (&cbuf[off], pending->msg, msize);
685     off += msize;
686     peer->pending_count--;
687     increment_stats ("# pending messages sent");
688     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
689     GNUNET_free (pending);
690     pending = peer->head;
691   }
692   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
693     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
694
695   return off;
696 }
697
698
699 /**
700  * Compute the distance between have and target as a 32-bit value.
701  * Differences in the lower bits must count stronger than differences
702  * in the higher bits.
703  *
704  * @return 0 if have==target, otherwise a number
705  *           that is larger as the distance between
706  *           the two hash codes increases
707  */
708 static unsigned int
709 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
710 {
711   unsigned int bucket;
712   unsigned int msb;
713   unsigned int lsb;
714   unsigned int i;
715
716   /* We have to represent the distance between two 2^9 (=512)-bit
717    * numbers as a 2^5 (=32)-bit number with "0" being used for the
718    * two numbers being identical; furthermore, we need to
719    * guarantee that a difference in the number of matching
720    * bits is always represented in the result.
721    *
722    * We use 2^32/2^9 numerical values to distinguish between
723    * hash codes that have the same LSB bit distance and
724    * use the highest 2^9 bits of the result to signify the
725    * number of (mis)matching LSB bits; if we have 0 matching
726    * and hence 512 mismatching LSB bits we return -1 (since
727    * 512 itself cannot be represented with 9 bits) */
728
729   /* first, calculate the most significant 9 bits of our
730    * result, aka the number of LSBs */
731   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
732   /* bucket is now a value between 0 and 512 */
733   if (bucket == 512)
734     return 0;                   /* perfect match */
735   if (bucket == 0)
736     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
737                                  * below, we'd end up with max+1 (overflow)) */
738
739   /* calculate the most significant bits of the final result */
740   msb = (512 - bucket) << (32 - 9);
741   /* calculate the 32-9 least significant bits of the final result by
742    * looking at the differences in the 32-9 bits following the
743    * mismatching bit at 'bucket' */
744   lsb = 0;
745   for (i = bucket + 1;
746        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
747   {
748     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
749         GNUNET_CRYPTO_hash_get_bit (have, i))
750       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
751                                                  * last bit set will be 31 -- if
752                                                  * i does not reach 512 first... */
753   }
754   return msb | lsb;
755 }
756
757
758 /**
759  * Return a number that is larger the closer the
760  * "have" GNUNET_hash code is to the "target".
761  *
762  * @return inverse distance metric, non-zero.
763  *         Must fudge the value if NO bits match.
764  */
765 static unsigned int
766 inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
767 {
768   if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
769     return 1;                   /* Never return 0! */
770   return ((unsigned int) -1) - distance (target, have);
771 }
772
773
774 /**
775  * Find which k-bucket this peer should go into,
776  * taking into account the size of the k-bucket
777  * array.  This means that if more bits match than
778  * there are currently buckets, lowest_bucket will
779  * be returned.
780  *
781  * @param hc GNUNET_HashCode we are finding the bucket for.
782  *
783  * @return the proper bucket index for this key,
784  *         or GNUNET_SYSERR on error (same hashcode)
785  */
786 static int
787 find_current_bucket (const GNUNET_HashCode * hc)
788 {
789   int actual_bucket;
790
791   actual_bucket = find_bucket (hc);
792   if (actual_bucket == GNUNET_SYSERR)   /* hc and our peer identity match! */
793     return lowest_bucket;
794   if (actual_bucket < lowest_bucket)    /* actual_bucket not yet used */
795     return lowest_bucket;
796   return actual_bucket;
797 }
798
799
800 /**
801  * Find a routing table entry from a peer identity
802  *
803  * @param peer the peer identity to look up
804  *
805  * @return the routing table entry, or NULL if not found
806  */
807 static struct PeerInfo *
808 find_peer_by_id (const struct GNUNET_PeerIdentity *peer)
809 {
810   int bucket;
811   struct PeerInfo *pos;
812
813   bucket = find_current_bucket (&peer->hashPubKey);
814
815   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
816     return NULL;
817
818   pos = k_buckets[bucket].head;
819   while (pos != NULL)
820   {
821     if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity)))
822       return pos;
823     pos = pos->next;
824   }
825   return NULL;                  /* No such peer. */
826 }
827
828 /* Forward declaration */
829 static void
830 update_core_preference (void *cls,
831                         const struct GNUNET_SCHEDULER_TaskContext *tc);
832
833
834 /**
835  * Function called with statistics about the given peer.
836  *
837  * @param cls closure
838  * @param peer identifies the peer
839  * @param bpm_out set to the current bandwidth limit (sending) for this peer
840  * @param amount set to the amount that was actually reserved or unreserved;
841  *               either the full requested amount or zero (no partial reservations)
842  * @param res_delay if the reservation could not be satisfied (amount was 0), how
843  *        long should the client wait until re-trying?
844  * @param preference current traffic preference for the given peer
845  */
846 static void
847 update_core_preference_finish (void *cls,
848                                const struct GNUNET_PeerIdentity *peer,
849                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
850                                int32_t amount,
851                                struct GNUNET_TIME_Relative res_delay,
852                                uint64_t preference)
853 {
854   struct PeerInfo *peer_info = cls;
855
856   peer_info->info_ctx = NULL;
857   GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
858                                 &update_core_preference, peer_info);
859 }
860
861 static void
862 update_core_preference (void *cls,
863                         const struct GNUNET_SCHEDULER_TaskContext *tc)
864 {
865   struct PeerInfo *peer = cls;
866   uint64_t preference;
867   unsigned int matching;
868
869   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
870   {
871     return;
872   }
873   matching =
874       GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
875                                         &peer->id.hashPubKey);
876   if (matching >= 64)
877   {
878 #if DEBUG_DHT
879     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
880                 "Peer identifier matches by %u bits, only shifting as much as we can!\n",
881                 matching);
882 #endif
883     matching = 63;
884   }
885   preference = 1LL << matching;
886   peer->info_ctx =
887       GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
888                                           GNUNET_TIME_UNIT_FOREVER_REL,
889                                           GNUNET_BANDWIDTH_VALUE_MAX, 0,
890                                           preference,
891                                           &update_core_preference_finish, peer);
892 }
893
894 /**
895  * Find the closest peer in our routing table to the
896  * given hashcode.
897  *
898  * @return The closest peer in our routing table to the
899  *         key, or NULL on error.
900  */
901 static struct PeerInfo *
902 find_closest_peer (const GNUNET_HashCode * hc)
903 {
904   struct PeerInfo *pos;
905   struct PeerInfo *current_closest;
906   unsigned int lowest_distance;
907   unsigned int temp_distance;
908   int bucket;
909   int count;
910
911   lowest_distance = -1;
912
913   if (k_buckets[lowest_bucket].peers_size == 0)
914     return NULL;
915
916   current_closest = NULL;
917   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
918   {
919     pos = k_buckets[bucket].head;
920     count = 0;
921     while ((pos != NULL) && (count < bucket_size))
922     {
923       temp_distance = distance (&pos->id.hashPubKey, hc);
924       if (temp_distance <= lowest_distance)
925       {
926         lowest_distance = temp_distance;
927         current_closest = pos;
928       }
929       pos = pos->next;
930       count++;
931     }
932   }
933   GNUNET_assert (current_closest != NULL);
934   return current_closest;
935 }
936
937
938 /**
939  * Function called to send a request out to another peer.
940  * Called both for locally initiated requests and those
941  * received from other peers.
942  *
943  * @param msg the encapsulated message
944  * @param peer the peer to forward the message to
945  * @param msg_ctx the context of the message (hop count, bloom, etc.)
946  */
947 static void
948 forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer,
949                  struct DHT_MessageContext *msg_ctx)
950 {
951   struct GNUNET_DHT_P2PRouteMessage *route_message;
952   struct P2PPendingMessage *pending;
953   size_t msize;
954   size_t psize;
955   char *route_path;
956
957   increment_stats (STAT_ROUTE_FORWARDS);
958   GNUNET_assert (peer != NULL);
959   if ((msg_ctx->closest != GNUNET_YES) &&
960       (peer == find_closest_peer (&msg_ctx->key)))
961     increment_stats (STAT_ROUTE_FORWARDS_CLOSEST);
962
963   msize =
964       sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) +
965       (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
966   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
967   psize = sizeof (struct P2PPendingMessage) + msize;
968   pending = GNUNET_malloc (psize);
969   pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
970   pending->importance = msg_ctx->importance;
971   pending->timeout = msg_ctx->timeout;
972   route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg;
973   route_message->header.size = htons (msize);
974   route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
975   route_message->options = htonl (msg_ctx->msg_options);
976   route_message->hop_count = htonl (msg_ctx->hop_count + 1);
977   route_message->network_size = htonl (msg_ctx->network_size);
978   route_message->desired_replication_level = htonl (msg_ctx->replication);
979   if (msg_ctx->bloom != NULL)
980     GNUNET_assert (GNUNET_OK ==
981                    GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
982                                                               route_message->
983                                                               bloomfilter,
984                                                               DHT_BLOOM_SIZE));
985   memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
986   memcpy (&route_message[1], msg, ntohs (msg->size));
987   if (GNUNET_DHT_RO_RECORD_ROUTE ==
988       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
989   {
990     route_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
991     /* Set pointer to start of enc_msg */
992     route_path = (char *) &route_message[1];
993     /* Offset to the end of the enc_msg */
994     route_path += ntohs (msg->size);
995     /* Copy the route_path after enc_msg */
996     memcpy (route_path, msg_ctx->path_history,
997             msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
998   }
999 #if DEBUG_DHT > 1
1000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001               "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1002               "DHT", msize, GNUNET_i2s (&peer->id));
1003 #endif
1004   peer->pending_count++;
1005   increment_stats ("# pending messages scheduled");
1006   GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1007                                      pending);
1008   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1009     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1010 }
1011
1012
1013
1014
1015 /**
1016  * Called when a reply needs to be sent to a client, as
1017  * a result it found to a GET or FIND PEER request.
1018  *
1019  * @param client the client to send the reply to
1020  * @param message the encapsulated message to send
1021  * @param msg_ctx the context of the received message
1022  */
1023 static void
1024 send_reply_to_client (struct ClientList *client,
1025                       const struct GNUNET_MessageHeader *message,
1026                       struct DHT_MessageContext *msg_ctx)
1027 {
1028   struct GNUNET_DHT_RouteResultMessage *reply;
1029   struct PendingMessage *pending_message;
1030   uint16_t msize;
1031   size_t tsize;
1032   char *reply_offset;
1033
1034 #if DEBUG_DHT
1035   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n",
1036               my_short_id, "DHT");
1037 #endif
1038   msize = ntohs (message->size);
1039   tsize =
1040       sizeof (struct GNUNET_DHT_RouteResultMessage) + msize +
1041       (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1042   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1043   {
1044     GNUNET_break_op (0);
1045     return;
1046   }
1047   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1048   pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1];
1049   reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1];
1050   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1051   reply->header.size = htons (tsize);
1052   reply->outgoing_path_length = htonl (msg_ctx->path_history_len);
1053   reply->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1054   memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1055   reply_offset = (char *) &reply[1];
1056   memcpy (&reply[1], message, msize);
1057   if (msg_ctx->path_history_len > 0)
1058   {
1059     reply_offset += msize;
1060     memcpy (reply_offset, msg_ctx->path_history,
1061             msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1062   }
1063   add_pending_message (client, pending_message);
1064 }
1065
1066 /**
1067  * Consider whether or not we would like to have this peer added to
1068  * our routing table.  Check whether bucket for this peer is full,
1069  * if so return negative; if not return positive.  Since peers are
1070  * only added on CORE level connect, this doesn't actually add the
1071  * peer to the routing table.
1072  *
1073  * @param peer the peer we are considering adding
1074  *
1075  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1076  *         already full)
1077  */
1078 static int
1079 consider_peer (struct GNUNET_PeerIdentity *peer)
1080 {
1081   int bucket;
1082
1083   if ((GNUNET_YES ==
1084        GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
1085                                                &peer->hashPubKey)) ||
1086       (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))))
1087     return GNUNET_NO;           /* We already know this peer (are connected even!) */
1088   bucket = find_current_bucket (&peer->hashPubKey);
1089
1090   if ((k_buckets[bucket].peers_size < bucket_size) ||
1091       ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1092     return GNUNET_YES;
1093
1094   return GNUNET_NO;
1095 }
1096
1097
1098 /**
1099  * Task used to remove forwarding entries, either
1100  * after timeout, when full, or on shutdown.
1101  *
1102  * @param cls the entry to remove
1103  * @param tc context, reason, etc.
1104  */
1105 static void
1106 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1107 {
1108   struct DHTRouteSource *source_info = cls;
1109   struct DHTQueryRecord *record;
1110
1111   source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode);
1112   record = source_info->record;
1113   GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1114
1115   if (record->head == NULL)     /* No more entries in DLL */
1116   {
1117     GNUNET_assert (GNUNET_YES ==
1118                    GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1119                                                          &record->key, record));
1120     GNUNET_free (record);
1121   }
1122   if (source_info->find_peers_responded != NULL)
1123     GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1124   GNUNET_free (source_info);
1125 }
1126
1127 /**
1128  * Main function that handles whether or not to route a result
1129  * message to other peers, or to send to our local client.
1130  *
1131  * @param msg the result message to be routed
1132  * @param msg_ctx context of the message we are routing
1133  *
1134  * @return the number of peers the message was routed to,
1135  *         GNUNET_SYSERR on failure
1136  */
1137 static int
1138 route_result_message (struct GNUNET_MessageHeader *msg,
1139                       struct DHT_MessageContext *msg_ctx)
1140 {
1141   struct GNUNET_PeerIdentity new_peer;
1142   struct DHTQueryRecord *record;
1143   struct DHTRouteSource *pos;
1144   struct PeerInfo *peer_info;
1145   const struct GNUNET_MessageHeader *hello_msg;
1146
1147 #if DEBUG_DHT > 1
1148   unsigned int i;
1149 #endif
1150
1151   increment_stats (STAT_RESULTS);
1152   /**
1153    * If a find peer result message is received and contains a valid
1154    * HELLO for another peer, offer it to the transport service.
1155    */
1156   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1157   {
1158     if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader))
1159       GNUNET_break_op (0);
1160
1161     hello_msg = &msg[1];
1162     if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1163         (GNUNET_SYSERR ==
1164          GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg,
1165                               &new_peer)))
1166     {
1167       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1168                   "%s:%s Received non-HELLO message type in find peer result message!\n",
1169                   my_short_id, "DHT");
1170       GNUNET_break_op (0);
1171       return GNUNET_NO;
1172     }
1173     else                        /* We have a valid hello, and peer id stored in new_peer */
1174     {
1175       find_peer_context.count++;
1176       increment_stats (STAT_FIND_PEER_REPLY);
1177       if (GNUNET_YES == consider_peer (&new_peer))
1178       {
1179         increment_stats (STAT_HELLOS_PROVIDED);
1180         GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL);
1181         GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL);
1182       }
1183     }
1184   }
1185
1186   record =
1187     GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1188
1189   if (record == NULL)           /* No record of this message! */
1190   {
1191 #if DEBUG_DHT
1192     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193                 "`%s:%s': Have no record of response key %s uid %llu\n",
1194                 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1195                 msg_ctx->unique_id);
1196 #endif
1197     return 0;
1198   }
1199
1200   pos = record->head;
1201   while (pos != NULL)
1202   {
1203     if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity)))  /* Local client (or DHT) initiated request! */
1204     {
1205 #if DEBUG_DHT
1206       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207                   "`%s:%s': Sending response key %s uid %llu to client\n",
1208                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1209                   msg_ctx->unique_id);
1210 #endif
1211       increment_stats (STAT_RESULTS_TO_CLIENT);
1212       if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
1213         increment_stats (STAT_GET_REPLY);
1214 #if DEBUG_DHT > 1
1215       for (i = 0; i < msg_ctx->path_history_len; i++)
1216       {
1217         char *path_offset;
1218
1219         path_offset =
1220             &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1221         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222                     "(before client) Key %s Found peer %d:%s\n",
1223                     GNUNET_h2s (&msg_ctx->key), i,
1224                     GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1225       }
1226 #endif
1227       send_reply_to_client (pos->client, msg, msg_ctx);
1228     }
1229     else                        /* Send to peer */
1230     {
1231       peer_info = find_peer_by_id (&pos->source);
1232       if (peer_info == NULL)    /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1233       {
1234         pos = pos->next;
1235         continue;
1236       }
1237 #if DEBUG_DHT
1238       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239                   "`%s:%s': Forwarding response key %s uid %llu to peer %s\n",
1240                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1241                   msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
1242 #endif
1243       forward_result_message (msg, peer_info, msg_ctx);
1244       /* Try removing forward entries after sending once, only allows ONE response per request */
1245       if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
1246       {
1247         GNUNET_SCHEDULER_cancel (pos->delete_task);
1248         pos->delete_task =
1249           GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);        
1250       }
1251     }
1252     pos = pos->next;
1253   }
1254   return 0;
1255 }
1256
1257
1258
1259
1260 /**
1261  * Main function that handles whether or not to route a message to other
1262  * peers.
1263  *
1264  * @param msg the message to be routed
1265  * @param msg_ctx the context containing all pertinent information about the message
1266  */
1267 static void
1268 route_message (const struct GNUNET_MessageHeader *msg,
1269                struct DHT_MessageContext *msg_ctx);
1270
1271
1272 /**
1273  * Server handler for all dht get requests, look for data,
1274  * if found, send response either to clients or other peers.
1275  *
1276  * @param msg the actual get message
1277  * @param msg_ctx struct containing pertinent information about the get request
1278  *
1279  * @return number of items found for GET request
1280  */
1281 static unsigned int
1282 handle_dht_get (const struct GNUNET_MessageHeader *msg,
1283                 struct DHT_MessageContext *msg_ctx)
1284 {
1285   const struct GNUNET_DHT_GetMessage *get_msg;
1286   uint16_t msize;
1287   uint16_t bf_size;
1288   unsigned int results;
1289   const char *end;
1290   enum GNUNET_BLOCK_Type type;
1291
1292   msize = ntohs (msg->size);
1293   if (msize < sizeof (struct GNUNET_DHT_GetMessage))
1294   {
1295     GNUNET_break (0);
1296     return 0;
1297   }
1298   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1299   bf_size = ntohs (get_msg->bf_size);
1300   msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
1301   msg_ctx->reply_bf_mutator = get_msg->bf_mutator;
1302   if (msize !=
1303       sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
1304   {
1305     GNUNET_break_op (0);
1306     return 0;
1307   }
1308   end = (const char *) &get_msg[1];
1309   if (msg_ctx->xquery_size == 0)
1310   {
1311     msg_ctx->xquery = NULL;
1312   }
1313   else
1314   {
1315     msg_ctx->xquery = (const void *) end;
1316     end += msg_ctx->xquery_size;
1317   }
1318   if (bf_size == 0)
1319   {
1320     msg_ctx->reply_bf = NULL;
1321   }
1322   else
1323   {
1324     msg_ctx->reply_bf =
1325         GNUNET_CONTAINER_bloomfilter_init (end, bf_size,
1326                                            GNUNET_DHT_GET_BLOOMFILTER_K);
1327   }
1328   type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
1329 #if DEBUG_DHT
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
1332               my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key),
1333               msg_ctx->unique_id);
1334 #endif
1335   increment_stats (STAT_GETS);
1336   results = 0;
1337   msg_ctx->do_forward = GNUNET_YES;
1338 #if DEBUG_DHT
1339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340               "`%s:%s': Found %d results for `%s' request uid %llu\n",
1341               my_short_id, "DHT", results, "GET", msg_ctx->unique_id);
1342 #endif
1343   if (results >= 1)
1344   {
1345   }
1346   else
1347   {
1348     /* check query valid */
1349     if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID ==
1350         GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key,
1351                                &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator,
1352                                msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0))
1353     {
1354       GNUNET_break_op (0);
1355       msg_ctx->do_forward = GNUNET_NO;
1356     }
1357   }
1358
1359   if (msg_ctx->do_forward == GNUNET_YES)
1360     route_message (msg, msg_ctx);
1361   GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
1362   return results;
1363 }
1364
1365
1366 static void
1367 remove_recent_find_peer (void *cls,
1368                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1369 {
1370   GNUNET_HashCode *key = cls;
1371
1372   GNUNET_assert (GNUNET_YES ==
1373                  GNUNET_CONTAINER_multihashmap_remove
1374                  (recent_find_peer_requests, key, NULL));
1375   GNUNET_free (key);
1376 }
1377
1378
1379 /**
1380  * Server handler for initiating local dht find peer requests
1381  *
1382  * @param find_msg the actual find peer message
1383  * @param msg_ctx struct containing pertinent information about the request
1384  *
1385  */
1386 static void
1387 handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
1388                       struct DHT_MessageContext *msg_ctx)
1389 {
1390   struct GNUNET_MessageHeader *find_peer_result;
1391   struct GNUNET_DHT_FindPeerMessage *find_peer_message;
1392   struct DHT_MessageContext *new_msg_ctx;
1393   struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
1394   size_t hello_size;
1395   size_t tsize;
1396   GNUNET_HashCode *recent_hash;
1397   struct GNUNET_MessageHeader *other_hello;
1398   size_t other_hello_size;
1399   struct GNUNET_PeerIdentity peer_id;
1400
1401   find_peer_message = (struct GNUNET_DHT_FindPeerMessage *) find_msg;
1402   GNUNET_break_op (ntohs (find_msg->size) >=
1403                    (sizeof (struct GNUNET_DHT_FindPeerMessage)));
1404   if (ntohs (find_msg->size) < sizeof (struct GNUNET_DHT_FindPeerMessage))
1405     return;
1406   other_hello = NULL;
1407   other_hello_size = 0;
1408   if (ntohs (find_msg->size) > sizeof (struct GNUNET_DHT_FindPeerMessage))
1409   {
1410     other_hello_size =
1411         ntohs (find_msg->size) - sizeof (struct GNUNET_DHT_FindPeerMessage);
1412     other_hello = GNUNET_malloc (other_hello_size);
1413     memcpy (other_hello, &find_peer_message[1], other_hello_size);
1414     if ((GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) other_hello) == 0)
1415         || (GNUNET_OK !=
1416             GNUNET_HELLO_get_id ((struct GNUNET_HELLO_Message *) other_hello,
1417                                  &peer_id)))
1418     {
1419       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1420                   "Received invalid HELLO message in find peer request!\n");
1421       GNUNET_free (other_hello);
1422       return;
1423     }
1424 #if FIND_PEER_WITH_HELLO
1425     if (GNUNET_YES == consider_peer (&peer_id))
1426     {
1427       increment_stats (STAT_HELLOS_PROVIDED);
1428       GNUNET_TRANSPORT_offer_hello (transport_handle, other_hello, NULL, NULL);
1429       GNUNET_CORE_peer_request_connect (coreAPI, &peer_id, NULL, NULL);
1430       route_message (find_msg, msg_ctx);
1431       GNUNET_free (other_hello);
1432       return;
1433     }
1434     else                        /* We don't want this peer! */
1435     {
1436       route_message (find_msg, msg_ctx);
1437       GNUNET_free (other_hello);
1438       return;
1439     }
1440 #endif
1441   }
1442
1443 #if DEBUG_DHT
1444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1445               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
1446               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&msg_ctx->key),
1447               ntohs (find_msg->size), sizeof (struct GNUNET_MessageHeader));
1448 #endif
1449   if (my_hello == NULL)
1450   {
1451 #if DEBUG_DHT
1452     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453                 "`%s': Our HELLO is null, can't return.\n", "DHT");
1454 #endif
1455     GNUNET_free_non_null (other_hello);
1456     route_message (find_msg, msg_ctx);
1457     return;
1458   }
1459
1460   incoming_bloom =
1461       GNUNET_CONTAINER_bloomfilter_init (find_peer_message->bloomfilter,
1462                                          DHT_BLOOM_SIZE, DHT_BLOOM_K);
1463   if (GNUNET_YES ==
1464       GNUNET_CONTAINER_bloomfilter_test (incoming_bloom,
1465                                          &my_identity.hashPubKey))
1466   {
1467     increment_stats (STAT_BLOOM_FIND_PEER);
1468     GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
1469     GNUNET_free_non_null (other_hello);
1470     route_message (find_msg, msg_ctx);
1471     return;                     /* We match the bloomfilter, do not send a response to this peer (they likely already know us!) */
1472   }
1473   GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
1474
1475   /**
1476    * Ignore any find peer requests from a peer we have seen very recently.
1477    */
1478   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent_find_peer_requests, &msg_ctx->key))  /* We have recently responded to a find peer request for this peer! */
1479   {
1480     increment_stats ("# dht find peer requests ignored (recently seen!)");
1481     GNUNET_free_non_null (other_hello);
1482     return;
1483   }
1484
1485   /**
1486    * Use this check to only allow the peer to respond to find peer requests if
1487    * it would be beneficial to have the requesting peer in this peers routing
1488    * table.  Can be used to thwart peers flooding the network with find peer
1489    * requests that we don't care about.  However, if a new peer is joining
1490    * the network and has no other peers this is a problem (assume all buckets
1491    * full, no one will respond!).
1492    */
1493   memcpy (&peer_id.hashPubKey, &msg_ctx->key, sizeof (GNUNET_HashCode));
1494   if (GNUNET_NO == consider_peer (&peer_id))
1495   {
1496     increment_stats ("# dht find peer requests ignored (do not need!)");
1497     GNUNET_free_non_null (other_hello);
1498     route_message (find_msg, msg_ctx);
1499     return;
1500   }
1501
1502   recent_hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
1503   memcpy (recent_hash, &msg_ctx->key, sizeof (GNUNET_HashCode));
1504   if (GNUNET_SYSERR !=
1505       GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests,
1506                                          &msg_ctx->key, NULL,
1507                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1508   {
1509 #if DEBUG_DHT
1510     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511                 "Adding recent remove task for key `%s`!\n",
1512                 GNUNET_h2s (&msg_ctx->key));
1513 #endif
1514     /* Only add a task if there wasn't one for this key already! */
1515     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
1516                                   (GNUNET_TIME_UNIT_SECONDS, 30),
1517                                   &remove_recent_find_peer, recent_hash);
1518   }
1519   else
1520   {
1521     GNUNET_free (recent_hash);
1522 #if DEBUG_DHT
1523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524                 "Received duplicate find peer request too soon!\n");
1525 #endif
1526   }
1527
1528   /* Simplistic find_peer functionality, always return our hello */
1529   hello_size = ntohs (my_hello->size);
1530   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
1531
1532   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1533   {
1534     GNUNET_break_op (0);
1535     GNUNET_free_non_null (other_hello);
1536     return;
1537   }
1538
1539   find_peer_result = GNUNET_malloc (tsize);
1540   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
1541   find_peer_result->size = htons (tsize);
1542   memcpy (&find_peer_result[1], my_hello, hello_size);
1543 #if DEBUG_DHT
1544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1545               "`%s': Sending hello size %d to requesting peer.\n", "DHT",
1546               hello_size);
1547 #endif
1548
1549   new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
1550   memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1551   new_msg_ctx->peer = my_identity;
1552   new_msg_ctx->bloom =
1553       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1554   new_msg_ctx->hop_count = 0;
1555   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;     /* Make find peer requests a higher priority */
1556   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
1557   increment_stats (STAT_FIND_PEER_ANSWER);
1558   if (GNUNET_DHT_RO_RECORD_ROUTE ==
1559       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1560   {
1561     new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1562     new_msg_ctx->path_history_len = msg_ctx->path_history_len;
1563     /* Assign to previous msg_ctx path history, caller should free after our return */
1564     new_msg_ctx->path_history = msg_ctx->path_history;
1565   }
1566   route_result_message (find_peer_result, new_msg_ctx);
1567   GNUNET_free (new_msg_ctx);
1568   GNUNET_free_non_null (other_hello);
1569   GNUNET_free (find_peer_result);
1570   route_message (find_msg, msg_ctx);
1571 }
1572
1573
1574 /**
1575  * Server handler for initiating local dht put requests
1576  *
1577  * @param msg the actual put message
1578  * @param msg_ctx struct containing pertinent information about the request
1579  */
1580 static void
1581 handle_dht_put (const struct GNUNET_MessageHeader *msg,
1582                 struct DHT_MessageContext *msg_ctx)
1583 {
1584   const struct GNUNET_DHT_PutMessage *put_msg;
1585   struct DHTPutEntry *put_entry;
1586   unsigned int put_size;
1587   char *path_offset;
1588   enum GNUNET_BLOCK_Type put_type;
1589   size_t data_size;
1590   int ret;
1591   GNUNET_HashCode key;
1592   struct DHTQueryRecord *record;
1593
1594   GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage));
1595
1596   put_msg = (const struct GNUNET_DHT_PutMessage *) msg;
1597   put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
1598   data_size =
1599       ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1600   ret =
1601       GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size,
1602                             &key);
1603   if (GNUNET_NO == ret)
1604   {
1605     /* invalid reply */
1606     GNUNET_break_op (0);
1607     return;
1608   }
1609   if ((GNUNET_YES == ret) &&
1610       (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode))))
1611   {
1612     /* invalid wrapper: key mismatch! */
1613     GNUNET_break_op (0);
1614     return;
1615   }
1616   /* ret == GNUNET_SYSERR means that there is no known relationship between
1617    * data and the key, so we cannot check it */
1618 #if DEBUG_DHT
1619   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1621               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1622               msg_ctx->unique_id);
1623 #endif
1624
1625   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
1626                                              &msg_ctx->key);
1627   if (NULL != record)
1628   {
1629     struct DHTRouteSource *pos;
1630     struct GNUNET_DHT_GetResultMessage *get_result;
1631     struct DHT_MessageContext new_msg_ctx;
1632     size_t get_size;
1633
1634     pos = record->head;
1635     while (pos != NULL)
1636     {
1637       /* TODO: do only for local started requests? or also for remote peers? */
1638       /* TODO: include this in statistics? under what? */
1639       /* TODO: reverse order of path_history? */
1640       if (NULL == pos->client)
1641       {
1642         pos = pos->next;
1643         continue;
1644       }
1645
1646       memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1647       if (GNUNET_DHT_RO_RECORD_ROUTE ==
1648           (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1649       {
1650         new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1651       }
1652
1653       get_size =
1654           sizeof (struct GNUNET_DHT_GetResultMessage) + data_size +
1655           (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1656       get_result = GNUNET_malloc (get_size);
1657       get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1658       get_result->header.size = htons (get_size);
1659       get_result->expiration = put_msg->expiration;
1660       get_result->type = put_msg->type;
1661       get_result->put_path_length = htons (msg_ctx->path_history_len);
1662
1663       /* Copy the actual data and the path_history to the end of the get result */
1664       memcpy (&get_result[1], &put_msg[1], data_size);
1665       path_offset = (char *) &get_result[1];
1666       path_offset += data_size;
1667       memcpy (path_offset, msg_ctx->path_history,
1668               msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1669       new_msg_ctx.peer = my_identity;
1670       new_msg_ctx.bloom = NULL;
1671       new_msg_ctx.hop_count = 0;
1672       /* Make result routing a higher priority */
1673       new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;
1674       new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1675       new_msg_ctx.unique_id = pos->uid;
1676       send_reply_to_client(pos->client, &get_result->header, &new_msg_ctx);
1677       GNUNET_free (get_result);
1678       pos = pos->next;
1679     }
1680   }
1681
1682   if (msg_ctx->closest != GNUNET_YES)
1683   {
1684     route_message (msg, msg_ctx);
1685     return;
1686   }
1687
1688 #if DEBUG_DHT
1689   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1691               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1692               msg_ctx->unique_id);
1693 #endif
1694
1695   increment_stats (STAT_PUTS_INSERTED);
1696
1697   route_message (msg, msg_ctx);
1698 }
1699
1700
1701 /**
1702  * To how many peers should we (on average)
1703  * forward the request to obtain the desired
1704  * target_replication count (on average).
1705  *
1706  * returns: target_replication / (est. hops) + (target_replication * hop_count)
1707  * where est. hops is typically 2 * the routing table depth
1708  *
1709  * @param hop_count number of hops the message has traversed
1710  * @param target_replication the number of total paths desired
1711  *
1712  * @return Some number of peers to forward the message to
1713  */
1714 static unsigned int
1715 get_forward_count (unsigned int hop_count, size_t target_replication)
1716 {
1717   uint32_t random_value;
1718   unsigned int forward_count;
1719   float target_value;
1720
1721   if (hop_count > log_of_network_size_estimate * 4.0)
1722   {
1723     /* forcefully terminate */
1724     return 0;
1725   }
1726
1727   if (hop_count > log_of_network_size_estimate * 2.0)
1728   {
1729     /* keep forwarding, but no more replication */
1730     return 1;
1731   }
1732
1733   target_value =
1734     1 + (target_replication - 1.0) / (log_of_network_size_estimate +
1735                                       ((float) (target_replication - 1.0) *
1736                                        hop_count));
1737   /* Set forward count to floor of target_value */
1738   forward_count = (unsigned int) target_value;
1739   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1740   target_value = target_value - forward_count;
1741   random_value =
1742     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
1743   if (random_value < (target_value * UINT32_MAX))
1744     forward_count++;
1745   return forward_count;
1746 }
1747
1748
1749 /**
1750  * Check whether my identity is closer than any known peers.
1751  * If a non-null bloomfilter is given, check if this is the closest
1752  * peer that hasn't already been routed to.
1753  *
1754  * @param target hash code to check closeness to
1755  * @param bloom bloomfilter, exclude these entries from the decision
1756  * @return GNUNET_YES if node location is closest,
1757  *         GNUNET_NO otherwise.
1758  */
1759 static int
1760 am_closest_peer (const GNUNET_HashCode * target,
1761                  struct GNUNET_CONTAINER_BloomFilter *bloom)
1762 {
1763   int bits;
1764   int other_bits;
1765   int bucket_num;
1766   int count;
1767   struct PeerInfo *pos;
1768   unsigned int my_distance;
1769
1770   if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode)))
1771     return GNUNET_YES;
1772
1773   bucket_num = find_current_bucket (target);
1774
1775   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target);
1776   my_distance = distance (&my_identity.hashPubKey, target);
1777   pos = k_buckets[bucket_num].head;
1778   count = 0;
1779   while ((pos != NULL) && (count < bucket_size))
1780   {
1781     if ((bloom != NULL) &&
1782         (GNUNET_YES ==
1783          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1784     {
1785       pos = pos->next;
1786       continue;                 /* Skip already checked entries */
1787     }
1788
1789     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target);
1790     if (other_bits > bits)
1791       return GNUNET_NO;
1792     else if (other_bits == bits)        /* We match the same number of bits */
1793     {
1794       if (distance (&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
1795         return GNUNET_NO;
1796     }
1797     pos = pos->next;
1798   }
1799
1800   /* No peers closer, we are the closest! */
1801   return GNUNET_YES;
1802 }
1803
1804
1805 /**
1806  * Select a peer from the routing table that would be a good routing
1807  * destination for sending a message for "target".  The resulting peer
1808  * must not be in the set of blocked peers.<p>
1809  *
1810  * Note that we should not ALWAYS select the closest peer to the
1811  * target, peers further away from the target should be chosen with
1812  * exponentially declining probability.
1813  *
1814  * @param target the key we are selecting a peer to route to
1815  * @param bloom a bloomfilter containing entries this request has seen already
1816  * @param hops how many hops has this message traversed thus far
1817  *
1818  * @return Peer to route to, or NULL on error
1819  */
1820 static struct PeerInfo *
1821 select_peer (const GNUNET_HashCode * target,
1822              struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
1823 {
1824   unsigned int bc;
1825   unsigned int count;
1826   unsigned int selected;
1827   struct PeerInfo *pos;
1828   unsigned int distance;
1829   unsigned int largest_distance;
1830   struct PeerInfo *chosen;
1831
1832   if (hops >= log_of_network_size_estimate)
1833   {
1834     /* greedy selection (closest peer that is not in bloomfilter) */
1835     largest_distance = 0;
1836     chosen = NULL;
1837     for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1838     {
1839       pos = k_buckets[bc].head;
1840       count = 0;
1841       while ((pos != NULL) && (count < bucket_size))
1842       {
1843         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
1844         if (GNUNET_NO ==
1845             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1846         {
1847           distance = inverse_distance (target, &pos->id.hashPubKey);
1848           if (distance > largest_distance)
1849           {
1850             chosen = pos;
1851             largest_distance = distance;
1852           }
1853         }
1854         count++;
1855         pos = pos->next;
1856       }
1857     }
1858     if ((largest_distance > 0) && (chosen != NULL))
1859     {
1860       GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey);
1861       return chosen;
1862     }
1863     return NULL;                /* no peer available or we are the closest */
1864   }
1865
1866
1867   /* select "random" peer */
1868   /* count number of peers that are available and not filtered */
1869   count = 0;
1870   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1871   {
1872     pos = k_buckets[bc].head;
1873     while ((pos != NULL) && (count < bucket_size))
1874     {
1875       if (GNUNET_YES ==
1876           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1877       {
1878         pos = pos->next;
1879         increment_stats ("# peer blocked from selection by Bloom filter");
1880         continue;               /* Ignore bloomfiltered peers */
1881       }
1882       count++;
1883       pos = pos->next;
1884     }
1885   }
1886   if (count == 0)               /* No peers to select from! */
1887   {
1888     increment_stats ("# failed to select peer");
1889     return NULL;
1890   }
1891   /* Now actually choose a peer */
1892   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1893   count = 0;
1894   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1895   {
1896     pos = k_buckets[bc].head;
1897     while ((pos != NULL) && (count < bucket_size))
1898     {
1899       if (GNUNET_YES ==
1900           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1901       {
1902         pos = pos->next;
1903         continue;               /* Ignore bloomfiltered peers */
1904       }
1905       if (0 == selected--)
1906         return pos;
1907       pos = pos->next;
1908     }
1909   }
1910   GNUNET_break (0);
1911   return NULL;
1912 }
1913
1914
1915 /**
1916  * Remember this routing request so that if a reply is
1917  * received we can either forward it to the correct peer
1918  * or return the result locally.
1919  *
1920  * @param msg_ctx Context of the route request
1921  *
1922  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1923  */
1924 static int
1925 cache_response (struct DHT_MessageContext *msg_ctx)
1926 {
1927   struct DHTQueryRecord *record;
1928   struct DHTRouteSource *source_info;
1929   struct DHTRouteSource *pos;
1930   struct GNUNET_TIME_Absolute now;
1931   unsigned int current_size;
1932
1933   current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
1934
1935   while (current_size >= MAX_OUTSTANDING_FORWARDS)
1936   {
1937     source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
1938     GNUNET_assert (source_info != NULL);
1939     record = source_info->record;
1940     GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1941     if (record->head == NULL)   /* No more entries in DLL */
1942     {
1943       GNUNET_assert (GNUNET_YES ==
1944                      GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1945                                                            &record->key,
1946                                                            record));
1947       GNUNET_free (record);
1948     }
1949     if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
1950     {
1951       GNUNET_SCHEDULER_cancel (source_info->delete_task);
1952       source_info->delete_task = GNUNET_SCHEDULER_NO_TASK;
1953     }
1954     if (source_info->find_peers_responded != NULL)
1955       GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1956     GNUNET_free (source_info);
1957     current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
1958   }
1959
1960   /** Non-local request and have too many outstanding forwards, discard! */
1961   if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
1962     return GNUNET_NO;
1963
1964   now = GNUNET_TIME_absolute_get ();
1965   record =
1966       GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1967   if (record != NULL)           /* Already know this request! */
1968   {
1969     pos = record->head;
1970     while (pos != NULL)
1971     {
1972       if (0 ==
1973           memcmp (&msg_ctx->peer, &pos->source,
1974                   sizeof (struct GNUNET_PeerIdentity)))
1975         break;                  /* Already have this peer in reply list! */
1976       pos = pos->next;
1977     }
1978     if ((pos != NULL) && (pos->client == msg_ctx->client))      /* Seen this already */
1979     {
1980       GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode,
1981                                          now.abs_value);
1982       return GNUNET_NO;
1983     }
1984   }
1985   else
1986   {
1987     record = GNUNET_malloc (sizeof (struct DHTQueryRecord));
1988     GNUNET_assert (GNUNET_OK ==
1989                    GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap,
1990                                                       &msg_ctx->key, record,
1991                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1992     memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1993   }
1994
1995   source_info = GNUNET_malloc (sizeof (struct DHTRouteSource));
1996   source_info->record = record;
1997   source_info->delete_task =
1998       GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry,
1999                                     source_info);
2000   source_info->find_peers_responded =
2001       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2002   source_info->source = msg_ctx->peer;
2003   GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail,
2004                                      source_info);
2005   if (msg_ctx->client != NULL)  /* For local request, set timeout so high it effectively never gets pushed out */
2006   {
2007     source_info->client = msg_ctx->client;
2008     now = GNUNET_TIME_absolute_get_forever ();
2009   }
2010   source_info->hnode =
2011       GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info,
2012                                     now.abs_value);
2013   source_info->uid = msg_ctx->unique_id;
2014 #if DEBUG_DHT > 1
2015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016               "`%s:%s': Created new forward source info for %s uid %llu\n",
2017               my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2018               msg_ctx->unique_id);
2019 #endif
2020   return GNUNET_YES;
2021 }
2022
2023
2024 /**
2025  * Main function that handles whether or not to route a message to other
2026  * peers.
2027  *
2028  * @param msg the message to be routed
2029  * @param msg_ctx the context containing all pertinent information about the message
2030  */
2031 static void
2032 route_message (const struct GNUNET_MessageHeader *msg,
2033                struct DHT_MessageContext *msg_ctx)
2034 {
2035   int i;
2036   struct PeerInfo *selected;
2037   unsigned int target_forward_count;
2038   unsigned int forward_count;
2039   struct RecentRequest *recent_req;
2040   char *stat_forward_count;
2041   char *temp_stat_str;
2042
2043   increment_stats (STAT_ROUTES);
2044   target_forward_count =
2045       get_forward_count (msg_ctx->hop_count, msg_ctx->replication);
2046   GNUNET_asprintf (&stat_forward_count, "# forward counts of %d",
2047                    target_forward_count);
2048   increment_stats (stat_forward_count);
2049   GNUNET_free (stat_forward_count);
2050   if (msg_ctx->bloom == NULL)
2051     msg_ctx->bloom =
2052         GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2053
2054   if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT)
2055   {
2056     recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2057     GNUNET_assert (recent_req != NULL);
2058     GNUNET_SCHEDULER_cancel (recent_req->remove_task);
2059     GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2060     GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2061     GNUNET_free (recent_req);
2062   }
2063
2064   recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
2065   recent_req->uid = msg_ctx->unique_id;
2066   memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
2067   recent_req->heap_node =
2068     GNUNET_CONTAINER_heap_insert (recent_heap, recent_req,
2069                                   GNUNET_TIME_absolute_get ().abs_value);
2070   recent_req->bloom =
2071     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2072
2073   forward_count = 0;
2074   for (i = 0; i < target_forward_count; i++)
2075   {
2076     selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
2077     if (selected == NULL)
2078       break;    
2079     forward_count++;
2080     if (GNUNET_CRYPTO_hash_matching_bits
2081         (&selected->id.hashPubKey,
2082          &msg_ctx->key) >=
2083         GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
2084                                           &msg_ctx->key))
2085       GNUNET_asprintf (&temp_stat_str,
2086                        "# requests routed to close(r) peer hop %u",
2087                        msg_ctx->hop_count);
2088     else
2089       GNUNET_asprintf (&temp_stat_str,
2090                        "# requests routed to less close peer hop %u",
2091                        msg_ctx->hop_count);
2092     if (temp_stat_str != NULL)
2093     {
2094       increment_stats (temp_stat_str);
2095       GNUNET_free (temp_stat_str);
2096     }
2097     GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
2098                                       &selected->id.hashPubKey);
2099     forward_message (msg, selected, msg_ctx);    
2100   }
2101
2102   if (msg_ctx->bloom != NULL)
2103   {
2104     GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom,
2105                                       DHT_BLOOM_SIZE);
2106     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2107     msg_ctx->bloom = NULL;
2108   }
2109 }
2110
2111
2112 /**
2113  * Main function that handles whether or not to route a message to other
2114  * peers.
2115  *
2116  * @param msg the message to be routed
2117  * @param msg_ctx the context containing all pertinent information about the message
2118  */
2119 static void
2120 demultiplex_message (const struct GNUNET_MessageHeader *msg,
2121                      struct DHT_MessageContext *msg_ctx)
2122 {
2123   /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */
2124   msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom);
2125
2126   switch (ntohs (msg->type))
2127   {
2128   case GNUNET_MESSAGE_TYPE_DHT_GET:    /* Add to hashmap of requests seen, search for data (always) */
2129     cache_response (msg_ctx);
2130     handle_dht_get (msg, msg_ctx);
2131     break;
2132   case GNUNET_MESSAGE_TYPE_DHT_PUT:    /* Check if closest, if so insert data. */
2133     increment_stats (STAT_PUTS);
2134     handle_dht_put (msg, msg_ctx);
2135     break;
2136   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:      /* Check if closest and not started by us, check options, add to requests seen */
2137     increment_stats (STAT_FIND_PEER);
2138     if (((msg_ctx->hop_count > 0) &&
2139          (0 !=
2140           memcmp (&msg_ctx->peer, &my_identity,
2141                   sizeof (struct GNUNET_PeerIdentity)))) ||
2142         (msg_ctx->client != NULL))
2143     {
2144       cache_response (msg_ctx);
2145       if ((msg_ctx->closest == GNUNET_YES) ||
2146           (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2147         handle_dht_find_peer (msg, msg_ctx);
2148     }
2149     else
2150       route_message (msg, msg_ctx);
2151     break;
2152   default:
2153     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2154                 "`%s': Message type (%d) not handled, forwarding anyway!\n",
2155                 "DHT", ntohs (msg->type));
2156     route_message (msg, msg_ctx);
2157   }
2158 }
2159
2160
2161 /**
2162  * Receive the HELLO from transport service,
2163  * free current and replace if necessary.
2164  *
2165  * @param cls NULL
2166  * @param message HELLO message of peer
2167  */
2168 static void
2169 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2170 {
2171 #if DEBUG_DHT
2172   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2173               "Received our `%s' from transport service\n", "HELLO");
2174 #endif
2175   GNUNET_assert (message != NULL);
2176   GNUNET_free_non_null (my_hello);
2177   my_hello = GNUNET_malloc (ntohs (message->size));
2178   memcpy (my_hello, message, ntohs (message->size));
2179 }
2180
2181
2182 /**
2183  * Task run during shutdown.
2184  *
2185  * @param cls unused
2186  * @param tc unused
2187  */
2188 static void
2189 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2190 {
2191   int bucket_count;
2192   struct PeerInfo *pos;
2193
2194   if (NULL != ghh)
2195   {
2196     GNUNET_TRANSPORT_get_hello_cancel (ghh);
2197     ghh = NULL;
2198   }
2199   if (transport_handle != NULL)
2200   {
2201     GNUNET_free_non_null (my_hello);
2202     GNUNET_TRANSPORT_disconnect (transport_handle);
2203     transport_handle = NULL;
2204   }
2205   GDS_NEIGHBOURS_done ();
2206   GDS_DATACACHE_done ();
2207   GDS_NSE_done ();
2208   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2209   {
2210     while (k_buckets[bucket_count].head != NULL)
2211     {
2212       pos = k_buckets[bucket_count].head;
2213 #if DEBUG_DHT
2214       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2215                   "%s:%s Removing peer %s from bucket %d!\n", my_short_id,
2216                   "DHT", GNUNET_i2s (&pos->id), bucket_count);
2217 #endif
2218       delete_peer (pos, bucket_count);
2219     }
2220   }
2221   if (stats != NULL)
2222   {
2223     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
2224     stats = NULL;
2225   }
2226   if (block_context != NULL)
2227   {
2228     GNUNET_BLOCK_context_destroy (block_context);
2229     block_context = NULL;
2230   }
2231 }
2232
2233
2234
2235 /**
2236  * Process dht requests.
2237  *
2238  * @param cls closure
2239  * @param server the initialized server
2240  * @param c configuration to use
2241  */
2242 static void
2243 run (void *cls, struct GNUNET_SERVER_Handle *server,
2244      const struct GNUNET_CONFIGURATION_Handle *c)
2245 {
2246   struct GNUNET_TIME_Relative next_send_time;
2247   unsigned long long temp_config_num;
2248
2249   cfg = c;
2250   GDS_DATACACHE_init ();
2251   coreAPI = GNUNET_CORE_connect (cfg,   /* Main configuration */
2252                                  DEFAULT_CORE_QUEUE_SIZE,       /* queue size */
2253                                  NULL,  /* Closure passed to DHT functions */
2254                                  &core_init,    /* Call core_init once connected */
2255                                  &handle_core_connect,  /* Handle connects */
2256                                  &handle_core_disconnect,       /* remove peers on disconnects */
2257                                  NULL,  /* Do we care about "status" updates? */
2258                                  NULL,  /* Don't want notified about all incoming messages */
2259                                  GNUNET_NO,     /* For header only inbound notification */
2260                                  NULL,  /* Don't want notified about all outbound messages */
2261                                  GNUNET_NO,     /* For header only outbound notification */
2262                                  core_handlers);        /* Register these handlers */
2263
2264   if (coreAPI == NULL)
2265     return;
2266   transport_handle =
2267       GNUNET_TRANSPORT_connect (cfg, NULL, NULL, NULL, NULL, NULL);
2268   if (transport_handle != NULL)
2269     ghh = GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
2270   else
2271     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2272                 "Failed to connect to transport service!\n");
2273   block_context = GNUNET_BLOCK_context_create (cfg);
2274   lowest_bucket = MAX_BUCKETS - 1;
2275   all_known_peers = GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2276   GNUNET_assert (all_known_peers != NULL);
2277
2278   if (GNUNET_OK ==
2279       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
2280                                              &temp_config_num))
2281   {
2282     bucket_size = (unsigned int) temp_config_num;
2283   }
2284
2285   stats = GNUNET_STATISTICS_create ("dht", cfg);
2286   next_send_time.rel_value =
2287     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
2288     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2289                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
2290                                2) -
2291                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
2292   find_peer_context.start = GNUNET_TIME_absolute_get ();
2293   GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
2294                                 &find_peer_context);  
2295
2296   /* Scheduled the task to clean up when shutdown is called */
2297   cleanup_task =
2298       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2299                                     &shutdown_task, NULL);
2300 }
2301
2302
2303 /**
2304  * The main function for the dht service.
2305  *
2306  * @param argc number of arguments from the command line
2307  * @param argv command line arguments
2308  * @return 0 ok, 1 on error
2309  */
2310 int
2311 main (int argc, char *const *argv)
2312 {
2313   int ret;
2314   struct RecentRequest *recent_req;
2315
2316   recent_heap =
2317     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2318   recent_find_peer_requests =
2319       GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2320   ret =
2321       (GNUNET_OK ==
2322        GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
2323                            NULL)) ? 0 : 1;
2324   while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0)
2325   {
2326     recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2327     GNUNET_assert (recent_req != NULL);
2328     GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2329     GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2330     GNUNET_free (recent_req);
2331   }
2332   GNUNET_CONTAINER_heap_destroy (recent_heap);
2333   recent_heap = NULL;
2334   GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
2335   recent_find_peer_requests = NULL;
2336   return ret;
2337 }
2338
2339 /* end of gnunet-service-dht.c */