core request connect in addition to hello, minor driver stuff
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief main DHT service shell, building block for DHT implementations
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_getopt_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_service_lib.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_signal_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "dhtlog.h"
43 #include "dht.h"
44
45 #define PRINT_TABLES GNUNET_NO
46
47 #define EXTRA_CHECKS GNUNET_NO
48 /**
49  * How many buckets will we allow total.
50  */
51 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
52
53 /**
54  * Should the DHT issue FIND_PEER requests to get better routing tables?
55  */
56 #define DO_FIND_PEER GNUNET_YES
57
58 /**
59  * What is the maximum number of peers in a given bucket.
60  */
61 #define DEFAULT_BUCKET_SIZE 4
62
63 /**
64  * Minimum number of peers we need for "good" routing,
65  * any less than this and we will allow messages to
66  * travel much further through the network!
67  */
68 #define MINIMUM_PEER_THRESHOLD 20
69
70 #define DHT_MAX_RECENT 1000
71
72 #define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
73
74 /**
75  * Default time to wait to send messages on behalf of other peers.
76  */
77 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
78
79 /**
80  * Default importance for handling messages on behalf of other peers.
81  */
82 #define DHT_DEFAULT_P2P_IMPORTANCE 0
83
84 /**
85  * How long to keep recent requests around by default.
86  */
87 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
88
89 /**
90  * Default time to wait to send find peer messages sent by the dht service.
91  */
92 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
93
94 /**
95  * Default importance for find peer messages sent by the dht service.
96  */
97 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
98
99 /**
100  * Default replication parameter for find peer messages sent by the dht service.
101  */
102 #define DHT_DEFAULT_FIND_PEER_REPLICATION 2
103
104 /**
105  * Default options for find peer requests sent by the dht service.
106  */
107 #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
108 /*#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE*/
109
110 /**
111  * How long at least to wait before sending another find peer request.
112  */
113 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
114
115 /**
116  * How long at most to wait before sending another find peer request.
117  */
118 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
119
120 /**
121  * How often to update our preference levels for peers in our routing tables.
122  */
123 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
124
125 /**
126  * How long at most on average will we allow a reply forward to take
127  * (before we quit sending out new requests)
128  */
129 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
130
131 /**
132  * How many initial requests to send out (in true Kademlia fashion)
133  */
134 #define DHT_KADEMLIA_REPLICATION 3
135
136 /*
137  * Default frequency for sending malicious get messages
138  */
139 #define DEFAULT_MALICIOUS_GET_FREQUENCY 1000 /* Number of milliseconds */
140
141 /*
142  * Default frequency for sending malicious put messages
143  */
144 #define DEFAULT_MALICIOUS_PUT_FREQUENCY 1000 /* Default is in milliseconds */
145
146 /**
147  * Type for a malicious request, so we can ignore it during testing
148  */
149 #define DHT_MALICIOUS_MESSAGE_TYPE 42
150
151 #define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
152
153 /**
154  * Real maximum number of hops, at which point we refuse
155  * to forward the message.
156  */
157 #define MAX_HOPS 10
158
159 /**
160  * How many time differences between requesting a core send and
161  * the actual callback to remember.
162  */
163 #define MAX_REPLY_TIMES 8
164
165 /**
166  * Linked list of messages to send to clients.
167  */
168 struct P2PPendingMessage
169 {
170   /**
171    * Pointer to next item in the list
172    */
173   struct P2PPendingMessage *next;
174
175   /**
176    * Pointer to previous item in the list
177    */
178   struct P2PPendingMessage *prev;
179
180   /**
181    * Message importance level.
182    */
183   unsigned int importance;
184
185   /**
186    * Time when this request was scheduled to be sent.
187    */
188   struct GNUNET_TIME_Absolute scheduled;
189
190   /**
191    * How long to wait before sending message.
192    */
193   struct GNUNET_TIME_Relative timeout;
194
195   /**
196    * Actual message to be sent; // avoid allocation
197    */
198   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
199
200 };
201
202 /**
203  * Per-peer information.
204  */
205 struct PeerInfo
206 {
207   /**
208    * Next peer entry (DLL)
209    */
210   struct PeerInfo *next;
211
212   /**
213    *  Prev peer entry (DLL)
214    */
215   struct PeerInfo *prev;
216
217   /**
218    * Head of pending messages to be sent to this peer.
219    */
220   struct P2PPendingMessage *head;
221
222   /**
223    * Tail of pending messages to be sent to this peer.
224    */
225   struct P2PPendingMessage *tail;
226
227   /**
228    * Core handle for sending messages to this peer.
229    */
230   struct GNUNET_CORE_TransmitHandle *th;
231
232   /**
233    * Task for scheduling message sends.
234    */
235   GNUNET_SCHEDULER_TaskIdentifier send_task;
236
237   /**
238    * Task for scheduling preference updates
239    */
240   GNUNET_SCHEDULER_TaskIdentifier preference_task;
241
242   /**
243    * Preference update context
244    */
245   struct GNUNET_CORE_InformationRequestContext *info_ctx;
246
247   /**
248    * What is the average latency for replies received?
249    */
250   struct GNUNET_TIME_Relative latency;
251
252   /**
253    * Number of responses received
254    */
255   unsigned long long response_count;
256
257   /**
258    * Number of requests sent
259    */
260   unsigned long long request_count;
261
262   /**
263    * What is the identity of the peer?
264    */
265   struct GNUNET_PeerIdentity id;
266
267   /**
268    * Transport level distance to peer.
269    */
270   unsigned int distance;
271
272   /**
273    * Task for scheduling periodic ping messages for this peer.
274    */
275   GNUNET_SCHEDULER_TaskIdentifier ping_task;
276 };
277
278 /**
279  * Peers are grouped into buckets.
280  */
281 struct PeerBucket
282 {
283   /**
284    * Head of DLL
285    */
286   struct PeerInfo *head;
287
288   /**
289    * Tail of DLL
290    */
291   struct PeerInfo *tail;
292
293   /**
294    * Number of peers in the bucket.
295    */
296   unsigned int peers_size;
297 };
298
299 /**
300  * Linked list of messages to send to clients.
301  */
302 struct PendingMessage
303 {
304   /**
305    * Pointer to next item in the list
306    */
307   struct PendingMessage *next;
308
309   /**
310    * Pointer to previous item in the list
311    */
312   struct PendingMessage *prev;
313
314   /**
315    * Actual message to be sent; // avoid allocation
316    */
317   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
318
319 };
320
321 /**
322  * Struct containing information about a client,
323  * handle to connect to it, and any pending messages
324  * that need to be sent to it.
325  */
326 struct ClientList
327 {
328   /**
329    * Linked list of active clients
330    */
331   struct ClientList *next;
332
333   /**
334    * The handle to this client
335    */
336   struct GNUNET_SERVER_Client *client_handle;
337
338   /**
339    * Handle to the current transmission request, NULL
340    * if none pending.
341    */
342   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
343
344   /**
345    * Linked list of pending messages for this client
346    */
347   struct PendingMessage *pending_head;
348
349   /**
350    * Tail of linked list of pending messages for this client
351    */
352   struct PendingMessage *pending_tail;
353 };
354
355
356 /**
357  * Context containing information about a DHT message received.
358  */
359 struct DHT_MessageContext
360 {
361   /**
362    * The client this request was received from.
363    * (NULL if received from another peer)
364    */
365   struct ClientList *client;
366
367   /**
368    * The peer this request was received from.
369    * (NULL if received from local client)
370    */
371   const struct GNUNET_PeerIdentity *peer;
372
373   /**
374    * The key this request was about
375    */
376   GNUNET_HashCode key;
377
378   /**
379    * The unique identifier of this request
380    */
381   uint64_t unique_id;
382
383   /**
384    * Desired replication level
385    */
386   uint32_t replication;
387
388   /**
389    * Network size estimate, either ours or the sum of
390    * those routed to thus far. =~ Log of number of peers
391    * chosen from for this request.
392    */
393   uint32_t network_size;
394
395   /**
396    * Any message options for this request
397    */
398   uint32_t msg_options;
399
400   /**
401    * How many hops has the message already traversed?
402    */
403   uint32_t hop_count;
404
405   /**
406    * How important is this message?
407    */
408   unsigned int importance;
409
410   /**
411    * How long should we wait to transmit this request?
412    */
413   struct GNUNET_TIME_Relative timeout;
414
415   /**
416    * Bloomfilter for this routing request.
417    */
418   struct GNUNET_CONTAINER_BloomFilter *bloom;
419
420   /**
421    * Did we forward this message? (may need to remember it!)
422    */
423   int forwarded;
424
425   /**
426    * Are we the closest known peer to this key (out of our neighbors?)
427    */
428   int closest;
429 };
430
431 /**
432  * Record used for remembering what peers are waiting for what
433  * responses (based on search key).
434  */
435 struct DHTRouteSource
436 {
437   /**
438    * This is a DLL.
439    */
440   struct DHTRouteSource *next;
441
442   /**
443    * This is a DLL.
444    */
445   struct DHTRouteSource *prev;
446
447   /**
448    * Source of the request.  Replies should be forwarded to
449    * this peer.
450    */
451   struct GNUNET_PeerIdentity source;
452
453   /**
454    * If this was a local request, remember the client; otherwise NULL.
455    */
456   struct ClientList *client;
457
458   /**
459    * Pointer to this nodes heap location (for removal)
460    */
461   struct GNUNET_CONTAINER_HeapNode *hnode;
462
463   /**
464    * Back pointer to the record storing this information.
465    */
466   struct DHTQueryRecord *record;
467
468   /**
469    * Task to remove this entry on timeout.
470    */
471   GNUNET_SCHEDULER_TaskIdentifier delete_task;
472
473   /**
474    * Bloomfilter of peers we have already sent back as
475    * replies to the initial request.  Allows us to not
476    * forward the same peer multiple times for a find peer
477    * request.
478    */
479   struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
480
481 };
482
483 /**
484  * Entry in the DHT routing table.
485  */
486 struct DHTQueryRecord
487 {
488   /**
489    * Head of DLL for result forwarding.
490    */
491   struct DHTRouteSource *head;
492
493   /**
494    * Tail of DLL for result forwarding.
495    */
496   struct DHTRouteSource *tail;
497
498   /**
499    * Key that the record concerns.
500    */
501   GNUNET_HashCode key;
502
503   /**
504    * GET message of this record (what we already forwarded?).
505    */
506   //DV_DHT_MESSAGE get; Try to get away with not saving this.
507
508   /**
509    * Bloomfilter of the peers we've replied to so far
510    */
511   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
512
513 };
514
515 /**
516  * Context used to calculate the number of find peer messages
517  * per X time units since our last scheduled find peer message
518  * was sent.  If we have seen too many messages, delay or don't
519  * send our own out.
520  */
521 struct FindPeerMessageContext
522 {
523   unsigned int count;
524
525   struct GNUNET_TIME_Absolute start;
526
527   struct GNUNET_TIME_Absolute end;
528 };
529
530 /**
531  * DHT Routing results structure
532  */
533 struct DHTResults
534 {
535   /*
536    * Min heap for removal upon reaching limit
537    */
538   struct GNUNET_CONTAINER_Heap *minHeap;
539
540   /*
541    * Hashmap for fast key based lookup
542    */
543   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
544
545 };
546
547 /**
548  * DHT structure for recent requests.
549  */
550 struct RecentRequests
551 {
552   /*
553    * Min heap for removal upon reaching limit
554    */
555   struct GNUNET_CONTAINER_Heap *minHeap;
556
557   /*
558    * Hashmap for key based lookup
559    */
560   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
561 };
562
563 struct RecentRequest
564 {
565   /**
566    * Position of this node in the min heap.
567    */
568   struct GNUNET_CONTAINER_HeapNode *heap_node;
569
570   /**
571    * Bloomfilter containing entries for peers
572    * we forwarded this request to.
573    */
574   struct GNUNET_CONTAINER_BloomFilter *bloom;
575
576   /**
577    * Timestamp of this request, for ordering
578    * the min heap.
579    */
580   struct GNUNET_TIME_Absolute timestamp;
581
582   /**
583    * Key of this request.
584    */
585   GNUNET_HashCode key;
586
587   /**
588    * Unique identifier for this request.
589    */
590   uint64_t uid;
591
592   /**
593    * Task to remove this entry on timeout.
594    */
595   GNUNET_SCHEDULER_TaskIdentifier remove_task;
596 };
597
598 /**
599  * Recent requests by hash/uid and by time inserted.
600  */
601 static struct RecentRequests recent;
602
603 /**
604  * Context to use to calculate find peer rates.
605  */
606 static struct FindPeerMessageContext find_peer_context;
607
608 /**
609  * Don't use our routing algorithm, always route
610  * to closest peer; initially send requests to 3
611  * peers.
612  */
613 static int strict_kademlia;
614
615 /**
616  * Routing option to end routing when closest peer found.
617  */
618 static int stop_on_closest;
619
620 /**
621  * Routing option to end routing when data is found.
622  */
623 static int stop_on_found;
624
625 /**
626  * Whether DHT needs to manage find peer requests, or
627  * an external force will do it on behalf of the DHT.
628  */
629 static int do_find_peer;
630
631 /**
632  * How many peers have we added since we sent out our last
633  * find peer request?
634  */
635 static unsigned int newly_found_peers;
636
637 /**
638  * Container of active queries we should remember
639  */
640 static struct DHTResults forward_list;
641
642 /**
643  * Handle to the datacache service (for inserting/retrieving data)
644  */
645 static struct GNUNET_DATACACHE_Handle *datacache;
646
647 /**
648  * Handle for the statistics service.
649  */
650 struct GNUNET_STATISTICS_Handle *stats;
651
652 /**
653  * The main scheduler to use for the DHT service
654  */
655 static struct GNUNET_SCHEDULER_Handle *sched;
656
657 /**
658  * The configuration the DHT service is running with
659  */
660 static const struct GNUNET_CONFIGURATION_Handle *cfg;
661
662 /**
663  * Handle to the core service
664  */
665 static struct GNUNET_CORE_Handle *coreAPI;
666
667 /**
668  * Handle to the transport service, for getting our hello
669  */
670 static struct GNUNET_TRANSPORT_Handle *transport_handle;
671
672 /**
673  * The identity of our peer.
674  */
675 static struct GNUNET_PeerIdentity my_identity;
676
677 /**
678  * Short id of the peer, for printing
679  */
680 static char *my_short_id;
681
682 /**
683  * Our HELLO
684  */
685 static struct GNUNET_MessageHeader *my_hello;
686
687 /**
688  * Task to run when we shut down, cleaning up all our trash
689  */
690 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
691
692 /**
693  * The lowest currently used bucket.
694  */
695 static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
696
697 /**
698  * The buckets (Kademlia routing table, complete with growth).
699  * Array of size MAX_BUCKET_SIZE.
700  */
701 static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
702
703 /**
704  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
705  */
706 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
707
708 /**
709  * Recently seen find peer requests.
710  */
711 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
712
713 /**
714  * Maximum size for each bucket.
715  */
716 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
717
718 /**
719  * List of active clients.
720  */
721 static struct ClientList *client_list;
722
723 /**
724  * Handle to the DHT logger.
725  */
726 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
727
728 /*
729  * Whether or not to send routing debugging information
730  * to the dht logging server
731  */
732 static unsigned int debug_routes;
733
734 /*
735  * Whether or not to send FULL route information to
736  * logging server
737  */
738 static unsigned int debug_routes_extended;
739
740 /*
741  * GNUNET_YES or GNUNET_NO, whether or not to act as
742  * a malicious node which drops all messages
743  */
744 static unsigned int malicious_dropper;
745
746 /*
747  * GNUNET_YES or GNUNET_NO, whether or not to act as
748  * a malicious node which sends out lots of GETS
749  */
750 static unsigned int malicious_getter;
751
752 /**
753  * GNUNET_YES or GNUNET_NO, whether or not to act as
754  * a malicious node which sends out lots of PUTS
755  */
756 static unsigned int malicious_putter;
757
758 /**
759  * Frequency for malicious get requests.
760  */
761 static unsigned long long malicious_get_frequency;
762
763 /**
764  * Frequency for malicious put requests.
765  */
766 static unsigned long long malicious_put_frequency;
767
768 /**
769  * Reply times for requests, if we are busy, don't send any
770  * more requests!
771  */
772 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
773
774 /**
775  * Current counter for replies.
776  */
777 static unsigned int reply_counter;
778
779 /**
780  * Forward declaration.
781  */
782 static size_t send_generic_reply (void *cls, size_t size, void *buf);
783
784 /** Declare here so retry_core_send is aware of it */
785 size_t core_transmit_notify (void *cls,
786                              size_t size, void *buf);
787
788 /**
789  * Convert unique ID to hash code.
790  *
791  * @param uid unique ID to convert
792  * @param hash set to uid (extended with zeros)
793  */
794 static void
795 hash_from_uid (uint64_t uid,
796                GNUNET_HashCode *hash)
797 {
798   memset (hash, 0, sizeof(GNUNET_HashCode));
799   *((uint64_t*)hash) = uid;
800 }
801
802 #if AVG
803 /**
804  * Calculate the average send time between messages so that we can
805  * ignore certain requests if we get too busy.
806  *
807  * @return the average time between asking core to send a message
808  *         and when the buffer for copying it is passed
809  */
810 static struct GNUNET_TIME_Relative get_average_send_delay()
811 {
812   unsigned int i;
813   unsigned int divisor;
814   struct GNUNET_TIME_Relative average_time;
815   average_time = GNUNET_TIME_relative_get_zero();
816   divisor = 0;
817   for (i = 0; i < MAX_REPLY_TIMES; i++)
818   {
819     average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]);
820     if (reply_times[i].value == (uint64_t)0)
821       continue;
822     else
823       divisor++;
824   }
825   if (divisor == 0)
826   {
827     return average_time;
828   }
829
830   average_time = GNUNET_TIME_relative_divide(average_time, divisor);
831   fprintf(stderr, "Avg send delay: %u sends is %llu\n", divisor, (long long unsigned int)average_time.value);
832   return average_time;
833 }
834 #endif
835
836 /**
837  * Given the largest send delay, artificially decrease it
838  * so the next time around we may have a chance at sending
839  * again.
840  */
841 static void decrease_max_send_delay(struct GNUNET_TIME_Relative max_time)
842 {
843   unsigned int i;
844   for (i = 0; i < MAX_REPLY_TIMES; i++)
845     {
846       if (reply_times[i].value == max_time.value)
847         {
848           reply_times[i].value = reply_times[i].value / 2;
849           return;
850         }
851     }
852 }
853
854 /**
855  * Find the maximum send time of the recently sent values.
856  *
857  * @return the average time between asking core to send a message
858  *         and when the buffer for copying it is passed
859  */
860 static struct GNUNET_TIME_Relative get_max_send_delay()
861 {
862   unsigned int i;
863   struct GNUNET_TIME_Relative max_time;
864   max_time = GNUNET_TIME_relative_get_zero();
865
866   for (i = 0; i < MAX_REPLY_TIMES; i++)
867   {
868     if (reply_times[i].value > max_time.value)
869       max_time.value = reply_times[i].value;
870   }
871
872   if (max_time.value > MAX_REQUEST_TIME.value)
873     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", (long long unsigned int)max_time.value);
874   return max_time;
875 }
876
877 static void
878 increment_stats(const char *value)
879 {
880   if (stats != NULL)
881     {
882       GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
883     }
884 }
885
886 /**
887  *  Try to send another message from our core send list
888  */
889 static void
890 try_core_send (void *cls,
891                const struct GNUNET_SCHEDULER_TaskContext *tc)
892 {
893   struct PeerInfo *peer = cls;
894   struct P2PPendingMessage *pending;
895   size_t ssize;
896
897   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
898
899   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
900     return;
901
902   if (peer->th != NULL)
903     return; /* Message send already in progress */
904
905   pending = peer->head;
906   if (pending != NULL)
907     {
908       ssize = ntohs(pending->msg->size);
909 #if DEBUG_DHT > 1
910      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
912                 "DHT", ssize, GNUNET_i2s(&peer->id));
913 #endif
914       pending->scheduled = GNUNET_TIME_absolute_get();
915       reply_counter++;
916       if (reply_counter >= MAX_REPLY_TIMES)
917         reply_counter = 0;
918       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
919                                                    pending->timeout, &peer->id,
920                                                    ssize, &core_transmit_notify, peer);
921     }
922 }
923
924 /**
925  * Function called to send a request out to another peer.
926  * Called both for locally initiated requests and those
927  * received from other peers.
928  *
929  * @param cls DHT service closure argument
930  * @param msg the encapsulated message
931  * @param peer the peer to forward the message to
932  * @param msg_ctx the context of the message (hop count, bloom, etc.)
933  */
934 static void forward_result_message (void *cls,
935                                     const struct GNUNET_MessageHeader *msg,
936                                     struct PeerInfo *peer,
937                                     struct DHT_MessageContext *msg_ctx)
938 {
939   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
940   struct P2PPendingMessage *pending;
941   size_t msize;
942   size_t psize;
943
944   increment_stats(STAT_RESULT_FORWARDS);
945   msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
946   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
947   psize = sizeof(struct P2PPendingMessage) + msize;
948   pending = GNUNET_malloc(psize);
949   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
950   pending->importance = DHT_SEND_PRIORITY;
951   pending->timeout = GNUNET_TIME_relative_get_forever();
952   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
953   result_message->header.size = htons(msize);
954   result_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
955   result_message->options = htonl(msg_ctx->msg_options);
956   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
957   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
958   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
959   memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
960   memcpy(&result_message[1], msg, ntohs(msg->size));
961 #if DEBUG_DHT > 1
962   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
963 #endif
964   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
965   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
966     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
967 }
968 /**
969  * Called when core is ready to send a message we asked for
970  * out to the destination.
971  *
972  * @param cls closure (NULL)
973  * @param size number of bytes available in buf
974  * @param buf where the callee should write the message
975  * @return number of bytes written to buf
976  */
977 size_t core_transmit_notify (void *cls,
978                              size_t size, void *buf)
979 {
980   struct PeerInfo *peer = cls;
981   char *cbuf = buf;
982   struct P2PPendingMessage *pending;
983
984   size_t off;
985   size_t msize;
986
987   if (buf == NULL)
988     {
989       /* client disconnected */
990 #if DEBUG_DHT
991       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
992 #endif
993       return 0;
994     }
995
996   if (peer->head == NULL)
997     return 0;
998
999   peer->th = NULL;
1000   off = 0;
1001   pending = peer->head;
1002   reply_times[reply_counter] = GNUNET_TIME_absolute_get_difference(pending->scheduled, GNUNET_TIME_absolute_get());
1003   msize = ntohs(pending->msg->size);
1004   if (msize <= size)
1005     {
1006       off = msize;
1007       memcpy (cbuf, pending->msg, msize);
1008       GNUNET_CONTAINER_DLL_remove (peer->head,
1009                                    peer->tail,
1010                                    pending);
1011 #if DEBUG_DHT > 1
1012       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Removing pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1013 #endif
1014       GNUNET_free (pending);
1015     }
1016 #if SMART
1017   while (NULL != pending &&
1018           (size - off >= (msize = ntohs (pending->msg->size))))
1019     {
1020 #if DEBUG_DHT_ROUTING
1021       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d\n", my_short_id, "dht service", msize, size);
1022 #endif
1023       memcpy (&cbuf[off], pending->msg, msize);
1024       off += msize;
1025       GNUNET_CONTAINER_DLL_remove (peer->head,
1026                                    peer->tail,
1027                                    pending);
1028       GNUNET_free (pending);
1029       pending = peer->head;
1030     }
1031 #endif
1032   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
1033     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1034 #if DEBUG_DHT > 1
1035   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d, returning %d\n", my_short_id, "dht service", msize, size, off);
1036 #endif
1037   return off;
1038 }
1039
1040 /**
1041  * Determine how many low order bits match in two
1042  * GNUNET_HashCodes.  i.e. - 010011 and 011111 share
1043  * the first two lowest order bits, and therefore the
1044  * return value is two (NOT XOR distance, nor how many
1045  * bits match absolutely!).
1046  *
1047  * @param first the first hashcode
1048  * @param second the hashcode to compare first to
1049  *
1050  * @return the number of bits that match
1051  */
1052 static unsigned int matching_bits(const GNUNET_HashCode *first, const GNUNET_HashCode *second)
1053 {
1054   unsigned int i;
1055
1056   for (i = 0; i < sizeof (GNUNET_HashCode) * 8; i++)
1057     if (GNUNET_CRYPTO_hash_get_bit (first, i) != GNUNET_CRYPTO_hash_get_bit (second, i))
1058       return i;
1059   return sizeof (GNUNET_HashCode) * 8;
1060 }
1061
1062 /**
1063  * Compute the distance between have and target as a 32-bit value.
1064  * Differences in the lower bits must count stronger than differences
1065  * in the higher bits.
1066  *
1067  * @return 0 if have==target, otherwise a number
1068  *           that is larger as the distance between
1069  *           the two hash codes increases
1070  */
1071 static unsigned int
1072 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1073 {
1074   unsigned int bucket;
1075   unsigned int msb;
1076   unsigned int lsb;
1077   unsigned int i;
1078
1079   /* We have to represent the distance between two 2^9 (=512)-bit
1080      numbers as a 2^5 (=32)-bit number with "0" being used for the
1081      two numbers being identical; furthermore, we need to
1082      guarantee that a difference in the number of matching
1083      bits is always represented in the result.
1084
1085      We use 2^32/2^9 numerical values to distinguish between
1086      hash codes that have the same LSB bit distance and
1087      use the highest 2^9 bits of the result to signify the
1088      number of (mis)matching LSB bits; if we have 0 matching
1089      and hence 512 mismatching LSB bits we return -1 (since
1090      512 itself cannot be represented with 9 bits) */
1091
1092   /* first, calculate the most significant 9 bits of our
1093      result, aka the number of LSBs */
1094   bucket = matching_bits (target, have);
1095   /* bucket is now a value between 0 and 512 */
1096   if (bucket == 512)
1097     return 0;                   /* perfect match */
1098   if (bucket == 0)
1099     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1100                                    below, we'd end up with max+1 (overflow)) */
1101
1102   /* calculate the most significant bits of the final result */
1103   msb = (512 - bucket) << (32 - 9);
1104   /* calculate the 32-9 least significant bits of the final result by
1105      looking at the differences in the 32-9 bits following the
1106      mismatching bit at 'bucket' */
1107   lsb = 0;
1108   for (i = bucket + 1;
1109        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1110     {
1111       if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
1112         lsb |= (1 << (bucket + 32 - 9 - i));    /* first bit set will be 10,
1113                                                    last bit set will be 31 -- if
1114                                                    i does not reach 512 first... */
1115     }
1116   return msb | lsb;
1117 }
1118
1119 /**
1120  * Return a number that is larger the closer the
1121  * "have" GNUNET_hash code is to the "target".
1122  *
1123  * @return inverse distance metric, non-zero.
1124  *         Must fudge the value if NO bits match.
1125  */
1126 static unsigned int
1127 inverse_distance (const GNUNET_HashCode * target,
1128                   const GNUNET_HashCode * have)
1129 {
1130   if (matching_bits(target, have) == 0)
1131     return 1; /* Never return 0! */
1132   return ((unsigned int) -1) - distance (target, have);
1133 }
1134
1135 /**
1136  * Find the optimal bucket for this key, regardless
1137  * of the current number of buckets in use.
1138  *
1139  * @param hc the hashcode to compare our identity to
1140  *
1141  * @return the proper bucket index, or GNUNET_SYSERR
1142  *         on error (same hashcode)
1143  */
1144 static int find_bucket(const GNUNET_HashCode *hc)
1145 {
1146   unsigned int bits;
1147
1148   bits = matching_bits(&my_identity.hashPubKey, hc);
1149   if (bits == MAX_BUCKETS)
1150     return GNUNET_SYSERR;
1151   return MAX_BUCKETS - bits - 1;
1152 }
1153
1154 /**
1155  * Find which k-bucket this peer should go into,
1156  * taking into account the size of the k-bucket
1157  * array.  This means that if more bits match than
1158  * there are currently buckets, lowest_bucket will
1159  * be returned.
1160  *
1161  * @param hc GNUNET_HashCode we are finding the bucket for.
1162  *
1163  * @return the proper bucket index for this key,
1164  *         or GNUNET_SYSERR on error (same hashcode)
1165  */
1166 static int find_current_bucket(const GNUNET_HashCode *hc)
1167 {
1168   int actual_bucket;
1169   actual_bucket = find_bucket(hc);
1170
1171   if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
1172     return GNUNET_SYSERR;
1173   else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
1174     return lowest_bucket;
1175   else
1176     return actual_bucket;
1177 }
1178
1179 #if EXTRA_CHECKS
1180 /**
1181  * Find a routing table entry from a peer identity
1182  *
1183  * @param peer the peer to look up
1184  *
1185  * @return the bucket number holding the peer, GNUNET_SYSERR if not found
1186  */
1187 static int
1188 find_bucket_by_peer(const struct PeerInfo *peer)
1189 {
1190   int bucket;
1191   struct PeerInfo *pos;
1192
1193   for (bucket = lowest_bucket; bucket < MAX_BUCKETS - 1; bucket++)
1194     {
1195       pos = k_buckets[bucket].head;
1196       while (pos != NULL)
1197         {
1198           if (peer == pos)
1199             return bucket;
1200           pos = pos->next;
1201         }
1202     }
1203
1204   return GNUNET_SYSERR; /* No such peer. */
1205 }
1206 #endif
1207
1208 #if PRINT_TABLES
1209 /**
1210  * Print the complete routing table for this peer.
1211  */
1212 static void
1213 print_routing_table ()
1214 {
1215   int bucket;
1216   struct PeerInfo *pos;
1217   char char_buf[30000];
1218   int char_pos;
1219   memset(char_buf, 0, sizeof(char_buf));
1220   char_pos = 0;
1221   char_pos += sprintf(&char_buf[char_pos], "Printing routing table for peer %s\n", my_short_id);
1222   //fprintf(stderr, "Printing routing table for peer %s\n", my_short_id);
1223   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1224     {
1225       pos = k_buckets[bucket].head;
1226       char_pos += sprintf(&char_buf[char_pos], "Bucket %d:\n", bucket);
1227       //fprintf(stderr, "Bucket %d:\n", bucket);
1228       while (pos != NULL)
1229         {
1230           //fprintf(stderr, "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1231           char_pos += sprintf(&char_buf[char_pos], "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1232           pos = pos->next;
1233         }
1234     }
1235   fprintf(stderr, "%s", char_buf);
1236   fflush(stderr);
1237 }
1238 #endif
1239
1240 /**
1241  * Find a routing table entry from a peer identity
1242  *
1243  * @param peer the peer identity to look up
1244  *
1245  * @return the routing table entry, or NULL if not found
1246  */
1247 static struct PeerInfo *
1248 find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
1249 {
1250   int bucket;
1251   struct PeerInfo *pos;
1252   bucket = find_current_bucket(&peer->hashPubKey);
1253
1254   if (bucket == GNUNET_SYSERR)
1255     return NULL;
1256
1257   pos = k_buckets[bucket].head;
1258   while (pos != NULL)
1259     {
1260       if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
1261         return pos;
1262       pos = pos->next;
1263     }
1264   return NULL; /* No such peer. */
1265 }
1266
1267 /* Forward declaration */
1268 static void
1269 update_core_preference (void *cls,
1270                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1271 /**
1272  * Function called with statistics about the given peer.
1273  *
1274  * @param cls closure
1275  * @param peer identifies the peer
1276  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1277  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1278  * @param latency current latency estimate, "FOREVER" if we have been
1279  *                disconnected
1280  * @param amount set to the amount that was actually reserved or unreserved;
1281  *               either the full requested amount or zero (no partial reservations)
1282  * @param preference current traffic preference for the given peer
1283  */
1284 static void
1285 update_core_preference_finish (void *cls,
1286                                const struct
1287                                GNUNET_PeerIdentity * peer,
1288                                struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1289                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1290                                int amount,
1291                                uint64_t preference)
1292 {
1293   struct PeerInfo *peer_info = cls;
1294   peer_info->info_ctx = NULL;
1295   GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PREFERENCE_INTERVAL, &update_core_preference, peer_info);
1296 }
1297
1298 static void
1299 update_core_preference (void *cls,
1300                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1301 {
1302   struct PeerInfo *peer = cls;
1303   uint64_t preference;
1304
1305   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1306     {
1307       return;
1308     }
1309
1310   preference = 2 << matching_bits(&my_identity.hashPubKey, &peer->id.hashPubKey);
1311   peer->info_ctx = GNUNET_CORE_peer_change_preference (sched, cfg,
1312                                                        &peer->id,
1313                                                        GNUNET_TIME_relative_get_forever(),
1314                                                        GNUNET_BANDWIDTH_value_init (UINT32_MAX),
1315                                                        0,
1316                                                        preference,
1317                                                        &update_core_preference_finish,
1318                                                        peer);
1319 }
1320
1321 /**
1322  * Really add a peer to a bucket (only do assertions
1323  * on size, etc.)
1324  *
1325  * @param peer GNUNET_PeerIdentity of the peer to add
1326  * @param bucket the already figured out bucket to add
1327  *        the peer to
1328  * @param latency the core reported latency of this peer
1329  * @param distance the transport level distance to this peer
1330  *
1331  * @return the newly added PeerInfo
1332  */
1333 static struct PeerInfo *
1334 add_peer(const struct GNUNET_PeerIdentity *peer,
1335          unsigned int bucket,
1336          struct GNUNET_TIME_Relative latency,
1337          unsigned int distance)
1338 {
1339   struct PeerInfo *new_peer;
1340   GNUNET_assert(bucket < MAX_BUCKETS);
1341   GNUNET_assert(peer != NULL);
1342   new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
1343   new_peer->latency = latency;
1344   new_peer->distance = distance;
1345
1346   memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
1347
1348   GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
1349                                     k_buckets[bucket].tail,
1350                                     k_buckets[bucket].tail,
1351                                     new_peer);
1352   k_buckets[bucket].peers_size++;
1353
1354   if ((matching_bits(&my_identity.hashPubKey, &peer->hashPubKey) > 0) && (k_buckets[bucket].peers_size <= bucket_size))
1355     {
1356       new_peer->preference_task = GNUNET_SCHEDULER_add_now(sched, &update_core_preference, new_peer);
1357     }
1358
1359   return new_peer;
1360 }
1361
1362 /**
1363  * Given a peer and its corresponding bucket,
1364  * remove it from that bucket.  Does not free
1365  * the PeerInfo struct, nor cancel messages
1366  * or free messages waiting to be sent to this
1367  * peer!
1368  *
1369  * @param peer the peer to remove
1370  * @param bucket the bucket the peer belongs to
1371  */
1372 static void remove_peer (struct PeerInfo *peer,
1373                          unsigned int bucket)
1374 {
1375   GNUNET_assert(k_buckets[bucket].peers_size > 0);
1376   GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
1377                               k_buckets[bucket].tail,
1378                               peer);
1379   k_buckets[bucket].peers_size--;
1380   if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) && (lowest_bucket < MAX_BUCKETS - 1))
1381     lowest_bucket++;
1382 }
1383
1384 /**
1385  * Removes peer from a bucket, then frees associated
1386  * resources and frees peer.
1387  *
1388  * @param peer peer to be removed and freed
1389  * @param bucket which bucket this peer belongs to
1390  */
1391 static void delete_peer (struct PeerInfo *peer,
1392                          unsigned int bucket)
1393 {
1394   struct P2PPendingMessage *pos;
1395   struct P2PPendingMessage *next;
1396 #if EXTRA_CHECKS
1397   struct PeerInfo *peer_pos;
1398
1399   peer_pos = k_buckets[bucket].head;
1400   while ((peer_pos != NULL) && (peer_pos != peer))
1401     peer_pos = peer_pos->next;
1402   if (peer_pos == NULL)
1403     {
1404       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Expected peer `%s' in bucket %d\n", my_short_id, "DHT", GNUNET_i2s(&peer->id), bucket);
1405       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Lowest bucket: %d, find_current_bucket: %d, peer resides in bucket: %d\n", my_short_id, "DHT", lowest_bucket, find_current_bucket(&peer->id.hashPubKey), find_bucket_by_peer(peer));
1406     }
1407   GNUNET_assert(peer_pos != NULL);
1408 #endif
1409   remove_peer(peer, bucket); /* First remove the peer from its bucket */
1410
1411   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
1412     GNUNET_SCHEDULER_cancel(sched, peer->send_task);
1413   if (peer->th != NULL)
1414     GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
1415
1416   pos = peer->head;
1417   while (pos != NULL) /* Remove any pending messages for this peer */
1418     {
1419       next = pos->next;
1420       GNUNET_free(pos);
1421       pos = next;
1422     }
1423
1424   GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey));
1425   GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer);
1426   GNUNET_free(peer);
1427 }
1428
1429
1430 /**
1431  * Iterator over hash map entries.
1432  *
1433  * @param cls closure
1434  * @param key current key code
1435  * @param value PeerInfo of the peer to move to new lowest bucket
1436  * @return GNUNET_YES if we should continue to
1437  *         iterate,
1438  *         GNUNET_NO if not.
1439  */
1440 static int move_lowest_bucket (void *cls,
1441                                const GNUNET_HashCode * key,
1442                                void *value)
1443 {
1444   struct PeerInfo *peer = value;
1445   int new_bucket;
1446
1447   GNUNET_assert(lowest_bucket > 0);
1448   new_bucket = lowest_bucket - 1;
1449   remove_peer(peer, lowest_bucket);
1450   GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
1451                                     k_buckets[new_bucket].tail,
1452                                     k_buckets[new_bucket].tail,
1453                                     peer);
1454   k_buckets[new_bucket].peers_size++;
1455   return GNUNET_YES;
1456 }
1457
1458
1459 /**
1460  * The current lowest bucket is full, so change the lowest
1461  * bucket to the next lower down, and move any appropriate
1462  * entries in the current lowest bucket to the new bucket.
1463  */
1464 static void enable_next_bucket()
1465 {
1466   struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1467   struct PeerInfo *pos;
1468   GNUNET_assert(lowest_bucket > 0);
1469   to_remove = GNUNET_CONTAINER_multihashmap_create(bucket_size);
1470   pos = k_buckets[lowest_bucket].head;
1471
1472 #if PRINT_TABLES
1473   fprintf(stderr, "Printing RT before new bucket\n");
1474   print_routing_table();
1475 #endif
1476   /* Populate the array of peers which should be in the next lowest bucket */
1477   while (pos != NULL)
1478     {
1479       if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
1480         GNUNET_CONTAINER_multihashmap_put(to_remove, &pos->id.hashPubKey, pos, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1481       pos = pos->next;
1482     }
1483
1484   /* Remove peers from lowest bucket, insert into next lowest bucket */
1485   GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL);
1486   GNUNET_CONTAINER_multihashmap_destroy(to_remove);
1487   lowest_bucket = lowest_bucket - 1;
1488 #if PRINT_TABLES
1489   fprintf(stderr, "Printing RT after new bucket\n");
1490   print_routing_table();
1491 #endif
1492 }
1493
1494 /**
1495  * Find the closest peer in our routing table to the
1496  * given hashcode.
1497  *
1498  * @return The closest peer in our routing table to the
1499  *         key, or NULL on error.
1500  */
1501 static struct PeerInfo *
1502 find_closest_peer (const GNUNET_HashCode *hc)
1503 {
1504   struct PeerInfo *pos;
1505   struct PeerInfo *current_closest;
1506   unsigned int lowest_distance;
1507   unsigned int temp_distance;
1508   int bucket;
1509   int count;
1510
1511   lowest_distance = -1;
1512
1513   if (k_buckets[lowest_bucket].peers_size == 0)
1514     return NULL;
1515
1516   current_closest = NULL;
1517   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1518     {
1519       pos = k_buckets[bucket].head;
1520       count = 0;
1521       while ((pos != NULL) && (count < bucket_size))
1522         {
1523           temp_distance = distance(&pos->id.hashPubKey, hc);
1524           if (temp_distance <= lowest_distance)
1525             {
1526               lowest_distance = temp_distance;
1527               current_closest = pos;
1528             }
1529           pos = pos->next;
1530           count++;
1531         }
1532     }
1533   GNUNET_assert(current_closest != NULL);
1534   return current_closest;
1535 }
1536
1537
1538 /**
1539  * Function called to send a request out to another peer.
1540  * Called both for locally initiated requests and those
1541  * received from other peers.
1542  *
1543  * @param cls DHT service closure argument (unused)
1544  * @param msg the encapsulated message
1545  * @param peer the peer to forward the message to
1546  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1547  */
1548 static void forward_message (void *cls,
1549                              const struct GNUNET_MessageHeader *msg,
1550                              struct PeerInfo *peer,
1551                              struct DHT_MessageContext *msg_ctx)
1552 {
1553   struct GNUNET_DHT_P2PRouteMessage *route_message;
1554   struct P2PPendingMessage *pending;
1555   size_t msize;
1556   size_t psize;
1557
1558   increment_stats(STAT_ROUTE_FORWARDS);
1559
1560   if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(&msg_ctx->key)))
1561     increment_stats(STAT_ROUTE_FORWARDS_CLOSEST);
1562
1563   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1564   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1565   psize = sizeof(struct P2PPendingMessage) + msize;
1566   pending = GNUNET_malloc(psize);
1567   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1568   pending->importance = msg_ctx->importance;
1569   pending->timeout = msg_ctx->timeout;
1570   route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1571   route_message->header.size = htons(msize);
1572   route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1573   route_message->options = htonl(msg_ctx->msg_options);
1574   route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1575   route_message->network_size = htonl(msg_ctx->network_size);
1576   route_message->desired_replication_level = htonl(msg_ctx->replication);
1577   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1578   if (msg_ctx->bloom != NULL)
1579     GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1580   memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
1581   memcpy(&route_message[1], msg, ntohs(msg->size));
1582 #if DEBUG_DHT > 1
1583   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1584 #endif
1585   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1586   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1587     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1588 }
1589
1590 #if DO_PING
1591 /**
1592  * Task used to send ping messages to peers so that
1593  * they don't get disconnected.
1594  *
1595  * @param cls the peer to send a ping message to
1596  * @param tc context, reason, etc.
1597  */
1598 static void
1599 periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1600 {
1601   struct PeerInfo *peer = cls;
1602   struct GNUNET_MessageHeader ping_message;
1603   struct DHT_MessageContext message_context;
1604
1605   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1606     return;
1607
1608   ping_message.size = htons(sizeof(struct GNUNET_MessageHeader));
1609   ping_message.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_PING);
1610
1611   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
1612 #if DEBUG_PING
1613   GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n", my_short_id, "DHT", GNUNET_i2s(&peer->id));
1614 #endif
1615   forward_message(NULL, &ping_message, peer, &message_context);
1616   peer->ping_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PING_DELAY, &periodic_ping_task, peer);
1617 }
1618
1619 /**
1620  * Schedule PING messages for the top X peers in each
1621  * bucket of the routing table (so core won't disconnect them!)
1622  */
1623 void schedule_ping_messages()
1624 {
1625   unsigned int bucket;
1626   unsigned int count;
1627   struct PeerInfo *pos;
1628   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1629     {
1630       pos = k_buckets[bucket].head;
1631       count = 0;
1632       while (pos != NULL)
1633         {
1634           if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK))
1635             GNUNET_SCHEDULER_add_now(sched, &periodic_ping_task, pos);
1636           else if ((count >= bucket_size) && (pos->ping_task != GNUNET_SCHEDULER_NO_TASK))
1637             {
1638               GNUNET_SCHEDULER_cancel(sched, pos->ping_task);
1639               pos->ping_task = GNUNET_SCHEDULER_NO_TASK;
1640             }
1641           pos = pos->next;
1642           count++;
1643         }
1644     }
1645 }
1646 #endif
1647
1648 /**
1649  * Attempt to add a peer to our k-buckets.
1650  *
1651  * @param peer, the peer identity of the peer being added
1652  *
1653  * @return NULL if the peer was not added,
1654  *         pointer to PeerInfo for new peer otherwise
1655  */
1656 static struct PeerInfo *
1657 try_add_peer(const struct GNUNET_PeerIdentity *peer,
1658              unsigned int bucket,
1659              struct GNUNET_TIME_Relative latency,
1660              unsigned int distance)
1661 {
1662   int peer_bucket;
1663   struct PeerInfo *new_peer;
1664   peer_bucket = find_current_bucket(&peer->hashPubKey);
1665   if (peer_bucket == GNUNET_SYSERR)
1666     return NULL;
1667
1668   GNUNET_assert(peer_bucket >= lowest_bucket);
1669   new_peer = add_peer(peer, peer_bucket, latency, distance);
1670
1671   if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
1672     enable_next_bucket();
1673 #if DO_PING
1674   schedule_ping_messages();
1675 #endif
1676   return new_peer;
1677 }
1678
1679
1680 /**
1681  * Task run to check for messages that need to be sent to a client.
1682  *
1683  * @param client a ClientList, containing the client and any messages to be sent to it
1684  */
1685 static void
1686 process_pending_messages (struct ClientList *client)
1687
1688   if (client->pending_head == NULL) 
1689     return;    
1690   if (client->transmit_handle != NULL) 
1691     return;
1692   client->transmit_handle =
1693     GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1694                                          ntohs (client->pending_head->msg->
1695                                                 size),
1696                                          GNUNET_TIME_UNIT_FOREVER_REL,
1697                                          &send_generic_reply, client);
1698 }
1699
1700 /**
1701  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1702  * request.  A ClientList is passed as closure, take the head of the list
1703  * and copy it into buf, which has the result of sending the message to the
1704  * client.
1705  *
1706  * @param cls closure to this call
1707  * @param size maximum number of bytes available to send
1708  * @param buf where to copy the actual message to
1709  *
1710  * @return the number of bytes actually copied, 0 indicates failure
1711  */
1712 static size_t
1713 send_generic_reply (void *cls, size_t size, void *buf)
1714 {
1715   struct ClientList *client = cls;
1716   char *cbuf = buf;
1717   struct PendingMessage *reply;
1718   size_t off;
1719   size_t msize;
1720
1721   client->transmit_handle = NULL;
1722   if (buf == NULL)             
1723     {
1724       /* client disconnected */
1725       return 0;
1726     }
1727   off = 0;
1728   while ( (NULL != (reply = client->pending_head)) &&
1729           (size >= off + (msize = ntohs (reply->msg->size))))
1730     {
1731       GNUNET_CONTAINER_DLL_remove (client->pending_head,
1732                                    client->pending_tail,
1733                                    reply);
1734       memcpy (&cbuf[off], reply->msg, msize);
1735       GNUNET_free (reply);
1736       off += msize;
1737     }
1738   process_pending_messages (client);
1739   return off;
1740 }
1741
1742
1743 /**
1744  * Add a PendingMessage to the clients list of messages to be sent
1745  *
1746  * @param client the active client to send the message to
1747  * @param pending_message the actual message to send
1748  */
1749 static void
1750 add_pending_message (struct ClientList *client,
1751                      struct PendingMessage *pending_message)
1752 {
1753   GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
1754                                      client->pending_tail,
1755                                      client->pending_tail,
1756                                      pending_message);
1757   process_pending_messages (client);
1758 }
1759
1760
1761
1762
1763 /**
1764  * Called when a reply needs to be sent to a client, as
1765  * a result it found to a GET or FIND PEER request.
1766  *
1767  * @param client the client to send the reply to
1768  * @param message the encapsulated message to send
1769  * @param uid the unique identifier of this request
1770  */
1771 static void
1772 send_reply_to_client (struct ClientList *client,
1773                       const struct GNUNET_MessageHeader *message,
1774                       unsigned long long uid)
1775 {
1776   struct GNUNET_DHT_RouteResultMessage *reply;
1777   struct PendingMessage *pending_message;
1778   uint16_t msize;
1779   size_t tsize;
1780 #if DEBUG_DHT
1781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782               "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
1783 #endif
1784   msize = ntohs (message->size);
1785   tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
1786   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1787     {
1788       GNUNET_break_op (0);
1789       return;
1790     }
1791
1792   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1793   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
1794   reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
1795   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1796   reply->header.size = htons (tsize);
1797   reply->unique_id = GNUNET_htonll (uid);
1798   memcpy (&reply[1], message, msize);
1799
1800   add_pending_message (client, pending_message);
1801 }
1802
1803 /**
1804  * Consider whether or not we would like to have this peer added to
1805  * our routing table.  Check whether bucket for this peer is full,
1806  * if so return negative; if not return positive.  Since peers are
1807  * only added on CORE level connect, this doesn't actually add the
1808  * peer to the routing table.
1809  *
1810  * @param peer the peer we are considering adding
1811  *
1812  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1813  *         already full)
1814  *
1815  * FIXME: Think about making a context for this call so that we can
1816  *        ping the oldest peer in the current bucket and consider
1817  *        removing it in lieu of the new peer.
1818  */
1819 static int consider_peer (struct GNUNET_PeerIdentity *peer)
1820 {
1821   int bucket;
1822
1823   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
1824     return GNUNET_NO; /* We already know this peer (are connected even!) */
1825   bucket = find_current_bucket(&peer->hashPubKey);
1826   if (bucket == GNUNET_SYSERR)
1827     return GNUNET_NO;
1828   if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1829     return GNUNET_YES;
1830
1831   return GNUNET_NO;
1832 }
1833
1834 /**
1835  * Main function that handles whether or not to route a result
1836  * message to other peers, or to send to our local client.
1837  *
1838  * @param msg the result message to be routed
1839  * @return the number of peers the message was routed to,
1840  *         GNUNET_SYSERR on failure
1841  */
1842 static int route_result_message(void *cls,
1843                                 struct GNUNET_MessageHeader *msg,
1844                                 struct DHT_MessageContext *message_context)
1845 {
1846   struct GNUNET_PeerIdentity new_peer;
1847   struct DHTQueryRecord *record;
1848   struct DHTRouteSource *pos;
1849   struct PeerInfo *peer_info;
1850   const struct GNUNET_MessageHeader *hello_msg;
1851
1852   increment_stats(STAT_RESULTS);
1853   /**
1854    * If a find peer result message is received and contains a valid
1855    * HELLO for another peer, offer it to the transport service.
1856    */
1857   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1858     {
1859       if (ntohs(msg->size) <= sizeof(struct GNUNET_MessageHeader))
1860         GNUNET_break_op(0);
1861
1862       hello_msg = &msg[1];
1863       if ((ntohs(hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) || (GNUNET_SYSERR == GNUNET_HELLO_get_id((const struct GNUNET_HELLO_Message *)hello_msg, &new_peer)))
1864       {
1865         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received non-HELLO message type in find peer result message!\n", my_short_id, "DHT");
1866         GNUNET_break_op(0);
1867         return GNUNET_NO;
1868       }
1869       else /* We have a valid hello, and peer id stored in new_peer */
1870       {
1871         find_peer_context.count++;
1872         increment_stats(STAT_FIND_PEER_REPLY);
1873         if (GNUNET_YES == consider_peer(&new_peer))
1874         {
1875           increment_stats(STAT_HELLOS_PROVIDED);
1876           GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg);
1877           GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, &new_peer, NULL, NULL);
1878           /* peer_request_connect call causes service to segfault */
1879           /* FIXME: Do we need this (peer_request_connect call)??? */
1880         }
1881       }
1882     }
1883
1884   if (malicious_dropper == GNUNET_YES)
1885     record = NULL;
1886   else
1887     record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &message_context->key);
1888
1889   if (record == NULL) /* No record of this message! */
1890     {
1891 #if DEBUG_DHT
1892     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1893                 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
1894                 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1895 #endif
1896 #if DEBUG_DHT_ROUTING
1897
1898       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1899         {
1900           dhtlog_handle->insert_route (NULL,
1901                                        message_context->unique_id,
1902                                        DHTLOG_RESULT,
1903                                        message_context->hop_count,
1904                                        GNUNET_SYSERR,
1905                                        &my_identity,
1906                                        &message_context->key,
1907                                        message_context->peer, NULL);
1908         }
1909 #endif
1910       if (message_context->bloom != NULL)
1911         {
1912           GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1913           message_context->bloom = NULL;
1914         }
1915       return 0;
1916     }
1917
1918   pos = record->head;
1919   while (pos != NULL)
1920     {
1921 #if STRICT_FORWARDING
1922       if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) /* If we have already forwarded this peer id, don't do it again! */
1923         {
1924           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pos->find_peers_responded, &new_peer.hashPubKey))
1925           {
1926             increment_stats("# find peer responses NOT forwarded (bloom match)");
1927             pos = pos->next;
1928             continue;
1929           }
1930           else
1931             GNUNET_CONTAINER_bloomfilter_add(pos->find_peers_responded, &new_peer.hashPubKey);
1932         }
1933 #endif
1934
1935       if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
1936         {
1937 #if DEBUG_DHT
1938           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939                       "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
1940                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1941 #endif
1942 #if DEBUG_DHT_ROUTING
1943           if ((debug_routes_extended) && (dhtlog_handle != NULL))
1944             {
1945               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
1946                                            message_context->hop_count,
1947                                            GNUNET_YES, &my_identity, &message_context->key,
1948                                            message_context->peer, NULL);
1949             }
1950 #endif
1951           increment_stats(STAT_RESULTS_TO_CLIENT);
1952           if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
1953             increment_stats(STAT_GET_REPLY);
1954
1955           send_reply_to_client(pos->client, msg, message_context->unique_id);
1956         }
1957       else /* Send to peer */
1958         {
1959           peer_info = find_peer_by_id(&pos->source);
1960           if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1961             {
1962               pos = pos->next;
1963               continue;
1964             }
1965
1966           if (message_context->bloom == NULL)
1967             message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1968           GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1969           if ((GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (message_context->bloom, &peer_info->id.hashPubKey)))
1970             {
1971 #if DEBUG_DHT
1972               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973                           "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
1974                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1975 #endif
1976 #if DEBUG_DHT_ROUTING
1977               if ((debug_routes_extended) && (dhtlog_handle != NULL))
1978                 {
1979                   dhtlog_handle->insert_route (NULL, message_context->unique_id,
1980                                                DHTLOG_RESULT,
1981                                                message_context->hop_count,
1982                                                GNUNET_NO, &my_identity, &message_context->key,
1983                                                message_context->peer, &pos->source);
1984                 }
1985 #endif
1986               forward_result_message(cls, msg, peer_info, message_context);
1987             }
1988           else
1989             {
1990 #if DEBUG_DHT
1991               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1992                           "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
1993                           "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1994 #endif
1995             }
1996         }
1997       pos = pos->next;
1998     }
1999   if (message_context->bloom != NULL)
2000     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2001   return 0;
2002 }
2003
2004 /**
2005  * Iterator for local get request results,
2006  *
2007  * @param cls closure for iterator, a DatacacheGetContext
2008  * @param exp when does this value expire?
2009  * @param key the key this data is stored under
2010  * @param size the size of the data identified by key
2011  * @param data the actual data
2012  * @param type the type of the data
2013  *
2014  * @return GNUNET_OK to continue iteration, anything else
2015  * to stop iteration.
2016  */
2017 static int
2018 datacache_get_iterator (void *cls,
2019                         struct GNUNET_TIME_Absolute exp,
2020                         const GNUNET_HashCode * key,
2021                         uint32_t size, const char *data, uint32_t type)
2022 {
2023   struct DHT_MessageContext *msg_ctx = cls;
2024   struct DHT_MessageContext *new_msg_ctx;
2025   struct GNUNET_DHT_GetResultMessage *get_result;
2026 #if DEBUG_DHT
2027   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2028               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
2029 #endif
2030   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
2031   memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
2032   get_result =
2033     GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
2034   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2035   get_result->header.size =
2036     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
2037   get_result->expiration = GNUNET_TIME_absolute_hton(exp);
2038   get_result->type = htons (type);
2039   memcpy (&get_result[1], data, size);
2040   new_msg_ctx->peer = &my_identity;
2041   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2042   new_msg_ctx->hop_count = 0;
2043   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make result routing a higher priority */
2044   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2045   increment_stats(STAT_GET_RESPONSE_START);
2046   route_result_message(cls, &get_result->header, new_msg_ctx);
2047   GNUNET_free(new_msg_ctx);
2048   //send_reply_to_client (datacache_get_ctx->client, &get_result->header,
2049   //                      datacache_get_ctx->unique_id);
2050   GNUNET_free (get_result);
2051   return GNUNET_OK;
2052 }
2053
2054
2055 /**
2056  * Server handler for all dht get requests, look for data,
2057  * if found, send response either to clients or other peers.
2058  *
2059  * @param cls closure for service
2060  * @param msg the actual get message
2061  * @param message_context struct containing pertinent information about the get request
2062  *
2063  * @return number of items found for GET request
2064  */
2065 static unsigned int
2066 handle_dht_get (void *cls, 
2067                 const struct GNUNET_MessageHeader *msg,
2068                 struct DHT_MessageContext *message_context)
2069 {
2070   const struct GNUNET_DHT_GetMessage *get_msg;
2071   uint16_t get_type;
2072   unsigned int results;
2073
2074   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
2075   if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
2076     {
2077       GNUNET_break (0);
2078       return 0;
2079     }
2080
2081   get_type = ntohs (get_msg->type);
2082 #if DEBUG_DHT
2083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2084               "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n", my_short_id,
2085               "DHT", "GET", get_type, GNUNET_h2s (message_context->key),
2086               message_context->unique_id);
2087 #endif
2088   increment_stats(STAT_GETS);
2089   results = 0;
2090   if (get_type == DHT_MALICIOUS_MESSAGE_TYPE)
2091     return results;
2092
2093   if (datacache != NULL)
2094     results =
2095       GNUNET_DATACACHE_get (datacache, &message_context->key, get_type,
2096                             &datacache_get_iterator, message_context);
2097
2098   if (results >= 1)
2099     {
2100 #if DEBUG_DHT
2101       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102                   "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
2103                   results, "GET", message_context->unique_id);
2104 #endif
2105 #if DEBUG_DHT_ROUTING
2106       if ((debug_routes) && (dhtlog_handle != NULL))
2107         {
2108           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
2109                                 message_context->hop_count, GNUNET_YES, &my_identity,
2110                                 &message_context->key);
2111         }
2112
2113       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2114         {
2115           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2116                                        message_context->hop_count, GNUNET_YES,
2117                                        &my_identity, &message_context->key, message_context->peer,
2118                                        NULL);
2119         }
2120 #endif
2121     }
2122
2123   if (message_context->hop_count == 0) /* Locally initiated request */
2124     {
2125 #if DEBUG_DHT_ROUTING
2126     if ((debug_routes) && (dhtlog_handle != NULL))
2127       {
2128         dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
2129                                       message_context->hop_count, GNUNET_NO, &my_identity,
2130                                       &message_context->key);
2131       }
2132 #endif
2133     }
2134
2135   return results;
2136 }
2137
2138 static void
2139 remove_recent_find_peer(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2140 {
2141   GNUNET_HashCode *key = cls;
2142   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent_find_peer_requests, key, key))
2143     {
2144       GNUNET_free(key);
2145     }
2146 }
2147
2148 /**
2149  * Server handler for initiating local dht find peer requests
2150  *
2151  * @param cls closure for service
2152  * @param find_msg the actual find peer message
2153  * @param message_context struct containing pertinent information about the request
2154  *
2155  */
2156 static void
2157 handle_dht_find_peer (void *cls, 
2158                       const struct GNUNET_MessageHeader *find_msg,
2159                       struct DHT_MessageContext *message_context)
2160 {
2161   struct GNUNET_MessageHeader *find_peer_result;
2162   struct GNUNET_DHT_FindPeerMessage *find_peer_message;
2163   struct DHT_MessageContext *new_msg_ctx;
2164   struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
2165   size_t hello_size;
2166   size_t tsize;
2167   GNUNET_HashCode *recent_hash;
2168 #if RESTRICT_FIND_PEER
2169   struct GNUNET_PeerIdentity peer_id;
2170 #endif
2171
2172   find_peer_message = (struct GNUNET_DHT_FindPeerMessage *)find_msg;
2173 #if DEBUG_DHT
2174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2175               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
2176               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
2177               ntohs (find_msg->size),
2178               sizeof (struct GNUNET_MessageHeader));
2179 #endif
2180   if (my_hello == NULL)
2181   {
2182 #if DEBUG_DHT
2183     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2184                 "`%s': Our HELLO is null, can't return.\n",
2185                 "DHT");
2186 #endif
2187     return;
2188   }
2189
2190   incoming_bloom = GNUNET_CONTAINER_bloomfilter_init(find_peer_message->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2191   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(incoming_bloom, &my_identity.hashPubKey))
2192     {
2193       increment_stats(STAT_BLOOM_FIND_PEER);
2194       GNUNET_CONTAINER_bloomfilter_free(incoming_bloom);
2195       return; /* We match the bloomfilter, do not send a response to this peer (they likely already know us!)*/
2196     }
2197   GNUNET_CONTAINER_bloomfilter_free(incoming_bloom);
2198
2199 #if RESTRICT_FIND_PEER
2200
2201   /**
2202    * Ignore any find peer requests from a peer we have seen very recently.
2203    */
2204   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent_find_peer_requests, &message_context->key)) /* We have recently responded to a find peer request for this peer! */
2205   {
2206     increment_stats("# dht find peer requests ignored (recently seen!)");
2207     return;
2208   }
2209
2210   /**
2211    * Use this check to only allow the peer to respond to find peer requests if
2212    * it would be beneficial to have the requesting peer in this peers routing
2213    * table.  Can be used to thwart peers flooding the network with find peer
2214    * requests that we don't care about.  However, if a new peer is joining
2215    * the network and has no other peers this is a problem (assume all buckets
2216    * full, no one will respond!).
2217    */
2218   memcpy(&peer_id.hashPubKey, &message_context->key, sizeof(GNUNET_HashCode));
2219   if (GNUNET_NO == consider_peer(&peer_id))
2220     {
2221       increment_stats("# dht find peer requests ignored (do not need!)");
2222       return;
2223     }
2224 #endif
2225
2226   recent_hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
2227   memcpy(recent_hash, &message_context->key, sizeof(GNUNET_HashCode));
2228   GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests, &message_context->key, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2229   GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30), &remove_recent_find_peer, &recent_hash);
2230
2231   /* Simplistic find_peer functionality, always return our hello */
2232   hello_size = ntohs(my_hello->size);
2233   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
2234
2235   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2236     {
2237       GNUNET_break_op (0);
2238       return;
2239     }
2240
2241   find_peer_result = GNUNET_malloc (tsize);
2242   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
2243   find_peer_result->size = htons (tsize);
2244   memcpy (&find_peer_result[1], my_hello, hello_size);
2245
2246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2247               "`%s': Sending hello size %d to requesting peer.\n",
2248               "DHT", hello_size);
2249
2250   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
2251   memcpy(new_msg_ctx, message_context, sizeof(struct DHT_MessageContext));
2252   new_msg_ctx->peer = &my_identity;
2253   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2254   new_msg_ctx->hop_count = 0;
2255   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make find peer requests a higher priority */
2256   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2257   increment_stats(STAT_FIND_PEER_ANSWER);
2258   route_result_message(cls, find_peer_result, new_msg_ctx);
2259   GNUNET_free(new_msg_ctx);
2260 #if DEBUG_DHT_ROUTING
2261   if ((debug_routes) && (dhtlog_handle != NULL))
2262     {
2263       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
2264                                    message_context->hop_count, GNUNET_YES, &my_identity,
2265                                    &message_context->key);
2266     }
2267 #endif
2268   GNUNET_free(find_peer_result);
2269 }
2270
2271
2272 /**
2273  * Server handler for initiating local dht put requests
2274  *
2275  * @param cls closure for service
2276  * @param msg the actual put message
2277  * @param message_context struct containing pertinent information about the request
2278  */
2279 static void
2280 handle_dht_put (void *cls,
2281                 const struct GNUNET_MessageHeader *msg,
2282                 struct DHT_MessageContext *message_context)
2283 {
2284   struct GNUNET_DHT_PutMessage *put_msg;
2285   size_t put_type;
2286   size_t data_size;
2287
2288   GNUNET_assert (ntohs (msg->size) >=
2289                  sizeof (struct GNUNET_DHT_PutMessage));
2290
2291
2292   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
2293   put_type = ntohs (put_msg->type);
2294
2295   if (put_type == DHT_MALICIOUS_MESSAGE_TYPE)
2296     return;
2297
2298   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
2299 #if DEBUG_DHT
2300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2301               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2302               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (message_context->key), message_context->unique_id);
2303 #endif
2304 #if DEBUG_DHT_ROUTING
2305   if (message_context->hop_count == 0) /* Locally initiated request */
2306     {
2307       if ((debug_routes) && (dhtlog_handle != NULL))
2308         {
2309           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
2310                                        message_context->hop_count, GNUNET_NO, &my_identity,
2311                                        &message_context->key);
2312         }
2313     }
2314 #endif
2315
2316   if (message_context->closest != GNUNET_YES)
2317     return;
2318
2319 #if DEBUG_DHT_ROUTING
2320   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2321     {
2322       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2323                                    message_context->hop_count, GNUNET_YES,
2324                                    &my_identity, &message_context->key, message_context->peer,
2325                                    NULL);
2326     }
2327
2328   if ((debug_routes) && (dhtlog_handle != NULL))
2329     {
2330       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
2331                                    message_context->hop_count, GNUNET_YES, &my_identity,
2332                                    &message_context->key);
2333     }
2334 #endif
2335
2336   increment_stats(STAT_PUTS_INSERTED);
2337   if (datacache != NULL)
2338     GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
2339                           (char *) &put_msg[1], put_type,
2340                           GNUNET_TIME_absolute_ntoh(put_msg->expiration));
2341   else
2342     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2343                 "`%s:%s': %s request received, but have no datacache!\n",
2344                 my_short_id, "DHT", "PUT");
2345 }
2346
2347 /**
2348  * Estimate the diameter of the network based
2349  * on how many buckets are currently in use.
2350  * Concept here is that the diameter of the network
2351  * is roughly the distance a message must travel in
2352  * order to reach its intended destination.  Since
2353  * at each hop we expect to get one bit closer, and
2354  * we have one bit per bucket, the number of buckets
2355  * in use should be the largest number of hops for
2356  * a sucessful message. (of course, this assumes we
2357  * know all peers in the network!)
2358  *
2359  * @return ballpark diameter figure
2360  */
2361 static unsigned int estimate_diameter()
2362 {
2363   return MAX_BUCKETS - lowest_bucket;
2364 }
2365
2366 /**
2367  * To how many peers should we (on average)
2368  * forward the request to obtain the desired
2369  * target_replication count (on average).
2370  *
2371  * Always 0, 1 or 2 (don't send, send once, split)
2372  */
2373 static unsigned int
2374 get_forward_count (unsigned int hop_count, size_t target_replication)
2375 {
2376 #if DOUBLE
2377   double target_count;
2378   double random_probability;
2379 #else
2380   uint32_t random_value;
2381 #endif
2382   unsigned int target_value;
2383   unsigned int diameter;
2384
2385   /**
2386    * If we are behaving in strict kademlia mode, send multiple initial requests,
2387    * but then only send to 1 or 0 peers based strictly on the number of hops.
2388    */
2389   if (strict_kademlia == GNUNET_YES)
2390     {
2391       if (hop_count == 0)
2392         return DHT_KADEMLIA_REPLICATION;
2393       else if (hop_count < MAX_HOPS)
2394         return 1;
2395       else
2396         return 0;
2397     }
2398
2399   /* FIXME: the smaller we think the network is the more lenient we should be for
2400    * routing right?  The estimation below only works if we think we have reasonably
2401    * full routing tables, which for our RR topologies may not be the case!
2402    */
2403   diameter = estimate_diameter ();
2404   if ((hop_count > (diameter + 1) * 2) && (MINIMUM_PEER_THRESHOLD < estimate_diameter() * bucket_size))
2405     {
2406 #if DEBUG_DHT
2407       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2408                   "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
2409                   "DHT", estimate_diameter(), lowest_bucket);
2410 #endif
2411       return 0;
2412     }
2413   else if (hop_count > MAX_HOPS)
2414     {
2415 #if DEBUG_DHT
2416       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2417                   "`%s:%s': Hop count too high (greater than max)\n", my_short_id,
2418                   "DHT");
2419 #endif
2420       return 0;
2421     }
2422
2423 #if DOUBLE
2424   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Replication %d, hop_count %u, diameter %u\n", target_replication, hop_count, diameter);
2425   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Numerator %f, denominator %f\n", (double)target_replication, ((double)target_replication * (hop_count + 1) + diameter));
2426   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
2427     (double)target_replication / ((double)target_replication * (hop_count + 1) + diameter);
2428   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Target count is %f\n", target_count);
2429   random_probability = ((double)GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2430       RAND_MAX)) / RAND_MAX;
2431   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Random is %f\n", random_probability);
2432
2433   target_value = 0;
2434   //while (target_value < target_count)
2435   if (target_value < target_count)
2436     target_value++; /* target_value is ALWAYS 1 after this "loop", right?  Because target_count is always > 0, right?  Or does it become 0.00000... at some point because the hop count is so high? */
2437
2438
2439   //if ((target_count + 1 - (double)target_value) > random_probability)
2440   if ((target_count) > random_probability)
2441     target_value++;
2442 #endif
2443
2444   random_value = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_STRONG, target_replication * (hop_count + 1) + diameter) + 1;
2445   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "replication %u, at hop %d, will split with probability %f\n", target_replication, hop_count, target_replication / (double)((target_replication * (hop_count + 1) + diameter) + 1));
2446   target_value = 1;
2447   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "random %u, target %u, max %u\n", random_value, target_replication, target_replication * (hop_count + 1) + diameter);
2448   if (random_value < target_replication)
2449     target_value++;
2450
2451   return target_value;
2452 }
2453
2454 /*
2455  * Check whether my identity is closer than any known peers.
2456  * If a non-null bloomfilter is given, check if this is the closest
2457  * peer that hasn't already been routed to.
2458  *
2459  * @param target hash code to check closeness to
2460  * @param bloom bloomfilter, exclude these entries from the decision
2461  *
2462  * Return GNUNET_YES if node location is closest, GNUNET_NO
2463  * otherwise.
2464  */
2465 int
2466 am_closest_peer (const GNUNET_HashCode * target, struct GNUNET_CONTAINER_BloomFilter *bloom)
2467 {
2468   int bits;
2469   int other_bits;
2470   int bucket_num;
2471   int count;
2472   struct PeerInfo *pos;
2473 #if INTEGER_DISTANCE
2474   unsigned int my_distance;
2475 #endif
2476   bucket_num = find_current_bucket(target);
2477   if (bucket_num == GNUNET_SYSERR) /* Same key! */
2478     return GNUNET_YES;
2479
2480   bits = matching_bits(&my_identity.hashPubKey, target);
2481 #if INTEGER_DISTANCE
2482   my_distance = distance(&my_identity.hashPubKey, target);
2483 #endif
2484   pos = k_buckets[bucket_num].head;
2485   count = 0;
2486   while ((pos != NULL) && (count < bucket_size))
2487     {
2488       if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey)))
2489         {
2490           pos = pos->next;
2491           continue; /* Skip already checked entries */
2492         }
2493
2494       other_bits = matching_bits(&pos->id.hashPubKey, target);
2495       if (other_bits > bits)
2496         return GNUNET_NO;
2497       else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
2498         {
2499           return GNUNET_YES;
2500           /* FIXME: why not just return GNUNET_YES here?  We are certainly close. */
2501           /*if (distance(&pos->id.hashPubKey, target) < my_distance)
2502             return GNUNET_NO;*/
2503         }
2504       pos = pos->next;
2505     }
2506
2507 #if DEBUG_TABLE
2508   GNUNET_GE_LOG (coreAPI->ectx,
2509                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2510                  GNUNET_GE_BULK, "closest peer\n");
2511   printPeerBits (&closest);
2512   GNUNET_GE_LOG (coreAPI->ectx,
2513                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2514                  GNUNET_GE_BULK, "me\n");
2515   printPeerBits (coreAPI->my_identity);
2516   GNUNET_GE_LOG (coreAPI->ectx,
2517                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2518                  GNUNET_GE_BULK, "key\n");
2519   printKeyBits (target);
2520   GNUNET_GE_LOG (coreAPI->ectx,
2521                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2522                  GNUNET_GE_BULK,
2523                  "closest peer inverse distance is %u, mine is %u\n",
2524                  inverse_distance (target, &closest.hashPubKey),
2525                  inverse_distance (target,
2526                                    &coreAPI->my_identity->hashPubKey));
2527 #endif
2528
2529   /* No peers closer, we are the closest! */
2530   return GNUNET_YES;
2531
2532 }
2533
2534 /**
2535  * Select a peer from the routing table that would be a good routing
2536  * destination for sending a message for "target".  The resulting peer
2537  * must not be in the set of blocked peers.<p>
2538  *
2539  * Note that we should not ALWAYS select the closest peer to the
2540  * target, peers further away from the target should be chosen with
2541  * exponentially declining probability.
2542  *
2543  * @param target the key we are selecting a peer to route to
2544  * @param bloom a bloomfilter containing entries this request has seen already
2545  *
2546  * @return Peer to route to, or NULL on error
2547  */
2548 static struct PeerInfo *
2549 select_peer (const GNUNET_HashCode * target,
2550              struct GNUNET_CONTAINER_BloomFilter *bloom)
2551 {
2552   unsigned int distance;
2553   unsigned int bc;
2554   unsigned int count;
2555   struct PeerInfo *pos;
2556   struct PeerInfo *chosen;
2557   unsigned long long largest_distance;
2558   unsigned long long total_distance;
2559   unsigned long long selected;
2560
2561   if (strict_kademlia == GNUNET_YES)
2562     {
2563       largest_distance = 0;
2564       chosen = NULL;
2565       for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2566         {
2567           pos = k_buckets[bc].head;
2568           count = 0;
2569           while ((pos != NULL) && (count < bucket_size))
2570             {
2571               /* If we are doing strict Kademlia like routing, then checking the bloomfilter is basically cheating! */
2572               if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2573                 {
2574                   distance = inverse_distance (target, &pos->id.hashPubKey);
2575                   if (distance > largest_distance)
2576                     {
2577                       chosen = pos;
2578                       largest_distance = distance;
2579                     }
2580                 }
2581               count++;
2582               pos = pos->next;
2583             }
2584         }
2585
2586       if ((largest_distance > 0) && (chosen != NULL))
2587         {
2588           GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
2589           return chosen;
2590         }
2591       else
2592         {
2593           return NULL;
2594         }
2595     }
2596   else
2597     {
2598       /* GNUnet-style */
2599       total_distance = 0;
2600       for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2601         {
2602           pos = k_buckets[bc].head;
2603           count = 0;
2604           while ((pos != NULL) && (count < bucket_size))
2605             {
2606               if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2607                 total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
2608   #if DEBUG_DHT > 1
2609               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2610                           "`%s:%s': Total distance is %llu, distance from %s to %s is %u\n",
2611                           my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey));
2612   #endif
2613               pos = pos->next;
2614               count++;
2615             }
2616         }
2617       if (total_distance == 0)
2618         {
2619           return NULL;
2620         }
2621
2622       selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
2623       for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2624         {
2625           pos = k_buckets[bc].head;
2626           count = 0;
2627           while ((pos != NULL) && (count < bucket_size))
2628             {
2629               if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2630                 {
2631                   distance = inverse_distance (target, &pos->id.hashPubKey);
2632                   if (distance > selected)
2633                     return pos;
2634                   selected -= distance;
2635                 }
2636               else
2637                 {
2638   #if DEBUG_DHT
2639                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2640                               "`%s:%s': peer %s matches bloomfilter.\n",
2641                               my_short_id, "DHT", GNUNET_i2s(&pos->id));
2642   #endif
2643                 }
2644               pos = pos->next;
2645               count++;
2646             }
2647         }
2648   #if DEBUG_DHT
2649         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2650                     "`%s:%s': peer %s matches bloomfilter.\n",
2651                     my_short_id, "DHT", GNUNET_i2s(&pos->id));
2652   #endif
2653       return NULL;
2654     }
2655 }
2656
2657 /**
2658  * Task used to remove recent entries, either
2659  * after timeout, when full, or on shutdown.
2660  *
2661  * @param cls the entry to remove
2662  * @param tc context, reason, etc.
2663  */
2664 static void
2665 remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2666 {
2667   struct RecentRequest *req = cls;
2668   static GNUNET_HashCode hash;
2669
2670   GNUNET_assert(req != NULL);
2671   hash_from_uid(req->uid, &hash);
2672   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req));
2673   GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node);
2674   GNUNET_CONTAINER_bloomfilter_free(req->bloom);
2675   GNUNET_free(req);
2676
2677   if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
2678   {
2679     GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
2680     GNUNET_CONTAINER_heap_destroy(recent.minHeap);
2681   }
2682 }
2683
2684
2685 /**
2686  * Task used to remove forwarding entries, either
2687  * after timeout, when full, or on shutdown.
2688  *
2689  * @param cls the entry to remove
2690  * @param tc context, reason, etc.
2691  */
2692 static void
2693 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2694 {
2695   struct DHTRouteSource *source_info = cls;
2696   struct DHTQueryRecord *record;
2697   source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
2698   record = source_info->record;
2699   GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
2700
2701   if (record->head == NULL) /* No more entries in DLL */
2702     {
2703       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
2704       GNUNET_free(record);
2705     }
2706   if (source_info->find_peers_responded != NULL)
2707     GNUNET_CONTAINER_bloomfilter_free(source_info->find_peers_responded);
2708   GNUNET_free(source_info);
2709 }
2710
2711 /**
2712  * Remember this routing request so that if a reply is
2713  * received we can either forward it to the correct peer
2714  * or return the result locally.
2715  *
2716  * @param cls DHT service closure
2717  * @param msg_ctx Context of the route request
2718  *
2719  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
2720  */
2721 static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
2722 {
2723   struct DHTQueryRecord *record;
2724   struct DHTRouteSource *source_info;
2725   struct DHTRouteSource *pos;
2726   struct GNUNET_TIME_Absolute now;
2727   unsigned int current_size;
2728
2729   current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
2730   while (current_size >= MAX_OUTSTANDING_FORWARDS)
2731     {
2732       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
2733       GNUNET_assert(source_info != NULL);
2734       record = source_info->record;
2735       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
2736       if (record->head == NULL) /* No more entries in DLL */
2737         {
2738           GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
2739           GNUNET_free(record);
2740         }
2741       GNUNET_SCHEDULER_cancel(sched, source_info->delete_task);
2742       if (source_info->find_peers_responded != NULL)
2743         GNUNET_CONTAINER_bloomfilter_free(source_info->find_peers_responded);
2744       GNUNET_free(source_info);
2745       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
2746     }
2747   now = GNUNET_TIME_absolute_get();
2748   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key);
2749   if (record != NULL) /* Already know this request! */
2750     {
2751       pos = record->head;
2752       while (pos != NULL)
2753         {
2754           if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
2755             break; /* Already have this peer in reply list! */
2756           pos = pos->next;
2757         }
2758       if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
2759         {
2760           GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.value);
2761           return GNUNET_NO;
2762         }
2763     }
2764   else
2765     {
2766       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
2767       GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2768       memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
2769     }
2770
2771   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
2772   source_info->record = record;
2773   source_info->delete_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
2774   source_info->find_peers_responded = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2775   memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
2776   GNUNET_CONTAINER_DLL_insert_after(record->head, record->tail, record->tail, source_info);
2777   if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
2778     {
2779       source_info->client = msg_ctx->client;
2780       now = GNUNET_TIME_absolute_get_forever();
2781     }
2782   source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.value);
2783 #if DEBUG_DHT > 1
2784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2785                   "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
2786                   "DHT", GNUNET_h2s (msg_ctx->key), msg_ctx->unique_id);
2787 #endif
2788   return GNUNET_YES;
2789 }
2790
2791
2792 /**
2793  * Main function that handles whether or not to route a message to other
2794  * peers.
2795  *
2796  * @param cls closure for dht service (NULL)
2797  * @param msg the message to be routed
2798  * @param message_context the context containing all pertinent information about the message
2799  *
2800  * @return the number of peers the message was routed to,
2801  *         GNUNET_SYSERR on failure
2802  */
2803 static int route_message(void *cls,
2804                          const struct GNUNET_MessageHeader *msg,
2805                          struct DHT_MessageContext *message_context)
2806 {
2807   int i;
2808   int global_closest;
2809   struct PeerInfo *selected;
2810 #if DEBUG_DHT_ROUTING > 1
2811   struct PeerInfo *nearest;
2812 #endif
2813   unsigned int forward_count;
2814   struct RecentRequest *recent_req;
2815   GNUNET_HashCode unique_hash;
2816   char *stat_forward_count;
2817 #if DEBUG_DHT_ROUTING
2818   int ret;
2819 #endif
2820
2821   if (malicious_dropper == GNUNET_YES)
2822     {
2823 #if DEBUG_DHT_ROUTING
2824       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2825         {
2826           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2827                                        message_context->hop_count, GNUNET_SYSERR,
2828                                        &my_identity, &message_context->key, message_context->peer,
2829                                        NULL);
2830         }
2831 #endif
2832       if (message_context->bloom != NULL)
2833         GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2834       return 0;
2835     }
2836
2837   increment_stats(STAT_ROUTES);
2838   /* Semantics of this call means we find whether we are the closest peer out of those already
2839    * routed to on this messages path.
2840    */
2841   global_closest = am_closest_peer(&message_context->key, NULL);
2842   message_context->closest = am_closest_peer(&message_context->key, message_context->bloom);
2843   forward_count = get_forward_count(message_context->hop_count, message_context->replication);
2844   GNUNET_asprintf(&stat_forward_count, "# forward counts of %d", forward_count);
2845   increment_stats(stat_forward_count);
2846   GNUNET_free(stat_forward_count);
2847   if (message_context->bloom == NULL)
2848     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2849
2850   if ((stop_on_closest == GNUNET_YES) && (global_closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
2851     forward_count = 0;
2852
2853 #if DEBUG_DHT_ROUTING
2854   if (forward_count == 0)
2855     ret = GNUNET_SYSERR;
2856   else
2857     ret = GNUNET_NO;
2858
2859   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2860     {
2861       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2862                                    message_context->hop_count, ret,
2863                                    &my_identity, &message_context->key, message_context->peer,
2864                                    NULL);
2865     }
2866 #endif
2867
2868   switch (ntohs(msg->type))
2869     {
2870     case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
2871       cache_response (cls, message_context);
2872       if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
2873         forward_count = 0;
2874       break;
2875     case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding to reduce complexity?*/
2876       increment_stats(STAT_PUTS);
2877       message_context->closest = global_closest;
2878       handle_dht_put (cls, msg, message_context);
2879       break;
2880     case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
2881       increment_stats(STAT_FIND_PEER);
2882       if (((message_context->hop_count > 0) && (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))) || (message_context->client != NULL))
2883       {
2884         cache_response (cls, message_context);
2885         if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2886           handle_dht_find_peer (cls, msg, message_context);
2887       }
2888 #if DEBUG_DHT_ROUTING
2889       if (message_context->hop_count == 0) /* Locally initiated request */
2890         {
2891           if ((debug_routes) && (dhtlog_handle != NULL))
2892             {
2893               dhtlog_handle->insert_dhtkey(NULL, &message_context->key);
2894               dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
2895                                            message_context->hop_count, GNUNET_NO, &my_identity,
2896                                            &message_context->key);
2897             }
2898         }
2899 #endif
2900       break;
2901     default:
2902       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2903                   "`%s': Message type (%d) not handled\n", "DHT", ntohs(msg->type));
2904     }
2905
2906   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
2907   hash_from_uid(message_context->unique_id, &unique_hash);
2908   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent.hashmap, &unique_hash))
2909   {
2910       recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap, &unique_hash);
2911       GNUNET_assert(recent_req != NULL);
2912       if (0 != memcmp(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode)))
2913         increment_stats(STAT_DUPLICATE_UID);
2914       else
2915       {
2916         increment_stats(STAT_RECENT_SEEN);
2917         GNUNET_CONTAINER_bloomfilter_or2(message_context->bloom, recent_req->bloom, DHT_BLOOM_SIZE);
2918       }
2919     }
2920   else
2921     {
2922       recent_req = GNUNET_malloc(sizeof(struct RecentRequest));
2923       recent_req->uid = message_context->unique_id;
2924       memcpy(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode));
2925       recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req);
2926       recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap, recent_req, GNUNET_TIME_absolute_get().value);
2927       recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2928       GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2929     }
2930
2931   if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT)
2932     {
2933       recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap);
2934       GNUNET_assert(recent_req != NULL);
2935       GNUNET_SCHEDULER_cancel(sched, recent_req->remove_task);
2936       GNUNET_SCHEDULER_add_now(sched, &remove_recent, recent_req);
2937     }
2938
2939   for (i = 0; i < forward_count; i++)
2940     {
2941       selected = select_peer(&message_context->key, message_context->bloom);
2942
2943       if (selected != NULL)
2944         {
2945           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
2946 #if DEBUG_DHT_ROUTING > 1
2947           nearest = find_closest_peer(&message_context->key);
2948           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
2949           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2950                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
2951                       "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&selected->id), nearest_buf, matching_bits(&nearest->id.hashPubKey, message_context->key), distance(&nearest->id.hashPubKey, message_context->key));
2952           GNUNET_free(nearest_buf);
2953 #endif
2954           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2955             {
2956               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2957                                            message_context->hop_count, GNUNET_NO,
2958                                            &my_identity, &message_context->key, message_context->peer,
2959                                            &selected->id);
2960             }
2961           forward_message(cls, msg, selected, message_context);
2962         }
2963       else
2964         {
2965 #if DEBUG_DHT
2966           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2967                       "`%s:%s': No peers selected for forwarding.\n", my_short_id,
2968                       "DHT");
2969 #endif
2970         }
2971     }
2972 #if DEBUG_DHT_ROUTING > 1
2973   if (forward_count == 0)
2974     {
2975       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2976                   "`%s:%s': NOT Forwarding request key %s uid %llu to any peers\n", my_short_id,
2977                   "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
2978     }
2979 #endif
2980
2981   if (message_context->bloom != NULL)
2982     {
2983       GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom, message_context->bloom, DHT_BLOOM_SIZE);
2984       GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2985     }
2986
2987   return forward_count;
2988 }
2989
2990 /**
2991  * Find a client if it exists, add it otherwise.
2992  *
2993  * @param client the server handle to the client
2994  *
2995  * @return the client if found, a new client otherwise
2996  */
2997 static struct ClientList *
2998 find_active_client (struct GNUNET_SERVER_Client *client)
2999 {
3000   struct ClientList *pos = client_list;
3001   struct ClientList *ret;
3002
3003   while (pos != NULL)
3004     {
3005       if (pos->client_handle == client)
3006         return pos;
3007       pos = pos->next;
3008     }
3009
3010   ret = GNUNET_malloc (sizeof (struct ClientList));
3011   ret->client_handle = client;
3012   ret->next = client_list;
3013   client_list = ret;
3014   return ret;
3015 }
3016
3017 /**
3018  * Task to send a malicious put message across the network.
3019  *
3020  * @param cls closure for this task
3021  * @param tc the context under which the task is running
3022  */
3023 static void
3024 malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3025 {
3026   static struct GNUNET_DHT_PutMessage put_message;
3027   static struct DHT_MessageContext message_context;
3028   static GNUNET_HashCode key;
3029   uint32_t random_key;
3030
3031   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3032     return;
3033
3034   put_message.header.size = htons(sizeof(struct GNUNET_DHT_PutMessage));
3035   put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
3036   put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
3037   put_message.expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever());
3038   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3039   message_context.client = NULL;
3040   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
3041   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
3042   memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
3043   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3044   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3045   message_context.msg_options = ntohl (0);
3046   message_context.network_size = estimate_diameter();
3047   message_context.peer = &my_identity;
3048   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE; /* Make result routing a higher priority */
3049   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3050   if (dhtlog_handle != NULL)
3051     dhtlog_handle->insert_dhtkey(NULL, &key);
3052   increment_stats(STAT_PUT_START);
3053   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious PUT message with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
3054   route_message(NULL, &put_message.header, &message_context);
3055   GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_put_frequency), &malicious_put_task, NULL);
3056
3057 }
3058
3059 /**
3060  * Task to send a malicious put message across the network.
3061  *
3062  * @param cls closure for this task
3063  * @param tc the context under which the task is running
3064  */
3065 static void
3066 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3067 {
3068   static struct GNUNET_DHT_GetMessage get_message;
3069   struct DHT_MessageContext message_context;
3070   static GNUNET_HashCode key;
3071   uint32_t random_key;
3072
3073   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3074     return;
3075
3076   get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
3077   get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
3078   get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
3079   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3080   message_context.client = NULL;
3081   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
3082   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
3083   memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
3084   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3085   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3086   message_context.msg_options = ntohl (0);
3087   message_context.network_size = estimate_diameter();
3088   message_context.peer = &my_identity;
3089   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE; /* Make result routing a higher priority */
3090   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3091   if (dhtlog_handle != NULL)
3092     dhtlog_handle->insert_dhtkey(NULL, &key);
3093   increment_stats(STAT_GET_START);
3094   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
3095   route_message (NULL, &get_message.header, &message_context);
3096   GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_get_frequency), &malicious_get_task, NULL);
3097 }
3098
3099 /**
3100  * Iterator over hash map entries.
3101  *
3102  * @param cls closure
3103  * @param key current key code
3104  * @param value value in the hash map
3105  * @return GNUNET_YES if we should continue to
3106  *         iterate,
3107  *         GNUNET_NO if not.
3108  */
3109 static int
3110 add_known_to_bloom (void *cls,
3111                     const GNUNET_HashCode * key,
3112                     void *value)
3113 {
3114   struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
3115   GNUNET_CONTAINER_bloomfilter_add (bloom, key);
3116   return GNUNET_YES;
3117 }
3118
3119 /**
3120  * Task to send a find peer message for our own peer identifier
3121  * so that we can find the closest peers in the network to ourselves
3122  * and attempt to connect to them.
3123  *
3124  * @param cls closure for this task
3125  * @param tc the context under which the task is running
3126  */
3127 static void
3128 send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3129 {
3130   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
3131   struct DHT_MessageContext message_context;
3132   int ret;
3133   struct GNUNET_TIME_Relative next_send_time;
3134   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
3135 #if COUNT_INTERVAL
3136   struct GNUNET_TIME_Relative time_diff;
3137   struct GNUNET_TIME_Absolute end;
3138   double multiplier;
3139   double count_per_interval;
3140 #endif
3141   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3142     return;
3143
3144   if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If we are finding peers already, no need to send out our request right now! */
3145     {
3146       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since last find peer message sent!\n", newly_found_peers);
3147       GNUNET_SCHEDULER_add_delayed (sched,
3148                                     GNUNET_TIME_UNIT_MINUTES,
3149                                     &send_find_peer_message, NULL);
3150       newly_found_peers = 0;
3151       return;
3152     }
3153     
3154   increment_stats(STAT_FIND_PEER_START);
3155 #if COUNT_INTERVAL
3156   end = GNUNET_TIME_absolute_get();
3157   time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start, end);
3158
3159   if (time_diff.value > FIND_PEER_CALC_INTERVAL.value)
3160     {
3161       multiplier = time_diff.value / FIND_PEER_CALC_INTERVAL.value;
3162       count_per_interval = find_peer_context.count / multiplier;
3163     }
3164   else
3165     {
3166       multiplier = FIND_PEER_CALC_INTERVAL.value / time_diff.value;
3167       count_per_interval = find_peer_context.count * multiplier;
3168     }
3169 #endif
3170
3171   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage));
3172   find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
3173   find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
3174   temp_bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3175   GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom, temp_bloom);
3176   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom, find_peer_msg->bloomfilter, DHT_BLOOM_SIZE));
3177   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3178   memcpy(&message_context.key, &my_identity.hashPubKey, sizeof(GNUNET_HashCode));
3179   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
3180   message_context.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
3181   message_context.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
3182   message_context.network_size = estimate_diameter();
3183   message_context.peer = &my_identity;
3184   message_context.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
3185   message_context.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
3186
3187   ret = route_message(NULL, &find_peer_msg->header, &message_context);
3188   GNUNET_free(find_peer_msg);
3189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3190               "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT",
3191               "FIND PEER", ret);
3192   if (newly_found_peers < bucket_size)
3193     {
3194       next_send_time.value = (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) +
3195                               GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
3196                                                        DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2);
3197     }
3198   else
3199     {
3200       next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
3201                              GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
3202                                                       DHT_MAXIMUM_FIND_PEER_INTERVAL.value - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
3203     }
3204
3205   GNUNET_assert (next_send_time.value != 0);
3206   find_peer_context.count = 0;
3207   newly_found_peers = 0;
3208   find_peer_context.start = GNUNET_TIME_absolute_get();
3209   if (GNUNET_YES == do_find_peer)
3210   {
3211     GNUNET_SCHEDULER_add_delayed (sched,
3212                                   next_send_time,
3213                                   &send_find_peer_message, NULL);
3214   }
3215 }
3216
3217 /**
3218  * Handler for any generic DHT messages, calls the appropriate handler
3219  * depending on message type, sends confirmation if responses aren't otherwise
3220  * expected.
3221  *
3222  * @param cls closure for the service
3223  * @param client the client we received this message from
3224  * @param message the actual message received
3225  */
3226 static void
3227 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
3228                                 const struct GNUNET_MessageHeader *message)
3229 {
3230   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
3231   const struct GNUNET_MessageHeader *enc_msg;
3232   struct DHT_MessageContext message_context;
3233   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
3234 #if DEBUG_DHT
3235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3236               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
3237               my_short_id, "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
3238               GNUNET_ntohll (dht_msg->unique_id));
3239 #endif
3240 #if DEBUG_DHT_ROUTING
3241   if (dhtlog_handle != NULL)
3242     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
3243 #endif
3244   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3245   message_context.client = find_active_client (client);
3246   memcpy(&message_context.key, &dht_msg->key, sizeof(GNUNET_HashCode));
3247   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
3248   message_context.replication = ntohl (dht_msg->desired_replication_level);
3249   message_context.msg_options = ntohl (dht_msg->options);
3250   message_context.network_size = estimate_diameter();
3251   message_context.peer = &my_identity;
3252   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE * 4; /* Make local routing a higher priority */
3253   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3254   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
3255     increment_stats(STAT_GET_START);
3256   else if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
3257     increment_stats(STAT_PUT_START);
3258   else if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER)
3259     increment_stats(STAT_FIND_PEER_START);
3260
3261   route_message(cls, enc_msg, &message_context);
3262
3263   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3264
3265 }
3266
3267 /**
3268  * Handler for any locally received DHT control messages,
3269  * sets malicious flags mostly for now.
3270  *
3271  * @param cls closure for the service
3272  * @param client the client we received this message from
3273  * @param message the actual message received
3274  *
3275  */
3276 static void
3277 handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client,
3278                             const struct GNUNET_MessageHeader *message)
3279 {
3280   const struct GNUNET_DHT_ControlMessage *dht_control_msg =
3281       (const struct GNUNET_DHT_ControlMessage *) message;
3282 #if DEBUG_DHT
3283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3284               "`%s:%s': Received `%s' request from client, command %d\n", my_short_id, "DHT",
3285               "CONTROL", ntohs(dht_control_msg->command));
3286 #endif
3287
3288   switch (ntohs(dht_control_msg->command))
3289   {
3290   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
3291     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending self seeking find peer request!\n");
3292     GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL);
3293     break;
3294   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
3295     if (ntohs(dht_control_msg->variable) > 0)
3296       malicious_get_frequency = ntohs(dht_control_msg->variable);
3297     if (malicious_get_frequency == 0)
3298       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
3299     if (malicious_getter != GNUNET_YES)
3300       GNUNET_SCHEDULER_add_now(sched, &malicious_get_task, NULL);
3301     malicious_getter = GNUNET_YES;
3302     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Initiating malicious GET behavior, frequency %d\n", my_short_id, "DHT", malicious_get_frequency);
3303     break;
3304   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT:
3305     if (ntohs(dht_control_msg->variable) > 0)
3306       malicious_put_frequency = ntohs(dht_control_msg->variable);
3307     if (malicious_put_frequency == 0)
3308       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
3309     if (malicious_putter != GNUNET_YES)
3310       GNUNET_SCHEDULER_add_now(sched, &malicious_put_task, NULL);
3311     malicious_putter = GNUNET_YES;
3312     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Initiating malicious PUT behavior, frequency %d\n", my_short_id, "DHT", malicious_put_frequency);
3313     break;
3314   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP:
3315     if ((malicious_dropper != GNUNET_YES) && (dhtlog_handle != NULL))
3316       dhtlog_handle->set_malicious(&my_identity);
3317     malicious_dropper = GNUNET_YES;
3318     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Initiating malicious DROP behavior\n", my_short_id, "DHT");
3319     break;
3320   default:
3321     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Unknown control command type `%d'!\n", ntohs(dht_control_msg->command));
3322   }
3323
3324   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3325 }
3326
3327 /**
3328  * Handler for any generic DHT stop messages, calls the appropriate handler
3329  * depending on message type (if processed locally)
3330  *
3331  * @param cls closure for the service
3332  * @param client the client we received this message from
3333  * @param message the actual message received
3334  *
3335  */
3336 static void
3337 handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
3338                             const struct GNUNET_MessageHeader *message)
3339 {
3340
3341   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
3342     (const struct GNUNET_DHT_StopMessage *) message;
3343   struct DHTQueryRecord *record;
3344   struct DHTRouteSource *pos;
3345 #if DEBUG_DHT
3346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3347               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
3348               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
3349 #endif
3350   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
3351   if (record != NULL)
3352     {
3353       pos = record->head;
3354
3355       while (pos != NULL)
3356         {
3357           if ((pos->client != NULL) && (pos->client->client_handle == client))
3358             {
3359               GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
3360               GNUNET_SCHEDULER_add_now(sched, &remove_forward_entry, pos);
3361             }
3362           pos = pos->next;
3363         }
3364     }
3365
3366   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3367 }
3368
3369
3370 /**
3371  * Core handler for p2p route requests.
3372  */
3373 static int
3374 handle_dht_p2p_route_request (void *cls,
3375                               const struct GNUNET_PeerIdentity *peer,
3376                               const struct GNUNET_MessageHeader *message,
3377                               struct GNUNET_TIME_Relative latency, uint32_t distance)
3378 {
3379 #if DEBUG_DHT
3380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3381               "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
3382 #endif
3383   struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
3384   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
3385   struct DHT_MessageContext *message_context;
3386
3387   if (get_max_send_delay().value > MAX_REQUEST_TIME.value)
3388   {
3389     fprintf(stderr, "Sending of previous replies took far too long, backing off!\n");
3390     decrease_max_send_delay(get_max_send_delay());
3391     return GNUNET_YES;
3392   }
3393
3394   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/
3395     {
3396 #if DEBUG_PING
3397       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n", my_short_id, "DHT");
3398 #endif
3399       return GNUNET_YES;
3400     }
3401
3402   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
3403     {
3404       GNUNET_break_op(0);
3405       return GNUNET_YES;
3406     }
3407   message_context = GNUNET_malloc(sizeof (struct DHT_MessageContext));
3408   message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3409   GNUNET_assert(message_context->bloom != NULL);
3410   message_context->hop_count = ntohl(incoming->hop_count);
3411   memcpy(&message_context->key, &incoming->key, sizeof(GNUNET_HashCode));
3412   message_context->replication = ntohl(incoming->desired_replication_level);
3413   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
3414   message_context->msg_options = ntohl(incoming->options);
3415   message_context->network_size = ntohl(incoming->network_size);
3416   message_context->peer = peer;
3417   message_context->importance = DHT_DEFAULT_P2P_IMPORTANCE;
3418   message_context->timeout = DHT_DEFAULT_P2P_TIMEOUT;
3419   route_message(cls, enc_msg, message_context);
3420   GNUNET_free(message_context);
3421   return GNUNET_YES;
3422 }
3423
3424
3425 /**
3426  * Core handler for p2p route results.
3427  */
3428 static int
3429 handle_dht_p2p_route_result (void *cls,
3430                              const struct GNUNET_PeerIdentity *peer,
3431                              const struct GNUNET_MessageHeader *message,
3432                              struct GNUNET_TIME_Relative latency, uint32_t distance)
3433 {
3434 #if DEBUG_DHT
3435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3436               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
3437 #endif
3438   struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
3439   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
3440   struct DHT_MessageContext message_context;
3441
3442   if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
3443     {
3444       GNUNET_break_op(0);
3445       return GNUNET_YES;
3446     }
3447
3448   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3449   message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3450   GNUNET_assert(message_context.bloom != NULL);
3451   memcpy(&message_context.key, &incoming->key, sizeof(GNUNET_HashCode));
3452   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
3453   message_context.msg_options = ntohl(incoming->options);
3454   message_context.hop_count = ntohl(incoming->hop_count);
3455   message_context.peer = peer;
3456   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make result routing a higher priority */
3457   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3458   route_result_message(cls, enc_msg, &message_context);
3459   return GNUNET_YES;
3460 }
3461
3462
3463 /**
3464  * Receive the HELLO from transport service,
3465  * free current and replace if necessary.
3466  *
3467  * @param cls NULL
3468  * @param message HELLO message of peer
3469  */
3470 static void
3471 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
3472 {
3473 #if DEBUG_DHT
3474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3475               "Received our `%s' from transport service\n",
3476               "HELLO");
3477 #endif
3478
3479   GNUNET_assert (message != NULL);
3480   GNUNET_free_non_null(my_hello);
3481   my_hello = GNUNET_malloc(ntohs(message->size));
3482   memcpy(my_hello, message, ntohs(message->size));
3483 }
3484
3485
3486 /**
3487  * Task run during shutdown.
3488  *
3489  * @param cls unused
3490  * @param tc unused
3491  */
3492 static void
3493 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3494 {
3495   int bucket_count;
3496   struct PeerInfo *pos;
3497   if (transport_handle != NULL)
3498   {
3499     GNUNET_free_non_null(my_hello);
3500     GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
3501     GNUNET_TRANSPORT_disconnect(transport_handle);
3502   }
3503
3504   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
3505     {
3506       while (k_buckets[bucket_count].head != NULL)
3507         {
3508           pos = k_buckets[bucket_count].head;
3509 #if DEBUG_DHT
3510           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3511                       "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
3512 #endif
3513           delete_peer(pos, bucket_count);
3514         }
3515     }
3516   if (coreAPI != NULL)
3517     {
3518 #if DEBUG_DHT
3519       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3520                   "%s:%s Disconnecting core!\n", my_short_id, "DHT");
3521 #endif
3522       GNUNET_CORE_disconnect (coreAPI);
3523     }
3524   if (datacache != NULL)
3525     {
3526 #if DEBUG_DHT
3527       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3528                   "%s:%s Destroying datacache!\n", my_short_id, "DHT");
3529 #endif
3530       GNUNET_DATACACHE_destroy (datacache);
3531     }
3532
3533   if (stats != NULL)
3534     {
3535       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
3536     }
3537
3538   if (dhtlog_handle != NULL)
3539     GNUNET_DHTLOG_disconnect(dhtlog_handle);
3540
3541   GNUNET_free_non_null(my_short_id);
3542 }
3543
3544
3545 /**
3546  * To be called on core init/fail.
3547  *
3548  * @param cls service closure
3549  * @param server handle to the server for this service
3550  * @param identity the public identity of this peer
3551  * @param publicKey the public key of this peer
3552  */
3553 void
3554 core_init (void *cls,
3555            struct GNUNET_CORE_Handle *server,
3556            const struct GNUNET_PeerIdentity *identity,
3557            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
3558 {
3559
3560   if (server == NULL)
3561     {
3562 #if DEBUG_DHT
3563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3564               "%s: Connection to core FAILED!\n", "dht",
3565               GNUNET_i2s (identity));
3566 #endif
3567       GNUNET_SCHEDULER_cancel (sched, cleanup_task);
3568       GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
3569       return;
3570     }
3571 #if DEBUG_DHT
3572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3573               "%s: Core connection initialized, I am peer: %s\n", "dht",
3574               GNUNET_i2s (identity));
3575 #endif
3576
3577   /* Copy our identity so we can use it */
3578   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
3579   if (my_short_id != NULL)
3580     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n", "DHT SERVICE");
3581   my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
3582   /* Set the server to local variable */
3583   coreAPI = server;
3584
3585   if (dhtlog_handle != NULL)
3586     dhtlog_handle->insert_node (NULL, &my_identity);
3587 }
3588
3589
3590 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
3591   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE, 0},
3592   {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
3593   {&handle_dht_control_message, NULL, GNUNET_MESSAGE_TYPE_DHT_CONTROL, 0},
3594   {NULL, NULL, 0, 0}
3595 };
3596
3597
3598 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
3599   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
3600   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
3601   {NULL, 0, 0}
3602 };
3603
3604 /**
3605  * Method called whenever a peer connects.
3606  *
3607  * @param cls closure
3608  * @param peer peer identity this notification is about
3609  * @param latency reported latency of the connection with peer
3610  * @param distance reported distance (DV) to peer
3611  */
3612 void handle_core_connect (void *cls,
3613                           const struct GNUNET_PeerIdentity * peer,
3614                           struct GNUNET_TIME_Relative latency,
3615                           uint32_t distance)
3616 {
3617   struct PeerInfo *ret;
3618
3619 #if DEBUG_DHT
3620   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3621               "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
3622 #endif
3623
3624   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
3625     {
3626       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received %s message for peer %s, but already have peer in RT!", my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s(peer));
3627       return;
3628     }
3629
3630   if (datacache != NULL)
3631     GNUNET_DATACACHE_put(datacache, &peer->hashPubKey, sizeof(struct GNUNET_PeerIdentity), (const char *)peer, 0, GNUNET_TIME_absolute_get_forever());
3632   ret = try_add_peer(peer,
3633                      find_current_bucket(&peer->hashPubKey),
3634                      latency,
3635                      distance);
3636   if (ret != NULL)
3637     {
3638       newly_found_peers++;
3639       GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3640     }
3641 #if DEBUG_DHT
3642     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3643                 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED");
3644 #endif
3645 }
3646
3647 /**
3648  * Method called whenever a peer disconnects.
3649  *
3650  * @param cls closure
3651  * @param peer peer identity this notification is about
3652  */
3653 void handle_core_disconnect (void *cls,
3654                              const struct
3655                              GNUNET_PeerIdentity * peer)
3656 {
3657   struct PeerInfo *to_remove;
3658   int current_bucket;
3659
3660   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: Received peer disconnect message for peer `%s' from %s\n", my_short_id, "DHT", GNUNET_i2s(peer), "CORE");
3661
3662   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
3663     {
3664       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: do not have peer `%s' in RT, can't disconnect!\n", my_short_id, "DHT", GNUNET_i2s(peer));
3665       return;
3666     }
3667   increment_stats(STAT_DISCONNECTS);
3668   GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey));
3669   to_remove = GNUNET_CONTAINER_multihashmap_get(all_known_peers, &peer->hashPubKey);
3670   GNUNET_assert(0 == memcmp(peer, &to_remove->id, sizeof(struct GNUNET_PeerIdentity)));
3671   current_bucket = find_current_bucket(&to_remove->id.hashPubKey);
3672   delete_peer(to_remove, current_bucket);
3673 }
3674
3675 /**
3676  * Process dht requests.
3677  *
3678  * @param cls closure
3679  * @param scheduler scheduler to use
3680  * @param server the initialized server
3681  * @param c configuration to use
3682  */
3683 static void
3684 run (void *cls,
3685      struct GNUNET_SCHEDULER_Handle *scheduler,
3686      struct GNUNET_SERVER_Handle *server,
3687      const struct GNUNET_CONFIGURATION_Handle *c)
3688 {
3689 #if DO_FIND_PEER
3690   struct GNUNET_TIME_Relative next_send_time;
3691 #endif
3692   sched = scheduler;
3693   cfg = c;
3694   datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
3695   GNUNET_SERVER_add_handlers (server, plugin_handlers);
3696   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
3697                                  cfg,   /* Main configuration */
3698                                  GNUNET_TIME_UNIT_FOREVER_REL,
3699                                  NULL,  /* Closure passed to DHT functions */
3700                                  &core_init,    /* Call core_init once connected */
3701                                  &handle_core_connect,  /* Handle connects */
3702                                  &handle_core_disconnect,  /* remove peers on disconnects */
3703                                  NULL,  /* Do we care about "status" updates? */
3704                                  NULL,  /* Don't want notified about all incoming messages */
3705                                  GNUNET_NO,     /* For header only inbound notification */
3706                                  NULL,  /* Don't want notified about all outbound messages */
3707                                  GNUNET_NO,     /* For header only outbound notification */
3708                                  core_handlers);        /* Register these handlers */
3709
3710   if (coreAPI == NULL)
3711     return;
3712   transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, 
3713                                               NULL, NULL, NULL, NULL, NULL);
3714   if (transport_handle != NULL)
3715     GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
3716   else
3717     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
3718
3719   lowest_bucket = MAX_BUCKETS - 1;
3720   forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
3721   forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
3722   all_known_peers = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
3723   recent_find_peer_requests = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
3724   GNUNET_assert(all_known_peers != NULL);
3725   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
3726     {
3727       debug_routes = GNUNET_YES;
3728     }
3729
3730   if (GNUNET_YES ==
3731       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3732                                            "strict_kademlia"))
3733     {
3734       strict_kademlia = GNUNET_YES;
3735     }
3736
3737   if (GNUNET_YES ==
3738       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3739                                            "stop_on_closest"))
3740     {
3741       stop_on_closest = GNUNET_YES;
3742     }
3743
3744   if (GNUNET_YES ==
3745       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3746                                            "stop_found"))
3747     {
3748       stop_on_found = GNUNET_YES;
3749     }
3750
3751   if (GNUNET_YES ==
3752       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3753                                            "malicious_getter"))
3754     {
3755       malicious_getter = GNUNET_YES;
3756       if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
3757                                             "MALICIOUS_GET_FREQUENCY",
3758                                             &malicious_get_frequency))
3759         malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
3760     }
3761
3762   if (GNUNET_YES ==
3763       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3764                                            "malicious_putter"))
3765     {
3766       malicious_putter = GNUNET_YES;
3767       if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
3768                                             "MALICIOUS_PUT_FREQUENCY",
3769                                             &malicious_put_frequency))
3770         malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
3771     }
3772
3773   if (GNUNET_YES ==
3774           GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3775                                                "malicious_dropper"))
3776     {
3777       malicious_dropper = GNUNET_YES;
3778     }
3779
3780   if (GNUNET_NO ==
3781         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
3782                                              "do_find_peer"))
3783     {
3784       do_find_peer = GNUNET_NO;
3785     }
3786   else
3787     do_find_peer = GNUNET_YES;
3788
3789   if (GNUNET_YES ==
3790       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
3791                                            "mysql_logging_extended"))
3792     {
3793       debug_routes = GNUNET_YES;
3794       debug_routes_extended = GNUNET_YES;
3795     }
3796
3797   if (GNUNET_YES == debug_routes)
3798     {
3799       dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
3800       if (dhtlog_handle == NULL)
3801         {
3802           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3803                       "Could not connect to mysql logging server, logging will not happen!");
3804         }
3805     }
3806
3807   stats = GNUNET_STATISTICS_create(sched, "dht", cfg);
3808
3809   if (stats != NULL)
3810     {
3811       GNUNET_STATISTICS_set(stats, STAT_ROUTES, 0, GNUNET_NO);
3812       GNUNET_STATISTICS_set(stats, STAT_ROUTE_FORWARDS, 0, GNUNET_NO);
3813       GNUNET_STATISTICS_set(stats, STAT_ROUTE_FORWARDS_CLOSEST, 0, GNUNET_NO);
3814       GNUNET_STATISTICS_set(stats, STAT_RESULTS, 0, GNUNET_NO);
3815       GNUNET_STATISTICS_set(stats, STAT_RESULTS_TO_CLIENT, 0, GNUNET_NO);
3816       GNUNET_STATISTICS_set(stats, STAT_RESULT_FORWARDS, 0, GNUNET_NO);
3817       GNUNET_STATISTICS_set(stats, STAT_GETS, 0, GNUNET_NO);
3818       GNUNET_STATISTICS_set(stats, STAT_PUTS, 0, GNUNET_NO);
3819       GNUNET_STATISTICS_set(stats, STAT_PUTS_INSERTED, 0, GNUNET_NO);
3820       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER, 0, GNUNET_NO);
3821       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_START, 0, GNUNET_NO);
3822       GNUNET_STATISTICS_set(stats, STAT_GET_START, 0, GNUNET_NO);
3823       GNUNET_STATISTICS_set(stats, STAT_PUT_START, 0, GNUNET_NO);
3824       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_REPLY, 0, GNUNET_NO);
3825       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_ANSWER, 0, GNUNET_NO);
3826       GNUNET_STATISTICS_set(stats, STAT_BLOOM_FIND_PEER, 0, GNUNET_NO);
3827       GNUNET_STATISTICS_set(stats, STAT_GET_REPLY, 0, GNUNET_NO);
3828       GNUNET_STATISTICS_set(stats, STAT_GET_RESPONSE_START, 0, GNUNET_NO);
3829       GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
3830       GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO);
3831     }
3832   /* FIXME: if there are no recent requests then these never get freed, but alternative is _annoying_! */
3833   recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2);
3834   recent.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
3835   if (GNUNET_YES == do_find_peer)
3836   {
3837     next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
3838                            GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
3839                                                     (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
3840     find_peer_context.start = GNUNET_TIME_absolute_get();
3841     GNUNET_SCHEDULER_add_delayed (sched,
3842                                   next_send_time,
3843                                   &send_find_peer_message, &find_peer_context);
3844   }
3845
3846   /* Scheduled the task to clean up when shutdown is called */
3847   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
3848                                                GNUNET_TIME_UNIT_FOREVER_REL,
3849                                                &shutdown_task, NULL);
3850 }
3851
3852 /**
3853  * The main function for the dht service.
3854  *
3855  * @param argc number of arguments from the command line
3856  * @param argv command line arguments
3857  * @return 0 ok, 1 on error
3858  */
3859 int
3860 main (int argc, char *const *argv)
3861 {
3862   return (GNUNET_OK ==
3863           GNUNET_SERVICE_run (argc,
3864                               argv,
3865                               "dht",
3866                               GNUNET_SERVICE_OPTION_NONE,
3867                               &run, NULL)) ? 0 : 1;
3868 }