don't try to connect to all peers, but only those advantageous for routing
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief main DHT service shell, building block for DHT implementations
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_getopt_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_service_lib.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_signal_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "dhtlog.h"
42 #include "dht.h"
43
44 /**
45  * How many buckets will we allow total.
46  */
47 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
48
49 /**
50  * What is the maximum number of peers in a given bucket.
51  */
52 #define DEFAULT_BUCKET_SIZE 8
53
54 /**
55  * Minimum number of peers we need for "good" routing,
56  * any less than this and we will allow messages to
57  * travel much further through the network!
58  */
59 #define MINIMUM_PEER_THRESHOLD 20
60
61 #define DHT_DEFAULT_FIND_PEER_REPLICATION 20
62
63 #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
64
65 #define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
66
67 /**
68  * Real maximum number of hops, at which point we refuse
69  * to forward the message.
70  */
71 #define MAX_HOPS 20
72
73 /**
74  * Linked list of messages to send to clients.
75  */
76 struct P2PPendingMessage
77 {
78   /**
79    * Pointer to next item in the list
80    */
81   struct P2PPendingMessage *next;
82
83   /**
84    * Pointer to previous item in the list
85    */
86   struct P2PPendingMessage *prev;
87
88   /**
89    * Message importance level.
90    */
91   unsigned int importance;
92
93   /**
94    * How long to wait before sending message.
95    */
96   struct GNUNET_TIME_Relative timeout;
97
98   /**
99    * Actual message to be sent; // avoid allocation
100    */
101   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
102
103 };
104
105 /**
106  * Per-peer information.
107  */
108 struct PeerInfo
109 {
110   /**
111    * Next peer entry (DLL)
112    */
113   struct PeerInfo *next;
114
115   /**
116    *  Prev peer entry (DLL)
117    */
118   struct PeerInfo *prev;
119
120   /**
121    * Head of pending messages to be sent to this peer.
122    */
123   struct P2PPendingMessage *head;
124
125   /**
126    * Tail of pending messages to be sent to this peer.
127    */
128   struct P2PPendingMessage *tail;
129
130   /**
131    * Core handle for sending messages to this peer.
132    */
133   struct GNUNET_CORE_TransmitHandle *th;
134
135   /**
136    * Task for scheduling message sends.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier send_task;
139
140   /**
141    * What is the average latency for replies received?
142    */
143   struct GNUNET_TIME_Relative latency;
144
145   /**
146    * Number of responses received
147    */
148   unsigned long long response_count;
149
150   /**
151    * Number of requests sent
152    */
153   unsigned long long request_count;
154
155   /**
156    * What is the identity of the peer?
157    */
158   struct GNUNET_PeerIdentity id;
159
160   /**
161    * Transport level distance to peer.
162    */
163   unsigned int distance;
164
165 };
166
167 /**
168  * Peers are grouped into buckets.
169  */
170 struct PeerBucket
171 {
172   /**
173    * Head of DLL
174    */
175   struct PeerInfo *head;
176
177   /**
178    * Tail of DLL
179    */
180   struct PeerInfo *tail;
181
182   /**
183    * Number of peers in the bucket.
184    */
185   unsigned int peers_size;
186 };
187
188 /**
189  * Linked list of messages to send to clients.
190  */
191 struct PendingMessage
192 {
193   /**
194    * Pointer to next item in the list
195    */
196   struct PendingMessage *next;
197
198   /**
199    * Pointer to previous item in the list
200    */
201   struct PendingMessage *prev;
202
203   /**
204    * Actual message to be sent; // avoid allocation
205    */
206   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
207
208 };
209
210 /**
211  * Struct containing information about a client,
212  * handle to connect to it, and any pending messages
213  * that need to be sent to it.
214  */
215 struct ClientList
216 {
217   /**
218    * Linked list of active clients
219    */
220   struct ClientList *next;
221
222   /**
223    * The handle to this client
224    */
225   struct GNUNET_SERVER_Client *client_handle;
226
227   /**
228    * Handle to the current transmission request, NULL
229    * if none pending.
230    */
231   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
232
233   /**
234    * Linked list of pending messages for this client
235    */
236   struct PendingMessage *pending_head;
237
238   /**
239    * Tail of linked list of pending messages for this client
240    */
241   struct PendingMessage *pending_tail;
242
243 };
244
245
246 /**
247  * Context containing information about a DHT message received.
248  */
249 struct DHT_MessageContext
250 {
251   /**
252    * The client this request was received from.
253    * (NULL if received from another peer)
254    */
255   struct ClientList *client;
256
257   /**
258    * The peer this request was received from.
259    * (NULL if received from local client)
260    */
261   const struct GNUNET_PeerIdentity *peer;
262
263   /**
264    * The key this request was about
265    */
266   const GNUNET_HashCode *key;
267
268   /**
269    * The unique identifier of this request
270    */
271   uint64_t unique_id;
272
273   /**
274    * Desired replication level
275    */
276   uint32_t replication;
277
278   /**
279    * Network size estimate, either ours or the sum of
280    * those routed to thus far. =~ Log of number of peers
281    * chosen from for this request.
282    */
283   uint32_t network_size;
284
285   /**
286    * Any message options for this request
287    */
288   uint32_t msg_options;
289
290   /**
291    * How many hops has the message already traversed?
292    */
293   uint32_t hop_count;
294
295   /**
296    * Bloomfilter for this routing request.
297    */
298   struct GNUNET_CONTAINER_BloomFilter *bloom;
299
300   /**
301    * Did we forward this message? (may need to remember it!)
302    */
303   int forwarded;
304
305   /**
306    * Are we the closest known peer to this key (out of our neighbors?)
307    */
308   int closest;
309 };
310
311 /**
312  * Record used for remembering what peers are waiting for what
313  * responses (based on search key).
314  */
315 struct DHTRouteSource
316 {
317
318   /**
319    * This is a DLL.
320    */
321   struct DHTRouteSource *next;
322
323   /**
324    * This is a DLL.
325    */
326   struct DHTRouteSource *prev;
327
328   /**
329    * Source of the request.  Replies should be forwarded to
330    * this peer.
331    */
332   struct GNUNET_PeerIdentity source;
333
334   /**
335    * If this was a local request, remember the client; otherwise NULL.
336    */
337   struct ClientList *client;
338
339   /**
340    * Pointer to this nodes heap location (for removal)
341    */
342   struct GNUNET_CONTAINER_HeapNode *hnode;
343
344   /**
345    * Back pointer to the record storing this information.
346    */
347   struct DHTQueryRecord *record;
348
349   /**
350    * Task to remove this entry on timeout.
351    */
352   GNUNET_SCHEDULER_TaskIdentifier delete_task;
353 };
354
355 /**
356  * Entry in the DHT routing table.
357  */
358 struct DHTQueryRecord
359 {
360   /**
361    * Head of DLL for result forwarding.
362    */
363   struct DHTRouteSource *head;
364
365   /**
366    * Tail of DLL for result forwarding.
367    */
368   struct DHTRouteSource *tail;
369
370   /**
371    * Key that the record concerns.
372    */
373   GNUNET_HashCode key;
374
375   /**
376    * GET message of this record (what we already forwarded?).
377    */
378   //DV_DHT_MESSAGE get; Try to get away with not saving this.
379
380   /**
381    * Bloomfilter of the peers we've replied to so far
382    */
383   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
384
385 };
386
387 /**
388  * DHT Routing results structure
389  */
390 struct DHTResults
391 {
392   /*
393    * Min heap for removal upon reaching limit
394    */
395   struct GNUNET_CONTAINER_Heap *minHeap;
396
397   /*
398    * Hashmap for fast key based lookup
399    */
400   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
401
402 };
403
404 /**
405  * Routing option to end routing when closest peer found.
406  */
407 static int stop_on_closest;
408
409 /**
410  * Routing option to end routing when data is found.
411  */
412 static int stop_on_found;
413
414 /**
415  * Container of active queries we should remember
416  */
417 static struct DHTResults forward_list;
418
419 /**
420  * Handle to the datacache service (for inserting/retrieving data)
421  */
422 static struct GNUNET_DATACACHE_Handle *datacache;
423
424 /**
425  * The main scheduler to use for the DHT service
426  */
427 static struct GNUNET_SCHEDULER_Handle *sched;
428
429 /**
430  * The configuration the DHT service is running with
431  */
432 static const struct GNUNET_CONFIGURATION_Handle *cfg;
433
434 /**
435  * Handle to the core service
436  */
437 static struct GNUNET_CORE_Handle *coreAPI;
438
439 /**
440  * Handle to the transport service, for getting our hello
441  */
442 static struct GNUNET_TRANSPORT_Handle *transport_handle;
443
444 /**
445  * The identity of our peer.
446  */
447 static struct GNUNET_PeerIdentity my_identity;
448
449 /**
450  * Short id of the peer, for printing
451  */
452 static char *my_short_id;
453
454 /**
455  * Our HELLO
456  */
457 static struct GNUNET_MessageHeader *my_hello;
458
459 /**
460  * Task to run when we shut down, cleaning up all our trash
461  */
462 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
463
464 /**
465  * The lowest currently used bucket.
466  */
467 static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
468
469 /**
470  * The buckets (Kademlia routing table, complete with growth).
471  * Array of size MAX_BUCKET_SIZE.
472  */
473 static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
474
475 /**
476  * Maximum size for each bucket.
477  */
478 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
479
480 /**
481  * List of active clients.
482  */
483 static struct ClientList *client_list;
484
485 /**
486  * Handle to the DHT logger.
487  */
488 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
489
490 /*
491  * Whether or not to send routing debugging information
492  * to the dht logging server
493  */
494 static unsigned int debug_routes;
495
496 /*
497  * Whether or not to send FULL route information to
498  * logging server
499  */
500 static unsigned int debug_routes_extended;
501
502 /**
503  * Forward declaration.
504  */
505 static size_t send_generic_reply (void *cls, size_t size, void *buf);
506
507 /* Declare here so retry_core_send is aware of it */
508 size_t core_transmit_notify (void *cls,
509                              size_t size, void *buf);
510
511 /**
512  *  Try to send another message from our core send list
513  */
514 static void
515 try_core_send (void *cls,
516                const struct GNUNET_SCHEDULER_TaskContext *tc)
517 {
518   struct PeerInfo *peer = cls;
519   struct P2PPendingMessage *pending;
520   size_t ssize;
521
522   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
523
524   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
525     return;
526
527   if (peer->th != NULL)
528     return; /* Message send already in progress */
529
530   pending = peer->head;
531   if (pending != NULL)
532     {
533       ssize = ntohs(pending->msg->size);
534 #if DEBUG_DHT > 1
535      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
536                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
537                 "DHT", ssize, GNUNET_i2s(&peer->id));
538 #endif
539       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
540                                                    pending->timeout, &peer->id,
541                                                    ssize, &core_transmit_notify, peer);
542     }
543 }
544
545 /**
546  * Function called to send a request out to another peer.
547  * Called both for locally initiated requests and those
548  * received from other peers.
549  *
550  * @param cls DHT service closure argument
551  * @param msg the encapsulated message
552  * @param peer the peer to forward the message to
553  * @param msg_ctx the context of the message (hop count, bloom, etc.)
554  */
555 static void forward_result_message (void *cls,
556                                     const struct GNUNET_MessageHeader *msg,
557                                     struct PeerInfo *peer,
558                                     struct DHT_MessageContext *msg_ctx)
559 {
560   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
561   struct P2PPendingMessage *pending;
562   size_t msize;
563   size_t psize;
564
565   msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
566   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
567   psize = sizeof(struct P2PPendingMessage) + msize;
568   pending = GNUNET_malloc(psize);
569   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
570   pending->importance = DHT_SEND_PRIORITY;
571   pending->timeout = GNUNET_TIME_relative_get_forever();
572   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
573   result_message->header.size = htons(msize);
574   result_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
575   result_message->options = htonl(msg_ctx->msg_options);
576   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
577   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
578   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
579   memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
580   memcpy(&result_message[1], msg, ntohs(msg->size));
581 #if DEBUG_DHT > 1
582   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
583 #endif
584   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
585   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
586     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
587 }
588 /**
589  * Called when core is ready to send a message we asked for
590  * out to the destination.
591  *
592  * @param cls closure (NULL)
593  * @param size number of bytes available in buf
594  * @param buf where the callee should write the message
595  * @return number of bytes written to buf
596  */
597 size_t core_transmit_notify (void *cls,
598                              size_t size, void *buf)
599 {
600   struct PeerInfo *peer = cls;
601   char *cbuf = buf;
602   struct P2PPendingMessage *pending;
603
604   size_t off;
605   size_t msize;
606
607   if (buf == NULL)
608     {
609       /* client disconnected */
610 #if DEBUG_DHT
611       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
612 #endif
613       return 0;
614     }
615
616   if (peer->head == NULL)
617     return 0;
618
619   peer->th = NULL;
620   off = 0;
621   pending = peer->head;
622   msize = ntohs(pending->msg->size);
623   if (msize <= size)
624     {
625       off = msize;
626       memcpy (cbuf, pending->msg, msize);
627       GNUNET_CONTAINER_DLL_remove (peer->head,
628                                    peer->tail,
629                                    pending);
630 #if DEBUG_DHT > 1
631       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Removing pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
632 #endif
633       GNUNET_free (pending);
634     }
635 #if SMART
636   while (NULL != pending &&
637           (size - off >= (msize = ntohs (pending->msg->size))))
638     {
639 #if DEBUG_DHT_ROUTING
640       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d\n", my_short_id, "dht service", msize, size);
641 #endif
642       memcpy (&cbuf[off], pending->msg, msize);
643       off += msize;
644       GNUNET_CONTAINER_DLL_remove (peer->head,
645                                    peer->tail,
646                                    pending);
647       GNUNET_free (pending);
648       pending = peer->head;
649     }
650 #endif
651   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
652     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
653 #if DEBUG_DHT > 1
654   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d, returning %d\n", my_short_id, "dht service", msize, size, off);
655 #endif
656   return off;
657 }
658
659 /**
660  * Determine how many low order bits match in two
661  * GNUNET_HashCodes.  i.e. - 010011 and 011111 share
662  * the first two lowest order bits, and therefore the
663  * return value is two (NOT XOR distance, nor how many
664  * bits match absolutely!).
665  *
666  * @param first the first hashcode
667  * @param second the hashcode to compare first to
668  *
669  * @return the number of bits that match
670  */
671 static unsigned int matching_bits(const GNUNET_HashCode *first, const GNUNET_HashCode *second)
672 {
673   unsigned int i;
674
675   for (i = 0; i < sizeof (GNUNET_HashCode) * 8; i++)
676     if (GNUNET_CRYPTO_hash_get_bit (first, i) != GNUNET_CRYPTO_hash_get_bit (second, i))
677       return i;
678   return sizeof (GNUNET_HashCode) * 8;
679 }
680
681 /**
682  * Compute the distance between have and target as a 32-bit value.
683  * Differences in the lower bits must count stronger than differences
684  * in the higher bits.
685  *
686  * @return 0 if have==target, otherwise a number
687  *           that is larger as the distance between
688  *           the two hash codes increases
689  */
690 static unsigned int
691 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
692 {
693   unsigned int bucket;
694   unsigned int msb;
695   unsigned int lsb;
696   unsigned int i;
697
698   /* We have to represent the distance between two 2^9 (=512)-bit
699      numbers as a 2^5 (=32)-bit number with "0" being used for the
700      two numbers being identical; furthermore, we need to
701      guarantee that a difference in the number of matching
702      bits is always represented in the result.
703
704      We use 2^32/2^9 numerical values to distinguish between
705      hash codes that have the same LSB bit distance and
706      use the highest 2^9 bits of the result to signify the
707      number of (mis)matching LSB bits; if we have 0 matching
708      and hence 512 mismatching LSB bits we return -1 (since
709      512 itself cannot be represented with 9 bits) */
710
711   /* first, calculate the most significant 9 bits of our
712      result, aka the number of LSBs */
713   bucket = matching_bits (target, have);
714   /* bucket is now a value between 0 and 512 */
715   if (bucket == 512)
716     return 0;                   /* perfect match */
717   if (bucket == 0)
718     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
719                                    below, we'd end up with max+1 (overflow)) */
720
721   /* calculate the most significant bits of the final result */
722   msb = (512 - bucket) << (32 - 9);
723   /* calculate the 32-9 least significant bits of the final result by
724      looking at the differences in the 32-9 bits following the
725      mismatching bit at 'bucket' */
726   lsb = 0;
727   for (i = bucket + 1;
728        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
729     {
730       if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
731         lsb |= (1 << (bucket + 32 - 9 - i));    /* first bit set will be 10,
732                                                    last bit set will be 31 -- if
733                                                    i does not reach 512 first... */
734     }
735   return msb | lsb;
736 }
737
738 /**
739  * Return a number that is larger the closer the
740  * "have" GNUNET_hash code is to the "target".
741  *
742  * @return inverse distance metric, non-zero.
743  *         Must fudge the value if NO bits match.
744  */
745 static unsigned int
746 inverse_distance (const GNUNET_HashCode * target,
747                   const GNUNET_HashCode * have)
748 {
749   if (matching_bits(target, have) == 0)
750     return 1; /* Never return 0! */
751   return ((unsigned int) -1) - distance (target, have);
752 }
753
754 /**
755  * Find the optimal bucket for this key, regardless
756  * of the current number of buckets in use.
757  *
758  * @param hc the hashcode to compare our identity to
759  *
760  * @return the proper bucket index, or GNUNET_SYSERR
761  *         on error (same hashcode)
762  */
763 static int find_bucket(const GNUNET_HashCode *hc)
764 {
765   unsigned int bits;
766
767   bits = matching_bits(&my_identity.hashPubKey, hc);
768   if (bits == MAX_BUCKETS)
769     return GNUNET_SYSERR;
770   return MAX_BUCKETS - bits - 1;
771 }
772
773 /**
774  * Find which k-bucket this peer should go into,
775  * taking into account the size of the k-bucket
776  * array.  This means that if more bits match than
777  * there are currently buckets, lowest_bucket will
778  * be returned.
779  *
780  * @param hc GNUNET_HashCode we are finding the bucket for.
781  *
782  * @return the proper bucket index for this key,
783  *         or GNUNET_SYSERR on error (same hashcode)
784  */
785 static int find_current_bucket(const GNUNET_HashCode *hc)
786 {
787   int actual_bucket;
788   actual_bucket = find_bucket(hc);
789
790   if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
791     return GNUNET_SYSERR;
792   else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
793     return lowest_bucket;
794   else
795     return actual_bucket;
796 }
797
798 /**
799  * Find a routing table entry from a peer identity
800  *
801  * @param peer the peer identity to look up
802  *
803  * @return the routing table entry, or NULL if not found
804  */
805 static struct PeerInfo *
806 find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
807 {
808   int bucket;
809   struct PeerInfo *pos;
810   bucket = find_current_bucket(&peer->hashPubKey);
811
812   if (bucket == GNUNET_SYSERR)
813     return NULL;
814
815   pos = k_buckets[bucket].head;
816   while (pos != NULL)
817     {
818       if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
819         return pos;
820       pos = pos->next;
821     }
822   return NULL; /* No such peer. */
823 }
824
825 /**
826  * Really add a peer to a bucket (only do assertions
827  * on size, etc.)
828  *
829  * @param peer GNUNET_PeerIdentity of the peer to add
830  * @param bucket the already figured out bucket to add
831  *        the peer to
832  * @param latency the core reported latency of this peer
833  * @param distance the transport level distance to this peer
834  */
835 static void add_peer(const struct GNUNET_PeerIdentity *peer,
836                      unsigned int bucket,
837                      struct GNUNET_TIME_Relative latency,
838                      unsigned int distance)
839 {
840   struct PeerInfo *new_peer;
841   GNUNET_assert(bucket < MAX_BUCKETS);
842   GNUNET_assert(peer != NULL);
843   new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
844   new_peer->latency = latency;
845   new_peer->distance = distance;
846   memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
847
848   GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
849                                     k_buckets[bucket].tail,
850                                     k_buckets[bucket].tail,
851                                     new_peer);
852   k_buckets[bucket].peers_size++;
853 }
854
855 /**
856  * Given a peer and its corresponding bucket,
857  * remove it from that bucket.  Does not free
858  * the PeerInfo struct, nor cancel messages
859  * or free messages waiting to be sent to this
860  * peer!
861  *
862  * @param peer the peer to remove
863  * @param bucket the bucket the peer belongs to
864  */
865 static void remove_peer (struct PeerInfo *peer,
866                          unsigned int bucket)
867 {
868   GNUNET_assert(k_buckets[bucket].peers_size > 0);
869   GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
870                               k_buckets[bucket].tail,
871                               peer);
872   k_buckets[bucket].peers_size--;
873 }
874
875 /**
876  * Removes peer from a bucket, then frees associated
877  * resources and frees peer.
878  *
879  * @param peer peer to be removed and freed
880  * @param bucket which bucket this peer belongs to
881  */
882 static void delete_peer (struct PeerInfo *peer,
883                          unsigned int bucket)
884 {
885   struct P2PPendingMessage *pos;
886   struct P2PPendingMessage *next;
887   remove_peer(peer, bucket); /* First remove the peer from its bucket */
888
889   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
890     GNUNET_SCHEDULER_cancel(sched, peer->send_task);
891   if (peer->th != NULL)
892     GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
893
894   pos = peer->head;
895   while (pos != NULL) /* Remove any pending messages for this peer */
896     {
897       next = pos->next;
898       GNUNET_free(pos);
899       pos = next;
900     }
901   GNUNET_free(peer);
902 }
903
904 /**
905  * The current lowest bucket is full, so change the lowest
906  * bucket to the next lower down, and move any appropriate
907  * entries in the current lowest bucket to the new bucket.
908  */
909 static void enable_next_bucket()
910 {
911   unsigned int new_bucket;
912   unsigned int to_remove;
913   int i;
914   struct PeerInfo *to_remove_list[bucket_size]; /* We either use CPU by making a list, or memory with array.  Use memory. */
915   struct PeerInfo *pos;
916   GNUNET_assert(lowest_bucket > 0);
917
918   pos = k_buckets[lowest_bucket].head;
919   memset(to_remove_list, 0, sizeof(to_remove_list));
920   to_remove = 0;
921   /* Populate the array of peers which should be in the next lowest bucket */
922   while (pos->next != NULL)
923     {
924       if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
925         {
926           to_remove_list[to_remove] = pos;
927           to_remove++;
928         }
929       pos = pos->next;
930     }
931   new_bucket = lowest_bucket - 1;
932
933   /* Remove peers from lowest bucket, insert into next lowest bucket */
934   for (i = 0; i < bucket_size; i++)
935     {
936       if (to_remove_list[i] != NULL)
937         {
938           remove_peer(to_remove_list[i], lowest_bucket);
939           GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
940                                             k_buckets[new_bucket].tail,
941                                             k_buckets[new_bucket].tail,
942                                             to_remove_list[i]);
943           k_buckets[new_bucket].peers_size++;
944         }
945       else
946         break;
947     }
948   lowest_bucket = new_bucket;
949 }
950 /**
951  * Attempt to add a peer to our k-buckets.
952  *
953  * @param peer, the peer identity of the peer being added
954  *
955  * @return GNUNET_YES if the peer was added,
956  *         GNUNET_NO if not,
957  *         GNUNET_SYSERR on err (peer is us!)
958  */
959 static int try_add_peer(const struct GNUNET_PeerIdentity *peer,
960                         unsigned int bucket,
961                         struct GNUNET_TIME_Relative latency,
962                         unsigned int distance)
963 {
964   int peer_bucket;
965
966   peer_bucket = find_current_bucket(&peer->hashPubKey);
967   if (peer_bucket == GNUNET_SYSERR)
968     return GNUNET_SYSERR;
969
970   GNUNET_assert(peer_bucket >= lowest_bucket);
971   if ((k_buckets[peer_bucket].peers_size) < bucket_size)
972     {
973       add_peer(peer, peer_bucket, latency, distance);
974       return GNUNET_YES;
975     }
976   else if ((peer_bucket == lowest_bucket) && (lowest_bucket > 0))
977     {
978       enable_next_bucket();
979       return try_add_peer(peer, bucket, latency, distance); /* Recurse, if proper bucket still full ping peers */
980     }
981   else if ((k_buckets[peer_bucket].peers_size) == bucket_size)
982     {
983       /* TODO: implement ping_oldest_peer */
984       //ping_oldest_peer(bucket, peer, latency, distance); /* Find oldest peer, ping it.  If no response, remove and add new peer! */
985       return GNUNET_NO;
986     }
987   GNUNET_break(0);
988   return GNUNET_NO;
989 }
990
991
992 /**
993  * Task run to check for messages that need to be sent to a client.
994  *
995  * @param client a ClientList, containing the client and any messages to be sent to it
996  */
997 static void
998 process_pending_messages (struct ClientList *client)
999
1000   if (client->pending_head == NULL) 
1001     return;    
1002   if (client->transmit_handle != NULL) 
1003     return;
1004   client->transmit_handle =
1005     GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1006                                          ntohs (client->pending_head->msg->
1007                                                 size),
1008                                          GNUNET_TIME_UNIT_FOREVER_REL,
1009                                          &send_generic_reply, client);
1010 }
1011
1012 /**
1013  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1014  * request.  A ClientList is passed as closure, take the head of the list
1015  * and copy it into buf, which has the result of sending the message to the
1016  * client.
1017  *
1018  * @param cls closure to this call
1019  * @param size maximum number of bytes available to send
1020  * @param buf where to copy the actual message to
1021  *
1022  * @return the number of bytes actually copied, 0 indicates failure
1023  */
1024 static size_t
1025 send_generic_reply (void *cls, size_t size, void *buf)
1026 {
1027   struct ClientList *client = cls;
1028   char *cbuf = buf;
1029   struct PendingMessage *reply;
1030   size_t off;
1031   size_t msize;
1032
1033   client->transmit_handle = NULL;
1034   if (buf == NULL)             
1035     {
1036       /* client disconnected */
1037       return 0;
1038     }
1039   off = 0;
1040   while ( (NULL != (reply = client->pending_head)) &&
1041           (size >= off + (msize = ntohs (reply->msg->size))))
1042     {
1043       GNUNET_CONTAINER_DLL_remove (client->pending_head,
1044                                    client->pending_tail,
1045                                    reply);
1046       memcpy (&cbuf[off], reply->msg, msize);
1047       GNUNET_free (reply);
1048       off += msize;
1049     }
1050   process_pending_messages (client);
1051   return off;
1052 }
1053
1054
1055 /**
1056  * Add a PendingMessage to the clients list of messages to be sent
1057  *
1058  * @param client the active client to send the message to
1059  * @param pending_message the actual message to send
1060  */
1061 static void
1062 add_pending_message (struct ClientList *client,
1063                      struct PendingMessage *pending_message)
1064 {
1065   GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
1066                                      client->pending_tail,
1067                                      client->pending_tail,
1068                                      pending_message);
1069   process_pending_messages (client);
1070 }
1071
1072
1073
1074
1075 /**
1076  * Called when a reply needs to be sent to a client, as
1077  * a result it found to a GET or FIND PEER request.
1078  *
1079  * @param client the client to send the reply to
1080  * @param message the encapsulated message to send
1081  * @param uid the unique identifier of this request
1082  */
1083 static void
1084 send_reply_to_client (struct ClientList *client,
1085                       const struct GNUNET_MessageHeader *message,
1086                       unsigned long long uid)
1087 {
1088   struct GNUNET_DHT_RouteResultMessage *reply;
1089   struct PendingMessage *pending_message;
1090   uint16_t msize;
1091   size_t tsize;
1092 #if DEBUG_DHT
1093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094               "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
1095 #endif
1096   msize = ntohs (message->size);
1097   tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
1098   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1099     {
1100       GNUNET_break_op (0);
1101       return;
1102     }
1103
1104   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1105   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
1106   reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
1107   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1108   reply->header.size = htons (tsize);
1109   reply->unique_id = GNUNET_htonll (uid);
1110   memcpy (&reply[1], message, msize);
1111
1112   add_pending_message (client, pending_message);
1113 }
1114
1115 /**
1116  * Consider whether or not we would like to have this peer added to
1117  * our routing table.  Check whether bucket for this peer is full,
1118  * if so return negative; if not return positive.  Since peers are
1119  * only added on CORE level connect, this doesn't actually add the
1120  * peer to the routing table.
1121  *
1122  * @param peer the peer we are considering adding
1123  *
1124  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1125  *         already full)
1126  *
1127  * FIXME: Think about making a context for this call so that we can
1128  *        ping the oldest peer in the current bucket and consider
1129  *        removing it in lieu of the new peer.
1130  */
1131 static int consider_peer (struct GNUNET_PeerIdentity *peer)
1132 {
1133   int bucket;
1134
1135   bucket = find_current_bucket(&peer->hashPubKey);
1136   if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1137     return GNUNET_YES;
1138
1139   return GNUNET_NO;
1140 }
1141
1142 /**
1143  * Main function that handles whether or not to route a result
1144  * message to other peers, or to send to our local client.
1145  *
1146  * @param msg the result message to be routed
1147  * @return the number of peers the message was routed to,
1148  *         GNUNET_SYSERR on failure
1149  */
1150 static int route_result_message(void *cls,
1151                                 struct GNUNET_MessageHeader *msg,
1152                                 struct DHT_MessageContext *message_context)
1153 {
1154   struct GNUNET_PeerIdentity new_peer;
1155   struct DHTQueryRecord *record;
1156   struct DHTRouteSource *pos;
1157   struct PeerInfo *peer_info;
1158   struct GNUNET_MessageHeader *hello_msg;
1159
1160   /**
1161    * If a find peer result message is received and contains a valid
1162    * HELLO for another peer, offer it to the transport service.
1163    *
1164    * FIXME: Check whether we need this peer (based on routing table
1165    * fullness) and only try to connect to it conditionally.  This should
1166    * reduce trying to connect to say (500) peers when the bucket size will
1167    * discard most of them.
1168    */
1169   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1170     {
1171       if (ntohs(msg->size) <= sizeof(struct GNUNET_MessageHeader))
1172         GNUNET_break_op(0);
1173
1174       hello_msg = &msg[1];
1175       if ((ntohs(hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) || (GNUNET_SYSERR == GNUNET_HELLO_get_id(hello_msg, &new_peer)))
1176       {
1177         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received non-HELLO message type in find peer result message!\n", my_short_id, "DHT");
1178         GNUNET_break_op(0);
1179       }
1180       else /* We have a valid hello, and peer id stored in new_peer */
1181       {
1182         if (GNUNET_YES == consider_peer(&new_peer))
1183         {
1184           GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received HELLO message for another peer, offering to transport!\n", my_short_id, "DHT");
1185           GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg);
1186           GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, &new_peer, NULL, NULL); /* FIXME: Do we need this??? */
1187         }
1188       }
1189
1190     }
1191   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, message_context->key);
1192   if (record == NULL) /* No record of this message! */
1193     {
1194 #if DEBUG_DHT
1195     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1196                 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
1197                 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1198 #endif
1199 #if DEBUG_DHT_ROUTING
1200
1201       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1202         {
1203           dhtlog_handle->insert_route (NULL,
1204                                        message_context->unique_id,
1205                                        DHTLOG_RESULT,
1206                                        message_context->hop_count,
1207                                        GNUNET_SYSERR,
1208                                        &my_identity,
1209                                        message_context->key,
1210                                        message_context->peer, NULL);
1211         }
1212 #endif
1213       if (message_context->bloom != NULL)
1214         {
1215           GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1216           message_context->bloom = NULL;
1217         }
1218       return 0;
1219     }
1220
1221   pos = record->head;
1222   while (pos != NULL)
1223     {
1224       if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
1225         {
1226 #if DEBUG_DHT
1227           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228                       "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
1229                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1230 #endif
1231 #if DEBUG_DHT_ROUTING
1232           if ((debug_routes_extended) && (dhtlog_handle != NULL))
1233             {
1234               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
1235                                            message_context->hop_count,
1236                                            GNUNET_YES, &my_identity, message_context->key,
1237                                            message_context->peer, NULL);
1238             }
1239 #endif
1240           send_reply_to_client(pos->client, msg, message_context->unique_id);
1241         }
1242       else /* Send to peer */
1243         {
1244           peer_info = find_peer_by_id(&pos->source);
1245           if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1246             {
1247               pos = pos->next;
1248               continue;
1249             }
1250
1251           if (message_context->bloom == NULL)
1252             message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1253           GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1254           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (message_context->bloom, &peer_info->id.hashPubKey))
1255             {
1256 #if DEBUG_DHT
1257               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                           "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
1259                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1260 #endif
1261 #if DEBUG_DHT_ROUTING
1262               if ((debug_routes_extended) && (dhtlog_handle != NULL))
1263                 {
1264                   dhtlog_handle->insert_route (NULL, message_context->unique_id,
1265                                                DHTLOG_RESULT,
1266                                                message_context->hop_count,
1267                                                GNUNET_NO, &my_identity, message_context->key,
1268                                                message_context->peer, &pos->source);
1269                 }
1270 #endif
1271               forward_result_message(cls, msg, peer_info, message_context);
1272             }
1273           else
1274             {
1275 #if DEBUG_DHT
1276               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277                           "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
1278                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1279 #endif
1280             }
1281         }
1282       pos = pos->next;
1283     }
1284   if (message_context->bloom != NULL)
1285     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1286   return 0;
1287 }
1288
1289 /**
1290  * Iterator for local get request results,
1291  *
1292  * @param cls closure for iterator, a DatacacheGetContext
1293  * @param exp when does this value expire?
1294  * @param key the key this data is stored under
1295  * @param size the size of the data identified by key
1296  * @param data the actual data
1297  * @param type the type of the data
1298  *
1299  * @return GNUNET_OK to continue iteration, anything else
1300  * to stop iteration.
1301  */
1302 static int
1303 datacache_get_iterator (void *cls,
1304                         struct GNUNET_TIME_Absolute exp,
1305                         const GNUNET_HashCode * key,
1306                         uint32_t size, const char *data, uint32_t type)
1307 {
1308   struct DHT_MessageContext *msg_ctx = cls;
1309   struct DHT_MessageContext *new_msg_ctx;
1310   struct GNUNET_DHT_GetResultMessage *get_result;
1311 #if DEBUG_DHT
1312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1313               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
1314 #endif
1315   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
1316   memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
1317   get_result =
1318     GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
1319   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1320   get_result->header.size =
1321     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
1322   get_result->expiration = GNUNET_TIME_absolute_hton(exp);
1323   get_result->type = htons (type);
1324   memcpy (&get_result[1], data, size);
1325   new_msg_ctx->peer = &my_identity;
1326   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1327   new_msg_ctx->hop_count = 0;
1328   route_result_message(cls, &get_result->header, new_msg_ctx);
1329   GNUNET_free(new_msg_ctx);
1330   //send_reply_to_client (datacache_get_ctx->client, &get_result->header,
1331   //                      datacache_get_ctx->unique_id);
1332   GNUNET_free (get_result);
1333   return GNUNET_OK;
1334 }
1335
1336
1337 /**
1338  * Server handler for all dht get requests, look for data,
1339  * if found, send response either to clients or other peers.
1340  *
1341  * @param cls closure for service
1342  * @param msg the actual get message
1343  * @param message_context struct containing pertinent information about the get request
1344  *
1345  * @return number of items found for GET request
1346  */
1347 static unsigned int
1348 handle_dht_get (void *cls, 
1349                 const struct GNUNET_MessageHeader *msg,
1350                 struct DHT_MessageContext *message_context)
1351 {
1352   const struct GNUNET_DHT_GetMessage *get_msg;
1353   uint16_t get_type;
1354   unsigned int results;
1355
1356   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1357   if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
1358     {
1359       GNUNET_break (0);
1360       return 0;
1361     }
1362
1363   get_type = ntohs (get_msg->type);
1364 #if DEBUG_DHT
1365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366               "`%s:%s': Received `%s' request from client, message type %u, key %s, uid %llu\n", my_short_id,
1367               "DHT", "GET", get_type, GNUNET_h2s (message_context->key),
1368               message_context->unique_id);
1369 #endif
1370
1371   results = 0;
1372   if (datacache != NULL)
1373     results =
1374       GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
1375                             &datacache_get_iterator, message_context);
1376
1377   if (results >= 1)
1378     {
1379 #if DEBUG_DHT
1380       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381                   "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
1382                   results, "GET", message_context->unique_id);
1383 #endif
1384 #if DEBUG_DHT_ROUTING
1385       if ((debug_routes) && (dhtlog_handle != NULL))
1386         {
1387           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1388                                 message_context->hop_count, GNUNET_YES, &my_identity,
1389                                 message_context->key);
1390         }
1391
1392       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1393         {
1394           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1395                                        message_context->hop_count, GNUNET_YES,
1396                                        &my_identity, message_context->key, message_context->peer,
1397                                        NULL);
1398         }
1399 #endif
1400     }
1401
1402   if (message_context->hop_count == 0) /* Locally initiated request */
1403     {
1404 #if DEBUG_DHT_ROUTING
1405     if ((debug_routes) && (dhtlog_handle != NULL))
1406       {
1407         dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1408                                       message_context->hop_count, GNUNET_NO, &my_identity,
1409                                       message_context->key);
1410       }
1411 #endif
1412     }
1413
1414   return results;
1415 }
1416
1417
1418 /**
1419  * Server handler for initiating local dht find peer requests
1420  *
1421  * @param cls closure for service
1422  * @param find_msg the actual find peer message
1423  * @param message_context struct containing pertinent information about the request
1424  *
1425  */
1426 static void
1427 handle_dht_find_peer (void *cls, 
1428                       const struct GNUNET_MessageHeader *find_msg,
1429                       struct DHT_MessageContext *message_context)
1430 {
1431   struct GNUNET_MessageHeader *find_peer_result;
1432   struct DHT_MessageContext *new_msg_ctx;
1433   size_t hello_size;
1434   size_t tsize;
1435
1436 #if DEBUG_DHT
1437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
1439               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
1440               ntohs (find_msg->size),
1441               sizeof (struct GNUNET_MessageHeader));
1442 #endif
1443   if ((my_hello == NULL) || (message_context->closest != GNUNET_YES))
1444   {
1445 #if DEBUG_DHT
1446     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1447                 "`%s': Our HELLO is null, can't return.\n",
1448                 "DHT");
1449 #endif
1450     return;
1451   }
1452   /* Simplistic find_peer functionality, always return our hello */
1453   hello_size = ntohs(my_hello->size);
1454   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
1455
1456   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1457     {
1458       GNUNET_break_op (0);
1459       return;
1460     }
1461
1462   find_peer_result = GNUNET_malloc (tsize);
1463   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
1464   find_peer_result->size = htons (tsize);
1465   memcpy (&find_peer_result[1], my_hello, hello_size);
1466
1467   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1468               "`%s': Sending hello size %d to requesting peer.\n",
1469               "DHT", hello_size);
1470
1471   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
1472   memcpy(new_msg_ctx, message_context, sizeof(struct DHT_MessageContext));
1473   new_msg_ctx->peer = &my_identity;
1474   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1475   new_msg_ctx->hop_count = 0;
1476   route_result_message(cls, find_peer_result, new_msg_ctx);
1477   //send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
1478   GNUNET_free(find_peer_result);
1479 }
1480
1481
1482 /**
1483  * Server handler for initiating local dht put requests
1484  *
1485  * @param cls closure for service
1486  * @param msg the actual put message
1487  * @param message_context struct containing pertinent information about the request
1488  */
1489 static void
1490 handle_dht_put (void *cls,
1491                 const struct GNUNET_MessageHeader *msg,
1492                 struct DHT_MessageContext *message_context)
1493 {
1494   struct GNUNET_DHT_PutMessage *put_msg;
1495   size_t put_type;
1496   size_t data_size;
1497
1498   GNUNET_assert (ntohs (msg->size) >=
1499                  sizeof (struct GNUNET_DHT_PutMessage));
1500   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
1501   put_type = ntohs (put_msg->type);
1502   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1503 #if DEBUG_DHT
1504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1506               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (message_context->key), message_context->unique_id);
1507 #endif
1508 #if DEBUG_DHT_ROUTING
1509
1510   if ((debug_routes) && (dhtlog_handle != NULL))
1511     {
1512       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
1513                                    message_context->hop_count, GNUNET_YES, &my_identity,
1514                                    message_context->key);
1515     }
1516 #endif
1517
1518   if (datacache != NULL)
1519     GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
1520                           (char *) &put_msg[1], put_type,
1521                           GNUNET_TIME_absolute_ntoh(put_msg->expiration));
1522   else
1523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524                 "`%s:%s': %s request received, but have no datacache!\n",
1525                 my_short_id, "DHT", "PUT");
1526 }
1527
1528 /**
1529  * Estimate the diameter of the network based
1530  * on how many buckets are currently in use.
1531  * Concept here is that the diameter of the network
1532  * is roughly the distance a message must travel in
1533  * order to reach its intended destination.  Since
1534  * at each hop we expect to get one bit closer, and
1535  * we have one bit per bucket, the number of buckets
1536  * in use should be the largest number of hops for
1537  * a sucessful message. (of course, this assumes we
1538  * know all peers in the network!)
1539  *
1540  * @return ballpark diameter figure
1541  */
1542 static unsigned int estimate_diameter()
1543 {
1544   return MAX_BUCKETS - lowest_bucket;
1545 }
1546
1547 /**
1548  * To how many peers should we (on average)
1549  * forward the request to obtain the desired
1550  * target_replication count (on average).
1551  *
1552  * Always 0, 1 or 2 (don't send, send once, split)
1553  */
1554 static unsigned int
1555 get_forward_count (unsigned int hop_count, size_t target_replication)
1556 {
1557   double target_count;
1558   unsigned int target_value;
1559   unsigned int diameter;
1560
1561   /* FIXME: the smaller we think the network is the more lenient we should be for
1562    * routing right?  The estimation below only works if we think we have reasonably
1563    * full routing tables, which for our RR topologies may not be the case!
1564    */
1565   diameter = estimate_diameter ();
1566   if ((hop_count > (diameter + 1) * 2) && (MINIMUM_PEER_THRESHOLD < estimate_diameter() * bucket_size))
1567     {
1568 #if DEBUG_DHT
1569       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570                   "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
1571                   "DHT", estimate_diameter(), lowest_bucket);
1572 #endif
1573       return 0;
1574     }
1575   else if (hop_count > MAX_HOPS)
1576     {
1577 #if DEBUG_DHT
1578       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1579                   "`%s:%s': Hop count too high (greater than max)\n", my_short_id,
1580                   "DHT");
1581 #endif
1582       return 0;
1583     }
1584   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
1585     target_replication / (target_replication * (hop_count + 1) + diameter);
1586   target_value = 0;
1587
1588 #if NONSENSE
1589   while (target_value < target_count)
1590     target_value++; /* target_value is ALWAYS 1 after this "loop" */
1591 #else
1592   target_value = 1;
1593 #endif
1594   if ((target_count + 1 - target_value) >
1595       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1596                                 RAND_MAX) / RAND_MAX)
1597     target_value++;
1598   return target_value;
1599 }
1600
1601 /**
1602  * Find the closest peer in our routing table to the
1603  * given hashcode.
1604  *
1605  * @return The closest peer in our routing table to the
1606  *         key, or NULL on error.
1607  */
1608 static struct PeerInfo *
1609 find_closest_peer (const GNUNET_HashCode *hc)
1610 {
1611   struct PeerInfo *pos;
1612   struct PeerInfo *current_closest;
1613   unsigned int lowest_distance;
1614   unsigned int temp_distance;
1615   int bucket;
1616
1617   lowest_distance = -1;
1618
1619   if (k_buckets[lowest_bucket].peers_size == 0)
1620     return NULL;
1621
1622   current_closest = NULL;
1623   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1624     {
1625       pos = k_buckets[bucket].head;
1626       while (pos != NULL)
1627         {
1628           temp_distance = distance(&pos->id.hashPubKey, hc);
1629           if (temp_distance <= lowest_distance)
1630             {
1631               lowest_distance = temp_distance;
1632               current_closest = pos;
1633             }
1634           pos = pos->next;
1635         }
1636     }
1637   GNUNET_assert(current_closest != NULL);
1638   return current_closest;
1639 }
1640
1641 /*
1642  * Check whether my identity is closer than any known peers.
1643  *
1644  * @param target hash code to check closeness to
1645  *
1646  * Return GNUNET_YES if node location is closest, GNUNET_NO
1647  * otherwise.
1648  */
1649 int
1650 am_closest_peer (const GNUNET_HashCode * target)
1651 {
1652   int bits;
1653   int other_bits;
1654   int bucket_num;
1655   struct PeerInfo *pos;
1656   unsigned int my_distance;
1657
1658   bucket_num = find_current_bucket(target);
1659   if (bucket_num == GNUNET_SYSERR) /* Same key! */
1660     return GNUNET_YES;
1661
1662   bits = matching_bits(&my_identity.hashPubKey, target);
1663   my_distance = distance(&my_identity.hashPubKey, target);
1664
1665   pos = k_buckets[bucket_num].head;
1666   while (pos != NULL)
1667     {
1668       other_bits = matching_bits(&pos->id.hashPubKey, target);
1669       if (other_bits > bits)
1670         return GNUNET_NO;
1671       else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
1672         {
1673           if (distance(&pos->id.hashPubKey, target) < my_distance)
1674             return GNUNET_NO;
1675         }
1676       pos = pos->next;
1677     }
1678
1679 #if DEBUG_TABLE
1680   GNUNET_GE_LOG (coreAPI->ectx,
1681                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1682                  GNUNET_GE_BULK, "closest peer\n");
1683   printPeerBits (&closest);
1684   GNUNET_GE_LOG (coreAPI->ectx,
1685                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1686                  GNUNET_GE_BULK, "me\n");
1687   printPeerBits (coreAPI->my_identity);
1688   GNUNET_GE_LOG (coreAPI->ectx,
1689                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1690                  GNUNET_GE_BULK, "key\n");
1691   printKeyBits (target);
1692   GNUNET_GE_LOG (coreAPI->ectx,
1693                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1694                  GNUNET_GE_BULK,
1695                  "closest peer inverse distance is %u, mine is %u\n",
1696                  inverse_distance (target, &closest.hashPubKey),
1697                  inverse_distance (target,
1698                                    &coreAPI->my_identity->hashPubKey));
1699 #endif
1700
1701   /* No peers closer, we are the closest! */
1702   return GNUNET_YES;
1703
1704 }
1705
1706
1707 /**
1708  * Select a peer from the routing table that would be a good routing
1709  * destination for sending a message for "target".  The resulting peer
1710  * must not be in the set of blocked peers.<p>
1711  *
1712  * Note that we should not ALWAYS select the closest peer to the
1713  * target, peers further away from the target should be chosen with
1714  * exponentially declining probability.
1715  *
1716  * @param target the key we are selecting a peer to route to
1717  * @param bloom a bloomfilter containing entries this request has seen already
1718  *
1719  * @return Peer to route to, or NULL on error
1720  */
1721 static struct PeerInfo *
1722 select_peer (const GNUNET_HashCode * target,
1723              struct GNUNET_CONTAINER_BloomFilter *bloom)
1724 {
1725   unsigned int distance;
1726   unsigned int bc;
1727   struct PeerInfo *pos;
1728 #if USE_KADEMLIA
1729   const struct PeerInfo *chosen;
1730   unsigned long long largest_distance;
1731 #else
1732   unsigned long long total_distance;
1733   unsigned long long selected;
1734 #endif
1735
1736 #if USE_KADEMLIA
1737   largest_distance = 0;
1738   chosen = NULL;
1739   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1740     {
1741       pos = k_buckets[bc].head;
1742       while (pos != NULL)
1743         {
1744           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1745             {
1746               distance = inverse_distance (target, &pos->id.hashPubKey);
1747               if (distance > largest_distance)
1748                 {
1749                   chosen = pos;
1750                   largest_distance = distance;
1751                 }
1752             }
1753           pos = pos->next;
1754         }
1755     }
1756
1757   if ((largest_distance > 0) && (chosen != NULL))
1758     {
1759       GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
1760       return chosen;
1761     }
1762   else
1763     {
1764       return NULL;
1765     }
1766 #else
1767   /* GNUnet-style */
1768   total_distance = 0;
1769   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1770     {
1771       pos = k_buckets[bc].head;
1772       while (pos != NULL)
1773         {
1774           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1775             total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
1776 #if DEBUG_DHT > 1
1777           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778                       "`%s:%s': Total distance is %llu, distance from %s to %s is %u\n",
1779                       my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey));
1780 #endif
1781           pos = pos->next;
1782         }
1783     }
1784   if (total_distance == 0)
1785     {
1786       return NULL;
1787     }
1788
1789   selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
1790   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1791     {
1792       pos = k_buckets[bc].head;
1793       while (pos != NULL)
1794         {
1795           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1796             {
1797               distance = inverse_distance (target, &pos->id.hashPubKey);
1798               if (distance > selected)
1799                 return pos;
1800               selected -= distance;
1801             }
1802           else
1803             {
1804 #if DEBUG_DHT
1805               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1806                           "`%s:%s': peer %s matches bloomfilter.\n",
1807                           my_short_id, "DHT", GNUNET_i2s(&pos->id));
1808 #endif
1809             }
1810           pos = pos->next;
1811         }
1812     }
1813 #if DEBUG_DHT
1814     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1815                 "`%s:%s': peer %s matches bloomfilter.\n",
1816                 my_short_id, "DHT", GNUNET_i2s(&pos->id));
1817 #endif
1818   return NULL;
1819 #endif
1820 }
1821
1822 /**
1823  * Function called to send a request out to another peer.
1824  * Called both for locally initiated requests and those
1825  * received from other peers.
1826  *
1827  * @param cls DHT service closure argument
1828  * @param msg the encapsulated message
1829  * @param peer the peer to forward the message to
1830  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1831  */
1832 static void forward_message (void *cls,
1833                              const struct GNUNET_MessageHeader *msg,
1834                              struct PeerInfo *peer,
1835                              struct DHT_MessageContext *msg_ctx)
1836 {
1837   struct GNUNET_DHT_P2PRouteMessage *route_message;
1838   struct P2PPendingMessage *pending;
1839   size_t msize;
1840   size_t psize;
1841
1842   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1843   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1844   psize = sizeof(struct P2PPendingMessage) + msize;
1845   pending = GNUNET_malloc(psize);
1846   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1847   pending->importance = DHT_SEND_PRIORITY;
1848   pending->timeout = GNUNET_TIME_relative_get_forever();
1849   route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1850   route_message->header.size = htons(msize);
1851   route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1852   route_message->options = htonl(msg_ctx->msg_options);
1853   route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1854   route_message->network_size = htonl(msg_ctx->network_size);
1855   route_message->desired_replication_level = htonl(msg_ctx->replication);
1856   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1857   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1858   memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1859   memcpy(&route_message[1], msg, ntohs(msg->size));
1860 #if DEBUG_DHT > 1
1861   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1862 #endif
1863   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1864   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1865     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1866 }
1867
1868 /**
1869  * Task used to remove forwarding entries, either
1870  * after timeout, when full, or on shutdown.
1871  *
1872  * @param cls the entry to remove
1873  * @param tc context, reason, etc.
1874  */
1875 static void
1876 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1877 {
1878   struct DHTRouteSource *source_info = cls;
1879   struct DHTQueryRecord *record;
1880   source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
1881   record = source_info->record;
1882   GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1883
1884   if (record->head == NULL) /* No more entries in DLL */
1885     {
1886       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1887       GNUNET_free(record);
1888     }
1889   GNUNET_free(source_info);
1890 }
1891
1892 /**
1893  * Remember this routing request so that if a reply is
1894  * received we can either forward it to the correct peer
1895  * or return the result locally.
1896  *
1897  * @param cls DHT service closure
1898  * @param msg_ctx Context of the route request
1899  *
1900  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1901  */
1902 static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
1903 {
1904   struct DHTQueryRecord *record;
1905   struct DHTRouteSource *source_info;
1906   struct DHTRouteSource *pos;
1907   struct GNUNET_TIME_Absolute now;
1908   unsigned int current_size;
1909
1910   current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1911   while (current_size >= MAX_OUTSTANDING_FORWARDS)
1912     {
1913       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
1914       record = source_info->record;
1915       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1916       if (record->head == NULL) /* No more entries in DLL */
1917         {
1918           GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1919           GNUNET_free(record);
1920         }
1921       GNUNET_SCHEDULER_cancel(sched, source_info->delete_task);
1922       GNUNET_free(source_info);
1923       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1924     }
1925   now = GNUNET_TIME_absolute_get();
1926   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, msg_ctx->key);
1927   if (record != NULL) /* Already know this request! */
1928     {
1929       pos = record->head;
1930       while (pos != NULL)
1931         {
1932           if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
1933             break; /* Already have this peer in reply list! */
1934           pos = pos->next;
1935         }
1936       if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
1937         {
1938           GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.value);
1939           return GNUNET_NO;
1940         }
1941     }
1942   else
1943     {
1944       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
1945       GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1946       memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1947     }
1948
1949   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
1950   source_info->record = record;
1951   source_info->delete_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
1952   memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
1953   GNUNET_CONTAINER_DLL_insert_after(record->head, record->tail, record->tail, source_info);
1954   if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
1955     {
1956       source_info->client = msg_ctx->client;
1957       now = GNUNET_TIME_absolute_get_forever();
1958     }
1959   source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.value);
1960 #if DEBUG_DHT > 1
1961       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1962                   "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
1963                   "DHT", GNUNET_h2s (msg_ctx->key), msg_ctx->unique_id);
1964 #endif
1965   return GNUNET_YES;
1966 }
1967
1968
1969 /**
1970  * Main function that handles whether or not to route a message to other
1971  * peers.
1972  *
1973  * @param msg the message to be routed
1974  *
1975  * @return the number of peers the message was routed to,
1976  *         GNUNET_SYSERR on failure
1977  */
1978 static int route_message(void *cls,
1979                          const struct GNUNET_MessageHeader *msg,
1980                          struct DHT_MessageContext *message_context)
1981 {
1982   int i;
1983   struct PeerInfo *selected;
1984   struct PeerInfo *nearest;
1985   unsigned int forward_count;
1986 #if DEBUG_DHT
1987   char *nearest_buf;
1988 #endif
1989 #if DEBUG_DHT_ROUTING
1990   int ret;
1991 #endif
1992
1993   message_context->closest = am_closest_peer(message_context->key);
1994   forward_count = get_forward_count(message_context->hop_count, message_context->replication);
1995   nearest = find_closest_peer(message_context->key);
1996
1997   if (message_context->bloom == NULL)
1998     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1999   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
2000
2001   if ((stop_on_closest == GNUNET_YES) && (message_context->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
2002     forward_count = 0;
2003
2004 #if DEBUG_DHT_ROUTING
2005   if (forward_count == 0)
2006     ret = GNUNET_SYSERR;
2007   else
2008     ret = GNUNET_NO;
2009
2010   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2011     {
2012       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2013                                    message_context->hop_count, ret,
2014                                    &my_identity, message_context->key, message_context->peer,
2015                                    NULL);
2016     }
2017 #endif
2018
2019   switch (ntohs(msg->type))
2020     {
2021     case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
2022       cache_response (cls, message_context);
2023       if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
2024         forward_count = 0;
2025       break;
2026     case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding?*/
2027       if (message_context->closest == GNUNET_YES)
2028         {
2029 #if DEBUG_DHT_ROUTING
2030           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2031             {
2032               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2033                                            message_context->hop_count, GNUNET_YES,
2034                                            &my_identity, message_context->key, message_context->peer,
2035                                            NULL);
2036             }
2037 #endif
2038           handle_dht_put (cls, msg, message_context);
2039         }
2040 #if DEBUG_DHT_ROUTING
2041         if (message_context->hop_count == 0) /* Locally initiated request */
2042           {
2043             if ((debug_routes) && (dhtlog_handle != NULL))
2044               {
2045                 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
2046                                              message_context->hop_count, GNUNET_NO, &my_identity,
2047                                              message_context->key);
2048               }
2049           }
2050 #endif
2051       break;
2052     case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
2053       if (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))
2054       {
2055         cache_response (cls, message_context);
2056         if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2057           handle_dht_find_peer (cls, msg, message_context);
2058       }
2059       break;
2060     default:
2061       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2062                   "`%s': Message type (%d) not handled\n", "DHT", ntohs(msg->type));
2063     }
2064
2065   for (i = 0; i < forward_count; i++)
2066     {
2067       selected = select_peer(message_context->key, message_context->bloom);
2068
2069       if (selected != NULL)
2070         {
2071           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
2072 #if DEBUG_DHT_ROUTING > 1
2073           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
2074           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2075                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
2076                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&selected->id), nearest_buf, matching_bits(&nearest->id.hashPubKey, message_context->key), distance(&nearest->id.hashPubKey, message_context->key));
2077           GNUNET_free(nearest_buf);
2078 #endif
2079           /* FIXME: statistics */
2080           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2081             {
2082               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2083                                            message_context->hop_count, GNUNET_NO,
2084                                            &my_identity, message_context->key, message_context->peer,
2085                                            &selected->id);
2086             }
2087           forward_message(cls, msg, selected, message_context);
2088         }
2089       else
2090         {
2091 #if DEBUG_DHT
2092           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2093                       "`%s:%s': No peers selected for forwarding.\n", my_short_id,
2094                       "DHT");
2095 #endif
2096         }
2097     }
2098 #if DEBUG_DHT_ROUTING > 1
2099   if (forward_count == 0)
2100     {
2101       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102                   "`%s:%s': NOT Forwarding request key %s uid %llu to any peers\n", my_short_id,
2103                   "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
2104     }
2105 #endif
2106
2107   if (message_context->bloom != NULL)
2108     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2109
2110   return forward_count;
2111 }
2112
2113 /**
2114  * Find a client if it exists, add it otherwise.
2115  *
2116  * @param client the server handle to the client
2117  *
2118  * @return the client if found, a new client otherwise
2119  */
2120 static struct ClientList *
2121 find_active_client (struct GNUNET_SERVER_Client *client)
2122 {
2123   struct ClientList *pos = client_list;
2124   struct ClientList *ret;
2125
2126   while (pos != NULL)
2127     {
2128       if (pos->client_handle == client)
2129         return pos;
2130       pos = pos->next;
2131     }
2132
2133   ret = GNUNET_malloc (sizeof (struct ClientList));
2134   ret->client_handle = client;
2135   ret->next = client_list;
2136   client_list = ret;
2137   return ret;
2138 }
2139
2140 /**
2141  * Task to send a find peer message for our own peer identifier
2142  * so that we can find the closest peers in the network to ourselves
2143  * and attempt to connect to them.
2144  *
2145  * @param cls closure for this task
2146  * @param tc the context under which the task is running
2147  */
2148 static void
2149 send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2150 {
2151   struct GNUNET_MessageHeader *find_peer_msg;
2152   struct DHT_MessageContext message_context;
2153   int ret;
2154
2155   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
2156     return;
2157
2158   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
2159   find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader));
2160   find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
2161   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2162   message_context.key = &my_identity.hashPubKey;
2163   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
2164   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
2165   message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS);
2166   message_context.network_size = estimate_diameter();
2167   message_context.peer = &my_identity;
2168
2169   ret = route_message(NULL, find_peer_msg, &message_context);
2170
2171   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2172               "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT",
2173               "FIND PEER", ret);
2174   GNUNET_SCHEDULER_add_delayed (sched,
2175                                 DHT_DEFAULT_FIND_PEER_INTERVAL,
2176                                 &send_find_peer_message, NULL);
2177 }
2178
2179 /**
2180  * Handler for any generic DHT messages, calls the appropriate handler
2181  * depending on message type, sends confirmation if responses aren't otherwise
2182  * expected.
2183  *
2184  * @param cls closure for the service
2185  * @param client the client we received this message from
2186  * @param message the actual message received
2187  */
2188 static void
2189 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
2190                                 const struct GNUNET_MessageHeader *message)
2191 {
2192   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
2193   const struct GNUNET_MessageHeader *enc_msg;
2194   struct DHT_MessageContext message_context;
2195   size_t enc_type;
2196
2197   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
2198   enc_type = ntohs (enc_msg->type);
2199
2200 #if DEBUG_DHT
2201   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2202               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
2203               my_short_id, "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
2204               GNUNET_ntohll (dht_msg->unique_id));
2205 #endif
2206 #if DEBUG_DHT_ROUTING
2207   if (dhtlog_handle != NULL)
2208     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
2209 #endif
2210   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2211   message_context.client = find_active_client (client);
2212   message_context.key = &dht_msg->key;
2213   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
2214   message_context.replication = ntohl (dht_msg->desired_replication_level);
2215   message_context.msg_options = ntohl (dht_msg->options);
2216   message_context.network_size = estimate_diameter();
2217   message_context.peer = &my_identity;
2218
2219   route_message(cls, enc_msg, &message_context);
2220
2221   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2222
2223 }
2224
2225 /**
2226  * Handler for any generic DHT stop messages, calls the appropriate handler
2227  * depending on message type (if processed locally)
2228  *
2229  * @param cls closure for the service
2230  * @param client the client we received this message from
2231  * @param message the actual message received
2232  *
2233  */
2234 static void
2235 handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
2236                             const struct GNUNET_MessageHeader *message)
2237 {
2238
2239   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
2240     (const struct GNUNET_DHT_StopMessage *) message;
2241   struct DHTQueryRecord *record;
2242   struct DHTRouteSource *pos;
2243   uint64_t uid;
2244 #if DEBUG_DHT
2245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2246               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
2247               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
2248 #endif
2249
2250   uid = GNUNET_ntohll(dht_stop_msg->unique_id);
2251
2252   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
2253   if (record != NULL)
2254     {
2255       pos = record->head;
2256
2257       while (pos != NULL)
2258         {
2259           if ((pos->client != NULL) && (pos->client->client_handle == client))
2260             {
2261               GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
2262               GNUNET_SCHEDULER_add_now(sched, &remove_forward_entry, pos);
2263             }
2264           pos = pos->next;
2265         }
2266     }
2267
2268   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2269 }
2270
2271
2272 /**
2273  * Core handler for p2p route requests.
2274  */
2275 static int
2276 handle_dht_p2p_route_request (void *cls,
2277                               const struct GNUNET_PeerIdentity *peer,
2278                               const struct GNUNET_MessageHeader *message,
2279                               struct GNUNET_TIME_Relative latency, uint32_t distance)
2280 {
2281 #if DEBUG_DHT
2282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2283               "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
2284 #endif
2285   struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
2286   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2287   struct DHT_MessageContext *message_context;
2288
2289   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2290     {
2291       GNUNET_break_op(0);
2292       return GNUNET_YES;
2293     }
2294   //memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2295   message_context = GNUNET_malloc(sizeof (struct DHT_MessageContext));
2296   message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2297   GNUNET_assert(message_context->bloom != NULL);
2298   message_context->hop_count = ntohl(incoming->hop_count);
2299   message_context->key = &incoming->key;
2300   message_context->replication = ntohl(incoming->desired_replication_level);
2301   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
2302   message_context->msg_options = ntohl(incoming->options);
2303   message_context->network_size = ntohl(incoming->network_size);
2304   message_context->peer = peer;
2305   route_message(cls, enc_msg, message_context);
2306   GNUNET_free(message_context);
2307   return GNUNET_YES;
2308 }
2309
2310
2311 /**
2312  * Core handler for p2p route results.
2313  */
2314 static int
2315 handle_dht_p2p_route_result (void *cls,
2316                              const struct GNUNET_PeerIdentity *peer,
2317                              const struct GNUNET_MessageHeader *message,
2318                              struct GNUNET_TIME_Relative latency, uint32_t distance)
2319 {
2320 #if DEBUG_DHT
2321   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2322               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
2323 #endif
2324   struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
2325   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2326   struct DHT_MessageContext message_context;
2327
2328   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2329     {
2330       GNUNET_break_op(0);
2331       return GNUNET_YES;
2332     }
2333   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2334   message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2335   GNUNET_assert(message_context.bloom != NULL);
2336   message_context.key = &incoming->key;
2337   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
2338   message_context.msg_options = ntohl(incoming->options);
2339   message_context.hop_count = ntohl(incoming->hop_count);
2340   message_context.peer = peer;
2341   route_result_message(cls, enc_msg, &message_context);
2342   return GNUNET_YES;
2343 }
2344
2345
2346 /**
2347  * Receive the HELLO from transport service,
2348  * free current and replace if necessary.
2349  *
2350  * @param cls NULL
2351  * @param message HELLO message of peer
2352  */
2353 static void
2354 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2355 {
2356 #if DEBUG_DHT
2357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2358               "Received our `%s' from transport service\n",
2359               "HELLO");
2360 #endif
2361
2362   GNUNET_assert (message != NULL);
2363   GNUNET_free_non_null(my_hello);
2364   my_hello = GNUNET_malloc(ntohs(message->size));
2365   memcpy(my_hello, message, ntohs(message->size));
2366 }
2367
2368 /**
2369  * Task run during shutdown.
2370  *
2371  * @param cls unused
2372  * @param tc unused
2373  */
2374 static void
2375 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2376 {
2377   int bucket_count;
2378   struct PeerInfo *pos;
2379
2380   if (transport_handle != NULL)
2381   {
2382     GNUNET_free_non_null(my_hello);
2383     GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
2384     GNUNET_TRANSPORT_disconnect(transport_handle);
2385   }
2386
2387   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2388     {
2389       while (k_buckets[bucket_count].head != NULL)
2390         {
2391           pos = k_buckets[bucket_count].head;
2392 #if DEBUG_DHT
2393           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2394                       "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
2395 #endif
2396           delete_peer(pos, bucket_count);
2397         }
2398     }
2399   if (coreAPI != NULL)
2400     {
2401 #if DEBUG_DHT
2402       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2403                   "%s:%s Disconnecting core!\n", my_short_id, "DHT");
2404 #endif
2405       GNUNET_CORE_disconnect (coreAPI);
2406     }
2407   if (datacache != NULL)
2408     {
2409 #if DEBUG_DHT
2410       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2411                   "%s:%s Destroying datacache!\n", my_short_id, "DHT");
2412 #endif
2413       GNUNET_DATACACHE_destroy (datacache);
2414     }
2415
2416   if (dhtlog_handle != NULL)
2417     GNUNET_DHTLOG_disconnect(dhtlog_handle);
2418
2419   GNUNET_free_non_null(my_short_id);
2420 }
2421
2422
2423 /**
2424  * To be called on core init/fail.
2425  *
2426  * @param cls service closure
2427  * @param server handle to the server for this service
2428  * @param identity the public identity of this peer
2429  * @param publicKey the public key of this peer
2430  */
2431 void
2432 core_init (void *cls,
2433            struct GNUNET_CORE_Handle *server,
2434            const struct GNUNET_PeerIdentity *identity,
2435            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
2436 {
2437
2438   if (server == NULL)
2439     {
2440 #if DEBUG_DHT
2441   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2442               "%s: Connection to core FAILED!\n", "dht",
2443               GNUNET_i2s (identity));
2444 #endif
2445       GNUNET_SCHEDULER_cancel (sched, cleanup_task);
2446       GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
2447       return;
2448     }
2449 #if DEBUG_DHT
2450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2451               "%s: Core connection initialized, I am peer: %s\n", "dht",
2452               GNUNET_i2s (identity));
2453 #endif
2454   /* Copy our identity so we can use it */
2455   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
2456   my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
2457   /* Set the server to local variable */
2458   coreAPI = server;
2459
2460   if (dhtlog_handle != NULL)
2461     dhtlog_handle->insert_node (NULL, &my_identity);
2462 }
2463
2464
2465 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
2466   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE, 0},
2467   {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
2468   {NULL, NULL, 0, 0}
2469 };
2470
2471
2472 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2473   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
2474   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
2475   {NULL, 0, 0}
2476 };
2477
2478 /**
2479  * Method called whenever a peer connects.
2480  *
2481  * @param cls closure
2482  * @param peer peer identity this notification is about
2483  * @param latency reported latency of the connection with peer
2484  * @param distance reported distance (DV) to peer
2485  */
2486 void handle_core_connect (void *cls,
2487                           const struct GNUNET_PeerIdentity * peer,
2488                           struct GNUNET_TIME_Relative latency,
2489                           uint32_t distance)
2490 {
2491   int ret;
2492
2493 #if DEBUG_DHT
2494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2495               "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
2496 #endif
2497   if (datacache != NULL)
2498     GNUNET_DATACACHE_put(datacache, &peer->hashPubKey, sizeof(struct GNUNET_PeerIdentity), (const char *)peer, 0, GNUNET_TIME_absolute_get_forever());
2499   ret = try_add_peer(peer,
2500                      find_current_bucket(&peer->hashPubKey),
2501                      latency,
2502                      distance);
2503 #if DEBUG_DHT
2504     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2505                 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == GNUNET_YES ? "PEER ADDED" : "NOT ADDED");
2506 #endif
2507 }
2508
2509 /**
2510  * Process dht requests.
2511  *
2512  * @param cls closure
2513  * @param scheduler scheduler to use
2514  * @param server the initialized server
2515  * @param c configuration to use
2516  */
2517 static void
2518 run (void *cls,
2519      struct GNUNET_SCHEDULER_Handle *scheduler,
2520      struct GNUNET_SERVER_Handle *server,
2521      const struct GNUNET_CONFIGURATION_Handle *c)
2522 {
2523   sched = scheduler;
2524   cfg = c;
2525   datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
2526   GNUNET_SERVER_add_handlers (server, plugin_handlers);
2527   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
2528                                  cfg,   /* Main configuration */
2529                                  GNUNET_TIME_UNIT_FOREVER_REL,
2530                                  NULL,  /* Closure passed to DHT functionas around? */
2531                                  &core_init,    /* Call core_init once connected */
2532                                  &handle_core_connect,  /* Don't care about connects */
2533                                  NULL,  /* FIXME: remove peers on disconnects */
2534                                  NULL,  /* Do we care about "status" updates? */
2535                                  NULL,  /* Don't want notified about all incoming messages */
2536                                  GNUNET_NO,     /* For header only inbound notification */
2537                                  NULL,  /* Don't want notified about all outbound messages */
2538                                  GNUNET_NO,     /* For header only outbound notification */
2539                                  core_handlers);        /* Register these handlers */
2540
2541   if (coreAPI == NULL)
2542     return;
2543   transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, 
2544                                               NULL, NULL, NULL, NULL, NULL);
2545   if (transport_handle != NULL)
2546     GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
2547   else
2548     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
2549
2550   lowest_bucket = MAX_BUCKETS - 1;
2551   forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
2552   forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
2553   /* Scheduled the task to clean up when shutdown is called */
2554
2555   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
2556     {
2557       debug_routes = GNUNET_YES;
2558     }
2559
2560   if (GNUNET_YES ==
2561       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2562                                            "stop_on_closest"))
2563     {
2564       stop_on_closest = GNUNET_YES;
2565     }
2566
2567   if (GNUNET_YES ==
2568       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2569                                            "stop_found"))
2570     {
2571       stop_on_found = GNUNET_YES;
2572     }
2573
2574   if (GNUNET_YES ==
2575       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
2576                                            "mysql_logging_extended"))
2577     {
2578       debug_routes = GNUNET_YES;
2579       debug_routes_extended = GNUNET_YES;
2580     }
2581
2582   if (GNUNET_YES == debug_routes)
2583     {
2584       dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
2585       if (dhtlog_handle == NULL)
2586         {
2587           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2588                       "Could not connect to mysql logging server, logging will not happen!");
2589         }
2590     }
2591
2592   GNUNET_SCHEDULER_add_delayed (sched,
2593                                 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30),
2594                                 &send_find_peer_message, NULL);
2595
2596   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
2597                                                GNUNET_TIME_UNIT_FOREVER_REL,
2598                                                &shutdown_task, NULL);
2599 }
2600
2601 /**
2602  * The main function for the dht service.
2603  *
2604  * @param argc number of arguments from the command line
2605  * @param argv command line arguments
2606  * @return 0 ok, 1 on error
2607  */
2608 int
2609 main (int argc, char *const *argv)
2610 {
2611   return (GNUNET_OK ==
2612           GNUNET_SERVICE_run (argc,
2613                               argv,
2614                               "dht",
2615                               GNUNET_SERVICE_OPTION_NONE,
2616                               &run, NULL)) ? 0 : 1;
2617 }