missing file
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief main DHT service shell, building block for DHT implementations
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_getopt_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_service_lib.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_signal_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "dhtlog.h"
42 #include "dht.h"
43
44 /**
45  * How many buckets will we allow total.
46  */
47 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
48
49 /**
50  * What is the maximum number of peers in a given bucket.
51  */
52 #define DEFAULT_BUCKET_SIZE 8
53
54 /**
55  * Minimum number of peers we need for "good" routing,
56  * any less than this and we will allow messages to
57  * travel much further through the network!
58  */
59 #define MINIMUM_PEER_THRESHOLD 20
60
61 /**
62  * Real maximum number of hops, at which point we refuse
63  * to forward the message.
64  */
65 #define MAX_HOPS 20
66
67 /**
68  * Linked list of messages to send to clients.
69  */
70 struct P2PPendingMessage
71 {
72   /**
73    * Pointer to next item in the list
74    */
75   struct P2PPendingMessage *next;
76
77   /**
78    * Pointer to previous item in the list
79    */
80   struct P2PPendingMessage *prev;
81
82   /**
83    * Message importance level.
84    */
85   unsigned int importance;
86
87   /**
88    * How long to wait before sending message.
89    */
90   struct GNUNET_TIME_Relative timeout;
91
92   /**
93    * Actual message to be sent; // avoid allocation
94    */
95   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
96
97 };
98
99 /**
100  * Per-peer information.
101  */
102 struct PeerInfo
103 {
104   /**
105    * Next peer entry (DLL)
106    */
107   struct PeerInfo *next;
108
109   /**
110    *  Prev peer entry (DLL)
111    */
112   struct PeerInfo *prev;
113
114   /**
115    * Head of pending messages to be sent to this peer.
116    */
117   struct P2PPendingMessage *head;
118
119   /**
120    * Tail of pending messages to be sent to this peer.
121    */
122   struct P2PPendingMessage *tail;
123
124   /**
125    * Core handle for sending messages to this peer.
126    */
127   struct GNUNET_CORE_TransmitHandle *th;
128
129   /**
130    * Task for scheduling message sends.
131    */
132   GNUNET_SCHEDULER_TaskIdentifier send_task;
133
134   /**
135    * What is the average latency for replies received?
136    */
137   struct GNUNET_TIME_Relative latency;
138
139   /**
140    * Number of responses received
141    */
142   unsigned long long response_count;
143
144   /**
145    * Number of requests sent
146    */
147   unsigned long long request_count;
148
149   /**
150    * What is the identity of the peer?
151    */
152   struct GNUNET_PeerIdentity id;
153
154   /**
155    * Transport level distance to peer.
156    */
157   unsigned int distance;
158
159 };
160
161 /**
162  * Peers are grouped into buckets.
163  */
164 struct PeerBucket
165 {
166   /**
167    * Head of DLL
168    */
169   struct PeerInfo *head;
170
171   /**
172    * Tail of DLL
173    */
174   struct PeerInfo *tail;
175
176   /**
177    * Number of peers in the bucket.
178    */
179   unsigned int peers_size;
180 };
181
182 /**
183  * Linked list of messages to send to clients.
184  */
185 struct PendingMessage
186 {
187   /**
188    * Pointer to next item in the list
189    */
190   struct PendingMessage *next;
191
192   /**
193    * Pointer to previous item in the list
194    */
195   struct PendingMessage *prev;
196
197   /**
198    * Actual message to be sent; // avoid allocation
199    */
200   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
201
202 };
203
204 /**
205  * Struct containing information about a client,
206  * handle to connect to it, and any pending messages
207  * that need to be sent to it.
208  */
209 struct ClientList
210 {
211   /**
212    * Linked list of active clients
213    */
214   struct ClientList *next;
215
216   /**
217    * The handle to this client
218    */
219   struct GNUNET_SERVER_Client *client_handle;
220
221   /**
222    * Handle to the current transmission request, NULL
223    * if none pending.
224    */
225   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
226
227   /**
228    * Linked list of pending messages for this client
229    */
230   struct PendingMessage *pending_head;
231
232   /**
233    * Tail of linked list of pending messages for this client
234    */
235   struct PendingMessage *pending_tail;
236
237 };
238
239
240 /**
241  * Context containing information about a DHT message received.
242  */
243 struct DHT_MessageContext
244 {
245   /**
246    * The client this request was received from.
247    * (NULL if received from another peer)
248    */
249   struct ClientList *client;
250
251   /**
252    * The peer this request was received from.
253    * (NULL if received from local client)
254    */
255   const struct GNUNET_PeerIdentity *peer;
256
257   /**
258    * The key this request was about
259    */
260   const GNUNET_HashCode *key;
261
262   /**
263    * The unique identifier of this request
264    */
265   uint64_t unique_id;
266
267   /**
268    * Desired replication level
269    */
270   uint32_t replication;
271
272   /**
273    * Network size estimate, either ours or the sum of
274    * those routed to thus far. =~ Log of number of peers
275    * chosen from for this request.
276    */
277   uint32_t network_size;
278
279   /**
280    * Any message options for this request
281    */
282   uint32_t msg_options;
283
284   /**
285    * How many hops has the message already traversed?
286    */
287   uint32_t hop_count;
288
289   /**
290    * Bloomfilter for this routing request.
291    */
292   struct GNUNET_CONTAINER_BloomFilter *bloom;
293
294   /**
295    * Did we forward this message? (may need to remember it!)
296    */
297   int forwarded;
298
299   /**
300    * Are we the closest known peer to this key (out of our neighbors?)
301    */
302   int closest;
303 };
304
305 /**
306  * Record used for remembering what peers are waiting for what
307  * responses (based on search key).
308  */
309 struct DHTRouteSource
310 {
311
312   /**
313    * This is a DLL.
314    */
315   struct DHTRouteSource *next;
316
317   /**
318    * This is a DLL.
319    */
320   struct DHTRouteSource *prev;
321
322   /**
323    * Source of the request.  Replies should be forwarded to
324    * this peer.
325    */
326   struct GNUNET_PeerIdentity source;
327
328   /**
329    * If this was a local request, remember the client; otherwise NULL.
330    */
331   struct ClientList *client;
332
333   /**
334    * Pointer to this nodes heap location (for removal)
335    */
336   struct GNUNET_CONTAINER_HeapNode *hnode;
337
338   /**
339    * Back pointer to the record storing this information.
340    */
341   struct DHTQueryRecord *record;
342
343   /**
344    * Task to remove this entry on timeout.
345    */
346   GNUNET_SCHEDULER_TaskIdentifier delete_task;
347 };
348
349 /**
350  * Entry in the DHT routing table.
351  */
352 struct DHTQueryRecord
353 {
354   /**
355    * Head of DLL for result forwarding.
356    */
357   struct DHTRouteSource *head;
358
359   /**
360    * Tail of DLL for result forwarding.
361    */
362   struct DHTRouteSource *tail;
363
364   /**
365    * Key that the record concerns.
366    */
367   GNUNET_HashCode key;
368
369   /**
370    * GET message of this record (what we already forwarded?).
371    */
372   //DV_DHT_MESSAGE get; Try to get away with not saving this.
373
374   /**
375    * Bloomfilter of the peers we've replied to so far
376    */
377   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
378
379 };
380
381 /**
382  * DHT Routing results structure
383  */
384 struct DHTResults
385 {
386   /*
387    * Min heap for removal upon reaching limit
388    */
389   struct GNUNET_CONTAINER_Heap *minHeap;
390
391   /*
392    * Hashmap for fast key based lookup
393    */
394   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
395
396 };
397
398 /**
399  * Routing option to end routing when closest peer found.
400  */
401 static int stop_on_closest;
402
403 /**
404  * Routing option to end routing when data is found.
405  */
406 static int stop_on_found;
407
408 /**
409  * Container of active queries we should remember
410  */
411 static struct DHTResults forward_list;
412
413 /**
414  * Handle to the datacache service (for inserting/retrieving data)
415  */
416 static struct GNUNET_DATACACHE_Handle *datacache;
417
418 /**
419  * The main scheduler to use for the DHT service
420  */
421 static struct GNUNET_SCHEDULER_Handle *sched;
422
423 /**
424  * The configuration the DHT service is running with
425  */
426 static const struct GNUNET_CONFIGURATION_Handle *cfg;
427
428 /**
429  * Handle to the core service
430  */
431 static struct GNUNET_CORE_Handle *coreAPI;
432
433 /**
434  * Handle to the transport service, for getting our hello
435  */
436 static struct GNUNET_TRANSPORT_Handle *transport_handle;
437
438 /**
439  * The identity of our peer.
440  */
441 static struct GNUNET_PeerIdentity my_identity;
442
443 /**
444  * Short id of the peer, for printing
445  */
446 static char *my_short_id;
447
448 /**
449  * Our HELLO
450  */
451 static struct GNUNET_MessageHeader *my_hello;
452
453 /**
454  * Task to run when we shut down, cleaning up all our trash
455  */
456 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
457
458 /**
459  * The lowest currently used bucket.
460  */
461 static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
462
463 /**
464  * The buckets (Kademlia routing table, complete with growth).
465  * Array of size MAX_BUCKET_SIZE.
466  */
467 static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
468
469 /**
470  * Maximum size for each bucket.
471  */
472 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
473
474 /**
475  * List of active clients.
476  */
477 static struct ClientList *client_list;
478
479 /**
480  * Handle to the DHT logger.
481  */
482 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
483
484 /*
485  * Whether or not to send routing debugging information
486  * to the dht logging server
487  */
488 static unsigned int debug_routes;
489
490 /*
491  * Whether or not to send FULL route information to
492  * logging server
493  */
494 static unsigned int debug_routes_extended;
495
496 /**
497  * Forward declaration.
498  */
499 static size_t send_generic_reply (void *cls, size_t size, void *buf);
500
501 /* Declare here so retry_core_send is aware of it */
502 size_t core_transmit_notify (void *cls,
503                              size_t size, void *buf);
504
505 /**
506  *  Try to send another message from our core send list
507  */
508 static void
509 try_core_send (void *cls,
510                const struct GNUNET_SCHEDULER_TaskContext *tc)
511 {
512   struct PeerInfo *peer = cls;
513   struct P2PPendingMessage *pending;
514   size_t ssize;
515
516   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
517
518   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
519     return;
520
521   if (peer->th != NULL)
522     return; /* Message send already in progress */
523
524   pending = peer->head;
525   if (pending != NULL)
526     {
527       ssize = ntohs(pending->msg->size);
528 #if DEBUG_DHT > 1
529      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
531                 "DHT", ssize, GNUNET_i2s(&peer->id));
532 #endif
533       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
534                                                    pending->timeout, &peer->id,
535                                                    ssize, &core_transmit_notify, peer);
536     }
537 }
538
539 /**
540  * Function called to send a request out to another peer.
541  * Called both for locally initiated requests and those
542  * received from other peers.
543  *
544  * @param cls DHT service closure argument
545  * @param msg the encapsulated message
546  * @param peer the peer to forward the message to
547  * @param msg_ctx the context of the message (hop count, bloom, etc.)
548  */
549 static void forward_result_message (void *cls,
550                                     const struct GNUNET_MessageHeader *msg,
551                                     struct PeerInfo *peer,
552                                     struct DHT_MessageContext *msg_ctx)
553 {
554   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
555   struct P2PPendingMessage *pending;
556   size_t msize;
557   size_t psize;
558
559   msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
560   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
561   psize = sizeof(struct P2PPendingMessage) + msize;
562   pending = GNUNET_malloc(psize);
563   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
564   pending->importance = DHT_SEND_PRIORITY;
565   pending->timeout = GNUNET_TIME_relative_get_forever();
566   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
567   result_message->header.size = htons(msize);
568   result_message->header.type = htons(GNUNET_MESSAGE_TYPE_P2P_DHT_ROUTE_RESULT);
569   result_message->options = htonl(msg_ctx->msg_options);
570   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
571   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
572   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
573   memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
574   memcpy(&result_message[1], msg, ntohs(msg->size));
575 #if DEBUG_DHT > 1
576   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
577 #endif
578   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
579   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
580     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
581 }
582 /**
583  * Called when core is ready to send a message we asked for
584  * out to the destination.
585  *
586  * @param cls closure (NULL)
587  * @param size number of bytes available in buf
588  * @param buf where the callee should write the message
589  * @return number of bytes written to buf
590  */
591 size_t core_transmit_notify (void *cls,
592                              size_t size, void *buf)
593 {
594   struct PeerInfo *peer = cls;
595   char *cbuf = buf;
596   struct P2PPendingMessage *pending;
597
598   size_t off;
599   size_t msize;
600
601   if (buf == NULL)
602     {
603       /* client disconnected */
604 #if DEBUG_DHT
605       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
606 #endif
607       return 0;
608     }
609
610   if (peer->head == NULL)
611     return 0;
612
613   peer->th = NULL;
614   off = 0;
615   pending = peer->head;
616   msize = ntohs(pending->msg->size);
617   if (msize <= size)
618     {
619       off = msize;
620       memcpy (cbuf, pending->msg, msize);
621       GNUNET_CONTAINER_DLL_remove (peer->head,
622                                    peer->tail,
623                                    pending);
624 #if DEBUG_DHT > 1
625       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Removing pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
626 #endif
627       GNUNET_free (pending);
628     }
629 #if SMART
630   while (NULL != pending &&
631           (size - off >= (msize = ntohs (pending->msg->size))))
632     {
633 #if DEBUG_DHT_ROUTING
634       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d\n", my_short_id, "dht service", msize, size);
635 #endif
636       memcpy (&cbuf[off], pending->msg, msize);
637       off += msize;
638       GNUNET_CONTAINER_DLL_remove (peer->head,
639                                    peer->tail,
640                                    pending);
641       GNUNET_free (pending);
642       pending = peer->head;
643     }
644 #endif
645   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
646     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
647 #if DEBUG_DHT > 1
648   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d, returning %d\n", my_short_id, "dht service", msize, size, off);
649 #endif
650   return off;
651 }
652
653 /**
654  * Determine how many low order bits match in two
655  * GNUNET_HashCodes.  i.e. - 010011 and 011111 share
656  * the first two lowest order bits, and therefore the
657  * return value is two (NOT XOR distance, nor how many
658  * bits match absolutely!).
659  *
660  * @param first the first hashcode
661  * @param second the hashcode to compare first to
662  *
663  * @return the number of bits that match
664  */
665 static unsigned int matching_bits(const GNUNET_HashCode *first, const GNUNET_HashCode *second)
666 {
667   unsigned int i;
668
669   for (i = 0; i < sizeof (GNUNET_HashCode) * 8; i++)
670     if (GNUNET_CRYPTO_hash_get_bit (first, i) != GNUNET_CRYPTO_hash_get_bit (second, i))
671       return i;
672   return sizeof (GNUNET_HashCode) * 8;
673 }
674
675 /**
676  * Compute the distance between have and target as a 32-bit value.
677  * Differences in the lower bits must count stronger than differences
678  * in the higher bits.
679  *
680  * @return 0 if have==target, otherwise a number
681  *           that is larger as the distance between
682  *           the two hash codes increases
683  */
684 static unsigned int
685 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
686 {
687   unsigned int bucket;
688   unsigned int msb;
689   unsigned int lsb;
690   unsigned int i;
691
692   /* We have to represent the distance between two 2^9 (=512)-bit
693      numbers as a 2^5 (=32)-bit number with "0" being used for the
694      two numbers being identical; furthermore, we need to
695      guarantee that a difference in the number of matching
696      bits is always represented in the result.
697
698      We use 2^32/2^9 numerical values to distinguish between
699      hash codes that have the same LSB bit distance and
700      use the highest 2^9 bits of the result to signify the
701      number of (mis)matching LSB bits; if we have 0 matching
702      and hence 512 mismatching LSB bits we return -1 (since
703      512 itself cannot be represented with 9 bits) */
704
705   /* first, calculate the most significant 9 bits of our
706      result, aka the number of LSBs */
707   bucket = matching_bits (target, have);
708   /* bucket is now a value between 0 and 512 */
709   if (bucket == 512)
710     return 0;                   /* perfect match */
711   if (bucket == 0)
712     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
713                                    below, we'd end up with max+1 (overflow)) */
714
715   /* calculate the most significant bits of the final result */
716   msb = (512 - bucket) << (32 - 9);
717   /* calculate the 32-9 least significant bits of the final result by
718      looking at the differences in the 32-9 bits following the
719      mismatching bit at 'bucket' */
720   lsb = 0;
721   for (i = bucket + 1;
722        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
723     {
724       if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
725         lsb |= (1 << (bucket + 32 - 9 - i));    /* first bit set will be 10,
726                                                    last bit set will be 31 -- if
727                                                    i does not reach 512 first... */
728     }
729   return msb | lsb;
730 }
731
732 /**
733  * Return a number that is larger the closer the
734  * "have" GNUNET_hash code is to the "target".
735  *
736  * @return inverse distance metric, non-zero.
737  *         Must fudge the value if NO bits match.
738  */
739 static unsigned int
740 inverse_distance (const GNUNET_HashCode * target,
741                   const GNUNET_HashCode * have)
742 {
743   if (matching_bits(target, have) == 0)
744     return 1; /* Never return 0! */
745   return ((unsigned int) -1) - distance (target, have);
746 }
747
748 /**
749  * Find the optimal bucket for this key, regardless
750  * of the current number of buckets in use.
751  *
752  * @param hc the hashcode to compare our identity to
753  *
754  * @return the proper bucket index, or GNUNET_SYSERR
755  *         on error (same hashcode)
756  */
757 static int find_bucket(const GNUNET_HashCode *hc)
758 {
759   unsigned int bits;
760
761   bits = matching_bits(&my_identity.hashPubKey, hc);
762   if (bits == MAX_BUCKETS)
763     return GNUNET_SYSERR;
764   return MAX_BUCKETS - bits - 1;
765 }
766
767 /**
768  * Find which k-bucket this peer should go into,
769  * taking into account the size of the k-bucket
770  * array.  This means that if more bits match than
771  * there are currently buckets, lowest_bucket will
772  * be returned.
773  *
774  * @param hc GNUNET_HashCode we are finding the bucket for.
775  *
776  * @return the proper bucket index for this key,
777  *         or GNUNET_SYSERR on error (same hashcode)
778  */
779 static int find_current_bucket(const GNUNET_HashCode *hc)
780 {
781   int actual_bucket;
782   actual_bucket = find_bucket(hc);
783
784   if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
785     return GNUNET_SYSERR;
786   else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
787     return lowest_bucket;
788   else
789     return actual_bucket;
790 }
791
792 /**
793  * Find a routing table entry from a peer identity
794  *
795  * @param peer the peer identity to look up
796  *
797  * @return the routing table entry, or NULL if not found
798  */
799 static struct PeerInfo *
800 find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
801 {
802   int bucket;
803   struct PeerInfo *pos;
804   bucket = find_current_bucket(&peer->hashPubKey);
805
806   if (bucket == GNUNET_SYSERR)
807     return NULL;
808
809   pos = k_buckets[bucket].head;
810   while (pos != NULL)
811     {
812       if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
813         return pos;
814       pos = pos->next;
815     }
816   return NULL; /* No such peer. */
817 }
818
819 /**
820  * Really add a peer to a bucket (only do assertions
821  * on size, etc.)
822  *
823  * @param peer GNUNET_PeerIdentity of the peer to add
824  * @param bucket the already figured out bucket to add
825  *        the peer to
826  * @param latency the core reported latency of this peer
827  * @param distance the transport level distance to this peer
828  */
829 static void add_peer(const struct GNUNET_PeerIdentity *peer,
830                      unsigned int bucket,
831                      struct GNUNET_TIME_Relative latency,
832                      unsigned int distance)
833 {
834   struct PeerInfo *new_peer;
835   GNUNET_assert(bucket < MAX_BUCKETS);
836   GNUNET_assert(peer != NULL);
837   new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
838   new_peer->latency = latency;
839   new_peer->distance = distance;
840   memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
841
842   GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
843                                     k_buckets[bucket].tail,
844                                     k_buckets[bucket].tail,
845                                     new_peer);
846   k_buckets[bucket].peers_size++;
847 }
848
849 /**
850  * Given a peer and its corresponding bucket,
851  * remove it from that bucket.  Does not free
852  * the PeerInfo struct, nor cancel messages
853  * or free messages waiting to be sent to this
854  * peer!
855  *
856  * @param peer the peer to remove
857  * @param bucket the bucket the peer belongs to
858  */
859 static void remove_peer (struct PeerInfo *peer,
860                          unsigned int bucket)
861 {
862   GNUNET_assert(k_buckets[bucket].peers_size > 0);
863   GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
864                               k_buckets[bucket].tail,
865                               peer);
866   k_buckets[bucket].peers_size--;
867 }
868
869 /**
870  * Removes peer from a bucket, then frees associated
871  * resources and frees peer.
872  *
873  * @param peer peer to be removed and freed
874  * @param bucket which bucket this peer belongs to
875  */
876 static void delete_peer (struct PeerInfo *peer,
877                          unsigned int bucket)
878 {
879   struct P2PPendingMessage *pos;
880   struct P2PPendingMessage *next;
881   remove_peer(peer, bucket); /* First remove the peer from its bucket */
882
883   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
884     GNUNET_SCHEDULER_cancel(sched, peer->send_task);
885   if (peer->th != NULL)
886     GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
887
888   pos = peer->head;
889   while (pos != NULL) /* Remove any pending messages for this peer */
890     {
891       next = pos->next;
892       GNUNET_free(pos);
893       pos = next;
894     }
895   GNUNET_free(peer);
896 }
897
898 /**
899  * The current lowest bucket is full, so change the lowest
900  * bucket to the next lower down, and move any appropriate
901  * entries in the current lowest bucket to the new bucket.
902  */
903 static void enable_next_bucket()
904 {
905   unsigned int new_bucket;
906   unsigned int to_remove;
907   int i;
908   struct PeerInfo *to_remove_list[bucket_size]; /* We either use CPU by making a list, or memory with array.  Use memory. */
909   struct PeerInfo *pos;
910   GNUNET_assert(lowest_bucket > 0);
911
912   pos = k_buckets[lowest_bucket].head;
913   memset(to_remove_list, 0, sizeof(to_remove_list));
914   to_remove = 0;
915   /* Populate the array of peers which should be in the next lowest bucket */
916   while (pos->next != NULL)
917     {
918       if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
919         {
920           to_remove_list[to_remove] = pos;
921           to_remove++;
922         }
923       pos = pos->next;
924     }
925   new_bucket = lowest_bucket - 1;
926
927   /* Remove peers from lowest bucket, insert into next lowest bucket */
928   for (i = 0; i < bucket_size; i++)
929     {
930       if (to_remove_list[i] != NULL)
931         {
932           remove_peer(to_remove_list[i], lowest_bucket);
933           GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
934                                             k_buckets[new_bucket].tail,
935                                             k_buckets[new_bucket].tail,
936                                             to_remove_list[i]);
937           k_buckets[new_bucket].peers_size++;
938         }
939       else
940         break;
941     }
942   lowest_bucket = new_bucket;
943 }
944 /**
945  * Attempt to add a peer to our k-buckets.
946  *
947  * @param peer, the peer identity of the peer being added
948  *
949  * @return GNUNET_YES if the peer was added,
950  *         GNUNET_NO if not,
951  *         GNUNET_SYSERR on err (peer is us!)
952  */
953 static int try_add_peer(const struct GNUNET_PeerIdentity *peer,
954                         unsigned int bucket,
955                         struct GNUNET_TIME_Relative latency,
956                         unsigned int distance)
957 {
958   int peer_bucket;
959
960   peer_bucket = find_current_bucket(&peer->hashPubKey);
961   if (peer_bucket == GNUNET_SYSERR)
962     return GNUNET_SYSERR;
963
964   GNUNET_assert(peer_bucket >= lowest_bucket);
965   if ((k_buckets[peer_bucket].peers_size) < bucket_size)
966     {
967       add_peer(peer, peer_bucket, latency, distance);
968       return GNUNET_YES;
969     }
970   else if ((peer_bucket == lowest_bucket) && (lowest_bucket > 0))
971     {
972       enable_next_bucket();
973       return try_add_peer(peer, bucket, latency, distance); /* Recurse, if proper bucket still full ping peers */
974     }
975   else if ((k_buckets[peer_bucket].peers_size) == bucket_size)
976     {
977       /* TODO: implement ping_oldest_peer */
978       //ping_oldest_peer(bucket, peer, latency, distance); /* Find oldest peer, ping it.  If no response, remove and add new peer! */
979       return GNUNET_NO;
980     }
981   GNUNET_break(0);
982   return GNUNET_NO;
983 }
984
985
986 /**
987  * Task run to check for messages that need to be sent to a client.
988  *
989  * @param client a ClientList, containing the client and any messages to be sent to it
990  */
991 static void
992 process_pending_messages (struct ClientList *client)
993
994   if (client->pending_head == NULL) 
995     return;    
996   if (client->transmit_handle != NULL) 
997     return;
998   client->transmit_handle =
999     GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1000                                          ntohs (client->pending_head->msg->
1001                                                 size),
1002                                          GNUNET_TIME_UNIT_FOREVER_REL,
1003                                          &send_generic_reply, client);
1004 }
1005
1006 /**
1007  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1008  * request.  A ClientList is passed as closure, take the head of the list
1009  * and copy it into buf, which has the result of sending the message to the
1010  * client.
1011  *
1012  * @param cls closure to this call
1013  * @param size maximum number of bytes available to send
1014  * @param buf where to copy the actual message to
1015  *
1016  * @return the number of bytes actually copied, 0 indicates failure
1017  */
1018 static size_t
1019 send_generic_reply (void *cls, size_t size, void *buf)
1020 {
1021   struct ClientList *client = cls;
1022   char *cbuf = buf;
1023   struct PendingMessage *reply;
1024   size_t off;
1025   size_t msize;
1026
1027   client->transmit_handle = NULL;
1028   if (buf == NULL)             
1029     {
1030       /* client disconnected */
1031       return 0;
1032     }
1033   off = 0;
1034   while ( (NULL != (reply = client->pending_head)) &&
1035           (size >= off + (msize = ntohs (reply->msg->size))))
1036     {
1037       GNUNET_CONTAINER_DLL_remove (client->pending_head,
1038                                    client->pending_tail,
1039                                    reply);
1040       memcpy (&cbuf[off], reply->msg, msize);
1041       GNUNET_free (reply);
1042       off += msize;
1043     }
1044   process_pending_messages (client);
1045   return off;
1046 }
1047
1048
1049 /**
1050  * Add a PendingMessage to the clients list of messages to be sent
1051  *
1052  * @param client the active client to send the message to
1053  * @param pending_message the actual message to send
1054  */
1055 static void
1056 add_pending_message (struct ClientList *client,
1057                      struct PendingMessage *pending_message)
1058 {
1059   GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
1060                                      client->pending_tail,
1061                                      client->pending_tail,
1062                                      pending_message);
1063   process_pending_messages (client);
1064 }
1065
1066
1067 /**
1068  * Called when a reply needs to be sent to a client, as
1069  * a result it found to a GET or FIND PEER request.
1070  *
1071  * @param client the client to send the reply to
1072  * @param message the encapsulated message to send
1073  * @param uid the unique identifier of this request
1074  */
1075 static void
1076 send_reply_to_client (struct ClientList *client,
1077                       const struct GNUNET_MessageHeader *message,
1078                       unsigned long long uid)
1079 {
1080   struct GNUNET_DHT_RouteResultMessage *reply;
1081   struct PendingMessage *pending_message;
1082   uint16_t msize;
1083   size_t tsize;
1084 #if DEBUG_DHT
1085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086               "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
1087 #endif
1088   msize = ntohs (message->size);
1089   tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
1090   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1091     {
1092       GNUNET_break_op (0);
1093       return;
1094     }
1095
1096   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1097   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
1098   reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
1099   reply->header.type = htons (GNUNET_MESSAGE_TYPE_LOCAL_DHT_ROUTE_RESULT);
1100   reply->header.size = htons (tsize);
1101   reply->unique_id = GNUNET_htonll (uid);
1102   memcpy (&reply[1], message, msize);
1103
1104   add_pending_message (client, pending_message);
1105 }
1106
1107
1108 /**
1109  * Main function that handles whether or not to route a result
1110  * message to other peers, or to send to our local client.
1111  *
1112  * @param msg the result message to be routed
1113  * @return the number of peers the message was routed to,
1114  *         GNUNET_SYSERR on failure
1115  */
1116 static int route_result_message(void *cls,
1117                                 struct GNUNET_MessageHeader *msg,
1118                                 struct DHT_MessageContext *message_context)
1119 {
1120   struct DHTQueryRecord *record;
1121   struct DHTRouteSource *pos;
1122   struct PeerInfo *peer_info;
1123
1124   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, message_context->key);
1125   if (record == NULL) /* No record of this message! */
1126     {
1127 #if DEBUG_DHT
1128     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129                 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
1130                 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1131 #endif
1132 #if DEBUG_DHT_ROUTING
1133
1134       /*if ((debug_routes) && (dhtlog_handle != NULL))
1135         {
1136           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_RESULT,
1137                                        message_context->hop_count, GNUNET_SYSERR,
1138                                        &my_identity, message_context->key);
1139         }*/
1140
1141       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1142         {
1143           dhtlog_handle->insert_route (NULL,
1144                                        message_context->unique_id,
1145                                        DHTLOG_RESULT,
1146                                        message_context->hop_count,
1147                                        GNUNET_SYSERR,
1148                                        &my_identity,
1149                                        message_context->key,
1150                                        message_context->peer, NULL);
1151         }
1152 #endif
1153       if (message_context->bloom != NULL)
1154         {
1155           GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1156           message_context->bloom = NULL;
1157         }
1158       return 0;
1159     }
1160
1161   pos = record->head;
1162   while (pos != NULL)
1163     {
1164       if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client initiated request! */
1165         {
1166 #if DEBUG_DHT
1167           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1168                       "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
1169                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1170 #endif
1171 #if DEBUG_DHT_ROUTING
1172           /*
1173           if ((debug_routes) && (dhtlog_handle != NULL))
1174             {
1175               dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_RESULT,
1176                                            message_context->hop_count, GNUNET_YES,
1177                                            &my_identity, message_context->key);
1178             }*/
1179
1180           if ((debug_routes_extended) && (dhtlog_handle != NULL))
1181             {
1182               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
1183                                            message_context->hop_count,
1184                                            GNUNET_YES, &my_identity, message_context->key,
1185                                            message_context->peer, NULL);
1186             }
1187 #endif
1188           send_reply_to_client(pos->client, msg, message_context->unique_id);
1189         }
1190       else /* Send to peer */
1191         {
1192           peer_info = find_peer_by_id(&pos->source);
1193           if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1194             {
1195               pos = pos->next;
1196               continue;
1197             }
1198
1199           if (message_context->bloom == NULL)
1200             message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1201           GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1202           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (message_context->bloom, &peer_info->id.hashPubKey))
1203             {
1204 #if DEBUG_DHT
1205               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206                           "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
1207                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1208 #endif
1209 #if DEBUG_DHT_ROUTING
1210               if ((debug_routes_extended) && (dhtlog_handle != NULL))
1211                 {
1212                   dhtlog_handle->insert_route (NULL, message_context->unique_id,
1213                                                DHTLOG_RESULT,
1214                                                message_context->hop_count,
1215                                                GNUNET_NO, &my_identity, message_context->key,
1216                                                message_context->peer, &pos->source);
1217                 }
1218 #endif
1219               forward_result_message(cls, msg, peer_info, message_context);
1220             }
1221           else
1222             {
1223 #if DEBUG_DHT
1224               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225                           "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
1226                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1227 #endif
1228             }
1229         }
1230       pos = pos->next;
1231     }
1232   if (message_context->bloom != NULL)
1233     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1234   return 0;
1235 }
1236
1237 /**
1238  * Iterator for local get request results,
1239  *
1240  * @param cls closure for iterator, a DatacacheGetContext
1241  * @param exp when does this value expire?
1242  * @param key the key this data is stored under
1243  * @param size the size of the data identified by key
1244  * @param data the actual data
1245  * @param type the type of the data
1246  *
1247  * @return GNUNET_OK to continue iteration, anything else
1248  * to stop iteration.
1249  */
1250 static int
1251 datacache_get_iterator (void *cls,
1252                         struct GNUNET_TIME_Absolute exp,
1253                         const GNUNET_HashCode * key,
1254                         uint32_t size, const char *data, uint32_t type)
1255 {
1256   struct DHT_MessageContext *msg_ctx = cls;
1257   struct DHT_MessageContext *new_msg_ctx;
1258   struct GNUNET_DHT_GetResultMessage *get_result;
1259 #if DEBUG_DHT
1260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
1262 #endif
1263   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
1264   memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
1265   get_result =
1266     GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
1267   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1268   get_result->header.size =
1269     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
1270   get_result->expiration = GNUNET_TIME_absolute_hton(exp);
1271   get_result->type = htons (type);
1272   memcpy (&get_result[1], data, size);
1273   new_msg_ctx->peer = &my_identity;
1274   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1275   new_msg_ctx->hop_count = 0;
1276   route_result_message(cls, &get_result->header, new_msg_ctx);
1277   GNUNET_free(new_msg_ctx);
1278   //send_reply_to_client (datacache_get_ctx->client, &get_result->header,
1279   //                      datacache_get_ctx->unique_id);
1280   GNUNET_free (get_result);
1281   return GNUNET_OK;
1282 }
1283
1284
1285 /**
1286  * Server handler for all dht get requests, look for data,
1287  * if found, send response either to clients or other peers.
1288  *
1289  * @param cls closure for service
1290  * @param msg the actual get message
1291  * @param message_context struct containing pertinent information about the get request
1292  *
1293  * @return number of items found for GET request
1294  */
1295 static unsigned int
1296 handle_dht_get (void *cls, 
1297                 const struct GNUNET_MessageHeader *msg,
1298                 struct DHT_MessageContext *message_context)
1299 {
1300   const struct GNUNET_DHT_GetMessage *get_msg;
1301   uint16_t get_type;
1302   unsigned int results;
1303
1304   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1305   if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
1306     {
1307       GNUNET_break (0);
1308       return 0;
1309     }
1310
1311   get_type = ntohs (get_msg->type);
1312 #if DEBUG_DHT
1313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1314               "`%s:%s': Received `%s' request from client, message type %u, key %s, uid %llu\n", my_short_id,
1315               "DHT", "GET", get_type, GNUNET_h2s (message_context->key),
1316               message_context->unique_id);
1317 #endif
1318
1319   results = 0;
1320   if (datacache != NULL)
1321     results =
1322       GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
1323                             &datacache_get_iterator, message_context);
1324
1325   if (results >= 1)
1326     {
1327 #if DEBUG_DHT
1328       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1329                   "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
1330                   results, "GET", message_context->unique_id);
1331 #endif
1332 #if DEBUG_DHT_ROUTING
1333       if ((debug_routes) && (dhtlog_handle != NULL))
1334         {
1335           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1336                                 message_context->hop_count, GNUNET_YES, &my_identity,
1337                                 message_context->key);
1338         }
1339
1340       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1341         {
1342           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1343                                        message_context->hop_count, GNUNET_YES,
1344                                        &my_identity, message_context->key, message_context->peer,
1345                                        NULL);
1346         }
1347 #endif
1348     }
1349
1350   if (message_context->hop_count == 0) /* Locally initiated request */
1351     {
1352 #if DEBUG_DHT_ROUTING
1353     if ((debug_routes) && (dhtlog_handle != NULL))
1354       {
1355         dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1356                                       message_context->hop_count, GNUNET_NO, &my_identity,
1357                                       message_context->key);
1358       }
1359 #endif
1360     }
1361
1362   return results;
1363 }
1364
1365
1366 /**
1367  * Server handler for initiating local dht find peer requests
1368  *
1369  * @param cls closure for service
1370  * @param find_msg the actual find peer message
1371  * @param message_context struct containing pertinent information about the request
1372  *
1373  */
1374 static void
1375 handle_dht_find_peer (void *cls, 
1376                       const struct GNUNET_MessageHeader *find_msg,
1377                       struct DHT_MessageContext *message_context)
1378 {
1379   struct GNUNET_MessageHeader *find_peer_result;
1380   size_t hello_size;
1381   size_t tsize;
1382
1383 #if DEBUG_DHT
1384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1385               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
1386               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
1387               ntohs (find_msg->size),
1388               sizeof (struct GNUNET_MessageHeader));
1389 #endif
1390   if ((my_hello == NULL) || (message_context->closest != GNUNET_YES))
1391   {
1392 #if DEBUG_DHT
1393     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394                 "`%s': Our HELLO is null, can't return.\n",
1395                 "DHT");
1396 #endif
1397     return;
1398   }
1399   /* Simplistic find_peer functionality, always return our hello */
1400   hello_size = ntohs(my_hello->size);
1401   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
1402
1403   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1404     {
1405       GNUNET_break_op (0);
1406       return;
1407     }
1408   find_peer_result = GNUNET_malloc (tsize);
1409   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
1410   find_peer_result->size = htons (tsize);
1411   memcpy (&find_peer_result[1], my_hello, hello_size);
1412 #if DEBUG_DHT_HELLO
1413     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1414                 "`%s': Sending hello size %d to client.\n",
1415                 "DHT", hello_size);
1416 #endif
1417   if (message_context->bloom != NULL)
1418     GNUNET_CONTAINER_bloomfilter_clear(message_context->bloom);
1419
1420   message_context->hop_count = 0;
1421   message_context->peer = &my_identity;
1422   route_result_message(cls, find_peer_result, message_context);
1423   //send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
1424   GNUNET_free(find_peer_result);
1425 }
1426
1427
1428 /**
1429  * Server handler for initiating local dht put requests
1430  *
1431  * @param cls closure for service
1432  * @param msg the actual put message
1433  * @param message_context struct containing pertinent information about the request
1434  */
1435 static void
1436 handle_dht_put (void *cls,
1437                 const struct GNUNET_MessageHeader *msg,
1438                 struct DHT_MessageContext *message_context)
1439 {
1440   struct GNUNET_DHT_PutMessage *put_msg;
1441   size_t put_type;
1442   size_t data_size;
1443
1444   GNUNET_assert (ntohs (msg->size) >=
1445                  sizeof (struct GNUNET_DHT_PutMessage));
1446   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
1447   put_type = ntohs (put_msg->type);
1448   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1449 #if DEBUG_DHT
1450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1451               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1452               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (message_context->key), message_context->unique_id);
1453 #endif
1454 #if DEBUG_DHT_ROUTING
1455
1456   if ((debug_routes) && (dhtlog_handle != NULL))
1457     {
1458       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
1459                                    message_context->hop_count, GNUNET_YES, &my_identity,
1460                                    message_context->key);
1461     }
1462 #endif
1463
1464   if (datacache != NULL)
1465     GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
1466                           (char *) &put_msg[1], put_type,
1467                           GNUNET_TIME_absolute_ntoh(put_msg->expiration));
1468   else
1469     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1470                 "`%s:%s': %s request received, but have no datacache!\n",
1471                 my_short_id, "DHT", "PUT");
1472 }
1473
1474 /**
1475  * Estimate the diameter of the network based
1476  * on how many buckets are currently in use.
1477  * Concept here is that the diameter of the network
1478  * is roughly the distance a message must travel in
1479  * order to reach its intended destination.  Since
1480  * at each hop we expect to get one bit closer, and
1481  * we have one bit per bucket, the number of buckets
1482  * in use should be the largest number of hops for
1483  * a sucessful message. (of course, this assumes we
1484  * know all peers in the network!)
1485  *
1486  * @return ballpark diameter figure
1487  */
1488 static unsigned int estimate_diameter()
1489 {
1490   return MAX_BUCKETS - lowest_bucket;
1491 }
1492
1493 /**
1494  * To how many peers should we (on average)
1495  * forward the request to obtain the desired
1496  * target_replication count (on average).
1497  *
1498  * Always 0, 1 or 2 (don't send, send once, split)
1499  */
1500 static unsigned int
1501 get_forward_count (unsigned int hop_count, size_t target_replication)
1502 {
1503   double target_count;
1504   unsigned int target_value;
1505   unsigned int diameter;
1506
1507   /* FIXME: the smaller we think the network is the more lenient we should be for
1508    * routing right?  The estimation below only works if we think we have reasonably
1509    * full routing tables, which for our RR topologies may not be the case!
1510    */
1511   diameter = estimate_diameter ();
1512   if ((hop_count > (diameter + 1) * 2) && (MINIMUM_PEER_THRESHOLD < estimate_diameter() * bucket_size))
1513     {
1514 #if DEBUG_DHT
1515       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516                   "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
1517                   "DHT", estimate_diameter(), lowest_bucket);
1518 #endif
1519       return 0;
1520     }
1521   else if (hop_count > MAX_HOPS)
1522     {
1523 #if DEBUG_DHT
1524       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525                   "`%s:%s': Hop count too high (greater than max)\n", my_short_id,
1526                   "DHT");
1527 #endif
1528       return 0;
1529     }
1530   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
1531     target_replication / (target_replication * (hop_count + 1) + diameter);
1532   target_value = 0;
1533
1534 #if NONSENSE
1535   while (target_value < target_count)
1536     target_value++; /* target_value is ALWAYS 1 after this "loop" */
1537 #else
1538   target_value = 1;
1539 #endif
1540   if ((target_count + 1 - target_value) >
1541       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1542                                 RAND_MAX) / RAND_MAX)
1543     target_value++;
1544   return target_value;
1545 }
1546
1547 /**
1548  * Find the closest peer in our routing table to the
1549  * given hashcode.
1550  *
1551  * @return The closest peer in our routing table to the
1552  *         key, or NULL on error.
1553  */
1554 static struct PeerInfo *
1555 find_closest_peer (const GNUNET_HashCode *hc)
1556 {
1557   struct PeerInfo *pos;
1558   struct PeerInfo *current_closest;
1559   unsigned int lowest_distance;
1560   unsigned int temp_distance;
1561   int bucket;
1562
1563   lowest_distance = -1;
1564
1565   if (k_buckets[lowest_bucket].peers_size == 0)
1566     return NULL;
1567
1568   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1569     {
1570       pos = k_buckets[bucket].head;
1571       while (pos != NULL)
1572         {
1573           temp_distance = distance(&pos->id.hashPubKey, hc);
1574           if (temp_distance <= lowest_distance)
1575             {
1576               lowest_distance = temp_distance;
1577               current_closest = pos;
1578             }
1579           pos = pos->next;
1580         }
1581     }
1582   GNUNET_assert(current_closest != NULL);
1583   return current_closest;
1584 }
1585
1586 /*
1587  * Check whether my identity is closer than any known peers.
1588  *
1589  * @param target hash code to check closeness to
1590  *
1591  * Return GNUNET_YES if node location is closest, GNUNET_NO
1592  * otherwise.
1593  */
1594 int
1595 am_closest_peer (const GNUNET_HashCode * target)
1596 {
1597   int bits;
1598   int other_bits;
1599   int bucket_num;
1600   struct PeerInfo *pos;
1601   unsigned int my_distance;
1602
1603   bucket_num = find_current_bucket(target);
1604   if (bucket_num == GNUNET_SYSERR) /* Same key! */
1605     return GNUNET_YES;
1606
1607   bits = matching_bits(&my_identity.hashPubKey, target);
1608   my_distance = distance(&my_identity.hashPubKey, target);
1609
1610   pos = k_buckets[bucket_num].head;
1611   while (pos != NULL)
1612     {
1613       other_bits = matching_bits(&pos->id.hashPubKey, target);
1614       if (other_bits > bits)
1615         return GNUNET_NO;
1616       else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
1617         {
1618           if (distance(&pos->id.hashPubKey, target) < my_distance)
1619             return GNUNET_NO;
1620         }
1621       pos = pos->next;
1622     }
1623
1624 #if DEBUG_TABLE
1625   GNUNET_GE_LOG (coreAPI->ectx,
1626                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1627                  GNUNET_GE_BULK, "closest peer\n");
1628   printPeerBits (&closest);
1629   GNUNET_GE_LOG (coreAPI->ectx,
1630                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1631                  GNUNET_GE_BULK, "me\n");
1632   printPeerBits (coreAPI->my_identity);
1633   GNUNET_GE_LOG (coreAPI->ectx,
1634                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1635                  GNUNET_GE_BULK, "key\n");
1636   printKeyBits (target);
1637   GNUNET_GE_LOG (coreAPI->ectx,
1638                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1639                  GNUNET_GE_BULK,
1640                  "closest peer inverse distance is %u, mine is %u\n",
1641                  inverse_distance (target, &closest.hashPubKey),
1642                  inverse_distance (target,
1643                                    &coreAPI->my_identity->hashPubKey));
1644 #endif
1645
1646   /* No peers closer, we are the closest! */
1647   return GNUNET_YES;
1648
1649 }
1650
1651
1652 /**
1653  * Select a peer from the routing table that would be a good routing
1654  * destination for sending a message for "target".  The resulting peer
1655  * must not be in the set of blocked peers.<p>
1656  *
1657  * Note that we should not ALWAYS select the closest peer to the
1658  * target, peers further away from the target should be chosen with
1659  * exponentially declining probability.
1660  *
1661  * @param target the key we are selecting a peer to route to
1662  * @param bloom a bloomfilter containing entries this request has seen already
1663  *
1664  * @return Peer to route to, or NULL on error
1665  */
1666 static struct PeerInfo *
1667 select_peer (const GNUNET_HashCode * target,
1668              struct GNUNET_CONTAINER_BloomFilter *bloom)
1669 {
1670   unsigned int distance;
1671   unsigned int bc;
1672   struct PeerInfo *pos;
1673 #if USE_KADEMLIA
1674   const struct PeerInfo *chosen;
1675   unsigned long long largest_distance;
1676 #else
1677   unsigned long long total_distance;
1678   unsigned long long selected;
1679 #endif
1680
1681 #if USE_KADEMLIA
1682   largest_distance = 0;
1683   chosen = NULL;
1684   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1685     {
1686       pos = k_buckets[bc].head;
1687       while (pos != NULL)
1688         {
1689           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1690             {
1691               distance = inverse_distance (target, &pos->id.hashPubKey);
1692               if (distance > largest_distance)
1693                 {
1694                   chosen = pos;
1695                   largest_distance = distance;
1696                 }
1697             }
1698           pos = pos->next;
1699         }
1700     }
1701
1702   if ((largest_distance > 0) && (chosen != NULL))
1703     {
1704       GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
1705       return chosen;
1706     }
1707   else
1708     {
1709       return NULL;
1710     }
1711 #else
1712   /* GNUnet-style */
1713   total_distance = 0;
1714   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1715     {
1716       pos = k_buckets[bc].head;
1717       while (pos != NULL)
1718         {
1719           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1720             total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
1721 #if DEBUG_DHT > 1
1722           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1723                       "`%s:%s': Total distance is %llu, distance from %s to %s is %u\n",
1724                       my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey));
1725 #endif
1726           pos = pos->next;
1727         }
1728     }
1729   if (total_distance == 0)
1730     {
1731       return NULL;
1732     }
1733
1734   selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
1735   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1736     {
1737       pos = k_buckets[bc].head;
1738       while (pos != NULL)
1739         {
1740           if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1741             {
1742               distance = inverse_distance (target, &pos->id.hashPubKey);
1743               if (distance > selected)
1744                 return pos;
1745               selected -= distance;
1746             }
1747           else
1748             {
1749 #if DEBUG_DHT
1750               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1751                           "`%s:%s': peer %s matches bloomfilter.\n",
1752                           my_short_id, "DHT", GNUNET_i2s(&pos->id));
1753 #endif
1754             }
1755           pos = pos->next;
1756         }
1757     }
1758 #if DEBUG_DHT
1759     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1760                 "`%s:%s': peer %s matches bloomfilter.\n",
1761                 my_short_id, "DHT", GNUNET_i2s(&pos->id));
1762 #endif
1763   return NULL;
1764 #endif
1765 }
1766
1767 /**
1768  * Function called to send a request out to another peer.
1769  * Called both for locally initiated requests and those
1770  * received from other peers.
1771  *
1772  * @param cls DHT service closure argument
1773  * @param msg the encapsulated message
1774  * @param peer the peer to forward the message to
1775  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1776  */
1777 static void forward_message (void *cls,
1778                              const struct GNUNET_MessageHeader *msg,
1779                              struct PeerInfo *peer,
1780                              struct DHT_MessageContext *msg_ctx)
1781 {
1782   struct GNUNET_DHT_P2PRouteMessage *route_message;
1783   struct P2PPendingMessage *pending;
1784   size_t msize;
1785   size_t psize;
1786
1787   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1788   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1789   psize = sizeof(struct P2PPendingMessage) + msize;
1790   pending = GNUNET_malloc(psize);
1791   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1792   pending->importance = DHT_SEND_PRIORITY;
1793   pending->timeout = GNUNET_TIME_relative_get_forever();
1794   route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1795   route_message->header.size = htons(msize);
1796   route_message->header.type = htons(GNUNET_MESSAGE_TYPE_P2P_DHT_ROUTE);
1797   route_message->options = htonl(msg_ctx->msg_options);
1798   route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1799   route_message->network_size = htonl(msg_ctx->network_size);
1800   route_message->desired_replication_level = htonl(msg_ctx->replication);
1801   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1802   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1803   memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1804   memcpy(&route_message[1], msg, ntohs(msg->size));
1805 #if DEBUG_DHT > 1
1806   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1807 #endif
1808   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1809   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1810     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1811 }
1812
1813 /**
1814  * Task used to remove forwarding entries, either
1815  * after timeout, when full, or on shutdown.
1816  *
1817  * @param cls the entry to remove
1818  * @param tc context, reason, etc.
1819  */
1820 static void
1821 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1822 {
1823   struct DHTRouteSource *source_info = cls;
1824   struct DHTQueryRecord *record;
1825   source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
1826   record = source_info->record;
1827   GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1828
1829   if (record->head == NULL) /* No more entries in DLL */
1830     {
1831       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1832       GNUNET_free(record);
1833     }
1834   GNUNET_free(source_info);
1835 }
1836
1837 /**
1838  * Remember this routing request so that if a reply is
1839  * received we can either forward it to the correct peer
1840  * or return the result locally.
1841  *
1842  * @param cls DHT service closure
1843  * @param msg_ctx Context of the route request
1844  *
1845  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1846  */
1847 static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
1848 {
1849   struct DHTQueryRecord *record;
1850   struct DHTRouteSource *source_info;
1851   struct DHTRouteSource *pos;
1852   struct GNUNET_TIME_Absolute now;
1853   unsigned int current_size;
1854
1855   current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1856   while (current_size >= MAX_OUTSTANDING_FORWARDS)
1857     {
1858       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
1859       record = source_info->record;
1860       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1861       if (record->head == NULL) /* No more entries in DLL */
1862         {
1863           GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1864           GNUNET_free(record);
1865         }
1866       GNUNET_SCHEDULER_cancel(sched, source_info->delete_task);
1867       GNUNET_free(source_info);
1868       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1869     }
1870   now = GNUNET_TIME_absolute_get();
1871   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, msg_ctx->key);
1872   if (record != NULL) /* Already know this request! */
1873     {
1874       pos = record->head;
1875       while (pos != NULL)
1876         {
1877           if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
1878             break; /* Already have this peer in reply list! */
1879           pos = pos->next;
1880         }
1881       if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
1882         {
1883           GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.value);
1884           return GNUNET_NO;
1885         }
1886     }
1887   else
1888     {
1889       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
1890       GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1891       memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1892     }
1893
1894   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
1895   source_info->record = record;
1896   source_info->delete_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
1897   memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
1898   GNUNET_CONTAINER_DLL_insert_after(record->head, record->tail, record->tail, source_info);
1899   if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
1900     {
1901       source_info->client = msg_ctx->client;
1902       now = GNUNET_TIME_absolute_get_forever();
1903     }
1904   source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.value);
1905 #if DEBUG_DHT > 1
1906       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1907                   "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
1908                   "DHT", GNUNET_h2s (msg_ctx->key), msg_ctx->unique_id);
1909 #endif
1910   return GNUNET_YES;
1911 }
1912
1913
1914 /**
1915  * Main function that handles whether or not to route a message to other
1916  * peers.
1917  *
1918  * @param msg the message to be routed
1919  *
1920  * @return the number of peers the message was routed to,
1921  *         GNUNET_SYSERR on failure
1922  */
1923 static int route_message(void *cls,
1924                          const struct GNUNET_MessageHeader *msg,
1925                          struct DHT_MessageContext *message_context)
1926 {
1927   int i;
1928   struct PeerInfo *selected;
1929   struct PeerInfo *nearest;
1930   unsigned int forward_count;
1931 #if DEBUG_DHT
1932   char *nearest_buf;
1933 #endif
1934 #if DEBUG_DHT_ROUTING
1935   int ret;
1936 #endif
1937
1938   message_context->closest = am_closest_peer(message_context->key);
1939   forward_count = get_forward_count(message_context->hop_count, message_context->replication);
1940   nearest = find_closest_peer(message_context->key);
1941
1942   if (message_context->bloom == NULL)
1943     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1944   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1945
1946   if ((stop_on_closest == GNUNET_YES) && (message_context->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
1947     forward_count = 0;
1948
1949 #if DEBUG_DHT_ROUTING
1950   if (forward_count == 0)
1951     ret = GNUNET_SYSERR;
1952   else
1953     ret = GNUNET_NO;
1954
1955   if ((debug_routes_extended) && (dhtlog_handle != NULL))
1956     {
1957       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1958                                    message_context->hop_count, ret,
1959                                    &my_identity, message_context->key, message_context->peer,
1960                                    NULL);
1961     }
1962 #endif
1963
1964   switch (ntohs(msg->type))
1965     {
1966     case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
1967       cache_response (cls, message_context);
1968       if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
1969         forward_count = 0;
1970       break;
1971     case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding?*/
1972       if (message_context->closest == GNUNET_YES)
1973         {
1974 #if DEBUG_DHT_ROUTING
1975           if ((debug_routes_extended) && (dhtlog_handle != NULL))
1976             {
1977               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1978                                            message_context->hop_count, GNUNET_YES,
1979                                            &my_identity, message_context->key, message_context->peer,
1980                                            NULL);
1981             }
1982 #endif
1983           handle_dht_put (cls, msg, message_context);
1984         }
1985 #if DEBUG_DHT_ROUTING
1986         if (message_context->hop_count == 0) /* Locally initiated request */
1987           {
1988             if ((debug_routes) && (dhtlog_handle != NULL))
1989               {
1990                 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
1991                                              message_context->hop_count, GNUNET_NO, &my_identity,
1992                                              message_context->key);
1993               }
1994           }
1995 #endif
1996       break;
1997     case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest, check options, add to requests seen */
1998       cache_response (cls, message_context);
1999       if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2000         handle_dht_find_peer (cls, msg, message_context);
2001       break;
2002     default:
2003       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2004                   "`%s': Message type (%d) not handled\n", "DHT", ntohs(msg->type));
2005     }
2006
2007   for (i = 0; i < forward_count; i++)
2008     {
2009       selected = select_peer(message_context->key, message_context->bloom);
2010
2011       if (selected != NULL)
2012         {
2013           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
2014 #if DEBUG_DHT_ROUTING > 1
2015           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
2016           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2017                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
2018                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&selected->id), nearest_buf, matching_bits(&nearest->id.hashPubKey, message_context->key), distance(&nearest->id.hashPubKey, message_context->key));
2019           GNUNET_free(nearest_buf);
2020 #endif
2021           /* FIXME: statistics */
2022           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2023             {
2024               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2025                                            message_context->hop_count, GNUNET_NO,
2026                                            &my_identity, message_context->key, message_context->peer,
2027                                            &selected->id);
2028             }
2029           forward_message(cls, msg, selected, message_context);
2030         }
2031       else
2032         {
2033 #if DEBUG_DHT
2034           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2035                       "`%s:%s': No peers selected for forwarding.\n", my_short_id,
2036                       "DHT");
2037 #endif
2038         }
2039     }
2040 #if DEBUG_DHT_ROUTING > 1
2041   if (forward_count == 0)
2042     {
2043       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2044                   "`%s:%s': NOT Forwarding request key %s uid %llu to any peers\n", my_short_id,
2045                   "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
2046     }
2047 #endif
2048
2049   if (message_context->bloom != NULL)
2050     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2051
2052   return forward_count;
2053 }
2054
2055 /**
2056  * Find a client if it exists, add it otherwise.
2057  *
2058  * @param client the server handle to the client
2059  *
2060  * @return the client if found, a new client otherwise
2061  */
2062 static struct ClientList *
2063 find_active_client (struct GNUNET_SERVER_Client *client)
2064 {
2065   struct ClientList *pos = client_list;
2066   struct ClientList *ret;
2067
2068   while (pos != NULL)
2069     {
2070       if (pos->client_handle == client)
2071         return pos;
2072       pos = pos->next;
2073     }
2074
2075   ret = GNUNET_malloc (sizeof (struct ClientList));
2076   ret->client_handle = client;
2077   ret->next = client_list;
2078   client_list = ret;
2079   return ret;
2080 }
2081
2082 /**
2083  * Handler for any generic DHT messages, calls the appropriate handler
2084  * depending on message type, sends confirmation if responses aren't otherwise
2085  * expected.
2086  *
2087  * @param cls closure for the service
2088  * @param client the client we received this message from
2089  * @param message the actual message received
2090  */
2091 static void
2092 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
2093                                 const struct GNUNET_MessageHeader *message)
2094 {
2095   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
2096   const struct GNUNET_MessageHeader *enc_msg;
2097   struct DHT_MessageContext message_context;
2098   size_t enc_type;
2099
2100   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
2101   enc_type = ntohs (enc_msg->type);
2102
2103 #if DEBUG_DHT
2104   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
2106               my_short_id, "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
2107               GNUNET_ntohll (dht_msg->unique_id));
2108 #endif
2109 #if DEBUG_DHT_ROUTING
2110   if (dhtlog_handle != NULL)
2111     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
2112 #endif
2113   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2114   message_context.client = find_active_client (client);
2115   message_context.key = &dht_msg->key;
2116   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
2117   message_context.replication = ntohl (dht_msg->desired_replication_level);
2118   message_context.msg_options = ntohl (dht_msg->options);
2119   message_context.network_size = estimate_diameter();
2120   message_context.peer = &my_identity;
2121
2122   route_message(cls, enc_msg, &message_context);
2123
2124   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2125
2126 }
2127
2128 /**
2129  * Handler for any generic DHT stop messages, calls the appropriate handler
2130  * depending on message type (if processed locally)
2131  *
2132  * @param cls closure for the service
2133  * @param client the client we received this message from
2134  * @param message the actual message received
2135  *
2136  */
2137 static void
2138 handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
2139                             const struct GNUNET_MessageHeader *message)
2140 {
2141
2142   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
2143     (const struct GNUNET_DHT_StopMessage *) message;
2144   struct DHTQueryRecord *record;
2145   struct DHTRouteSource *pos;
2146   uint64_t uid;
2147 #if DEBUG_DHT
2148   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2149               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
2150               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
2151 #endif
2152
2153   uid = GNUNET_ntohll(dht_stop_msg->unique_id);
2154
2155   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
2156   if (record != NULL)
2157     {
2158       pos = record->head;
2159
2160       while (pos != NULL)
2161         {
2162           if ((pos->client != NULL) && (pos->client->client_handle == client))
2163             {
2164               GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
2165               GNUNET_SCHEDULER_add_now(sched, &remove_forward_entry, pos);
2166             }
2167           pos = pos->next;
2168         }
2169     }
2170
2171   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2172 }
2173
2174
2175 /**
2176  * Core handler for p2p route requests.
2177  */
2178 static int
2179 handle_dht_p2p_route_request (void *cls,
2180                               const struct GNUNET_PeerIdentity *peer,
2181                               const struct GNUNET_MessageHeader *message,
2182                               struct GNUNET_TIME_Relative latency, uint32_t distance)
2183 {
2184 #if DEBUG_DHT
2185   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2186               "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
2187 #endif
2188   struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
2189   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2190   struct DHT_MessageContext *message_context;
2191
2192   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2193     {
2194       GNUNET_break_op(0);
2195       return GNUNET_YES;
2196     }
2197   //memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2198   message_context = GNUNET_malloc(sizeof (struct DHT_MessageContext));
2199   message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2200   GNUNET_assert(message_context->bloom != NULL);
2201   message_context->hop_count = ntohl(incoming->hop_count);
2202   message_context->key = &incoming->key;
2203   message_context->replication = ntohl(incoming->desired_replication_level);
2204   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
2205   message_context->msg_options = ntohl(incoming->options);
2206   message_context->network_size = ntohl(incoming->network_size);
2207   message_context->peer = peer;
2208   route_message(cls, enc_msg, message_context);
2209   GNUNET_free(message_context);
2210   return GNUNET_YES;
2211 }
2212
2213
2214 /**
2215  * Core handler for p2p route results.
2216  */
2217 static int
2218 handle_dht_p2p_route_result (void *cls,
2219                              const struct GNUNET_PeerIdentity *peer,
2220                              const struct GNUNET_MessageHeader *message,
2221                              struct GNUNET_TIME_Relative latency, uint32_t distance)
2222 {
2223 #if DEBUG_DHT
2224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2225               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
2226 #endif
2227   struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
2228   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2229   struct DHT_MessageContext message_context;
2230
2231   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2232     {
2233       GNUNET_break_op(0);
2234       return GNUNET_YES;
2235     }
2236   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2237   message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2238   GNUNET_assert(message_context.bloom != NULL);
2239   message_context.key = &incoming->key;
2240   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
2241   message_context.msg_options = ntohl(incoming->options);
2242   message_context.hop_count = ntohl(incoming->hop_count);
2243   message_context.peer = peer;
2244   route_result_message(cls, enc_msg, &message_context);
2245   return GNUNET_YES;
2246 }
2247
2248
2249 /**
2250  * Receive the HELLO from transport service,
2251  * free current and replace if necessary.
2252  *
2253  * @param cls NULL
2254  * @param message HELLO message of peer
2255  */
2256 static void
2257 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2258 {
2259 #if DEBUG_DHT
2260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2261               "Received our `%s' from transport service\n",
2262               "HELLO");
2263 #endif
2264
2265   GNUNET_assert (message != NULL);
2266   GNUNET_free_non_null(my_hello);
2267   my_hello = GNUNET_malloc(ntohs(message->size));
2268   memcpy(my_hello, message, ntohs(message->size));
2269 }
2270
2271 /**
2272  * Task run during shutdown.
2273  *
2274  * @param cls unused
2275  * @param tc unused
2276  */
2277 static void
2278 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2279 {
2280   int bucket_count;
2281   struct PeerInfo *pos;
2282
2283   if (transport_handle != NULL)
2284   {
2285     GNUNET_free_non_null(my_hello);
2286     GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
2287     GNUNET_TRANSPORT_disconnect(transport_handle);
2288   }
2289
2290   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2291     {
2292       while (k_buckets[bucket_count].head != NULL)
2293         {
2294           pos = k_buckets[bucket_count].head;
2295 #if DEBUG_DHT
2296           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2297                       "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
2298 #endif
2299           delete_peer(pos, bucket_count);
2300         }
2301     }
2302   if (coreAPI != NULL)
2303     {
2304 #if DEBUG_DHT
2305       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306                   "%s:%s Disconnecting core!\n", my_short_id, "DHT");
2307 #endif
2308       GNUNET_CORE_disconnect (coreAPI);
2309     }
2310   if (datacache != NULL)
2311     {
2312 #if DEBUG_DHT
2313       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2314                   "%s:%s Destroying datacache!\n", my_short_id, "DHT");
2315 #endif
2316       GNUNET_DATACACHE_destroy (datacache);
2317     }
2318
2319   if (dhtlog_handle != NULL)
2320     GNUNET_DHTLOG_disconnect(dhtlog_handle);
2321
2322   GNUNET_free_non_null(my_short_id);
2323 }
2324
2325
2326 /**
2327  * To be called on core init/fail.
2328  *
2329  * @param cls service closure
2330  * @param server handle to the server for this service
2331  * @param identity the public identity of this peer
2332  * @param publicKey the public key of this peer
2333  */
2334 void
2335 core_init (void *cls,
2336            struct GNUNET_CORE_Handle *server,
2337            const struct GNUNET_PeerIdentity *identity,
2338            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
2339 {
2340
2341   if (server == NULL)
2342     {
2343 #if DEBUG_DHT
2344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2345               "%s: Connection to core FAILED!\n", "dht",
2346               GNUNET_i2s (identity));
2347 #endif
2348       GNUNET_SCHEDULER_cancel (sched, cleanup_task);
2349       GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
2350       return;
2351     }
2352 #if DEBUG_DHT
2353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2354               "%s: Core connection initialized, I am peer: %s\n", "dht",
2355               GNUNET_i2s (identity));
2356 #endif
2357   /* Copy our identity so we can use it */
2358   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
2359   my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
2360   /* Set the server to local variable */
2361   coreAPI = server;
2362
2363   if (dhtlog_handle != NULL)
2364     dhtlog_handle->insert_node (NULL, &my_identity);
2365 }
2366
2367
2368 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
2369   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_LOCAL_DHT_ROUTE, 0},
2370   {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE_STOP, 0},
2371   {NULL, NULL, 0, 0}
2372 };
2373
2374
2375 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2376   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_P2P_DHT_ROUTE, 0},
2377   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_P2P_DHT_ROUTE_RESULT, 0},
2378   {NULL, 0, 0}
2379 };
2380
2381 /**
2382  * Method called whenever a peer connects.
2383  *
2384  * @param cls closure
2385  * @param peer peer identity this notification is about
2386  * @param latency reported latency of the connection with peer
2387  * @param distance reported distance (DV) to peer
2388  */
2389 void handle_core_connect (void *cls,
2390                           const struct GNUNET_PeerIdentity * peer,
2391                           struct GNUNET_TIME_Relative latency,
2392                           uint32_t distance)
2393 {
2394   int ret;
2395
2396 #if DEBUG_DHT
2397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2398               "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
2399 #endif
2400
2401   ret = try_add_peer(peer,
2402                      find_current_bucket(&peer->hashPubKey),
2403                      latency,
2404                      distance);
2405 #if DEBUG_DHT
2406     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2407                 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == GNUNET_YES ? "PEER ADDED" : "NOT ADDED");
2408 #endif
2409 }
2410
2411 /**
2412  * Process dht requests.
2413  *
2414  * @param cls closure
2415  * @param scheduler scheduler to use
2416  * @param server the initialized server
2417  * @param c configuration to use
2418  */
2419 static void
2420 run (void *cls,
2421      struct GNUNET_SCHEDULER_Handle *scheduler,
2422      struct GNUNET_SERVER_Handle *server,
2423      const struct GNUNET_CONFIGURATION_Handle *c)
2424 {
2425   sched = scheduler;
2426   cfg = c;
2427   datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
2428   GNUNET_SERVER_add_handlers (server, plugin_handlers);
2429   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
2430                                  cfg,   /* Main configuration */
2431                                  GNUNET_TIME_UNIT_FOREVER_REL,
2432                                  NULL,  /* Closure passed to DHT functionas around? */
2433                                  &core_init,    /* Call core_init once connected */
2434                                  &handle_core_connect,  /* Don't care about connects */
2435                                  NULL,  /* FIXME: remove peers on disconnects */
2436                                  NULL,  /* Do we care about "status" updates? */
2437                                  NULL,  /* Don't want notified about all incoming messages */
2438                                  GNUNET_NO,     /* For header only inbound notification */
2439                                  NULL,  /* Don't want notified about all outbound messages */
2440                                  GNUNET_NO,     /* For header only outbound notification */
2441                                  core_handlers);        /* Register these handlers */
2442
2443   if (coreAPI == NULL)
2444     return;
2445   transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, NULL, NULL, NULL, NULL);
2446   if (transport_handle != NULL)
2447     GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
2448   else
2449     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
2450
2451   lowest_bucket = MAX_BUCKETS - 1;
2452   forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
2453   forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
2454   /* Scheduled the task to clean up when shutdown is called */
2455
2456   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
2457     {
2458       debug_routes = GNUNET_YES;
2459     }
2460
2461   if (GNUNET_YES ==
2462       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2463                                            "stop_on_closest"))
2464     {
2465       stop_on_closest = GNUNET_YES;
2466     }
2467
2468   if (GNUNET_YES ==
2469       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2470                                            "stop_found"))
2471     {
2472       stop_on_found = GNUNET_YES;
2473     }
2474
2475   if (GNUNET_YES ==
2476       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
2477                                            "mysql_logging_extended"))
2478     {
2479       debug_routes = GNUNET_YES;
2480       debug_routes_extended = GNUNET_YES;
2481     }
2482
2483   if (GNUNET_YES == debug_routes)
2484     {
2485       dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
2486       if (dhtlog_handle == NULL)
2487         {
2488           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2489                       "Could not connect to mysql logging server, logging will not happen!");
2490           return;
2491         }
2492     }
2493
2494   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
2495                                                GNUNET_TIME_UNIT_FOREVER_REL,
2496                                                &shutdown_task, NULL);
2497 }
2498
2499 /**
2500  * The main function for the dht service.
2501  *
2502  * @param argc number of arguments from the command line
2503  * @param argv command line arguments
2504  * @return 0 ok, 1 on error
2505  */
2506 int
2507 main (int argc, char *const *argv)
2508 {
2509   return (GNUNET_OK ==
2510           GNUNET_SERVICE_run (argc,
2511                               argv,
2512                               "dht",
2513                               GNUNET_SERVICE_OPTION_NONE,
2514                               &run, NULL)) ? 0 : 1;
2515 }