some dht changes
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief main DHT service shell, building block for DHT implementations
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_getopt_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_service_lib.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_signal_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "dhtlog.h"
42 #include "dht.h"
43
44 /**
45  * How many buckets will we allow total.
46  */
47 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
48
49 /**
50  * What is the maximum number of peers in a given bucket.
51  */
52 #define DEFAULT_BUCKET_SIZE 8
53
54 /**
55  * Minimum number of peers we need for "good" routing,
56  * any less than this and we will allow messages to
57  * travel much further through the network!
58  */
59 #define MINIMUM_PEER_THRESHOLD 20
60
61 #define DHT_DEFAULT_FIND_PEER_REPLICATION 20
62
63 #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
64
65 #define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
66
67 /**
68  * Real maximum number of hops, at which point we refuse
69  * to forward the message.
70  */
71 #define MAX_HOPS 20
72
73 /**
74  * Linked list of messages to send to clients.
75  */
76 struct P2PPendingMessage
77 {
78   /**
79    * Pointer to next item in the list
80    */
81   struct P2PPendingMessage *next;
82
83   /**
84    * Pointer to previous item in the list
85    */
86   struct P2PPendingMessage *prev;
87
88   /**
89    * Message importance level.
90    */
91   unsigned int importance;
92
93   /**
94    * How long to wait before sending message.
95    */
96   struct GNUNET_TIME_Relative timeout;
97
98   /**
99    * Actual message to be sent; // avoid allocation
100    */
101   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
102
103 };
104
105 /**
106  * Per-peer information.
107  */
108 struct PeerInfo
109 {
110   /**
111    * Next peer entry (DLL)
112    */
113   struct PeerInfo *next;
114
115   /**
116    *  Prev peer entry (DLL)
117    */
118   struct PeerInfo *prev;
119
120   /**
121    * Head of pending messages to be sent to this peer.
122    */
123   struct P2PPendingMessage *head;
124
125   /**
126    * Tail of pending messages to be sent to this peer.
127    */
128   struct P2PPendingMessage *tail;
129
130   /**
131    * Core handle for sending messages to this peer.
132    */
133   struct GNUNET_CORE_TransmitHandle *th;
134
135   /**
136    * Task for scheduling message sends.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier send_task;
139
140   /**
141    * What is the average latency for replies received?
142    */
143   struct GNUNET_TIME_Relative latency;
144
145   /**
146    * Number of responses received
147    */
148   unsigned long long response_count;
149
150   /**
151    * Number of requests sent
152    */
153   unsigned long long request_count;
154
155   /**
156    * What is the identity of the peer?
157    */
158   struct GNUNET_PeerIdentity id;
159
160   /**
161    * Transport level distance to peer.
162    */
163   unsigned int distance;
164
165 };
166
167 /**
168  * Peers are grouped into buckets.
169  */
170 struct PeerBucket
171 {
172   /**
173    * Head of DLL
174    */
175   struct PeerInfo *head;
176
177   /**
178    * Tail of DLL
179    */
180   struct PeerInfo *tail;
181
182   /**
183    * Number of peers in the bucket.
184    */
185   unsigned int peers_size;
186 };
187
188 /**
189  * Linked list of messages to send to clients.
190  */
191 struct PendingMessage
192 {
193   /**
194    * Pointer to next item in the list
195    */
196   struct PendingMessage *next;
197
198   /**
199    * Pointer to previous item in the list
200    */
201   struct PendingMessage *prev;
202
203   /**
204    * Actual message to be sent; // avoid allocation
205    */
206   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
207
208 };
209
210 /**
211  * Struct containing information about a client,
212  * handle to connect to it, and any pending messages
213  * that need to be sent to it.
214  */
215 struct ClientList
216 {
217   /**
218    * Linked list of active clients
219    */
220   struct ClientList *next;
221
222   /**
223    * The handle to this client
224    */
225   struct GNUNET_SERVER_Client *client_handle;
226
227   /**
228    * Handle to the current transmission request, NULL
229    * if none pending.
230    */
231   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
232
233   /**
234    * Linked list of pending messages for this client
235    */
236   struct PendingMessage *pending_head;
237
238   /**
239    * Tail of linked list of pending messages for this client
240    */
241   struct PendingMessage *pending_tail;
242
243 };
244
245
246 /**
247  * Context containing information about a DHT message received.
248  */
249 struct DHT_MessageContext
250 {
251   /**
252    * The client this request was received from.
253    * (NULL if received from another peer)
254    */
255   struct ClientList *client;
256
257   /**
258    * The peer this request was received from.
259    * (NULL if received from local client)
260    */
261   const struct GNUNET_PeerIdentity *peer;
262
263   /**
264    * The key this request was about
265    */
266   const GNUNET_HashCode *key;
267
268   /**
269    * The unique identifier of this request
270    */
271   uint64_t unique_id;
272
273   /**
274    * Desired replication level
275    */
276   uint32_t replication;
277
278   /**
279    * Network size estimate, either ours or the sum of
280    * those routed to thus far. =~ Log of number of peers
281    * chosen from for this request.
282    */
283   uint32_t network_size;
284
285   /**
286    * Any message options for this request
287    */
288   uint32_t msg_options;
289
290   /**
291    * How many hops has the message already traversed?
292    */
293   uint32_t hop_count;
294
295   /**
296    * Bloomfilter for this routing request.
297    */
298   struct GNUNET_CONTAINER_BloomFilter *bloom;
299
300   /**
301    * Did we forward this message? (may need to remember it!)
302    */
303   int forwarded;
304
305   /**
306    * Are we the closest known peer to this key (out of our neighbors?)
307    */
308   int closest;
309 };
310
311 /**
312  * Record used for remembering what peers are waiting for what
313  * responses (based on search key).
314  */
315 struct DHTRouteSource
316 {
317
318   /**
319    * This is a DLL.
320    */
321   struct DHTRouteSource *next;
322
323   /**
324    * This is a DLL.
325    */
326   struct DHTRouteSource *prev;
327
328   /**
329    * Source of the request.  Replies should be forwarded to
330    * this peer.
331    */
332   struct GNUNET_PeerIdentity source;
333
334   /**
335    * If this was a local request, remember the client; otherwise NULL.
336    */
337   struct ClientList *client;
338
339   /**
340    * Pointer to this nodes heap location (for removal)
341    */
342   struct GNUNET_CONTAINER_HeapNode *hnode;
343
344   /**
345    * Back pointer to the record storing this information.
346    */
347   struct DHTQueryRecord *record;
348
349   /**
350    * Task to remove this entry on timeout.
351    */
352   GNUNET_SCHEDULER_TaskIdentifier delete_task;
353 };
354
355 /**
356  * Entry in the DHT routing table.
357  */
358 struct DHTQueryRecord
359 {
360   /**
361    * Head of DLL for result forwarding.
362    */
363   struct DHTRouteSource *head;
364
365   /**
366    * Tail of DLL for result forwarding.
367    */
368   struct DHTRouteSource *tail;
369
370   /**
371    * Key that the record concerns.
372    */
373   GNUNET_HashCode key;
374
375   /**
376    * GET message of this record (what we already forwarded?).
377    */
378   //DV_DHT_MESSAGE get; Try to get away with not saving this.
379
380   /**
381    * Bloomfilter of the peers we've replied to so far
382    */
383   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
384
385 };
386
387 /**
388  * DHT Routing results structure
389  */
390 struct DHTResults
391 {
392   /*
393    * Min heap for removal upon reaching limit
394    */
395   struct GNUNET_CONTAINER_Heap *minHeap;
396
397   /*
398    * Hashmap for fast key based lookup
399    */
400   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
401
402 };
403
404 /**
405  * Routing option to end routing when closest peer found.
406  */
407 static int stop_on_closest;
408
409 /**
410  * Routing option to end routing when data is found.
411  */
412 static int stop_on_found;
413
414 /**
415  * Container of active queries we should remember
416  */
417 static struct DHTResults forward_list;
418
419 /**
420  * Handle to the datacache service (for inserting/retrieving data)
421  */
422 static struct GNUNET_DATACACHE_Handle *datacache;
423
424 /**
425  * The main scheduler to use for the DHT service
426  */
427 static struct GNUNET_SCHEDULER_Handle *sched;
428
429 /**
430  * The configuration the DHT service is running with
431  */
432 static const struct GNUNET_CONFIGURATION_Handle *cfg;
433
434 /**
435  * Handle to the core service
436  */
437 static struct GNUNET_CORE_Handle *coreAPI;
438
439 /**
440  * Handle to the transport service, for getting our hello
441  */
442 static struct GNUNET_TRANSPORT_Handle *transport_handle;
443
444 /**
445  * The identity of our peer.
446  */
447 static struct GNUNET_PeerIdentity my_identity;
448
449 /**
450  * Short id of the peer, for printing
451  */
452 static char *my_short_id;
453
454 /**
455  * Our HELLO
456  */
457 static struct GNUNET_MessageHeader *my_hello;
458
459 /**
460  * Task to run when we shut down, cleaning up all our trash
461  */
462 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
463
464 /**
465  * The lowest currently used bucket.
466  */
467 static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
468
469 /**
470  * The buckets (Kademlia routing table, complete with growth).
471  * Array of size MAX_BUCKET_SIZE.
472  */
473 static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
474
475 /**
476  * Maximum size for each bucket.
477  */
478 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
479
480 /**
481  * List of active clients.
482  */
483 static struct ClientList *client_list;
484
485 /**
486  * Handle to the DHT logger.
487  */
488 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
489
490 /*
491  * Whether or not to send routing debugging information
492  * to the dht logging server
493  */
494 static unsigned int debug_routes;
495
496 /*
497  * Whether or not to send FULL route information to
498  * logging server
499  */
500 static unsigned int debug_routes_extended;
501
502 /**
503  * Forward declaration.
504  */
505 static size_t send_generic_reply (void *cls, size_t size, void *buf);
506
507 /* Declare here so retry_core_send is aware of it */
508 size_t core_transmit_notify (void *cls,
509                              size_t size, void *buf);
510
511 /**
512  *  Try to send another message from our core send list
513  */
514 static void
515 try_core_send (void *cls,
516                const struct GNUNET_SCHEDULER_TaskContext *tc)
517 {
518   struct PeerInfo *peer = cls;
519   struct P2PPendingMessage *pending;
520   size_t ssize;
521
522   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
523
524   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
525     return;
526
527   if (peer->th != NULL)
528     return; /* Message send already in progress */
529
530   pending = peer->head;
531   if (pending != NULL)
532     {
533       ssize = ntohs(pending->msg->size);
534 #if DEBUG_DHT > 1
535      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
536                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
537                 "DHT", ssize, GNUNET_i2s(&peer->id));
538 #endif
539       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
540                                                    pending->timeout, &peer->id,
541                                                    ssize, &core_transmit_notify, peer);
542     }
543 }
544
545 /**
546  * Function called to send a request out to another peer.
547  * Called both for locally initiated requests and those
548  * received from other peers.
549  *
550  * @param cls DHT service closure argument
551  * @param msg the encapsulated message
552  * @param peer the peer to forward the message to
553  * @param msg_ctx the context of the message (hop count, bloom, etc.)
554  */
555 static void forward_result_message (void *cls,
556                                     const struct GNUNET_MessageHeader *msg,
557                                     struct PeerInfo *peer,
558                                     struct DHT_MessageContext *msg_ctx)
559 {
560   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
561   struct P2PPendingMessage *pending;
562   size_t msize;
563   size_t psize;
564
565   msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
566   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
567   psize = sizeof(struct P2PPendingMessage) + msize;
568   pending = GNUNET_malloc(psize);
569   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
570   pending->importance = DHT_SEND_PRIORITY;
571   pending->timeout = GNUNET_TIME_relative_get_forever();
572   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
573   result_message->header.size = htons(msize);
574   result_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
575   result_message->options = htonl(msg_ctx->msg_options);
576   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
577   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
578   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
579   memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
580   memcpy(&result_message[1], msg, ntohs(msg->size));
581 #if DEBUG_DHT > 1
582   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
583 #endif
584   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
585   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
586     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
587 }
588 /**
589  * Called when core is ready to send a message we asked for
590  * out to the destination.
591  *
592  * @param cls closure (NULL)
593  * @param size number of bytes available in buf
594  * @param buf where the callee should write the message
595  * @return number of bytes written to buf
596  */
597 size_t core_transmit_notify (void *cls,
598                              size_t size, void *buf)
599 {
600   struct PeerInfo *peer = cls;
601   char *cbuf = buf;
602   struct P2PPendingMessage *pending;
603
604   size_t off;
605   size_t msize;
606
607   if (buf == NULL)
608     {
609       /* client disconnected */
610 #if DEBUG_DHT
611       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
612 #endif
613       return 0;
614     }
615
616   if (peer->head == NULL)
617     return 0;
618
619   peer->th = NULL;
620   off = 0;
621   pending = peer->head;
622   msize = ntohs(pending->msg->size);
623   if (msize <= size)
624     {
625       off = msize;
626       memcpy (cbuf, pending->msg, msize);
627       GNUNET_CONTAINER_DLL_remove (peer->head,
628                                    peer->tail,
629                                    pending);
630 #if DEBUG_DHT > 1
631       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Removing pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
632 #endif
633       GNUNET_free (pending);
634     }
635 #if SMART
636   while (NULL != pending &&
637           (size - off >= (msize = ntohs (pending->msg->size))))
638     {
639 #if DEBUG_DHT_ROUTING
640       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d\n", my_short_id, "dht service", msize, size);
641 #endif
642       memcpy (&cbuf[off], pending->msg, msize);
643       off += msize;
644       GNUNET_CONTAINER_DLL_remove (peer->head,
645                                    peer->tail,
646                                    pending);
647       GNUNET_free (pending);
648       pending = peer->head;
649     }
650 #endif
651   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
652     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
653 #if DEBUG_DHT > 1
654   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d, returning %d\n", my_short_id, "dht service", msize, size, off);
655 #endif
656   return off;
657 }
658
659 /**
660  * Determine how many low order bits match in two
661  * GNUNET_HashCodes.  i.e. - 010011 and 011111 share
662  * the first two lowest order bits, and therefore the
663  * return value is two (NOT XOR distance, nor how many
664  * bits match absolutely!).
665  *
666  * @param first the first hashcode
667  * @param second the hashcode to compare first to
668  *
669  * @return the number of bits that match
670  */
671 static unsigned int matching_bits(const GNUNET_HashCode *first, const GNUNET_HashCode *second)
672 {
673   unsigned int i;
674
675   for (i = 0; i < sizeof (GNUNET_HashCode) * 8; i++)
676     if (GNUNET_CRYPTO_hash_get_bit (first, i) != GNUNET_CRYPTO_hash_get_bit (second, i))
677       return i;
678   return sizeof (GNUNET_HashCode) * 8;
679 }
680
681 /**
682  * Compute the distance between have and target as a 32-bit value.
683  * Differences in the lower bits must count stronger than differences
684  * in the higher bits.
685  *
686  * @return 0 if have==target, otherwise a number
687  *           that is larger as the distance between
688  *           the two hash codes increases
689  */
690 static unsigned int
691 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
692 {
693   unsigned int bucket;
694   unsigned int msb;
695   unsigned int lsb;
696   unsigned int i;
697
698   /* We have to represent the distance between two 2^9 (=512)-bit
699      numbers as a 2^5 (=32)-bit number with "0" being used for the
700      two numbers being identical; furthermore, we need to
701      guarantee that a difference in the number of matching
702      bits is always represented in the result.
703
704      We use 2^32/2^9 numerical values to distinguish between
705      hash codes that have the same LSB bit distance and
706      use the highest 2^9 bits of the result to signify the
707      number of (mis)matching LSB bits; if we have 0 matching
708      and hence 512 mismatching LSB bits we return -1 (since
709      512 itself cannot be represented with 9 bits) */
710
711   /* first, calculate the most significant 9 bits of our
712      result, aka the number of LSBs */
713   bucket = matching_bits (target, have);
714   /* bucket is now a value between 0 and 512 */
715   if (bucket == 512)
716     return 0;                   /* perfect match */
717   if (bucket == 0)
718     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
719                                    below, we'd end up with max+1 (overflow)) */
720
721   /* calculate the most significant bits of the final result */
722   msb = (512 - bucket) << (32 - 9);
723   /* calculate the 32-9 least significant bits of the final result by
724      looking at the differences in the 32-9 bits following the
725      mismatching bit at 'bucket' */
726   lsb = 0;
727   for (i = bucket + 1;
728        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
729     {
730       if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
731         lsb |= (1 << (bucket + 32 - 9 - i));    /* first bit set will be 10,
732                                                    last bit set will be 31 -- if
733                                                    i does not reach 512 first... */
734     }
735   return msb | lsb;
736 }
737
738 /**
739  * Return a number that is larger the closer the
740  * "have" GNUNET_hash code is to the "target".
741  *
742  * @return inverse distance metric, non-zero.
743  *         Must fudge the value if NO bits match.
744  */
745 static unsigned int
746 inverse_distance (const GNUNET_HashCode * target,
747                   const GNUNET_HashCode * have)
748 {
749   if (matching_bits(target, have) == 0)
750     return 1; /* Never return 0! */
751   return ((unsigned int) -1) - distance (target, have);
752 }
753
754 /**
755  * Find the optimal bucket for this key, regardless
756  * of the current number of buckets in use.
757  *
758  * @param hc the hashcode to compare our identity to
759  *
760  * @return the proper bucket index, or GNUNET_SYSERR
761  *         on error (same hashcode)
762  */
763 static int find_bucket(const GNUNET_HashCode *hc)
764 {
765   unsigned int bits;
766
767   bits = matching_bits(&my_identity.hashPubKey, hc);
768   if (bits == MAX_BUCKETS)
769     return GNUNET_SYSERR;
770   return MAX_BUCKETS - bits - 1;
771 }
772
773 /**
774  * Find which k-bucket this peer should go into,
775  * taking into account the size of the k-bucket
776  * array.  This means that if more bits match than
777  * there are currently buckets, lowest_bucket will
778  * be returned.
779  *
780  * @param hc GNUNET_HashCode we are finding the bucket for.
781  *
782  * @return the proper bucket index for this key,
783  *         or GNUNET_SYSERR on error (same hashcode)
784  */
785 static int find_current_bucket(const GNUNET_HashCode *hc)
786 {
787   int actual_bucket;
788   actual_bucket = find_bucket(hc);
789
790   if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
791     return GNUNET_SYSERR;
792   else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
793     return lowest_bucket;
794   else
795     return actual_bucket;
796 }
797
798 /**
799  * Find a routing table entry from a peer identity
800  *
801  * @param peer the peer identity to look up
802  *
803  * @return the routing table entry, or NULL if not found
804  */
805 static struct PeerInfo *
806 find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
807 {
808   int bucket;
809   struct PeerInfo *pos;
810   bucket = find_current_bucket(&peer->hashPubKey);
811
812   if (bucket == GNUNET_SYSERR)
813     return NULL;
814
815   pos = k_buckets[bucket].head;
816   while (pos != NULL)
817     {
818       if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
819         return pos;
820       pos = pos->next;
821     }
822   return NULL; /* No such peer. */
823 }
824
825 /**
826  * Really add a peer to a bucket (only do assertions
827  * on size, etc.)
828  *
829  * @param peer GNUNET_PeerIdentity of the peer to add
830  * @param bucket the already figured out bucket to add
831  *        the peer to
832  * @param latency the core reported latency of this peer
833  * @param distance the transport level distance to this peer
834  */
835 static void add_peer(const struct GNUNET_PeerIdentity *peer,
836                      unsigned int bucket,
837                      struct GNUNET_TIME_Relative latency,
838                      unsigned int distance)
839 {
840   struct PeerInfo *new_peer;
841   GNUNET_assert(bucket < MAX_BUCKETS);
842   GNUNET_assert(peer != NULL);
843   new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
844   new_peer->latency = latency;
845   new_peer->distance = distance;
846   memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
847
848   GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
849                                     k_buckets[bucket].tail,
850                                     k_buckets[bucket].tail,
851                                     new_peer);
852   k_buckets[bucket].peers_size++;
853 }
854
855 /**
856  * Given a peer and its corresponding bucket,
857  * remove it from that bucket.  Does not free
858  * the PeerInfo struct, nor cancel messages
859  * or free messages waiting to be sent to this
860  * peer!
861  *
862  * @param peer the peer to remove
863  * @param bucket the bucket the peer belongs to
864  */
865 static void remove_peer (struct PeerInfo *peer,
866                          unsigned int bucket)
867 {
868   GNUNET_assert(k_buckets[bucket].peers_size > 0);
869   GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
870                               k_buckets[bucket].tail,
871                               peer);
872   k_buckets[bucket].peers_size--;
873 }
874
875 /**
876  * Removes peer from a bucket, then frees associated
877  * resources and frees peer.
878  *
879  * @param peer peer to be removed and freed
880  * @param bucket which bucket this peer belongs to
881  */
882 static void delete_peer (struct PeerInfo *peer,
883                          unsigned int bucket)
884 {
885   struct P2PPendingMessage *pos;
886   struct P2PPendingMessage *next;
887   remove_peer(peer, bucket); /* First remove the peer from its bucket */
888
889   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
890     GNUNET_SCHEDULER_cancel(sched, peer->send_task);
891   if (peer->th != NULL)
892     GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
893
894   pos = peer->head;
895   while (pos != NULL) /* Remove any pending messages for this peer */
896     {
897       next = pos->next;
898       GNUNET_free(pos);
899       pos = next;
900     }
901   GNUNET_free(peer);
902 }
903
904 /**
905  * The current lowest bucket is full, so change the lowest
906  * bucket to the next lower down, and move any appropriate
907  * entries in the current lowest bucket to the new bucket.
908  */
909 static void enable_next_bucket()
910 {
911   unsigned int new_bucket;
912   unsigned int to_remove;
913   int i;
914   struct PeerInfo *to_remove_list[bucket_size]; /* We either use CPU by making a list, or memory with array.  Use memory. */
915   struct PeerInfo *pos;
916   GNUNET_assert(lowest_bucket > 0);
917
918   pos = k_buckets[lowest_bucket].head;
919   memset(to_remove_list, 0, sizeof(to_remove_list));
920   to_remove = 0;
921   /* Populate the array of peers which should be in the next lowest bucket */
922   while (pos->next != NULL)
923     {
924       if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
925         {
926           to_remove_list[to_remove] = pos;
927           to_remove++;
928         }
929       pos = pos->next;
930     }
931   new_bucket = lowest_bucket - 1;
932
933   /* Remove peers from lowest bucket, insert into next lowest bucket */
934   for (i = 0; i < bucket_size; i++)
935     {
936       if (to_remove_list[i] != NULL)
937         {
938           remove_peer(to_remove_list[i], lowest_bucket);
939           GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
940                                             k_buckets[new_bucket].tail,
941                                             k_buckets[new_bucket].tail,
942                                             to_remove_list[i]);
943           k_buckets[new_bucket].peers_size++;
944         }
945       else
946         break;
947     }
948   lowest_bucket = new_bucket;
949 }
950 /**
951  * Attempt to add a peer to our k-buckets.
952  *
953  * @param peer, the peer identity of the peer being added
954  *
955  * @return GNUNET_YES if the peer was added,
956  *         GNUNET_NO if not,
957  *         GNUNET_SYSERR on err (peer is us!)
958  */
959 static int try_add_peer(const struct GNUNET_PeerIdentity *peer,
960                         unsigned int bucket,
961                         struct GNUNET_TIME_Relative latency,
962                         unsigned int distance)
963 {
964   int peer_bucket;
965
966   peer_bucket = find_current_bucket(&peer->hashPubKey);
967   if (peer_bucket == GNUNET_SYSERR)
968     return GNUNET_SYSERR;
969
970   GNUNET_assert(peer_bucket >= lowest_bucket);
971   if ((k_buckets[peer_bucket].peers_size) < bucket_size)
972     {
973       add_peer(peer, peer_bucket, latency, distance);
974       return GNUNET_YES;
975     }
976   else if ((peer_bucket == lowest_bucket) && (lowest_bucket > 0))
977     {
978       enable_next_bucket();
979       return try_add_peer(peer, bucket, latency, distance); /* Recurse, if proper bucket still full ping peers */
980     }
981   else if ((k_buckets[peer_bucket].peers_size) == bucket_size)
982     {
983       /* TODO: implement ping_oldest_peer */
984       //ping_oldest_peer(bucket, peer, latency, distance); /* Find oldest peer, ping it.  If no response, remove and add new peer! */
985       return GNUNET_NO;
986     }
987   GNUNET_break(0);
988   return GNUNET_NO;
989 }
990
991
992 /**
993  * Task run to check for messages that need to be sent to a client.
994  *
995  * @param client a ClientList, containing the client and any messages to be sent to it
996  */
997 static void
998 process_pending_messages (struct ClientList *client)
999
1000   if (client->pending_head == NULL) 
1001     return;    
1002   if (client->transmit_handle != NULL) 
1003     return;
1004   client->transmit_handle =
1005     GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1006                                          ntohs (client->pending_head->msg->
1007                                                 size),
1008                                          GNUNET_TIME_UNIT_FOREVER_REL,
1009                                          &send_generic_reply, client);
1010 }
1011
1012 /**
1013  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1014  * request.  A ClientList is passed as closure, take the head of the list
1015  * and copy it into buf, which has the result of sending the message to the
1016  * client.
1017  *
1018  * @param cls closure to this call
1019  * @param size maximum number of bytes available to send
1020  * @param buf where to copy the actual message to
1021  *
1022  * @return the number of bytes actually copied, 0 indicates failure
1023  */
1024 static size_t
1025 send_generic_reply (void *cls, size_t size, void *buf)
1026 {
1027   struct ClientList *client = cls;
1028   char *cbuf = buf;
1029   struct PendingMessage *reply;
1030   size_t off;
1031   size_t msize;
1032
1033   client->transmit_handle = NULL;
1034   if (buf == NULL)             
1035     {
1036       /* client disconnected */
1037       return 0;
1038     }
1039   off = 0;
1040   while ( (NULL != (reply = client->pending_head)) &&
1041           (size >= off + (msize = ntohs (reply->msg->size))))
1042     {
1043       GNUNET_CONTAINER_DLL_remove (client->pending_head,
1044                                    client->pending_tail,
1045                                    reply);
1046       memcpy (&cbuf[off], reply->msg, msize);
1047       GNUNET_free (reply);
1048       off += msize;
1049     }
1050   process_pending_messages (client);
1051   return off;
1052 }
1053
1054
1055 /**
1056  * Add a PendingMessage to the clients list of messages to be sent
1057  *
1058  * @param client the active client to send the message to
1059  * @param pending_message the actual message to send
1060  */
1061 static void
1062 add_pending_message (struct ClientList *client,
1063                      struct PendingMessage *pending_message)
1064 {
1065   GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
1066                                      client->pending_tail,
1067                                      client->pending_tail,
1068                                      pending_message);
1069   process_pending_messages (client);
1070 }
1071
1072
1073
1074
1075 /**
1076  * Called when a reply needs to be sent to a client, as
1077  * a result it found to a GET or FIND PEER request.
1078  *
1079  * @param client the client to send the reply to
1080  * @param message the encapsulated message to send
1081  * @param uid the unique identifier of this request
1082  */
1083 static void
1084 send_reply_to_client (struct ClientList *client,
1085                       const struct GNUNET_MessageHeader *message,
1086                       unsigned long long uid)
1087 {
1088   struct GNUNET_DHT_RouteResultMessage *reply;
1089   struct PendingMessage *pending_message;
1090   uint16_t msize;
1091   size_t tsize;
1092 #if DEBUG_DHT
1093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094               "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
1095 #endif
1096   msize = ntohs (message->size);
1097   tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
1098   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1099     {
1100       GNUNET_break_op (0);
1101       return;
1102     }
1103
1104   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1105   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
1106   reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
1107   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1108   reply->header.size = htons (tsize);
1109   reply->unique_id = GNUNET_htonll (uid);
1110   memcpy (&reply[1], message, msize);
1111
1112   add_pending_message (client, pending_message);
1113 }
1114
1115
1116 /**
1117  * Main function that handles whether or not to route a result
1118  * message to other peers, or to send to our local client.
1119  *
1120  * @param msg the result message to be routed
1121  * @return the number of peers the message was routed to,
1122  *         GNUNET_SYSERR on failure
1123  */
1124 static int route_result_message(void *cls,
1125                                 struct GNUNET_MessageHeader *msg,
1126                                 struct DHT_MessageContext *message_context)
1127 {
1128   struct DHTQueryRecord *record;
1129   struct DHTRouteSource *pos;
1130   struct PeerInfo *peer_info;
1131   struct GNUNET_MessageHeader *hello_msg;
1132
1133   /**
1134    * If a find peer result message is received and contains a valid
1135    * HELLO for another peer, offer it to the transport service.
1136    *
1137    * FIXME: Check whether we need this peer (based on routing table
1138    * fullness) and only try to connect to it conditionally.  This should
1139    * reduce trying to connect to say (500) peers when the bucket size will
1140    * discard most of them.
1141    */
1142   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1143     {
1144       if (ntohs(msg->size) <= sizeof(struct GNUNET_MessageHeader))
1145         GNUNET_break_op(0);
1146
1147       hello_msg = &msg[1];
1148       if (ntohs(hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO)
1149       {
1150         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received non-HELLO message type in find peer result message!\n", my_short_id, "DHT");
1151         GNUNET_break_op(0);
1152       }
1153       else
1154       {
1155         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received HELLO message for another peer, offering to transport!\n", my_short_id, "DHT");
1156         GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg);
1157       }
1158
1159     }
1160   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, message_context->key);
1161   if (record == NULL) /* No record of this message! */
1162     {
1163 #if DEBUG_DHT
1164     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165                 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
1166                 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1167 #endif
1168 #if DEBUG_DHT_ROUTING
1169
1170       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1171         {
1172           dhtlog_handle->insert_route (NULL,
1173                                        message_context->unique_id,
1174                                        DHTLOG_RESULT,
1175                                        message_context->hop_count,
1176                                        GNUNET_SYSERR,
1177                                        &my_identity,
1178                                        message_context->key,
1179                                        message_context->peer, NULL);
1180         }
1181 #endif
1182       if (message_context->bloom != NULL)
1183         {
1184           GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1185           message_context->bloom = NULL;
1186         }
1187       return 0;
1188     }
1189
1190   pos = record->head;
1191   while (pos != NULL)
1192     {
1193       if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
1194         {
1195 #if DEBUG_DHT
1196           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197                       "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
1198                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1199 #endif
1200 #if DEBUG_DHT_ROUTING
1201           if ((debug_routes_extended) && (dhtlog_handle != NULL))
1202             {
1203               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
1204                                            message_context->hop_count,
1205                                            GNUNET_YES, &my_identity, message_context->key,
1206                                            message_context->peer, NULL);
1207             }
1208 #endif
1209           send_reply_to_client(pos->client, msg, message_context->unique_id);
1210         }
1211       else /* Send to peer */
1212         {
1213           peer_info = find_peer_by_id(&pos->source);
1214           if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1215             {
1216               pos = pos->next;
1217               continue;
1218             }
1219
1220           if (message_context->bloom == NULL)
1221             message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1222           GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1223           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (message_context->bloom, &peer_info->id.hashPubKey))
1224             {
1225 #if DEBUG_DHT
1226               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227                           "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
1228                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1229 #endif
1230 #if DEBUG_DHT_ROUTING
1231               if ((debug_routes_extended) && (dhtlog_handle != NULL))
1232                 {
1233                   dhtlog_handle->insert_route (NULL, message_context->unique_id,
1234                                                DHTLOG_RESULT,
1235                                                message_context->hop_count,
1236                                                GNUNET_NO, &my_identity, message_context->key,
1237                                                message_context->peer, &pos->source);
1238                 }
1239 #endif
1240               forward_result_message(cls, msg, peer_info, message_context);
1241             }
1242           else
1243             {
1244 #if DEBUG_DHT
1245               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1246                           "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
1247                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1248 #endif
1249             }
1250         }
1251       pos = pos->next;
1252     }
1253   if (message_context->bloom != NULL)
1254     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1255   return 0;
1256 }
1257
1258 /**
1259  * Iterator for local get request results,
1260  *
1261  * @param cls closure for iterator, a DatacacheGetContext
1262  * @param exp when does this value expire?
1263  * @param key the key this data is stored under
1264  * @param size the size of the data identified by key
1265  * @param data the actual data
1266  * @param type the type of the data
1267  *
1268  * @return GNUNET_OK to continue iteration, anything else
1269  * to stop iteration.
1270  */
1271 static int
1272 datacache_get_iterator (void *cls,
1273                         struct GNUNET_TIME_Absolute exp,
1274                         const GNUNET_HashCode * key,
1275                         uint32_t size, const char *data, uint32_t type)
1276 {
1277   struct DHT_MessageContext *msg_ctx = cls;
1278   struct DHT_MessageContext *new_msg_ctx;
1279   struct GNUNET_DHT_GetResultMessage *get_result;
1280 #if DEBUG_DHT
1281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
1283 #endif
1284   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
1285   memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
1286   get_result =
1287     GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
1288   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1289   get_result->header.size =
1290     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
1291   get_result->expiration = GNUNET_TIME_absolute_hton(exp);
1292   get_result->type = htons (type);
1293   memcpy (&get_result[1], data, size);
1294   new_msg_ctx->peer = &my_identity;
1295   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1296   new_msg_ctx->hop_count = 0;
1297   route_result_message(cls, &get_result->header, new_msg_ctx);
1298   GNUNET_free(new_msg_ctx);
1299   //send_reply_to_client (datacache_get_ctx->client, &get_result->header,
1300   //                      datacache_get_ctx->unique_id);
1301   GNUNET_free (get_result);
1302   return GNUNET_OK;
1303 }
1304
1305
1306 /**
1307  * Server handler for all dht get requests, look for data,
1308  * if found, send response either to clients or other peers.
1309  *
1310  * @param cls closure for service
1311  * @param msg the actual get message
1312  * @param message_context struct containing pertinent information about the get request
1313  *
1314  * @return number of items found for GET request
1315  */
1316 static unsigned int
1317 handle_dht_get (void *cls, 
1318                 const struct GNUNET_MessageHeader *msg,
1319                 struct DHT_MessageContext *message_context)
1320 {
1321   const struct GNUNET_DHT_GetMessage *get_msg;
1322   uint16_t get_type;
1323   unsigned int results;
1324
1325   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1326   if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
1327     {
1328       GNUNET_break (0);
1329       return 0;
1330     }
1331
1332   get_type = ntohs (get_msg->type);
1333 #if DEBUG_DHT
1334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335               "`%s:%s': Received `%s' request from client, message type %u, key %s, uid %llu\n", my_short_id,
1336               "DHT", "GET", get_type, GNUNET_h2s (message_context->key),
1337               message_context->unique_id);
1338 #endif
1339
1340   results = 0;
1341   if (datacache != NULL)
1342     results =
1343       GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
1344                             &datacache_get_iterator, message_context);
1345
1346   if (results >= 1)
1347     {
1348 #if DEBUG_DHT
1349       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350                   "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
1351                   results, "GET", message_context->unique_id);
1352 #endif
1353 #if DEBUG_DHT_ROUTING
1354       if ((debug_routes) && (dhtlog_handle != NULL))
1355         {
1356           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1357                                 message_context->hop_count, GNUNET_YES, &my_identity,
1358                                 message_context->key);
1359         }
1360
1361       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1362         {
1363           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1364                                        message_context->hop_count, GNUNET_YES,
1365                                        &my_identity, message_context->key, message_context->peer,
1366                                        NULL);
1367         }
1368 #endif
1369     }
1370
1371   if (message_context->hop_count == 0) /* Locally initiated request */
1372     {
1373 #if DEBUG_DHT_ROUTING
1374     if ((debug_routes) && (dhtlog_handle != NULL))
1375       {
1376         dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1377                                       message_context->hop_count, GNUNET_NO, &my_identity,
1378                                       message_context->key);
1379       }
1380 #endif
1381     }
1382
1383   return results;
1384 }
1385
1386
1387 /**
1388  * Server handler for initiating local dht find peer requests
1389  *
1390  * @param cls closure for service
1391  * @param find_msg the actual find peer message
1392  * @param message_context struct containing pertinent information about the request
1393  *
1394  */
1395 static void
1396 handle_dht_find_peer (void *cls, 
1397                       const struct GNUNET_MessageHeader *find_msg,
1398                       struct DHT_MessageContext *message_context)
1399 {
1400   struct GNUNET_MessageHeader *find_peer_result;
1401   struct DHT_MessageContext *new_msg_ctx;
1402   size_t hello_size;
1403   size_t tsize;
1404
1405 #if DEBUG_DHT
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
1408               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
1409               ntohs (find_msg->size),
1410               sizeof (struct GNUNET_MessageHeader));
1411 #endif
1412   if ((my_hello == NULL) || (message_context->closest != GNUNET_YES))
1413   {
1414 #if DEBUG_DHT
1415     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1416                 "`%s': Our HELLO is null, can't return.\n",
1417                 "DHT");
1418 #endif
1419     return;
1420   }
1421   /* Simplistic find_peer functionality, always return our hello */
1422   hello_size = ntohs(my_hello->size);
1423   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
1424
1425   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1426     {
1427       GNUNET_break_op (0);
1428       return;
1429     }
1430
1431   find_peer_result = GNUNET_malloc (tsize);
1432   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
1433   find_peer_result->size = htons (tsize);
1434   memcpy (&find_peer_result[1], my_hello, hello_size);
1435
1436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437               "`%s': Sending hello size %d to requesting peer.\n",
1438               "DHT", hello_size);
1439
1440   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
1441   memcpy(new_msg_ctx, message_context, sizeof(struct DHT_MessageContext));
1442   new_msg_ctx->peer = &my_identity;
1443   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1444   new_msg_ctx->hop_count = 0;
1445   route_result_message(cls, find_peer_result, new_msg_ctx);
1446   //send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
1447   GNUNET_free(find_peer_result);
1448 }
1449
1450
1451 /**
1452  * Server handler for initiating local dht put requests
1453  *
1454  * @param cls closure for service
1455  * @param msg the actual put message
1456  * @param message_context struct containing pertinent information about the request
1457  */
1458 static void
1459 handle_dht_put (void *cls,
1460                 const struct GNUNET_MessageHeader *msg,
1461                 struct DHT_MessageContext *message_context)
1462 {
1463   struct GNUNET_DHT_PutMessage *put_msg;
1464   size_t put_type;
1465   size_t data_size;
1466
1467   GNUNET_assert (ntohs (msg->size) >=
1468                  sizeof (struct GNUNET_DHT_PutMessage));
1469   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
1470   put_type = ntohs (put_msg->type);
1471   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1472 #if DEBUG_DHT
1473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1474               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1475               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (message_context->key), message_context->unique_id);
1476 #endif
1477 #if DEBUG_DHT_ROUTING
1478
1479   if ((debug_routes) && (dhtlog_handle != NULL))
1480     {
1481       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
1482                                    message_context->hop_count, GNUNET_YES, &my_identity,
1483                                    message_context->key);
1484     }
1485 #endif
1486
1487   if (datacache != NULL)
1488     GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
1489                           (char *) &put_msg[1], put_type,
1490                           GNUNET_TIME_absolute_ntoh(put_msg->expiration));
1491   else
1492     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493                 "`%s:%s': %s request received, but have no datacache!\n",
1494                 my_short_id, "DHT", "PUT");
1495 }
1496
1497 /**
1498  * Estimate the diameter of the network based
1499  * on how many buckets are currently in use.
1500  * Concept here is that the diameter of the network
1501  * is roughly the distance a message must travel in
1502  * order to reach its intended destination.  Since
1503  * at each hop we expect to get one bit closer, and
1504  * we have one bit per bucket, the number of buckets
1505  * in use should be the largest number of hops for
1506  * a sucessful message. (of course, this assumes we
1507  * know all peers in the network!)
1508  *
1509  * @return ballpark diameter figure
1510  */
1511 static unsigned int estimate_diameter()
1512 {
1513   return MAX_BUCKETS - lowest_bucket;
1514 }
1515
1516 /**
1517  * To how many peers should we (on average)
1518  * forward the request to obtain the desired
1519  * target_replication count (on average).
1520  *
1521  * Always 0, 1 or 2 (don't send, send once, split)
1522  */
1523 static unsigned int
1524 get_forward_count (unsigned int hop_count, size_t target_replication)
1525 {
1526   double target_count;
1527   unsigned int target_value;
1528   unsigned int diameter;
1529
1530   /* FIXME: the smaller we think the network is the more lenient we should be for
1531    * routing right?  The estimation below only works if we think we have reasonably
1532    * full routing tables, which for our RR topologies may not be the case!
1533    */
1534   diameter = estimate_diameter ();
1535   if ((hop_count > (diameter + 1) * 2) && (MINIMUM_PEER_THRESHOLD < estimate_diameter() * bucket_size))
1536     {
1537 #if DEBUG_DHT
1538       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1539                   "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
1540                   "DHT", estimate_diameter(), lowest_bucket);
1541 #endif
1542       return 0;
1543     }
1544   else if (hop_count > MAX_HOPS)
1545     {
1546 #if DEBUG_DHT
1547       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1548                   "`%s:%s': Hop count too high (greater than max)\n", my_short_id,
1549                   "DHT");
1550 #endif
1551       return 0;
1552     }
1553   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
1554     target_replication / (target_replication * (hop_count + 1) + diameter);
1555   target_value = 0;
1556
1557 #if NONSENSE
1558   while (target_value < target_count)
1559     target_value++; /* target_value is ALWAYS 1 after this "loop" */
1560 #else
1561   target_value = 1;
1562 #endif
1563   if ((target_count + 1 - target_value) >
1564       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1565                                 RAND_MAX) / RAND_MAX)
1566     target_value++;
1567   return target_value;
1568 }
1569
1570 /**
1571  * Find the closest peer in our routing table to the
1572  * given hashcode.
1573  *
1574  * @return The closest peer in our routing table to the
1575  *         key, or NULL on error.
1576  */
1577 static struct PeerInfo *
1578 find_closest_peer (const GNUNET_HashCode *hc)
1579 {
1580   struct PeerInfo *pos;
1581   struct PeerInfo *current_closest;
1582   unsigned int lowest_distance;
1583   unsigned int temp_distance;
1584   int bucket;
1585
1586   lowest_distance = -1;
1587
1588   if (k_buckets[lowest_bucket].peers_size == 0)
1589     return NULL;
1590
1591   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1592     {
1593       pos = k_buckets[bucket].head;
1594       while (pos != NULL)
1595         {
1596           temp_distance = distance(&pos->id.hashPubKey, hc);
1597           if (temp_distance <= lowest_distance)
1598             {
1599               lowest_distance = temp_distance;
1600               current_closest = pos;
1601             }
1602           pos = pos->next;
1603         }
1604     }
1605   GNUNET_assert(current_closest != NULL);
1606   return current_closest;
1607 }
1608
1609 /*
1610  * Check whether my identity is closer than any known peers.
1611  *
1612  * @param target hash code to check closeness to
1613  *
1614  * Return GNUNET_YES if node location is closest, GNUNET_NO
1615  * otherwise.
1616  */
1617 int
1618 am_closest_peer (const GNUNET_HashCode * target)
1619 {
1620   int bits;
1621   int other_bits;
1622   int bucket_num;
1623   struct PeerInfo *pos;
1624   unsigned int my_distance;
1625
1626   bucket_num = find_current_bucket(target);
1627   if (bucket_num == GNUNET_SYSERR) /* Same key! */
1628     return GNUNET_YES;
1629
1630   bits = matching_bits(&my_identity.hashPubKey, target);
1631   my_distance = distance(&my_identity.hashPubKey, target);
1632
1633   pos = k_buckets[bucket_num].head;
1634   while (pos != NULL)
1635     {
1636       other_bits = matching_bits(&pos->id.hashPubKey, target);
1637       if (other_bits > bits)
1638         return GNUNET_NO;
1639       else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
1640         {
1641           if (distance(&pos->id.hashPubKey, target) < my_distance)
1642             return GNUNET_NO;
1643         }
1644       pos = pos->next;
1645     }
1646
1647 #if DEBUG_TABLE
1648   GNUNET_GE_LOG (coreAPI->ectx,
1649                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1650                  GNUNET_GE_BULK, "closest peer\n");
1651   printPeerBits (&closest);
1652   GNUNET_GE_LOG (coreAPI->ectx,
1653                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1654                  GNUNET_GE_BULK, "me\n");
1655   printPeerBits (coreAPI->my_identity);
1656   GNUNET_GE_LOG (coreAPI->ectx,
1657                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1658                  GNUNET_GE_BULK, "key\n");
1659   printKeyBits (target);
1660   GNUNET_GE_LOG (coreAPI->ectx,
1661                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1662                  GNUNET_GE_BULK,
1663                  "closest peer inverse distance is %u, mine is %u\n",
1664                  inverse_distance (target, &closest.hashPubKey),
1665                  inverse_distance (target,
1666                                    &coreAPI->my_identity->hashPubKey));
1667 #endif
1668
1669   /* No peers closer, we are the closest! */
1670   return GNUNET_YES;
1671
1672 }
1673
1674
1675 /**
1676  * Select a peer from the routing table that would be a good routing
1677  * destination for sending a message for "target".  The resulting peer
1678  * must not be in the set of blocked peers.<p>
1679  *
1680  * Note that we should not ALWAYS select the closest peer to the
1681  * target, peers further away from the target should be chosen with
1682  * exponentially declining probability.
1683  *
1684  * @param target the key we are selecting a peer to route to
1685  * @param bloom a bloomfilter containing entries this request has seen already
1686  *
1687  * @return Peer to route to, or NULL on error
1688  */
1689 static struct PeerInfo *
1690 select_peer (const GNUNET_HashCode * target,
1691              struct GNUNET_CONTAINER_BloomFilter *bloom)
1692 {
1693   unsigned int distance;
1694   unsigned int bc;
1695   struct PeerInfo *pos;
1696 #if USE_KADEMLIA
1697   const struct PeerInfo *chosen;
1698   unsigned long long largest_distance;
1699 #else
1700   unsigned long long total_distance;
1701   unsigned long long selected;
1702 #endif
1703
1704 #if USE_KADEMLIA
1705   largest_distance = 0;
1706   chosen = NULL;
1707   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1708     {
1709       pos = k_buckets[bc].head;
1710       while (pos != NULL)
1711         {
1712           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1713             {
1714               distance = inverse_distance (target, &pos->id.hashPubKey);
1715               if (distance > largest_distance)
1716                 {
1717                   chosen = pos;
1718                   largest_distance = distance;
1719                 }
1720             }
1721           pos = pos->next;
1722         }
1723     }
1724
1725   if ((largest_distance > 0) && (chosen != NULL))
1726     {
1727       GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
1728       return chosen;
1729     }
1730   else
1731     {
1732       return NULL;
1733     }
1734 #else
1735   /* GNUnet-style */
1736   total_distance = 0;
1737   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1738     {
1739       pos = k_buckets[bc].head;
1740       while (pos != NULL)
1741         {
1742           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1743             total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
1744 #if DEBUG_DHT > 1
1745           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1746                       "`%s:%s': Total distance is %llu, distance from %s to %s is %u\n",
1747                       my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey));
1748 #endif
1749           pos = pos->next;
1750         }
1751     }
1752   if (total_distance == 0)
1753     {
1754       return NULL;
1755     }
1756
1757   selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
1758   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1759     {
1760       pos = k_buckets[bc].head;
1761       while (pos != NULL)
1762         {
1763           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1764             {
1765               distance = inverse_distance (target, &pos->id.hashPubKey);
1766               if (distance > selected)
1767                 return pos;
1768               selected -= distance;
1769             }
1770           else
1771             {
1772 #if DEBUG_DHT
1773               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774                           "`%s:%s': peer %s matches bloomfilter.\n",
1775                           my_short_id, "DHT", GNUNET_i2s(&pos->id));
1776 #endif
1777             }
1778           pos = pos->next;
1779         }
1780     }
1781 #if DEBUG_DHT
1782     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783                 "`%s:%s': peer %s matches bloomfilter.\n",
1784                 my_short_id, "DHT", GNUNET_i2s(&pos->id));
1785 #endif
1786   return NULL;
1787 #endif
1788 }
1789
1790 /**
1791  * Function called to send a request out to another peer.
1792  * Called both for locally initiated requests and those
1793  * received from other peers.
1794  *
1795  * @param cls DHT service closure argument
1796  * @param msg the encapsulated message
1797  * @param peer the peer to forward the message to
1798  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1799  */
1800 static void forward_message (void *cls,
1801                              const struct GNUNET_MessageHeader *msg,
1802                              struct PeerInfo *peer,
1803                              struct DHT_MessageContext *msg_ctx)
1804 {
1805   struct GNUNET_DHT_P2PRouteMessage *route_message;
1806   struct P2PPendingMessage *pending;
1807   size_t msize;
1808   size_t psize;
1809
1810   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1811   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1812   psize = sizeof(struct P2PPendingMessage) + msize;
1813   pending = GNUNET_malloc(psize);
1814   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1815   pending->importance = DHT_SEND_PRIORITY;
1816   pending->timeout = GNUNET_TIME_relative_get_forever();
1817   route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1818   route_message->header.size = htons(msize);
1819   route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1820   route_message->options = htonl(msg_ctx->msg_options);
1821   route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1822   route_message->network_size = htonl(msg_ctx->network_size);
1823   route_message->desired_replication_level = htonl(msg_ctx->replication);
1824   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1825   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1826   memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1827   memcpy(&route_message[1], msg, ntohs(msg->size));
1828 #if DEBUG_DHT > 1
1829   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1830 #endif
1831   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1832   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1833     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1834 }
1835
1836 /**
1837  * Task used to remove forwarding entries, either
1838  * after timeout, when full, or on shutdown.
1839  *
1840  * @param cls the entry to remove
1841  * @param tc context, reason, etc.
1842  */
1843 static void
1844 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1845 {
1846   struct DHTRouteSource *source_info = cls;
1847   struct DHTQueryRecord *record;
1848   source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
1849   record = source_info->record;
1850   GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1851
1852   if (record->head == NULL) /* No more entries in DLL */
1853     {
1854       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1855       GNUNET_free(record);
1856     }
1857   GNUNET_free(source_info);
1858 }
1859
1860 /**
1861  * Remember this routing request so that if a reply is
1862  * received we can either forward it to the correct peer
1863  * or return the result locally.
1864  *
1865  * @param cls DHT service closure
1866  * @param msg_ctx Context of the route request
1867  *
1868  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1869  */
1870 static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
1871 {
1872   struct DHTQueryRecord *record;
1873   struct DHTRouteSource *source_info;
1874   struct DHTRouteSource *pos;
1875   struct GNUNET_TIME_Absolute now;
1876   unsigned int current_size;
1877
1878   current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1879   while (current_size >= MAX_OUTSTANDING_FORWARDS)
1880     {
1881       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
1882       record = source_info->record;
1883       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1884       if (record->head == NULL) /* No more entries in DLL */
1885         {
1886           GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1887           GNUNET_free(record);
1888         }
1889       GNUNET_SCHEDULER_cancel(sched, source_info->delete_task);
1890       GNUNET_free(source_info);
1891       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1892     }
1893   now = GNUNET_TIME_absolute_get();
1894   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, msg_ctx->key);
1895   if (record != NULL) /* Already know this request! */
1896     {
1897       pos = record->head;
1898       while (pos != NULL)
1899         {
1900           if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
1901             break; /* Already have this peer in reply list! */
1902           pos = pos->next;
1903         }
1904       if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
1905         {
1906           GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.value);
1907           return GNUNET_NO;
1908         }
1909     }
1910   else
1911     {
1912       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
1913       GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1914       memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1915     }
1916
1917   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
1918   source_info->record = record;
1919   source_info->delete_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
1920   memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
1921   GNUNET_CONTAINER_DLL_insert_after(record->head, record->tail, record->tail, source_info);
1922   if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
1923     {
1924       source_info->client = msg_ctx->client;
1925       now = GNUNET_TIME_absolute_get_forever();
1926     }
1927   source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.value);
1928 #if DEBUG_DHT > 1
1929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1930                   "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
1931                   "DHT", GNUNET_h2s (msg_ctx->key), msg_ctx->unique_id);
1932 #endif
1933   return GNUNET_YES;
1934 }
1935
1936
1937 /**
1938  * Main function that handles whether or not to route a message to other
1939  * peers.
1940  *
1941  * @param msg the message to be routed
1942  *
1943  * @return the number of peers the message was routed to,
1944  *         GNUNET_SYSERR on failure
1945  */
1946 static int route_message(void *cls,
1947                          const struct GNUNET_MessageHeader *msg,
1948                          struct DHT_MessageContext *message_context)
1949 {
1950   int i;
1951   struct PeerInfo *selected;
1952   struct PeerInfo *nearest;
1953   unsigned int forward_count;
1954 #if DEBUG_DHT
1955   char *nearest_buf;
1956 #endif
1957 #if DEBUG_DHT_ROUTING
1958   int ret;
1959 #endif
1960
1961   message_context->closest = am_closest_peer(message_context->key);
1962   forward_count = get_forward_count(message_context->hop_count, message_context->replication);
1963   nearest = find_closest_peer(message_context->key);
1964
1965   if (message_context->bloom == NULL)
1966     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1967   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1968
1969   if ((stop_on_closest == GNUNET_YES) && (message_context->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
1970     forward_count = 0;
1971
1972 #if DEBUG_DHT_ROUTING
1973   if (forward_count == 0)
1974     ret = GNUNET_SYSERR;
1975   else
1976     ret = GNUNET_NO;
1977
1978   if ((debug_routes_extended) && (dhtlog_handle != NULL))
1979     {
1980       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1981                                    message_context->hop_count, ret,
1982                                    &my_identity, message_context->key, message_context->peer,
1983                                    NULL);
1984     }
1985 #endif
1986
1987   switch (ntohs(msg->type))
1988     {
1989     case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
1990       cache_response (cls, message_context);
1991       if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
1992         forward_count = 0;
1993       break;
1994     case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding?*/
1995       if (message_context->closest == GNUNET_YES)
1996         {
1997 #if DEBUG_DHT_ROUTING
1998           if ((debug_routes_extended) && (dhtlog_handle != NULL))
1999             {
2000               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2001                                            message_context->hop_count, GNUNET_YES,
2002                                            &my_identity, message_context->key, message_context->peer,
2003                                            NULL);
2004             }
2005 #endif
2006           handle_dht_put (cls, msg, message_context);
2007         }
2008 #if DEBUG_DHT_ROUTING
2009         if (message_context->hop_count == 0) /* Locally initiated request */
2010           {
2011             if ((debug_routes) && (dhtlog_handle != NULL))
2012               {
2013                 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
2014                                              message_context->hop_count, GNUNET_NO, &my_identity,
2015                                              message_context->key);
2016               }
2017           }
2018 #endif
2019       break;
2020     case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
2021       if (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))
2022       {
2023         cache_response (cls, message_context);
2024         if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2025           handle_dht_find_peer (cls, msg, message_context);
2026       }
2027       break;
2028     default:
2029       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2030                   "`%s': Message type (%d) not handled\n", "DHT", ntohs(msg->type));
2031     }
2032
2033   for (i = 0; i < forward_count; i++)
2034     {
2035       selected = select_peer(message_context->key, message_context->bloom);
2036
2037       if (selected != NULL)
2038         {
2039           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
2040 #if DEBUG_DHT_ROUTING > 1
2041           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
2042           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2043                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
2044                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&selected->id), nearest_buf, matching_bits(&nearest->id.hashPubKey, message_context->key), distance(&nearest->id.hashPubKey, message_context->key));
2045           GNUNET_free(nearest_buf);
2046 #endif
2047           /* FIXME: statistics */
2048           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2049             {
2050               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2051                                            message_context->hop_count, GNUNET_NO,
2052                                            &my_identity, message_context->key, message_context->peer,
2053                                            &selected->id);
2054             }
2055           forward_message(cls, msg, selected, message_context);
2056         }
2057       else
2058         {
2059 #if DEBUG_DHT
2060           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2061                       "`%s:%s': No peers selected for forwarding.\n", my_short_id,
2062                       "DHT");
2063 #endif
2064         }
2065     }
2066 #if DEBUG_DHT_ROUTING > 1
2067   if (forward_count == 0)
2068     {
2069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2070                   "`%s:%s': NOT Forwarding request key %s uid %llu to any peers\n", my_short_id,
2071                   "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
2072     }
2073 #endif
2074
2075   if (message_context->bloom != NULL)
2076     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2077
2078   return forward_count;
2079 }
2080
2081 /**
2082  * Find a client if it exists, add it otherwise.
2083  *
2084  * @param client the server handle to the client
2085  *
2086  * @return the client if found, a new client otherwise
2087  */
2088 static struct ClientList *
2089 find_active_client (struct GNUNET_SERVER_Client *client)
2090 {
2091   struct ClientList *pos = client_list;
2092   struct ClientList *ret;
2093
2094   while (pos != NULL)
2095     {
2096       if (pos->client_handle == client)
2097         return pos;
2098       pos = pos->next;
2099     }
2100
2101   ret = GNUNET_malloc (sizeof (struct ClientList));
2102   ret->client_handle = client;
2103   ret->next = client_list;
2104   client_list = ret;
2105   return ret;
2106 }
2107
2108 /**
2109  * Task to send a find peer message for our own peer identifier
2110  * so that we can find the closest peers in the network to ourselves
2111  * and attempt to connect to them.
2112  *
2113  * @param cls closure for this task
2114  * @param tc the context under which the task is running
2115  */
2116 static void
2117 send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2118 {
2119   struct GNUNET_MessageHeader *find_peer_msg;
2120   struct DHT_MessageContext message_context;
2121   int ret;
2122
2123   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
2124     return;
2125
2126   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
2127   find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader));
2128   find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
2129   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2130   message_context.key = &my_identity.hashPubKey;
2131   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
2132   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
2133   message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS);
2134   message_context.network_size = estimate_diameter();
2135   message_context.peer = &my_identity;
2136
2137   ret = route_message(NULL, find_peer_msg, &message_context);
2138
2139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2140               "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT",
2141               "FIND PEER", ret);
2142   GNUNET_SCHEDULER_add_delayed (sched,
2143                                 DHT_DEFAULT_FIND_PEER_INTERVAL,
2144                                 &send_find_peer_message, NULL);
2145 }
2146
2147 /**
2148  * Handler for any generic DHT messages, calls the appropriate handler
2149  * depending on message type, sends confirmation if responses aren't otherwise
2150  * expected.
2151  *
2152  * @param cls closure for the service
2153  * @param client the client we received this message from
2154  * @param message the actual message received
2155  */
2156 static void
2157 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
2158                                 const struct GNUNET_MessageHeader *message)
2159 {
2160   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
2161   const struct GNUNET_MessageHeader *enc_msg;
2162   struct DHT_MessageContext message_context;
2163   size_t enc_type;
2164
2165   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
2166   enc_type = ntohs (enc_msg->type);
2167
2168 #if DEBUG_DHT
2169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
2171               my_short_id, "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
2172               GNUNET_ntohll (dht_msg->unique_id));
2173 #endif
2174 #if DEBUG_DHT_ROUTING
2175   if (dhtlog_handle != NULL)
2176     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
2177 #endif
2178   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2179   message_context.client = find_active_client (client);
2180   message_context.key = &dht_msg->key;
2181   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
2182   message_context.replication = ntohl (dht_msg->desired_replication_level);
2183   message_context.msg_options = ntohl (dht_msg->options);
2184   message_context.network_size = estimate_diameter();
2185   message_context.peer = &my_identity;
2186
2187   route_message(cls, enc_msg, &message_context);
2188
2189   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2190
2191 }
2192
2193 /**
2194  * Handler for any generic DHT stop messages, calls the appropriate handler
2195  * depending on message type (if processed locally)
2196  *
2197  * @param cls closure for the service
2198  * @param client the client we received this message from
2199  * @param message the actual message received
2200  *
2201  */
2202 static void
2203 handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
2204                             const struct GNUNET_MessageHeader *message)
2205 {
2206
2207   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
2208     (const struct GNUNET_DHT_StopMessage *) message;
2209   struct DHTQueryRecord *record;
2210   struct DHTRouteSource *pos;
2211   uint64_t uid;
2212 #if DEBUG_DHT
2213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2214               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
2215               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
2216 #endif
2217
2218   uid = GNUNET_ntohll(dht_stop_msg->unique_id);
2219
2220   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
2221   if (record != NULL)
2222     {
2223       pos = record->head;
2224
2225       while (pos != NULL)
2226         {
2227           if ((pos->client != NULL) && (pos->client->client_handle == client))
2228             {
2229               GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
2230               GNUNET_SCHEDULER_add_now(sched, &remove_forward_entry, pos);
2231             }
2232           pos = pos->next;
2233         }
2234     }
2235
2236   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2237 }
2238
2239
2240 /**
2241  * Core handler for p2p route requests.
2242  */
2243 static int
2244 handle_dht_p2p_route_request (void *cls,
2245                               const struct GNUNET_PeerIdentity *peer,
2246                               const struct GNUNET_MessageHeader *message,
2247                               struct GNUNET_TIME_Relative latency, uint32_t distance)
2248 {
2249 #if DEBUG_DHT
2250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2251               "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
2252 #endif
2253   struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
2254   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2255   struct DHT_MessageContext *message_context;
2256
2257   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2258     {
2259       GNUNET_break_op(0);
2260       return GNUNET_YES;
2261     }
2262   //memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2263   message_context = GNUNET_malloc(sizeof (struct DHT_MessageContext));
2264   message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2265   GNUNET_assert(message_context->bloom != NULL);
2266   message_context->hop_count = ntohl(incoming->hop_count);
2267   message_context->key = &incoming->key;
2268   message_context->replication = ntohl(incoming->desired_replication_level);
2269   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
2270   message_context->msg_options = ntohl(incoming->options);
2271   message_context->network_size = ntohl(incoming->network_size);
2272   message_context->peer = peer;
2273   route_message(cls, enc_msg, message_context);
2274   GNUNET_free(message_context);
2275   return GNUNET_YES;
2276 }
2277
2278
2279 /**
2280  * Core handler for p2p route results.
2281  */
2282 static int
2283 handle_dht_p2p_route_result (void *cls,
2284                              const struct GNUNET_PeerIdentity *peer,
2285                              const struct GNUNET_MessageHeader *message,
2286                              struct GNUNET_TIME_Relative latency, uint32_t distance)
2287 {
2288 #if DEBUG_DHT
2289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2290               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
2291 #endif
2292   struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
2293   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2294   struct DHT_MessageContext message_context;
2295
2296   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2297     {
2298       GNUNET_break_op(0);
2299       return GNUNET_YES;
2300     }
2301   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2302   message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2303   GNUNET_assert(message_context.bloom != NULL);
2304   message_context.key = &incoming->key;
2305   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
2306   message_context.msg_options = ntohl(incoming->options);
2307   message_context.hop_count = ntohl(incoming->hop_count);
2308   message_context.peer = peer;
2309   route_result_message(cls, enc_msg, &message_context);
2310   return GNUNET_YES;
2311 }
2312
2313
2314 /**
2315  * Receive the HELLO from transport service,
2316  * free current and replace if necessary.
2317  *
2318  * @param cls NULL
2319  * @param message HELLO message of peer
2320  */
2321 static void
2322 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2323 {
2324 #if DEBUG_DHT
2325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326               "Received our `%s' from transport service\n",
2327               "HELLO");
2328 #endif
2329
2330   GNUNET_assert (message != NULL);
2331   GNUNET_free_non_null(my_hello);
2332   my_hello = GNUNET_malloc(ntohs(message->size));
2333   memcpy(my_hello, message, ntohs(message->size));
2334 }
2335
2336 /**
2337  * Task run during shutdown.
2338  *
2339  * @param cls unused
2340  * @param tc unused
2341  */
2342 static void
2343 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2344 {
2345   int bucket_count;
2346   struct PeerInfo *pos;
2347
2348   if (transport_handle != NULL)
2349   {
2350     GNUNET_free_non_null(my_hello);
2351     GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
2352     GNUNET_TRANSPORT_disconnect(transport_handle);
2353   }
2354
2355   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2356     {
2357       while (k_buckets[bucket_count].head != NULL)
2358         {
2359           pos = k_buckets[bucket_count].head;
2360 #if DEBUG_DHT
2361           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362                       "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
2363 #endif
2364           delete_peer(pos, bucket_count);
2365         }
2366     }
2367   if (coreAPI != NULL)
2368     {
2369 #if DEBUG_DHT
2370       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2371                   "%s:%s Disconnecting core!\n", my_short_id, "DHT");
2372 #endif
2373       GNUNET_CORE_disconnect (coreAPI);
2374     }
2375   if (datacache != NULL)
2376     {
2377 #if DEBUG_DHT
2378       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2379                   "%s:%s Destroying datacache!\n", my_short_id, "DHT");
2380 #endif
2381       GNUNET_DATACACHE_destroy (datacache);
2382     }
2383
2384   if (dhtlog_handle != NULL)
2385     GNUNET_DHTLOG_disconnect(dhtlog_handle);
2386
2387   GNUNET_free_non_null(my_short_id);
2388 }
2389
2390
2391 /**
2392  * To be called on core init/fail.
2393  *
2394  * @param cls service closure
2395  * @param server handle to the server for this service
2396  * @param identity the public identity of this peer
2397  * @param publicKey the public key of this peer
2398  */
2399 void
2400 core_init (void *cls,
2401            struct GNUNET_CORE_Handle *server,
2402            const struct GNUNET_PeerIdentity *identity,
2403            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
2404 {
2405
2406   if (server == NULL)
2407     {
2408 #if DEBUG_DHT
2409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2410               "%s: Connection to core FAILED!\n", "dht",
2411               GNUNET_i2s (identity));
2412 #endif
2413       GNUNET_SCHEDULER_cancel (sched, cleanup_task);
2414       GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
2415       return;
2416     }
2417 #if DEBUG_DHT
2418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2419               "%s: Core connection initialized, I am peer: %s\n", "dht",
2420               GNUNET_i2s (identity));
2421 #endif
2422   /* Copy our identity so we can use it */
2423   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
2424   my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
2425   /* Set the server to local variable */
2426   coreAPI = server;
2427
2428   if (dhtlog_handle != NULL)
2429     dhtlog_handle->insert_node (NULL, &my_identity);
2430 }
2431
2432
2433 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
2434   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE, 0},
2435   {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
2436   {NULL, NULL, 0, 0}
2437 };
2438
2439
2440 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2441   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
2442   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
2443   {NULL, 0, 0}
2444 };
2445
2446 /**
2447  * Method called whenever a peer connects.
2448  *
2449  * @param cls closure
2450  * @param peer peer identity this notification is about
2451  * @param latency reported latency of the connection with peer
2452  * @param distance reported distance (DV) to peer
2453  */
2454 void handle_core_connect (void *cls,
2455                           const struct GNUNET_PeerIdentity * peer,
2456                           struct GNUNET_TIME_Relative latency,
2457                           uint32_t distance)
2458 {
2459   int ret;
2460
2461 #if DEBUG_DHT
2462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2463               "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
2464 #endif
2465   if (datacache != NULL)
2466     GNUNET_DATACACHE_put(datacache, &peer->hashPubKey, sizeof(struct GNUNET_PeerIdentity), (const char *)peer, 0, GNUNET_TIME_absolute_get_forever());
2467   ret = try_add_peer(peer,
2468                      find_current_bucket(&peer->hashPubKey),
2469                      latency,
2470                      distance);
2471 #if DEBUG_DHT
2472     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2473                 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == GNUNET_YES ? "PEER ADDED" : "NOT ADDED");
2474 #endif
2475 }
2476
2477 /**
2478  * Process dht requests.
2479  *
2480  * @param cls closure
2481  * @param scheduler scheduler to use
2482  * @param server the initialized server
2483  * @param c configuration to use
2484  */
2485 static void
2486 run (void *cls,
2487      struct GNUNET_SCHEDULER_Handle *scheduler,
2488      struct GNUNET_SERVER_Handle *server,
2489      const struct GNUNET_CONFIGURATION_Handle *c)
2490 {
2491   sched = scheduler;
2492   cfg = c;
2493   datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
2494   GNUNET_SERVER_add_handlers (server, plugin_handlers);
2495   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
2496                                  cfg,   /* Main configuration */
2497                                  GNUNET_TIME_UNIT_FOREVER_REL,
2498                                  NULL,  /* Closure passed to DHT functionas around? */
2499                                  &core_init,    /* Call core_init once connected */
2500                                  &handle_core_connect,  /* Don't care about connects */
2501                                  NULL,  /* FIXME: remove peers on disconnects */
2502                                  NULL,  /* Do we care about "status" updates? */
2503                                  NULL,  /* Don't want notified about all incoming messages */
2504                                  GNUNET_NO,     /* For header only inbound notification */
2505                                  NULL,  /* Don't want notified about all outbound messages */
2506                                  GNUNET_NO,     /* For header only outbound notification */
2507                                  core_handlers);        /* Register these handlers */
2508
2509   if (coreAPI == NULL)
2510     return;
2511   transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, 
2512                                               NULL, NULL, NULL, NULL, NULL);
2513   if (transport_handle != NULL)
2514     GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
2515   else
2516     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
2517
2518   lowest_bucket = MAX_BUCKETS - 1;
2519   forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
2520   forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
2521   /* Scheduled the task to clean up when shutdown is called */
2522
2523   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
2524     {
2525       debug_routes = GNUNET_YES;
2526     }
2527
2528   if (GNUNET_YES ==
2529       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2530                                            "stop_on_closest"))
2531     {
2532       stop_on_closest = GNUNET_YES;
2533     }
2534
2535   if (GNUNET_YES ==
2536       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2537                                            "stop_found"))
2538     {
2539       stop_on_found = GNUNET_YES;
2540     }
2541
2542   if (GNUNET_YES ==
2543       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
2544                                            "mysql_logging_extended"))
2545     {
2546       debug_routes = GNUNET_YES;
2547       debug_routes_extended = GNUNET_YES;
2548     }
2549
2550   if (GNUNET_YES == debug_routes)
2551     {
2552       dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
2553       if (dhtlog_handle == NULL)
2554         {
2555           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2556                       "Could not connect to mysql logging server, logging will not happen!");
2557         }
2558     }
2559
2560   GNUNET_SCHEDULER_add_delayed (sched,
2561                                 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30),
2562                                 &send_find_peer_message, NULL);
2563
2564   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
2565                                                GNUNET_TIME_UNIT_FOREVER_REL,
2566                                                &shutdown_task, NULL);
2567 }
2568
2569 /**
2570  * The main function for the dht service.
2571  *
2572  * @param argc number of arguments from the command line
2573  * @param argv command line arguments
2574  * @return 0 ok, 1 on error
2575  */
2576 int
2577 main (int argc, char *const *argv)
2578 {
2579   return (GNUNET_OK ==
2580           GNUNET_SERVICE_run (argc,
2581                               argv,
2582                               "dht",
2583                               GNUNET_SERVICE_OPTION_NONE,
2584                               &run, NULL)) ? 0 : 1;
2585 }