count number of messages queued vs. number sent out
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief GNUnet DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_client_lib.h"
31 #include "gnunet_getopt_lib.h"
32 #include "gnunet_os_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_service_lib.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_signal_lib.h"
37 #include "gnunet_util_lib.h"
38 #include "gnunet_datacache_lib.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_dht_service.h"
42 #include "gnunet_statistics_service.h"
43 #include "dhtlog.h"
44 #include "dht.h"
45 #include <fenv.h>
46
47 #define PRINT_TABLES GNUNET_NO
48
49 #define REAL_DISTANCE GNUNET_NO
50
51 #define EXTRA_CHECKS GNUNET_NO
52
53 /**
54  * How many buckets will we allow total.
55  */
56 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
57
58 /**
59  * Should the DHT issue FIND_PEER requests to get better routing tables?
60  */
61 #define DEFAULT_DO_FIND_PEER GNUNET_YES
62
63 /**
64  * Defines whether find peer requests send their HELLO's outgoing,
65  * or expect replies to contain hellos.
66  */
67 #define FIND_PEER_WITH_HELLO GNUNET_YES
68
69 /**
70  * What is the maximum number of peers in a given bucket.
71  */
72 #define DEFAULT_BUCKET_SIZE 4
73
74 #define DEFAULT_CORE_QUEUE_SIZE 32
75
76 /**
77  * Minimum number of peers we need for "good" routing,
78  * any less than this and we will allow messages to
79  * travel much further through the network!
80  */
81 #define MINIMUM_PEER_THRESHOLD 20
82
83 #define DHT_MAX_RECENT 1000
84
85 #define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
86
87 /**
88  * Default time to wait to send messages on behalf of other peers.
89  */
90 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
91
92 /**
93  * Default importance for handling messages on behalf of other peers.
94  */
95 #define DHT_DEFAULT_P2P_IMPORTANCE 0
96
97 /**
98  * How long to keep recent requests around by default.
99  */
100 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
101
102 /**
103  * Default time to wait to send find peer messages sent by the dht service.
104  */
105 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
106
107 /**
108  * Default importance for find peer messages sent by the dht service.
109  */
110 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
111
112 /**
113  * Default replication parameter for find peer messages sent by the dht service.
114  */
115 #define DHT_DEFAULT_FIND_PEER_REPLICATION 4
116
117 /**
118  * Default options for find peer requests sent by the dht service.
119  */
120 #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
121 /*#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE*/
122
123 /**
124  * How long at least to wait before sending another find peer request.
125  */
126 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
127
128 /**
129  * How long at most to wait before sending another find peer request.
130  */
131 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
132
133 /**
134  * How often to update our preference levels for peers in our routing tables.
135  */
136 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
137
138 /**
139  * How long at most on average will we allow a reply forward to take
140  * (before we quit sending out new requests)
141  */
142 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
143
144 /**
145  * How many initial requests to send out (in true Kademlia fashion)
146  */
147 #define DEFAULT_KADEMLIA_REPLICATION 3
148
149 /*
150  * Default frequency for sending malicious get messages
151  */
152 #define DEFAULT_MALICIOUS_GET_FREQUENCY 1000 /* Number of milliseconds */
153
154 /*
155  * Default frequency for sending malicious put messages
156  */
157 #define DEFAULT_MALICIOUS_PUT_FREQUENCY 1000 /* Default is in milliseconds */
158
159
160 #define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
161
162 /**
163  * Real maximum number of hops, at which point we refuse
164  * to forward the message.
165  */
166 #define DEFAULT_MAX_HOPS 10
167
168 /**
169  * How many time differences between requesting a core send and
170  * the actual callback to remember.
171  */
172 #define MAX_REPLY_TIMES 8
173
174 enum ConvergenceOptions
175 {
176    /**
177     * Use the linear method for convergence.
178     */
179    DHT_CONVERGE_LINEAR,
180
181    /**
182     * Converge using a fast converging square
183     * function.
184     */
185    DHT_CONVERGE_SQUARE,
186
187    /**
188     * Converge using a slower exponential
189     * function.
190     */
191    DHT_CONVERGE_EXPONENTIAL,
192
193    /**
194     * Don't do any special convergence, allow
195     * the algorithm to hopefully route to closer
196     * peers more often.
197     */
198    DHT_CONVERGE_RANDOM,
199
200    /**
201     * Binary convergence, start routing to closest
202     * only after set number of hops.
203     */
204    DHT_CONVERGE_BINARY
205 };
206
207 /**
208  * Linked list of messages to send to clients.
209  */
210 struct P2PPendingMessage
211 {
212   /**
213    * Pointer to next item in the list
214    */
215   struct P2PPendingMessage *next;
216
217   /**
218    * Pointer to previous item in the list
219    */
220   struct P2PPendingMessage *prev;
221
222   /**
223    * Message importance level.
224    */
225   unsigned int importance;
226
227   /**
228    * Time when this request was scheduled to be sent.
229    */
230   struct GNUNET_TIME_Absolute scheduled;
231
232   /**
233    * How long to wait before sending message.
234    */
235   struct GNUNET_TIME_Relative timeout;
236
237   /**
238    * Actual message to be sent; // avoid allocation
239    */
240   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
241
242 };
243
244 /**
245  * Per-peer information.
246  */
247 struct PeerInfo
248 {
249   /**
250    * Next peer entry (DLL)
251    */
252   struct PeerInfo *next;
253
254   /**
255    *  Prev peer entry (DLL)
256    */
257   struct PeerInfo *prev;
258
259   /**
260    * Count of outstanding messages for peer.
261    */
262   unsigned int pending_count;
263
264   /**
265    * Head of pending messages to be sent to this peer.
266    */
267   struct P2PPendingMessage *head;
268
269   /**
270    * Tail of pending messages to be sent to this peer.
271    */
272   struct P2PPendingMessage *tail;
273
274   /**
275    * Core handle for sending messages to this peer.
276    */
277   struct GNUNET_CORE_TransmitHandle *th;
278
279   /**
280    * Task for scheduling message sends.
281    */
282   GNUNET_SCHEDULER_TaskIdentifier send_task;
283
284   /**
285    * Task for scheduling preference updates
286    */
287   GNUNET_SCHEDULER_TaskIdentifier preference_task;
288
289   /**
290    * Preference update context
291    */
292   struct GNUNET_CORE_InformationRequestContext *info_ctx;
293
294   /**
295    * What is the identity of the peer?
296    */
297   struct GNUNET_PeerIdentity id;
298
299 #if 0
300   /**
301    * What is the average latency for replies received?
302    */
303   struct GNUNET_TIME_Relative latency;
304
305   /**
306    * Transport level distance to peer.
307    */
308   unsigned int distance;
309 #endif
310
311   /**
312    * Holds matching bits from peer to current target,
313    * used for distance comparisons between peers. May
314    * be considered a really bad idea.
315    * FIXME: remove this value (create struct which holds
316    *        a single peerinfo and the matching bits, use
317    *        that to pass to comparator)
318    */
319   unsigned int matching_bits;
320
321   /**
322    * Task for scheduling periodic ping messages for this peer.
323    */
324   GNUNET_SCHEDULER_TaskIdentifier ping_task;
325 };
326
327 /**
328  * Peers are grouped into buckets.
329  */
330 struct PeerBucket
331 {
332   /**
333    * Head of DLL
334    */
335   struct PeerInfo *head;
336
337   /**
338    * Tail of DLL
339    */
340   struct PeerInfo *tail;
341
342   /**
343    * Number of peers in the bucket.
344    */
345   unsigned int peers_size;
346 };
347
348 /**
349  * Linked list of messages to send to clients.
350  */
351 struct PendingMessage
352 {
353   /**
354    * Pointer to next item in the list
355    */
356   struct PendingMessage *next;
357
358   /**
359    * Pointer to previous item in the list
360    */
361   struct PendingMessage *prev;
362
363   /**
364    * Actual message to be sent; // avoid allocation
365    */
366   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
367
368 };
369
370 /**
371  * Struct containing information about a client,
372  * handle to connect to it, and any pending messages
373  * that need to be sent to it.
374  */
375 struct ClientList
376 {
377   /**
378    * Linked list of active clients
379    */
380   struct ClientList *next;
381
382   /**
383    * The handle to this client
384    */
385   struct GNUNET_SERVER_Client *client_handle;
386
387   /**
388    * Handle to the current transmission request, NULL
389    * if none pending.
390    */
391   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
392
393   /**
394    * Linked list of pending messages for this client
395    */
396   struct PendingMessage *pending_head;
397
398   /**
399    * Tail of linked list of pending messages for this client
400    */
401   struct PendingMessage *pending_tail;
402 };
403
404
405 /**
406  * Context containing information about a DHT message received.
407  */
408 struct DHT_MessageContext
409 {
410   /**
411    * The client this request was received from.
412    * (NULL if received from another peer)
413    */
414   struct ClientList *client;
415
416   /**
417    * The peer this request was received from.
418    * (NULL if received from local client)
419    */
420   const struct GNUNET_PeerIdentity *peer;
421
422   /**
423    * Bloomfilter for this routing request.
424    */
425   struct GNUNET_CONTAINER_BloomFilter *bloom;
426
427   /**
428    * extended query (see gnunet_block_lib.h).
429    */
430   const void *xquery;
431
432   /**
433    * Bloomfilter to filter out duplicate replies.
434    */
435   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
436
437   /**
438    * The key this request was about
439    */
440   GNUNET_HashCode key;
441
442   /**
443    * How long should we wait to transmit this request?
444    */
445   struct GNUNET_TIME_Relative timeout;
446
447   /**
448    * The unique identifier of this request
449    */
450   uint64_t unique_id;
451
452   /**
453    * Number of bytes in xquery.
454    */
455   size_t xquery_size;
456
457   /**
458    * Mutator value for the reply_bf, see gnunet_block_lib.h
459    */
460   uint32_t reply_bf_mutator;
461
462   /**
463    * Desired replication level
464    */
465   uint32_t replication;
466
467   /**
468    * Network size estimate, either ours or the sum of
469    * those routed to thus far. =~ Log of number of peers
470    * chosen from for this request.
471    */
472   uint32_t network_size;
473
474   /**
475    * Any message options for this request
476    */
477   uint32_t msg_options;
478
479   /**
480    * How many hops has the message already traversed?
481    */
482   uint32_t hop_count;
483
484   /**
485    * How important is this message?
486    */
487   unsigned int importance;
488
489   /**
490    * Should we (still) forward the request on to other peers?
491    */
492   int do_forward;
493
494   /**
495    * Did we forward this message? (may need to remember it!)
496    */
497   int forwarded;
498
499   /**
500    * Are we the closest known peer to this key (out of our neighbors?)
501    */
502   int closest;
503 };
504
505 /**
506  * Record used for remembering what peers are waiting for what
507  * responses (based on search key).
508  */
509 struct DHTRouteSource
510 {
511   /**
512    * This is a DLL.
513    */
514   struct DHTRouteSource *next;
515
516   /**
517    * This is a DLL.
518    */
519   struct DHTRouteSource *prev;
520
521   /**
522    * Source of the request.  Replies should be forwarded to
523    * this peer.
524    */
525   struct GNUNET_PeerIdentity source;
526
527   /**
528    * If this was a local request, remember the client; otherwise NULL.
529    */
530   struct ClientList *client;
531
532   /**
533    * Pointer to this nodes heap location (for removal)
534    */
535   struct GNUNET_CONTAINER_HeapNode *hnode;
536
537   /**
538    * Back pointer to the record storing this information.
539    */
540   struct DHTQueryRecord *record;
541
542   /**
543    * Task to remove this entry on timeout.
544    */
545   GNUNET_SCHEDULER_TaskIdentifier delete_task;
546
547   /**
548    * Bloomfilter of peers we have already sent back as
549    * replies to the initial request.  Allows us to not
550    * forward the same peer multiple times for a find peer
551    * request.
552    */
553   struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
554
555 };
556
557 /**
558  * Entry in the DHT routing table.
559  */
560 struct DHTQueryRecord
561 {
562   /**
563    * Head of DLL for result forwarding.
564    */
565   struct DHTRouteSource *head;
566
567   /**
568    * Tail of DLL for result forwarding.
569    */
570   struct DHTRouteSource *tail;
571
572   /**
573    * Key that the record concerns.
574    */
575   GNUNET_HashCode key;
576
577   /**
578    * GET message of this record (what we already forwarded?).
579    */
580   //DV_DHT_MESSAGE get; Try to get away with not saving this.
581
582   /**
583    * Bloomfilter of the peers we've replied to so far
584    */
585   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
586
587 };
588
589 /**
590  * Context used to calculate the number of find peer messages
591  * per X time units since our last scheduled find peer message
592  * was sent.  If we have seen too many messages, delay or don't
593  * send our own out.
594  */
595 struct FindPeerMessageContext
596 {
597   unsigned int count;
598
599   struct GNUNET_TIME_Absolute start;
600
601   struct GNUNET_TIME_Absolute end;
602 };
603
604 /**
605  * DHT Routing results structure
606  */
607 struct DHTResults
608 {
609   /*
610    * Min heap for removal upon reaching limit
611    */
612   struct GNUNET_CONTAINER_Heap *minHeap;
613
614   /*
615    * Hashmap for fast key based lookup
616    */
617   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
618
619 };
620
621 /**
622  * DHT structure for recent requests.
623  */
624 struct RecentRequests
625 {
626   /*
627    * Min heap for removal upon reaching limit
628    */
629   struct GNUNET_CONTAINER_Heap *minHeap;
630
631   /*
632    * Hashmap for key based lookup
633    */
634   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
635 };
636
637 struct RecentRequest
638 {
639   /**
640    * Position of this node in the min heap.
641    */
642   struct GNUNET_CONTAINER_HeapNode *heap_node;
643
644   /**
645    * Bloomfilter containing entries for peers
646    * we forwarded this request to.
647    */
648   struct GNUNET_CONTAINER_BloomFilter *bloom;
649
650   /**
651    * Timestamp of this request, for ordering
652    * the min heap.
653    */
654   struct GNUNET_TIME_Absolute timestamp;
655
656   /**
657    * Key of this request.
658    */
659   GNUNET_HashCode key;
660
661   /**
662    * Unique identifier for this request.
663    */
664   uint64_t uid;
665
666   /**
667    * Task to remove this entry on timeout.
668    */
669   GNUNET_SCHEDULER_TaskIdentifier remove_task;
670 };
671
672 struct RepublishContext
673 {
674   /**
675    * Key to republish.
676    */
677   GNUNET_HashCode key;
678
679   /**
680    * Type of the data.
681    */
682   unsigned int type;
683
684 };
685
686 /**
687  * Which kind of convergence will we be using?
688  */
689 static enum ConvergenceOptions converge_option;
690
691 /**
692  * Modifier for the convergence function
693  */
694 static float converge_modifier;
695
696 /**
697  * Recent requests by hash/uid and by time inserted.
698  */
699 static struct RecentRequests recent;
700
701 /**
702  * Context to use to calculate find peer rates.
703  */
704 static struct FindPeerMessageContext find_peer_context;
705
706 /**
707  * Don't use our routing algorithm, always route
708  * to closest peer; initially send requests to 3
709  * peers.
710  */
711 static unsigned int strict_kademlia;
712
713 /**
714  * Routing option to end routing when closest peer found.
715  */
716 static unsigned int stop_on_closest;
717
718 /**
719  * Routing option to end routing when data is found.
720  */
721 static unsigned int stop_on_found;
722
723 /**
724  * Whether DHT needs to manage find peer requests, or
725  * an external force will do it on behalf of the DHT.
726  */
727 static unsigned int do_find_peer;
728
729 /**
730  * Once we have stored an item in the DHT, refresh it
731  * according to our republish interval.
732  */
733 static unsigned int do_republish;
734
735 /**
736  * Use the "real" distance metric when selecting the
737  * next routing hop.  Can be less accurate.
738  */
739 static unsigned int use_real_distance;
740
741 /**
742  * How many peers have we added since we sent out our last
743  * find peer request?
744  */
745 static unsigned int newly_found_peers;
746
747 /**
748  * Container of active queries we should remember
749  */
750 static struct DHTResults forward_list;
751
752 /**
753  * Handle to the datacache service (for inserting/retrieving data)
754  */
755 static struct GNUNET_DATACACHE_Handle *datacache;
756
757 /**
758  * Handle for the statistics service.
759  */
760 struct GNUNET_STATISTICS_Handle *stats;
761
762
763 /**
764  * The configuration the DHT service is running with
765  */
766 static const struct GNUNET_CONFIGURATION_Handle *cfg;
767
768 /**
769  * Handle to the core service
770  */
771 static struct GNUNET_CORE_Handle *coreAPI;
772
773 /**
774  * Handle to the transport service, for getting our hello
775  */
776 static struct GNUNET_TRANSPORT_Handle *transport_handle;
777
778 /**
779  * The identity of our peer.
780  */
781 static struct GNUNET_PeerIdentity my_identity;
782
783 /**
784  * Short id of the peer, for printing
785  */
786 static char *my_short_id;
787
788 /**
789  * Our HELLO
790  */
791 static struct GNUNET_MessageHeader *my_hello;
792
793 /**
794  * Task to run when we shut down, cleaning up all our trash
795  */
796 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
797
798 /**
799  * The lowest currently used bucket.
800  */
801 static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
802
803 /**
804  * The maximum number of hops before we stop routing messages.
805  */
806 static unsigned long long max_hops;
807
808 /**
809  * How often to republish content we have previously stored.
810  */
811 static struct GNUNET_TIME_Relative dht_republish_frequency;
812
813 /**
814  * GNUNET_YES to stop at max_hops, GNUNET_NO to heuristically decide when to stop forwarding.
815  */
816 static int use_max_hops;
817
818 /**
819  * The buckets (Kademlia routing table, complete with growth).
820  * Array of size MAX_BUCKET_SIZE.
821  */
822 static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
823
824 /**
825  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
826  */
827 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
828
829 /**
830  * Recently seen find peer requests.
831  */
832 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
833
834 /**
835  * Maximum size for each bucket.
836  */
837 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
838
839 /**
840  * List of active clients.
841  */
842 static struct ClientList *client_list;
843
844 /**
845  * Handle to the DHT logger.
846  */
847 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
848
849 /*
850  * Whether or not to send routing debugging information
851  * to the dht logging server
852  */
853 static unsigned int debug_routes;
854
855 /*
856  * Whether or not to send FULL route information to
857  * logging server
858  */
859 static unsigned int debug_routes_extended;
860
861 /*
862  * GNUNET_YES or GNUNET_NO, whether or not to act as
863  * a malicious node which drops all messages
864  */
865 static unsigned int malicious_dropper;
866
867 /*
868  * GNUNET_YES or GNUNET_NO, whether or not to act as
869  * a malicious node which sends out lots of GETS
870  */
871 static unsigned int malicious_getter;
872
873 /**
874  * GNUNET_YES or GNUNET_NO, whether or not to act as
875  * a malicious node which sends out lots of PUTS
876  */
877 static unsigned int malicious_putter;
878
879 /**
880  * Frequency for malicious get requests.
881  */
882 static unsigned long long malicious_get_frequency;
883
884 /**
885  * Frequency for malicious put requests.
886  */
887 static unsigned long long malicious_put_frequency;
888
889 /**
890  * Kademlia replication
891  */
892 static unsigned long long kademlia_replication;
893
894 /**
895  * Reply times for requests, if we are busy, don't send any
896  * more requests!
897  */
898 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
899
900 /**
901  * Current counter for replies.
902  */
903 static unsigned int reply_counter;
904
905 /**
906  * Our handle to the BLOCK library.
907  */
908 static struct GNUNET_BLOCK_Context *block_context;
909
910
911 /**
912  * Forward declaration.
913  */
914 static size_t 
915 send_generic_reply (void *cls, size_t size, void *buf);
916
917
918 /** Declare here so retry_core_send is aware of it */
919 static size_t 
920 core_transmit_notify (void *cls,
921                       size_t size, void *buf);
922
923 /**
924  * Convert unique ID to hash code.
925  *
926  * @param uid unique ID to convert
927  * @param hash set to uid (extended with zeros)
928  */
929 static void
930 hash_from_uid (uint64_t uid,
931                GNUNET_HashCode *hash)
932 {
933   memset (hash, 0, sizeof(GNUNET_HashCode));
934   *((uint64_t*)hash) = uid;
935 }
936
937 #if AVG
938 /**
939  * Calculate the average send time between messages so that we can
940  * ignore certain requests if we get too busy.
941  *
942  * @return the average time between asking core to send a message
943  *         and when the buffer for copying it is passed
944  */
945 static struct GNUNET_TIME_Relative get_average_send_delay()
946 {
947   unsigned int i;
948   unsigned int divisor;
949   struct GNUNET_TIME_Relative average_time;
950   average_time = GNUNET_TIME_relative_get_zero();
951   divisor = 0;
952   for (i = 0; i < MAX_REPLY_TIMES; i++)
953   {
954     average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]);
955     if (reply_times[i].abs_value == (uint64_t)0)
956       continue;
957     else
958       divisor++;
959   }
960   if (divisor == 0)
961   {
962     return average_time;
963   }
964
965   average_time = GNUNET_TIME_relative_divide(average_time, divisor);
966   fprintf(stderr, 
967           "Avg send delay: %u sends is %llu\n",
968           divisor, 
969           (unsigned long long) average_time.abs_value);
970   return average_time;
971 }
972 #endif
973
974 /**
975  * Given the largest send delay, artificially decrease it
976  * so the next time around we may have a chance at sending
977  * again.
978  */
979 static void decrease_max_send_delay(struct GNUNET_TIME_Relative max_time)
980 {
981   unsigned int i;
982   for (i = 0; i < MAX_REPLY_TIMES; i++)
983     {
984       if (reply_times[i].rel_value == max_time.rel_value)
985         {
986           reply_times[i].rel_value = reply_times[i].rel_value / 2;
987           return;
988         }
989     }
990 }
991
992 /**
993  * Find the maximum send time of the recently sent values.
994  *
995  * @return the average time between asking core to send a message
996  *         and when the buffer for copying it is passed
997  */
998 static struct GNUNET_TIME_Relative get_max_send_delay()
999 {
1000   unsigned int i;
1001   struct GNUNET_TIME_Relative max_time;
1002   max_time = GNUNET_TIME_relative_get_zero();
1003
1004   for (i = 0; i < MAX_REPLY_TIMES; i++)
1005   {
1006     if (reply_times[i].rel_value > max_time.rel_value)
1007       max_time.rel_value = reply_times[i].rel_value;
1008   }
1009
1010   if (max_time.rel_value > MAX_REQUEST_TIME.rel_value)
1011     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", 
1012                (unsigned long long) max_time.rel_value);
1013   return max_time;
1014 }
1015
1016 static void
1017 increment_stats(const char *value)
1018 {
1019   if (stats != NULL)
1020     {
1021       GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
1022     }
1023 }
1024
1025 /**
1026  *  Try to send another message from our core send list
1027  */
1028 static void
1029 try_core_send (void *cls,
1030                const struct GNUNET_SCHEDULER_TaskContext *tc)
1031 {
1032   struct PeerInfo *peer = cls;
1033   struct P2PPendingMessage *pending;
1034   size_t ssize;
1035
1036   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
1037
1038   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1039     return;
1040
1041   if (peer->th != NULL)
1042     return; /* Message send already in progress */
1043
1044   pending = peer->head;
1045   if (pending != NULL)
1046     {
1047       ssize = ntohs(pending->msg->size);
1048 #if DEBUG_DHT > 1
1049      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1050                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
1051                 "DHT", ssize, GNUNET_i2s(&peer->id));
1052 #endif
1053       pending->scheduled = GNUNET_TIME_absolute_get();
1054       reply_counter++;
1055       if (reply_counter >= MAX_REPLY_TIMES)
1056         reply_counter = 0;
1057       peer->th = GNUNET_CORE_notify_transmit_ready (coreAPI, pending->importance,
1058                                                     pending->timeout, &peer->id,
1059                                                     ssize, &core_transmit_notify, peer);
1060       if (peer->th == NULL)
1061         increment_stats("# notify transmit ready failed");
1062     }
1063 }
1064
1065 /**
1066  * Function called to send a request out to another peer.
1067  * Called both for locally initiated requests and those
1068  * received from other peers.
1069  *
1070  * @param msg the encapsulated message
1071  * @param peer the peer to forward the message to
1072  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1073  */
1074 static void 
1075 forward_result_message (const struct GNUNET_MessageHeader *msg,
1076                         struct PeerInfo *peer,
1077                         struct DHT_MessageContext *msg_ctx)
1078 {
1079   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
1080   struct P2PPendingMessage *pending;
1081   size_t msize;
1082   size_t psize;
1083
1084   increment_stats(STAT_RESULT_FORWARDS);
1085   msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
1086   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1087   psize = sizeof(struct P2PPendingMessage) + msize;
1088   pending = GNUNET_malloc(psize);
1089   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1090   pending->importance = DHT_SEND_PRIORITY;
1091   pending->timeout = GNUNET_TIME_relative_get_forever();
1092   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
1093   result_message->header.size = htons(msize);
1094   result_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
1095   result_message->put_path_length = htons(0); /* FIXME: implement */
1096   result_message->get_path_length = htons(0); /* FIXME: implement */
1097   result_message->options = htonl(msg_ctx->msg_options);
1098   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
1099   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
1100   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1101   memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
1102   memcpy(&result_message[1], msg, ntohs(msg->size));
1103 #if DEBUG_DHT > 1
1104   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1105 #endif
1106   peer->pending_count++;
1107   increment_stats("# pending messages scheduled");
1108   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1109   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1110     peer->send_task = GNUNET_SCHEDULER_add_now(&try_core_send, peer);
1111 }
1112
1113
1114 /**
1115  * Called when core is ready to send a message we asked for
1116  * out to the destination.
1117  *
1118  * @param cls closure (NULL)
1119  * @param size number of bytes available in buf
1120  * @param buf where the callee should write the message
1121  * @return number of bytes written to buf
1122  */
1123 static size_t 
1124 core_transmit_notify (void *cls,
1125                       size_t size, void *buf)
1126 {
1127   struct PeerInfo *peer = cls;
1128   char *cbuf = buf;
1129   struct P2PPendingMessage *pending;
1130
1131   size_t off;
1132   size_t msize;
1133   peer->th = NULL;
1134   if (buf == NULL)
1135     {
1136       /* client disconnected */
1137 #if DEBUG_DHT
1138       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
1139 #endif
1140       return 0;
1141     }
1142
1143   if (peer->head == NULL)
1144     return 0;
1145
1146   off = 0;
1147   pending = peer->head;
1148   reply_times[reply_counter] = GNUNET_TIME_absolute_get_difference(pending->scheduled, GNUNET_TIME_absolute_get());
1149   msize = ntohs(pending->msg->size);
1150   if (msize <= size)
1151     {
1152       off = msize;
1153       memcpy (cbuf, pending->msg, msize);
1154       peer->pending_count--;
1155       increment_stats("# pending messages sent");
1156       GNUNET_assert(peer->pending_count >= 0);
1157       GNUNET_CONTAINER_DLL_remove (peer->head,
1158                                    peer->tail,
1159                                    pending);
1160       GNUNET_free (pending);
1161     }
1162 #if SMART
1163   while (NULL != pending &&
1164           (size - off >= (msize = ntohs (pending->msg->size))))
1165     {
1166       memcpy (&cbuf[off], pending->msg, msize);
1167       off += msize;
1168       peer->pending_count--;
1169       GNUNET_assert(peer->pending_count >= 0);
1170       GNUNET_CONTAINER_DLL_remove (peer->head,
1171                                    peer->tail,
1172                                    pending);
1173       GNUNET_free (pending);
1174       pending = peer->head;
1175     }
1176 #endif
1177   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
1178     peer->send_task = GNUNET_SCHEDULER_add_now(&try_core_send, peer);
1179
1180   return off;
1181 }
1182
1183
1184 /**
1185  * Compute the distance between have and target as a 32-bit value.
1186  * Differences in the lower bits must count stronger than differences
1187  * in the higher bits.
1188  *
1189  * @return 0 if have==target, otherwise a number
1190  *           that is larger as the distance between
1191  *           the two hash codes increases
1192  */
1193 static unsigned int
1194 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1195 {
1196   unsigned int bucket;
1197   unsigned int msb;
1198   unsigned int lsb;
1199   unsigned int i;
1200
1201   /* We have to represent the distance between two 2^9 (=512)-bit
1202      numbers as a 2^5 (=32)-bit number with "0" being used for the
1203      two numbers being identical; furthermore, we need to
1204      guarantee that a difference in the number of matching
1205      bits is always represented in the result.
1206
1207      We use 2^32/2^9 numerical values to distinguish between
1208      hash codes that have the same LSB bit distance and
1209      use the highest 2^9 bits of the result to signify the
1210      number of (mis)matching LSB bits; if we have 0 matching
1211      and hence 512 mismatching LSB bits we return -1 (since
1212      512 itself cannot be represented with 9 bits) */
1213
1214   /* first, calculate the most significant 9 bits of our
1215      result, aka the number of LSBs */
1216   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1217   /* bucket is now a value between 0 and 512 */
1218   if (bucket == 512)
1219     return 0;                   /* perfect match */
1220   if (bucket == 0)
1221     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1222                                    below, we'd end up with max+1 (overflow)) */
1223
1224   /* calculate the most significant bits of the final result */
1225   msb = (512 - bucket) << (32 - 9);
1226   /* calculate the 32-9 least significant bits of the final result by
1227      looking at the differences in the 32-9 bits following the
1228      mismatching bit at 'bucket' */
1229   lsb = 0;
1230   for (i = bucket + 1;
1231        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1232     {
1233       if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
1234         lsb |= (1 << (bucket + 32 - 9 - i));    /* first bit set will be 10,
1235                                                    last bit set will be 31 -- if
1236                                                    i does not reach 512 first... */
1237     }
1238   return msb | lsb;
1239 }
1240
1241 /**
1242  * Return a number that is larger the closer the
1243  * "have" GNUNET_hash code is to the "target".
1244  *
1245  * @return inverse distance metric, non-zero.
1246  *         Must fudge the value if NO bits match.
1247  */
1248 static unsigned int
1249 inverse_distance (const GNUNET_HashCode * target,
1250                   const GNUNET_HashCode * have)
1251 {
1252   if (GNUNET_CRYPTO_hash_matching_bits(target, have) == 0)
1253     return 1; /* Never return 0! */
1254   return ((unsigned int) -1) - distance (target, have);
1255 }
1256
1257 /**
1258  * Find the optimal bucket for this key, regardless
1259  * of the current number of buckets in use.
1260  *
1261  * @param hc the hashcode to compare our identity to
1262  *
1263  * @return the proper bucket index, or GNUNET_SYSERR
1264  *         on error (same hashcode)
1265  */
1266 static int find_bucket(const GNUNET_HashCode *hc)
1267 {
1268   unsigned int bits;
1269
1270   bits = GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, hc);
1271   if (bits == MAX_BUCKETS)
1272     return GNUNET_SYSERR;
1273   return MAX_BUCKETS - bits - 1;
1274 }
1275
1276 /**
1277  * Find which k-bucket this peer should go into,
1278  * taking into account the size of the k-bucket
1279  * array.  This means that if more bits match than
1280  * there are currently buckets, lowest_bucket will
1281  * be returned.
1282  *
1283  * @param hc GNUNET_HashCode we are finding the bucket for.
1284  *
1285  * @return the proper bucket index for this key,
1286  *         or GNUNET_SYSERR on error (same hashcode)
1287  */
1288 static int find_current_bucket(const GNUNET_HashCode *hc)
1289 {
1290   int actual_bucket;
1291   actual_bucket = find_bucket(hc);
1292
1293   if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
1294     return lowest_bucket;
1295   else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
1296     return lowest_bucket;
1297   else
1298     return actual_bucket;
1299 }
1300
1301 #if EXTRA_CHECKS
1302 /**
1303  * Find a routing table entry from a peer identity
1304  *
1305  * @param peer the peer to look up
1306  *
1307  * @return the bucket number holding the peer, GNUNET_SYSERR if not found
1308  */
1309 static int
1310 find_bucket_by_peer(const struct PeerInfo *peer)
1311 {
1312   int bucket;
1313   struct PeerInfo *pos;
1314
1315   for (bucket = lowest_bucket; bucket < MAX_BUCKETS - 1; bucket++)
1316     {
1317       pos = k_buckets[bucket].head;
1318       while (pos != NULL)
1319         {
1320           if (peer == pos)
1321             return bucket;
1322           pos = pos->next;
1323         }
1324     }
1325
1326   return GNUNET_SYSERR; /* No such peer. */
1327 }
1328 #endif
1329
1330 #if PRINT_TABLES
1331 /**
1332  * Print the complete routing table for this peer.
1333  */
1334 static void
1335 print_routing_table ()
1336 {
1337   int bucket;
1338   struct PeerInfo *pos;
1339   char char_buf[30000];
1340   int char_pos;
1341   memset(char_buf, 0, sizeof(char_buf));
1342   char_pos = 0;
1343   char_pos += sprintf(&char_buf[char_pos], "Printing routing table for peer %s\n", my_short_id);
1344   //fprintf(stderr, "Printing routing table for peer %s\n", my_short_id);
1345   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1346     {
1347       pos = k_buckets[bucket].head;
1348       char_pos += sprintf(&char_buf[char_pos], "Bucket %d:\n", bucket);
1349       //fprintf(stderr, "Bucket %d:\n", bucket);
1350       while (pos != NULL)
1351         {
1352           //fprintf(stderr, "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1353           char_pos += sprintf(&char_buf[char_pos], "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1354           pos = pos->next;
1355         }
1356     }
1357   fprintf(stderr, "%s", char_buf);
1358   fflush(stderr);
1359 }
1360 #endif
1361
1362 /**
1363  * Find a routing table entry from a peer identity
1364  *
1365  * @param peer the peer identity to look up
1366  *
1367  * @return the routing table entry, or NULL if not found
1368  */
1369 static struct PeerInfo *
1370 find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
1371 {
1372   int bucket;
1373   struct PeerInfo *pos;
1374   bucket = find_current_bucket(&peer->hashPubKey);
1375
1376   if (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity)))
1377     return NULL;
1378
1379   pos = k_buckets[bucket].head;
1380   while (pos != NULL)
1381     {
1382       if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
1383         return pos;
1384       pos = pos->next;
1385     }
1386   return NULL; /* No such peer. */
1387 }
1388
1389 /* Forward declaration */
1390 static void
1391 update_core_preference (void *cls,
1392                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1393 /**
1394  * Function called with statistics about the given peer.
1395  *
1396  * @param cls closure
1397  * @param peer identifies the peer
1398  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1399  * @param amount set to the amount that was actually reserved or unreserved;
1400  *               either the full requested amount or zero (no partial reservations)
1401  * @param preference current traffic preference for the given peer
1402  */
1403 static void
1404 update_core_preference_finish (void *cls,
1405                                const struct GNUNET_PeerIdentity * peer,
1406                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1407                                int amount, uint64_t preference)
1408 {
1409   struct PeerInfo *peer_info = cls;
1410   peer_info->info_ctx = NULL;
1411   GNUNET_SCHEDULER_add_delayed(DHT_DEFAULT_PREFERENCE_INTERVAL, &update_core_preference, peer_info);
1412 }
1413
1414 static void
1415 update_core_preference (void *cls,
1416                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1417 {
1418   struct PeerInfo *peer = cls;
1419   uint64_t preference;
1420   unsigned int matching;
1421   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1422     {
1423       return;
1424     }
1425   matching = GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, &peer->id.hashPubKey);
1426   if (matching >= 64)
1427     {
1428       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer identifier matches by %u bits, only shifting as much as we can!\n", matching);
1429       matching = 63;
1430     }
1431   preference = 1LL << matching;
1432   peer->info_ctx = GNUNET_CORE_peer_change_preference (coreAPI,
1433                                                        &peer->id,
1434                                                        GNUNET_TIME_relative_get_forever(),
1435                                                        GNUNET_BANDWIDTH_value_init (UINT32_MAX),
1436                                                        0,
1437                                                        preference,
1438                                                        &update_core_preference_finish,
1439                                                        peer);
1440 }
1441
1442 /**
1443  * Really add a peer to a bucket (only do assertions
1444  * on size, etc.)
1445  *
1446  * @param peer GNUNET_PeerIdentity of the peer to add
1447  * @param bucket the already figured out bucket to add
1448  *        the peer to
1449  * @param atsi performance information
1450  *
1451  * @return the newly added PeerInfo
1452  */
1453 static struct PeerInfo *
1454 add_peer(const struct GNUNET_PeerIdentity *peer,
1455          unsigned int bucket,
1456          const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1457 {
1458   struct PeerInfo *new_peer;
1459   GNUNET_assert(bucket < MAX_BUCKETS);
1460   GNUNET_assert(peer != NULL);
1461   new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
1462 #if 0
1463   new_peer->latency = latency;
1464   new_peer->distance = distance;
1465 #endif
1466
1467   memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
1468
1469   GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
1470                                     k_buckets[bucket].tail,
1471                                     k_buckets[bucket].tail,
1472                                     new_peer);
1473   k_buckets[bucket].peers_size++;
1474
1475   if ((GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, &peer->hashPubKey) > 0) && (k_buckets[bucket].peers_size <= bucket_size))
1476     {
1477 #if DO_UPDATE_PREFERENCE
1478       new_peer->preference_task = GNUNET_SCHEDULER_add_now(&update_core_preference, new_peer);
1479 #endif
1480     }
1481
1482   return new_peer;
1483 }
1484
1485 /**
1486  * Given a peer and its corresponding bucket,
1487  * remove it from that bucket.  Does not free
1488  * the PeerInfo struct, nor cancel messages
1489  * or free messages waiting to be sent to this
1490  * peer!
1491  *
1492  * @param peer the peer to remove
1493  * @param bucket the bucket the peer belongs to
1494  */
1495 static void remove_peer (struct PeerInfo *peer,
1496                          unsigned int bucket)
1497 {
1498   GNUNET_assert(k_buckets[bucket].peers_size > 0);
1499   GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
1500                               k_buckets[bucket].tail,
1501                               peer);
1502   k_buckets[bucket].peers_size--;
1503 #if CHANGE_LOWEST
1504   if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) && (lowest_bucket < MAX_BUCKETS - 1))
1505     lowest_bucket++;
1506 #endif
1507 }
1508
1509 /**
1510  * Removes peer from a bucket, then frees associated
1511  * resources and frees peer.
1512  *
1513  * @param peer peer to be removed and freed
1514  * @param bucket which bucket this peer belongs to
1515  */
1516 static void delete_peer (struct PeerInfo *peer,
1517                          unsigned int bucket)
1518 {
1519   struct P2PPendingMessage *pos;
1520   struct P2PPendingMessage *next;
1521 #if EXTRA_CHECKS
1522   struct PeerInfo *peer_pos;
1523
1524   peer_pos = k_buckets[bucket].head;
1525   while ((peer_pos != NULL) && (peer_pos != peer))
1526     peer_pos = peer_pos->next;
1527   if (peer_pos == NULL)
1528     {
1529       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Expected peer `%s' in bucket %d\n", my_short_id, "DHT", GNUNET_i2s(&peer->id), bucket);
1530       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Lowest bucket: %d, find_current_bucket: %d, peer resides in bucket: %d\n", my_short_id, "DHT", lowest_bucket, find_current_bucket(&peer->id.hashPubKey), find_bucket_by_peer(peer));
1531     }
1532   GNUNET_assert(peer_pos != NULL);
1533 #endif
1534   remove_peer(peer, bucket); /* First remove the peer from its bucket */
1535
1536   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
1537     GNUNET_SCHEDULER_cancel(peer->send_task);
1538   if ((peer->th != NULL) && (coreAPI != NULL))
1539     GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
1540
1541   pos = peer->head;
1542   while (pos != NULL) /* Remove any pending messages for this peer */
1543     {
1544       next = pos->next;
1545       GNUNET_free(pos);
1546       pos = next;
1547     }
1548
1549   GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey));
1550   GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer));
1551   GNUNET_free(peer);
1552 }
1553
1554
1555 /**
1556  * Iterator over hash map entries.
1557  *
1558  * @param cls closure
1559  * @param key current key code
1560  * @param value PeerInfo of the peer to move to new lowest bucket
1561  * @return GNUNET_YES if we should continue to
1562  *         iterate,
1563  *         GNUNET_NO if not.
1564  */
1565 static int move_lowest_bucket (void *cls,
1566                                const GNUNET_HashCode * key,
1567                                void *value)
1568 {
1569   struct PeerInfo *peer = value;
1570   int new_bucket;
1571
1572   GNUNET_assert(lowest_bucket > 0);
1573   new_bucket = lowest_bucket - 1;
1574   remove_peer(peer, lowest_bucket);
1575   GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
1576                                     k_buckets[new_bucket].tail,
1577                                     k_buckets[new_bucket].tail,
1578                                     peer);
1579   k_buckets[new_bucket].peers_size++;
1580   return GNUNET_YES;
1581 }
1582
1583
1584 /**
1585  * The current lowest bucket is full, so change the lowest
1586  * bucket to the next lower down, and move any appropriate
1587  * entries in the current lowest bucket to the new bucket.
1588  */
1589 static void enable_next_bucket()
1590 {
1591   struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1592   struct PeerInfo *pos;
1593   GNUNET_assert(lowest_bucket > 0);
1594   to_remove = GNUNET_CONTAINER_multihashmap_create(bucket_size);
1595   pos = k_buckets[lowest_bucket].head;
1596
1597 #if PRINT_TABLES
1598   fprintf(stderr, "Printing RT before new bucket\n");
1599   print_routing_table();
1600 #endif
1601   /* Populate the array of peers which should be in the next lowest bucket */
1602   while (pos != NULL)
1603     {
1604       if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
1605         GNUNET_CONTAINER_multihashmap_put(to_remove, &pos->id.hashPubKey, pos, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1606       pos = pos->next;
1607     }
1608
1609   /* Remove peers from lowest bucket, insert into next lowest bucket */
1610   GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL);
1611   GNUNET_CONTAINER_multihashmap_destroy(to_remove);
1612   lowest_bucket = lowest_bucket - 1;
1613 #if PRINT_TABLES
1614   fprintf(stderr, "Printing RT after new bucket\n");
1615   print_routing_table();
1616 #endif
1617 }
1618
1619 /**
1620  * Find the closest peer in our routing table to the
1621  * given hashcode.
1622  *
1623  * @return The closest peer in our routing table to the
1624  *         key, or NULL on error.
1625  */
1626 static struct PeerInfo *
1627 find_closest_peer (const GNUNET_HashCode *hc)
1628 {
1629   struct PeerInfo *pos;
1630   struct PeerInfo *current_closest;
1631   unsigned int lowest_distance;
1632   unsigned int temp_distance;
1633   int bucket;
1634   int count;
1635
1636   lowest_distance = -1;
1637
1638   if (k_buckets[lowest_bucket].peers_size == 0)
1639     return NULL;
1640
1641   current_closest = NULL;
1642   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1643     {
1644       pos = k_buckets[bucket].head;
1645       count = 0;
1646       while ((pos != NULL) && (count < bucket_size))
1647         {
1648           temp_distance = distance(&pos->id.hashPubKey, hc);
1649           if (temp_distance <= lowest_distance)
1650             {
1651               lowest_distance = temp_distance;
1652               current_closest = pos;
1653             }
1654           pos = pos->next;
1655           count++;
1656         }
1657     }
1658   GNUNET_assert(current_closest != NULL);
1659   return current_closest;
1660 }
1661
1662
1663 /**
1664  * Function called to send a request out to another peer.
1665  * Called both for locally initiated requests and those
1666  * received from other peers.
1667  *
1668  * @param msg the encapsulated message
1669  * @param peer the peer to forward the message to
1670  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1671  */
1672 static void forward_message (const struct GNUNET_MessageHeader *msg,
1673                              struct PeerInfo *peer,
1674                              struct DHT_MessageContext *msg_ctx)
1675 {
1676   struct GNUNET_DHT_P2PRouteMessage *route_message;
1677   struct P2PPendingMessage *pending;
1678   size_t msize;
1679   size_t psize;
1680
1681   increment_stats(STAT_ROUTE_FORWARDS);
1682   GNUNET_assert(peer != NULL);
1683   if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(&msg_ctx->key)))
1684     increment_stats(STAT_ROUTE_FORWARDS_CLOSEST);
1685
1686   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1687   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1688   psize = sizeof(struct P2PPendingMessage) + msize;
1689   pending = GNUNET_malloc(psize);
1690   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1691   pending->importance = msg_ctx->importance;
1692   pending->timeout = msg_ctx->timeout;
1693   route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1694   route_message->header.size = htons(msize);
1695   route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1696   route_message->options = htonl(msg_ctx->msg_options);
1697   route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1698   route_message->network_size = htonl(msg_ctx->network_size);
1699   route_message->desired_replication_level = htonl(msg_ctx->replication);
1700   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1701   if (msg_ctx->bloom != NULL)
1702     GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1703   memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
1704   memcpy(&route_message[1], msg, ntohs(msg->size));
1705 #if DEBUG_DHT > 1
1706   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1707 #endif
1708   peer->pending_count++;
1709   increment_stats("# pending messages scheduled");
1710   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1711   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1712     peer->send_task = GNUNET_SCHEDULER_add_now(&try_core_send, peer);
1713 }
1714
1715 #if DO_PING
1716 /**
1717  * Task used to send ping messages to peers so that
1718  * they don't get disconnected.
1719  *
1720  * @param cls the peer to send a ping message to
1721  * @param tc context, reason, etc.
1722  */
1723 static void
1724 periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1725 {
1726   struct PeerInfo *peer = cls;
1727   struct GNUNET_MessageHeader ping_message;
1728   struct DHT_MessageContext msg_ctx;
1729
1730   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1731     return;
1732
1733   ping_message.size = htons(sizeof(struct GNUNET_MessageHeader));
1734   ping_message.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_PING);
1735
1736   memset(&msg_ctx, 0, sizeof(struct DHT_MessageContext));
1737 #if DEBUG_PING
1738   GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n", my_short_id, "DHT", GNUNET_i2s(&peer->id));
1739 #endif
1740   forward_message(&ping_message, peer, &msg_ctx);
1741   peer->ping_task = GNUNET_SCHEDULER_add_delayed(DHT_DEFAULT_PING_DELAY, &periodic_ping_task, peer);
1742 }
1743
1744 /**
1745  * Schedule PING messages for the top X peers in each
1746  * bucket of the routing table (so core won't disconnect them!)
1747  */
1748 void schedule_ping_messages()
1749 {
1750   unsigned int bucket;
1751   unsigned int count;
1752   struct PeerInfo *pos;
1753   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1754     {
1755       pos = k_buckets[bucket].head;
1756       count = 0;
1757       while (pos != NULL)
1758         {
1759           if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK))
1760             GNUNET_SCHEDULER_add_now(&periodic_ping_task, pos);
1761           else if ((count >= bucket_size) && (pos->ping_task != GNUNET_SCHEDULER_NO_TASK))
1762             {
1763               GNUNET_SCHEDULER_cancel(pos->ping_task);
1764               pos->ping_task = GNUNET_SCHEDULER_NO_TASK;
1765             }
1766           pos = pos->next;
1767           count++;
1768         }
1769     }
1770 }
1771 #endif
1772
1773 /**
1774  * Attempt to add a peer to our k-buckets.
1775  *
1776  * @param peer the peer identity of the peer being added
1777  * @param bucket the bucket that we want this peer to go in
1778  * @param latency transport latency of this peer
1779  * @param distance transport distance to this peer
1780  *
1781  * @return NULL if the peer was not added,
1782  *         pointer to PeerInfo for new peer otherwise
1783  */
1784 static struct PeerInfo *
1785 try_add_peer(const struct GNUNET_PeerIdentity *peer,
1786              unsigned int bucket,
1787              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1788 {
1789   int peer_bucket;
1790   struct PeerInfo *new_peer;
1791
1792   if (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity)))
1793     return NULL;
1794
1795   peer_bucket = find_current_bucket(&peer->hashPubKey);
1796
1797   GNUNET_assert(peer_bucket >= lowest_bucket);
1798   new_peer = add_peer(peer, peer_bucket, atsi);
1799
1800   if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
1801     enable_next_bucket();
1802 #if DO_PING
1803   schedule_ping_messages();
1804 #endif
1805   return new_peer;
1806 }
1807
1808
1809 /**
1810  * Task run to check for messages that need to be sent to a client.
1811  *
1812  * @param client a ClientList, containing the client and any messages to be sent to it
1813  */
1814 static void
1815 process_pending_messages (struct ClientList *client)
1816
1817   if (client->pending_head == NULL) 
1818     return;    
1819   if (client->transmit_handle != NULL) 
1820     return;
1821
1822   client->transmit_handle =
1823     GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1824                                          ntohs (client->pending_head->msg->
1825                                                 size),
1826                                          GNUNET_TIME_UNIT_FOREVER_REL,
1827                                          &send_generic_reply, client);
1828 }
1829
1830 /**
1831  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1832  * request.  A ClientList is passed as closure, take the head of the list
1833  * and copy it into buf, which has the result of sending the message to the
1834  * client.
1835  *
1836  * @param cls closure to this call
1837  * @param size maximum number of bytes available to send
1838  * @param buf where to copy the actual message to
1839  *
1840  * @return the number of bytes actually copied, 0 indicates failure
1841  */
1842 static size_t
1843 send_generic_reply (void *cls, size_t size, void *buf)
1844 {
1845   struct ClientList *client = cls;
1846   char *cbuf = buf;
1847   struct PendingMessage *reply;
1848   size_t off;
1849   size_t msize;
1850
1851   client->transmit_handle = NULL;
1852   if (buf == NULL)             
1853     {
1854       /* client disconnected */
1855       return 0;
1856     }
1857   off = 0;
1858   while ( (NULL != (reply = client->pending_head)) &&
1859           (size >= off + (msize = ntohs (reply->msg->size))))
1860     {
1861       GNUNET_CONTAINER_DLL_remove (client->pending_head,
1862                                    client->pending_tail,
1863                                    reply);
1864       memcpy (&cbuf[off], reply->msg, msize);
1865       GNUNET_free (reply);
1866       off += msize;
1867     }
1868   process_pending_messages (client);
1869 #if DEBUG_DHT
1870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871               "Transmitted %u bytes of replies to client\n",
1872               (unsigned int) off);
1873 #endif
1874   return off;
1875 }
1876
1877
1878 /**
1879  * Add a PendingMessage to the clients list of messages to be sent
1880  *
1881  * @param client the active client to send the message to
1882  * @param pending_message the actual message to send
1883  */
1884 static void
1885 add_pending_message (struct ClientList *client,
1886                      struct PendingMessage *pending_message)
1887 {
1888   GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
1889                                      client->pending_tail,
1890                                      client->pending_tail,
1891                                      pending_message);
1892   process_pending_messages (client);
1893 }
1894
1895
1896 /**
1897  * Called when a reply needs to be sent to a client, as
1898  * a result it found to a GET or FIND PEER request.
1899  *
1900  * @param client the client to send the reply to
1901  * @param message the encapsulated message to send
1902  * @param uid the unique identifier of this request
1903  */
1904 static void
1905 send_reply_to_client (struct ClientList *client,
1906                       const struct GNUNET_MessageHeader *message,
1907                       unsigned long long uid,
1908                       const GNUNET_HashCode *key)
1909 {
1910   struct GNUNET_DHT_RouteResultMessage *reply;
1911   struct PendingMessage *pending_message;
1912   uint16_t msize;
1913   size_t tsize;
1914 #if DEBUG_DHT
1915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1916               "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
1917 #endif
1918   msize = ntohs (message->size);
1919   tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
1920   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1921     {
1922       GNUNET_break_op (0);
1923       return;
1924     }
1925   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1926   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
1927   reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
1928   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1929   reply->header.size = htons (tsize);
1930   reply->put_path_length = htons(0); /* FIXME: implement */
1931   reply->get_path_length = htons(0); /* FIXME: implement */
1932   reply->unique_id = GNUNET_htonll (uid);
1933   reply->key = *key;
1934   memcpy (&reply[1], message, msize);
1935   add_pending_message (client, pending_message);
1936 }
1937
1938 /**
1939  * Consider whether or not we would like to have this peer added to
1940  * our routing table.  Check whether bucket for this peer is full,
1941  * if so return negative; if not return positive.  Since peers are
1942  * only added on CORE level connect, this doesn't actually add the
1943  * peer to the routing table.
1944  *
1945  * @param peer the peer we are considering adding
1946  *
1947  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1948  *         already full)
1949  */
1950 static int consider_peer (struct GNUNET_PeerIdentity *peer)
1951 {
1952   int bucket;
1953
1954   if ((GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey)) || (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity))))
1955     return GNUNET_NO; /* We already know this peer (are connected even!) */
1956   bucket = find_current_bucket(&peer->hashPubKey);
1957
1958   if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1959     return GNUNET_YES;
1960
1961   return GNUNET_NO;
1962 }
1963
1964 /**
1965  * Main function that handles whether or not to route a result
1966  * message to other peers, or to send to our local client.
1967  *
1968  * @param msg the result message to be routed
1969  * @param msg_ctx context of the message we are routing
1970  *
1971  * @return the number of peers the message was routed to,
1972  *         GNUNET_SYSERR on failure
1973  */
1974 static int route_result_message(struct GNUNET_MessageHeader *msg,
1975                                 struct DHT_MessageContext *msg_ctx)
1976 {
1977   struct GNUNET_PeerIdentity new_peer;
1978   struct DHTQueryRecord *record;
1979   struct DHTRouteSource *pos;
1980   struct PeerInfo *peer_info;
1981   const struct GNUNET_MessageHeader *hello_msg;
1982
1983   increment_stats(STAT_RESULTS);
1984   /**
1985    * If a find peer result message is received and contains a valid
1986    * HELLO for another peer, offer it to the transport service.
1987    */
1988   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1989     {
1990       if (ntohs(msg->size) <= sizeof(struct GNUNET_MessageHeader))
1991         GNUNET_break_op(0);
1992
1993       hello_msg = &msg[1];
1994       if ((ntohs(hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) || (GNUNET_SYSERR == GNUNET_HELLO_get_id((const struct GNUNET_HELLO_Message *)hello_msg, &new_peer)))
1995       {
1996         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received non-HELLO message type in find peer result message!\n", my_short_id, "DHT");
1997         GNUNET_break_op(0);
1998         return GNUNET_NO;
1999       }
2000       else /* We have a valid hello, and peer id stored in new_peer */
2001       {
2002         find_peer_context.count++;
2003         increment_stats(STAT_FIND_PEER_REPLY);
2004         if (GNUNET_YES == consider_peer(&new_peer))
2005         {
2006           increment_stats(STAT_HELLOS_PROVIDED);
2007           GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg);
2008           GNUNET_CORE_peer_request_connect(coreAPI, 
2009                                            GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5), &new_peer, NULL, NULL);
2010         }
2011       }
2012     }
2013
2014   if (malicious_dropper == GNUNET_YES)
2015     record = NULL;
2016   else
2017     record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key);
2018
2019   if (record == NULL) /* No record of this message! */
2020     {
2021 #if DEBUG_DHT
2022     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023                 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
2024                 "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id);
2025 #endif
2026 #if DEBUG_DHT_ROUTING
2027       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2028         {
2029           dhtlog_handle->insert_route (NULL,
2030                                        msg_ctx->unique_id,
2031                                        DHTLOG_RESULT,
2032                                        msg_ctx->hop_count,
2033                                        GNUNET_SYSERR,
2034                                        &my_identity,
2035                                        &msg_ctx->key,
2036                                        msg_ctx->peer, NULL);
2037         }
2038 #endif
2039       if (msg_ctx->bloom != NULL)
2040         {
2041           GNUNET_CONTAINER_bloomfilter_free(msg_ctx->bloom);
2042           msg_ctx->bloom = NULL;
2043         }
2044       return 0;
2045     }
2046
2047   pos = record->head;
2048   while (pos != NULL)
2049     {
2050 #if STRICT_FORWARDING
2051       if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) /* If we have already forwarded this peer id, don't do it again! */
2052         {
2053           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pos->find_peers_responded, &new_peer.hashPubKey))
2054           {
2055             increment_stats("# find peer responses NOT forwarded (bloom match)");
2056             pos = pos->next;
2057             continue;
2058           }
2059           else
2060             GNUNET_CONTAINER_bloomfilter_add(pos->find_peers_responded, &new_peer.hashPubKey);
2061         }
2062 #endif
2063
2064       if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
2065         {
2066 #if DEBUG_DHT
2067           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068                       "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
2069                       "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id);
2070 #endif
2071 #if DEBUG_DHT_ROUTING
2072           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2073             {
2074               dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
2075                                            msg_ctx->hop_count,
2076                                            GNUNET_YES, &my_identity, &msg_ctx->key,
2077                                            msg_ctx->peer, NULL);
2078             }
2079 #endif
2080           increment_stats(STAT_RESULTS_TO_CLIENT);
2081           if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
2082             increment_stats(STAT_GET_REPLY);
2083
2084           send_reply_to_client(pos->client, msg, 
2085                                msg_ctx->unique_id,
2086                                &msg_ctx->key);
2087         }
2088       else /* Send to peer */
2089         {
2090           peer_info = find_peer_by_id(&pos->source);
2091           if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
2092             {
2093               pos = pos->next;
2094               continue;
2095             }
2096
2097           if (msg_ctx->bloom == NULL)
2098             msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2099           GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom, &my_identity.hashPubKey);
2100           if ((GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (msg_ctx->bloom, &peer_info->id.hashPubKey)))
2101             {
2102 #if DEBUG_DHT
2103               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104                           "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
2105                           "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id, GNUNET_i2s(&peer_info->id));
2106 #endif
2107 #if DEBUG_DHT_ROUTING
2108               if ((debug_routes_extended) && (dhtlog_handle != NULL))
2109                 {
2110                   dhtlog_handle->insert_route (NULL, msg_ctx->unique_id,
2111                                                DHTLOG_RESULT,
2112                                                msg_ctx->hop_count,
2113                                                GNUNET_NO, &my_identity, &msg_ctx->key,
2114                                                msg_ctx->peer, &pos->source);
2115                 }
2116 #endif
2117               forward_result_message(msg, peer_info, msg_ctx);
2118             }
2119           else
2120             {
2121 #if DEBUG_DHT
2122               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2123                           "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
2124                           "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id, GNUNET_i2s(&peer_info->id));
2125 #endif
2126             }
2127         }
2128       pos = pos->next;
2129     }
2130   if (msg_ctx->bloom != NULL)
2131   {
2132     GNUNET_CONTAINER_bloomfilter_free(msg_ctx->bloom);
2133     msg_ctx->bloom = NULL;
2134   }
2135   return 0;
2136 }
2137
2138 /**
2139  * Iterator for local get request results,
2140  *
2141  * @param cls closure for iterator, a DatacacheGetContext
2142  * @param exp when does this value expire?
2143  * @param key the key this data is stored under
2144  * @param size the size of the data identified by key
2145  * @param data the actual data
2146  * @param type the type of the data
2147  *
2148  * @return GNUNET_OK to continue iteration, anything else
2149  * to stop iteration.
2150  */
2151 static int
2152 datacache_get_iterator (void *cls,
2153                         struct GNUNET_TIME_Absolute exp,
2154                         const GNUNET_HashCode * key,
2155                         size_t size, const char *data, 
2156                         enum GNUNET_BLOCK_Type type)
2157 {
2158   struct DHT_MessageContext *msg_ctx = cls;
2159   struct DHT_MessageContext *new_msg_ctx;
2160   struct GNUNET_DHT_GetResultMessage *get_result;
2161   enum GNUNET_BLOCK_EvaluationResult eval;
2162
2163 #if DEBUG_DHT
2164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2165               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
2166 #endif  
2167   eval = GNUNET_BLOCK_evaluate (block_context,
2168                                 type,
2169                                 key,
2170                                 &msg_ctx->reply_bf,
2171                                 msg_ctx->reply_bf_mutator,
2172                                 msg_ctx->xquery,
2173                                 msg_ctx->xquery_size,
2174                                 data,
2175                                 size);
2176   switch (eval)
2177     {
2178     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2179       msg_ctx->do_forward = GNUNET_NO;
2180     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2181       new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
2182       memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
2183       get_result =
2184         GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
2185       get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2186       get_result->header.size =
2187         htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
2188       get_result->expiration = GNUNET_TIME_absolute_hton(exp);
2189       get_result->type = htons (type);
2190       memcpy (&get_result[1], data, size);
2191       new_msg_ctx->peer = &my_identity;
2192       new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2193       new_msg_ctx->hop_count = 0;
2194       new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
2195       new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2196       increment_stats(STAT_GET_RESPONSE_START);
2197       route_result_message(&get_result->header, new_msg_ctx);
2198       GNUNET_free(new_msg_ctx);
2199       GNUNET_free (get_result);
2200       break;
2201     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2202 #if DEBUG_DHT
2203       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2204                   "`%s:%s': Duplicate block error\n", my_short_id, "DHT");
2205 #endif
2206       break;
2207     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2208 #if DEBUG_DHT
2209       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2210                   "`%s:%s': Invalid request error\n", my_short_id, "DHT");
2211 #endif
2212       break;
2213     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2214 #if DEBUG_DHT
2215       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2216                   "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
2217 #endif
2218       GNUNET_break (0);
2219       break;
2220     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2221       GNUNET_break_op (0);
2222       msg_ctx->do_forward = GNUNET_NO;
2223       break;
2224     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2225 #if DEBUG_DHT
2226       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2227                   "`%s:%s': Unsupported block type (%u) in response!\n", my_short_id, "DHT", type);
2228 #endif
2229       /* msg_ctx->do_forward = GNUNET_NO;  // not sure... */
2230       break;
2231     }  
2232   return GNUNET_OK;
2233 }
2234
2235
2236 /**
2237  * Main function that handles whether or not to route a message to other
2238  * peers.
2239  *
2240  * @param msg the message to be routed
2241  * @param msg_ctx the context containing all pertinent information about the message
2242  */
2243 static void
2244 route_message(const struct GNUNET_MessageHeader *msg,
2245                struct DHT_MessageContext *msg_ctx);
2246
2247
2248 /**
2249  * Server handler for all dht get requests, look for data,
2250  * if found, send response either to clients or other peers.
2251  *
2252  * @param msg the actual get message
2253  * @param msg_ctx struct containing pertinent information about the get request
2254  *
2255  * @return number of items found for GET request
2256  */
2257 static unsigned int
2258 handle_dht_get (const struct GNUNET_MessageHeader *msg,
2259                 struct DHT_MessageContext *msg_ctx)
2260 {
2261   const struct GNUNET_DHT_GetMessage *get_msg;
2262   uint16_t msize;
2263   uint16_t bf_size;
2264   unsigned int results;
2265   const char *end;
2266   enum GNUNET_BLOCK_Type type;
2267
2268   msize = ntohs (msg->size);
2269   if (msize < sizeof (struct GNUNET_DHT_GetMessage))
2270     {
2271       GNUNET_break (0);
2272       return 0;
2273     }
2274   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
2275   bf_size = ntohs (get_msg->bf_size);
2276   msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
2277   msg_ctx->reply_bf_mutator = get_msg->bf_mutator; /* FIXME: ntohl? */
2278   if (msize != sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
2279     {
2280       GNUNET_break (0);
2281       return 0;
2282     }
2283   end = (const char*) &get_msg[1];
2284   if (msg_ctx->xquery_size == 0)
2285     {
2286       msg_ctx->xquery = NULL;
2287     }
2288   else
2289     {
2290       msg_ctx->xquery = (const void*) end;
2291       end += msg_ctx->xquery_size;
2292     }
2293   if (bf_size == 0)
2294     {
2295       msg_ctx->reply_bf = NULL;
2296     }
2297   else
2298     {
2299       msg_ctx->reply_bf = GNUNET_CONTAINER_bloomfilter_init (end,
2300                                                                      bf_size,
2301                                                                      GNUNET_DHT_GET_BLOOMFILTER_K);
2302     }
2303   type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
2304 #if DEBUG_DHT
2305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306               "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
2307               my_short_id,
2308               "DHT", "GET", 
2309               type,
2310               GNUNET_h2s (&msg_ctx->key),
2311               msg_ctx->unique_id);
2312 #endif
2313   increment_stats(STAT_GETS);
2314   results = 0;
2315 #if HAVE_MALICIOUS
2316   if (type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE)
2317     {
2318       GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
2319       return results;
2320     }
2321 #endif
2322   msg_ctx->do_forward = GNUNET_YES;
2323   if (datacache != NULL)
2324     results
2325       = GNUNET_DATACACHE_get (datacache,
2326                               &msg_ctx->key, type,
2327                               &datacache_get_iterator,
2328                               msg_ctx);
2329 #if DEBUG_DHT
2330       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2331                   "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
2332                   results, "GET", msg_ctx->unique_id);
2333 #endif
2334   if (results >= 1)
2335     {
2336 #if DEBUG_DHT_ROUTING
2337       if ((debug_routes) && (dhtlog_handle != NULL))
2338         {
2339           dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_GET,
2340                                 msg_ctx->hop_count, GNUNET_YES, &my_identity,
2341                                 &msg_ctx->key);
2342         }
2343
2344       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2345         {
2346           dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2347                                        msg_ctx->hop_count, GNUNET_YES,
2348                                        &my_identity, &msg_ctx->key, msg_ctx->peer,
2349                                        NULL);
2350         }
2351 #endif
2352     }
2353   else
2354     {
2355       /* check query valid */
2356       if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID
2357           == GNUNET_BLOCK_evaluate (block_context,
2358                                     type,
2359                                     &msg_ctx->key,
2360                                     &msg_ctx->reply_bf,
2361                                     msg_ctx->reply_bf_mutator,
2362                                     msg_ctx->xquery,
2363                                     msg_ctx->xquery_size,
2364                                     NULL, 0))
2365         {
2366           GNUNET_break_op (0);
2367           msg_ctx->do_forward = GNUNET_NO;
2368         }
2369     }
2370
2371   if (msg_ctx->hop_count == 0) /* Locally initiated request */
2372     {
2373 #if DEBUG_DHT_ROUTING
2374     if ((debug_routes) && (dhtlog_handle != NULL))
2375       {
2376         dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_GET,
2377                                       msg_ctx->hop_count, GNUNET_NO, &my_identity,
2378                                       &msg_ctx->key);
2379       }
2380 #endif
2381     }
2382   if (msg_ctx->do_forward == GNUNET_YES)
2383     route_message (msg, msg_ctx);
2384   GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
2385   return results;
2386 }
2387
2388 static void
2389 remove_recent_find_peer(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2390 {
2391   GNUNET_HashCode *key = cls;
2392   
2393   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent_find_peer_requests, key, NULL));
2394   GNUNET_free (key);
2395 }
2396
2397 /**
2398  * Server handler for initiating local dht find peer requests
2399  *
2400  * @param find_msg the actual find peer message
2401  * @param msg_ctx struct containing pertinent information about the request
2402  *
2403  */
2404 static void
2405 handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
2406                       struct DHT_MessageContext *msg_ctx)
2407 {
2408   struct GNUNET_MessageHeader *find_peer_result;
2409   struct GNUNET_DHT_FindPeerMessage *find_peer_message;
2410   struct DHT_MessageContext *new_msg_ctx;
2411   struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
2412   size_t hello_size;
2413   size_t tsize;
2414   GNUNET_HashCode *recent_hash;
2415   struct GNUNET_MessageHeader *other_hello;
2416   size_t other_hello_size;
2417   struct GNUNET_PeerIdentity peer_id;
2418
2419   find_peer_message = (struct GNUNET_DHT_FindPeerMessage *)find_msg;
2420   GNUNET_break_op(ntohs(find_msg->size) >= (sizeof(struct GNUNET_DHT_FindPeerMessage)));
2421   if (ntohs(find_msg->size) < sizeof(struct GNUNET_DHT_FindPeerMessage))
2422     return;
2423   other_hello = NULL;
2424   other_hello_size = 0;
2425   if (ntohs(find_msg->size) > sizeof(struct GNUNET_DHT_FindPeerMessage))
2426     {
2427       other_hello_size = ntohs(find_msg->size) - sizeof(struct GNUNET_DHT_FindPeerMessage);
2428       other_hello = GNUNET_malloc(other_hello_size);
2429       memcpy(other_hello, &find_peer_message[1], other_hello_size);
2430       if ((GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)other_hello) == 0) || (GNUNET_OK != GNUNET_HELLO_get_id((struct GNUNET_HELLO_Message *)other_hello, &peer_id)))
2431         {
2432           GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Received invalid HELLO message in find peer request!\n");
2433           GNUNET_free(other_hello);
2434           return;
2435         }
2436 #if FIND_PEER_WITH_HELLO
2437       if (GNUNET_YES == consider_peer(&peer_id))
2438         {
2439           increment_stats(STAT_HELLOS_PROVIDED);
2440           GNUNET_TRANSPORT_offer_hello(transport_handle, other_hello);
2441           GNUNET_CORE_peer_request_connect(coreAPI, 
2442                                            GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5), &peer_id, NULL, NULL);
2443           route_message (find_msg, msg_ctx);
2444           GNUNET_free (other_hello);
2445           return;
2446         }
2447       else /* We don't want this peer! */
2448         {
2449           route_message (find_msg, msg_ctx);
2450           GNUNET_free (other_hello);
2451           return;
2452         }
2453 #endif
2454     }
2455
2456 #if DEBUG_DHT
2457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2458               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
2459               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&msg_ctx->key),
2460               ntohs (find_msg->size),
2461               sizeof (struct GNUNET_MessageHeader));
2462 #endif
2463   if (my_hello == NULL)
2464   {
2465 #if DEBUG_DHT
2466     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2467                 "`%s': Our HELLO is null, can't return.\n",
2468                 "DHT");
2469 #endif
2470     GNUNET_free_non_null (other_hello);
2471     route_message (find_msg, msg_ctx);
2472     return;
2473   }
2474
2475   incoming_bloom = GNUNET_CONTAINER_bloomfilter_init(find_peer_message->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2476   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(incoming_bloom, &my_identity.hashPubKey))
2477     {
2478       increment_stats(STAT_BLOOM_FIND_PEER);
2479       GNUNET_CONTAINER_bloomfilter_free(incoming_bloom);
2480       GNUNET_free_non_null(other_hello);
2481       route_message (find_msg, msg_ctx);
2482       return; /* We match the bloomfilter, do not send a response to this peer (they likely already know us!)*/
2483     }
2484   GNUNET_CONTAINER_bloomfilter_free(incoming_bloom);
2485
2486 #if RESTRICT_FIND_PEER
2487
2488   /**
2489    * Ignore any find peer requests from a peer we have seen very recently.
2490    */
2491   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent_find_peer_requests, &msg_ctx->key)) /* We have recently responded to a find peer request for this peer! */
2492   {
2493     increment_stats("# dht find peer requests ignored (recently seen!)");
2494     GNUNET_free_non_null(other_hello);
2495     return;
2496   }
2497
2498   /**
2499    * Use this check to only allow the peer to respond to find peer requests if
2500    * it would be beneficial to have the requesting peer in this peers routing
2501    * table.  Can be used to thwart peers flooding the network with find peer
2502    * requests that we don't care about.  However, if a new peer is joining
2503    * the network and has no other peers this is a problem (assume all buckets
2504    * full, no one will respond!).
2505    */
2506   memcpy(&peer_id.hashPubKey, &msg_ctx->key, sizeof(GNUNET_HashCode));
2507   if (GNUNET_NO == consider_peer(&peer_id))
2508     {
2509       increment_stats("# dht find peer requests ignored (do not need!)");
2510       GNUNET_free_non_null(other_hello);
2511       route_message (find_msg, msg_ctx);
2512       return;
2513     }
2514 #endif
2515
2516   recent_hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
2517   memcpy(recent_hash, &msg_ctx->key, sizeof(GNUNET_HashCode));
2518   if (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests,
2519                                      &msg_ctx->key, NULL,
2520                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
2521     {
2522       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Adding recent remove task for key `%s`!\n", GNUNET_h2s(&msg_ctx->key));
2523       /* Only add a task if there wasn't one for this key already! */
2524       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30),
2525                                     &remove_recent_find_peer, recent_hash);
2526     }
2527   else
2528     {
2529       GNUNET_free(recent_hash);
2530       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received duplicate find peer request too soon!\n");
2531     }
2532
2533   /* Simplistic find_peer functionality, always return our hello */
2534   hello_size = ntohs(my_hello->size);
2535   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
2536
2537   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2538     {
2539       GNUNET_break_op (0);
2540       GNUNET_free_non_null(other_hello);
2541       return;
2542     }
2543
2544   find_peer_result = GNUNET_malloc (tsize);
2545   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
2546   find_peer_result->size = htons (tsize);
2547   memcpy (&find_peer_result[1], my_hello, hello_size);
2548
2549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2550               "`%s': Sending hello size %d to requesting peer.\n",
2551               "DHT", hello_size);
2552
2553   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
2554   memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
2555   new_msg_ctx->peer = &my_identity;
2556   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2557   new_msg_ctx->hop_count = 0;
2558   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make find peer requests a higher priority */
2559   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2560   increment_stats(STAT_FIND_PEER_ANSWER);
2561   route_result_message(find_peer_result, new_msg_ctx);
2562   GNUNET_free(new_msg_ctx);
2563 #if DEBUG_DHT_ROUTING
2564   if ((debug_routes) && (dhtlog_handle != NULL))
2565     {
2566       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_FIND_PEER,
2567                                    msg_ctx->hop_count, GNUNET_YES, &my_identity,
2568                                    &msg_ctx->key);
2569     }
2570 #endif
2571   GNUNET_free_non_null(other_hello);
2572   GNUNET_free(find_peer_result);
2573   route_message (find_msg, msg_ctx);
2574 }
2575
2576 /**
2577  * Task used to republish data.
2578  * Forward declaration; function call loop.
2579  *
2580  * @param cls closure (a struct RepublishContext)
2581  * @param tc runtime context for this task
2582  */
2583 static void
2584 republish_content(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
2585
2586 /**
2587  * Server handler for initiating local dht put requests
2588  *
2589  * @param msg the actual put message
2590  * @param msg_ctx struct containing pertinent information about the request
2591  */
2592 static void
2593 handle_dht_put (const struct GNUNET_MessageHeader *msg,
2594                 struct DHT_MessageContext *msg_ctx)
2595 {
2596   const struct GNUNET_DHT_PutMessage *put_msg;
2597   enum GNUNET_BLOCK_Type put_type;
2598   size_t data_size;
2599   int ret;
2600   struct RepublishContext *put_context;
2601   GNUNET_HashCode key;
2602
2603   GNUNET_assert (ntohs (msg->size) >=
2604                  sizeof (struct GNUNET_DHT_PutMessage));
2605
2606
2607   put_msg = (const struct GNUNET_DHT_PutMessage *)msg;
2608   put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
2609 #if HAVE_MALICIOUS
2610   if (put_type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE)
2611     {
2612 #if DEBUG_DHT_ROUTING
2613       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2614         {
2615           /** Log routes that die due to high load! */
2616           dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2617                                        msg_ctx->hop_count, GNUNET_SYSERR,
2618                                        &my_identity, &msg_ctx->key, msg_ctx->peer,
2619                                        NULL);
2620         }
2621 #endif
2622       return;
2623     }
2624 #endif
2625   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
2626   ret = GNUNET_BLOCK_get_key (block_context,
2627                               put_type,
2628                               &put_msg[1],
2629                               data_size,
2630                               &key);
2631   if (GNUNET_NO == ret)
2632     {
2633 #if DEBUG_DHT_ROUTING
2634       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2635         {
2636           /** Log routes that die due to high load! */
2637           dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2638                                        msg_ctx->hop_count, GNUNET_SYSERR,
2639                                        &my_identity, &msg_ctx->key, msg_ctx->peer,
2640                                        NULL);
2641         }
2642 #endif
2643       /* invalid reply */
2644       GNUNET_break_op (0);
2645       return;
2646     }
2647   if ( (GNUNET_YES == ret) &&
2648        (0 != memcmp (&key,
2649                      &msg_ctx->key,
2650                      sizeof (GNUNET_HashCode))) )
2651     {
2652 #if DEBUG_DHT_ROUTING
2653       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2654         {
2655           /** Log routes that die due to high load! */
2656           dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2657                                        msg_ctx->hop_count, GNUNET_SYSERR,
2658                                        &my_identity, &msg_ctx->key, msg_ctx->peer,
2659                                        NULL);
2660         }
2661 #endif
2662       /* invalid wrapper: key mismatch! */
2663       GNUNET_break_op (0);
2664       return;
2665     }
2666   /* ret == GNUNET_SYSERR means that there is no known relationship between
2667      data and the key, so we cannot check it */
2668 #if DEBUG_DHT
2669   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2671               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id);
2672 #endif
2673 #if DEBUG_DHT_ROUTING
2674   if (msg_ctx->hop_count == 0) /* Locally initiated request */
2675     {
2676       if ((debug_routes) && (dhtlog_handle != NULL))
2677         {
2678           dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_PUT,
2679                                        msg_ctx->hop_count, GNUNET_NO, &my_identity,
2680                                        &msg_ctx->key);
2681         }
2682     }
2683 #endif
2684
2685   if (msg_ctx->closest != GNUNET_YES)
2686     {
2687       route_message (msg, msg_ctx);
2688       return;
2689     }
2690
2691 #if DEBUG_DHT
2692   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2693               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2694               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id);
2695 #endif
2696
2697 #if DEBUG_DHT_ROUTING
2698   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2699     {
2700       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2701                                    msg_ctx->hop_count, GNUNET_YES,
2702                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
2703                                    NULL);
2704     }
2705
2706   if ((debug_routes) && (dhtlog_handle != NULL))
2707     {
2708       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_PUT,
2709                                    msg_ctx->hop_count, GNUNET_YES, &my_identity,
2710                                    &msg_ctx->key);
2711     }
2712 #endif
2713
2714   increment_stats(STAT_PUTS_INSERTED);
2715   if (datacache != NULL)
2716     {
2717       ret = GNUNET_DATACACHE_put (datacache, &msg_ctx->key, data_size,
2718                                   (char *) &put_msg[1], put_type,
2719                                   GNUNET_TIME_absolute_ntoh(put_msg->expiration));
2720
2721       if ((ret == GNUNET_YES) && (do_republish == GNUNET_YES))
2722         {
2723           put_context = GNUNET_malloc(sizeof(struct RepublishContext));
2724           memcpy(&put_context->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
2725           put_context->type = put_type;
2726           GNUNET_SCHEDULER_add_delayed (dht_republish_frequency, &republish_content, put_context);
2727         }
2728     }
2729   else
2730     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2731                 "`%s:%s': %s request received, but have no datacache!\n",
2732                 my_short_id, "DHT", "PUT");
2733
2734   if (stop_on_closest == GNUNET_NO)
2735     route_message (msg, msg_ctx);
2736 }
2737
2738 /**
2739  * Estimate the diameter of the network based
2740  * on how many buckets are currently in use.
2741  * Concept here is that the diameter of the network
2742  * is roughly the distance a message must travel in
2743  * order to reach its intended destination.  Since
2744  * at each hop we expect to get one bit closer, and
2745  * we have one bit per bucket, the number of buckets
2746  * in use should be the largest number of hops for
2747  * a successful message. (of course, this assumes we
2748  * know all peers in the network!)
2749  *
2750  * @return ballpark diameter figure
2751  */
2752 static unsigned int estimate_diameter()
2753 {
2754   return MAX_BUCKETS - lowest_bucket;
2755 }
2756
2757 /**
2758  * To how many peers should we (on average)
2759  * forward the request to obtain the desired
2760  * target_replication count (on average).
2761  *
2762  * returns: target_replication / (est. hops) + (target_replication * hop_count)
2763  * where est. hops is typically 2 * the routing table depth
2764  *
2765  * @param hop_count number of hops the message has traversed
2766  * @param target_replication the number of total paths desired
2767  *
2768  * @return Some number of peers to forward the message to
2769  */
2770 static unsigned int
2771 get_forward_count (unsigned int hop_count, size_t target_replication)
2772 {
2773   uint32_t random_value;
2774   unsigned int forward_count;
2775   float target_value;
2776   unsigned int diameter;
2777
2778   diameter = estimate_diameter ();
2779
2780   if (GNUNET_NO == use_max_hops)
2781     max_hops = (diameter + 1) * 2;
2782
2783   /**
2784    * If we are behaving in strict kademlia mode, send multiple initial requests,
2785    * but then only send to 1 or 0 peers based strictly on the number of hops.
2786    */
2787   if (strict_kademlia == GNUNET_YES)
2788     {
2789       if (hop_count == 0)
2790         return kademlia_replication;
2791       else if (hop_count < max_hops)
2792         return 1;
2793       else
2794         return 0;
2795     }
2796
2797   /* FIXME: the smaller we think the network is the more lenient we should be for
2798    * routing right?  The estimation below only works if we think we have reasonably
2799    * full routing tables, which for our RR topologies may not be the case!
2800    */
2801   if (hop_count > max_hops)
2802     {
2803 #if DEBUG_DHT
2804       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2805                   "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
2806                   "DHT", estimate_diameter(), lowest_bucket);
2807 #endif
2808       return 0;
2809     }
2810
2811   random_value = 0;
2812   /* FIXME: we use diameter as the expected number of hops, but with randomized routing we will likely route to more! */
2813   target_value = target_replication / (diameter + ((float)target_replication * hop_count));
2814   if (target_value > 1)
2815     return (unsigned int)target_value;
2816   else
2817     random_value = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int)-1);
2818
2819   if (random_value < (target_value * (unsigned int)-1))
2820     forward_count = 2;
2821   else
2822     forward_count = 1;
2823
2824   return forward_count;
2825 }
2826
2827 /*
2828  * Check whether my identity is closer than any known peers.
2829  * If a non-null bloomfilter is given, check if this is the closest
2830  * peer that hasn't already been routed to.
2831  *
2832  * @param target hash code to check closeness to
2833  * @param bloom bloomfilter, exclude these entries from the decision
2834  *
2835  * Return GNUNET_YES if node location is closest, GNUNET_NO
2836  * otherwise.
2837  */
2838 int
2839 am_closest_peer (const GNUNET_HashCode * target, struct GNUNET_CONTAINER_BloomFilter *bloom)
2840 {
2841   int bits;
2842   int other_bits;
2843   int bucket_num;
2844   int count;
2845   struct PeerInfo *pos;
2846   unsigned int my_distance;
2847
2848   if (0 == memcmp(&my_identity.hashPubKey, target, sizeof(GNUNET_HashCode)))
2849     return GNUNET_YES;
2850
2851   bucket_num = find_current_bucket(target);
2852
2853   bits = GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, target);
2854   my_distance = distance(&my_identity.hashPubKey, target);
2855   pos = k_buckets[bucket_num].head;
2856   count = 0;
2857   while ((pos != NULL) && (count < bucket_size))
2858     {
2859       if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey)))
2860         {
2861           pos = pos->next;
2862           continue; /* Skip already checked entries */
2863         }
2864
2865       other_bits = GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target);
2866       if (other_bits > bits)
2867         return GNUNET_NO;
2868       else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
2869         {
2870           if (strict_kademlia != GNUNET_YES) /* Return that we at as close as any other peer */
2871             return GNUNET_YES;
2872           else if (distance(&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
2873             return GNUNET_NO;
2874         }
2875       pos = pos->next;
2876     }
2877
2878   /* No peers closer, we are the closest! */
2879   return GNUNET_YES;
2880 }
2881
2882
2883 /**
2884  * Return this peers adjusted value based on the convergence
2885  * function chosen.  This is the key function for randomized
2886  * routing decisions.
2887  *
2888  * @param target the key of the request
2889  * @param peer the peer we would like the value of
2890  * @param hops number of hops this message has already traveled
2891  *
2892  * @return bit distance from target to peer raised to an exponent
2893  *         adjusted based on the current routing convergence algorithm
2894  *
2895  */
2896 static unsigned long long
2897 converge_distance (const GNUNET_HashCode *target,
2898                    struct PeerInfo *peer,
2899                    unsigned int hops)
2900 {
2901   unsigned long long ret;
2902   unsigned int other_matching_bits;
2903   double base_converge_modifier = .1; /* Value that "looks" good (when plotted), have to start somewhere */
2904   double temp_modifier;
2905   double calc_value;
2906   double exponent;
2907   int curr_max_hops;
2908
2909   if (use_max_hops)
2910     curr_max_hops = max_hops;
2911   else
2912     curr_max_hops = (estimate_diameter() + 1) * 2;
2913
2914   if (converge_modifier > 0)
2915     temp_modifier = converge_modifier * base_converge_modifier;
2916   else
2917     {
2918       temp_modifier = base_converge_modifier;
2919       base_converge_modifier = 0.0;
2920     }
2921
2922   GNUNET_assert(temp_modifier > 0);
2923
2924   other_matching_bits = GNUNET_CRYPTO_hash_matching_bits(target, &peer->id.hashPubKey);
2925
2926   switch (converge_option)
2927     {
2928       case DHT_CONVERGE_RANDOM:
2929         return 1; /* Always return 1, choose equally among all peers */
2930       case DHT_CONVERGE_LINEAR:
2931         calc_value = hops * curr_max_hops * temp_modifier;
2932         break;
2933       case DHT_CONVERGE_SQUARE:
2934         /**
2935          * Simple square based curve.
2936          */
2937         calc_value = (sqrt(hops) / sqrt(curr_max_hops)) * (curr_max_hops / (curr_max_hops * temp_modifier));
2938         break;
2939       case DHT_CONVERGE_EXPONENTIAL:
2940         /**
2941          * Simple exponential curve.
2942          */
2943         if (base_converge_modifier > 0)
2944           calc_value = (temp_modifier * hops * hops) / curr_max_hops;
2945         else
2946           calc_value = (hops * hops) / curr_max_hops;
2947         break;
2948       case DHT_CONVERGE_BINARY:
2949         /**
2950          * If below the cutoff, route randomly (return 1),
2951          * If above the cutoff, return the maximum possible
2952          * value first (always route to closest, because
2953          * they are sorted.)
2954          */
2955
2956         if (hops > converge_modifier) /* Past cutoff */
2957           {
2958             return ULLONG_MAX;
2959           }
2960         /* Fall through */
2961       default:
2962         return 1;
2963     }
2964
2965   /* Take the log (base e) of the number of bits matching the other peer */
2966   exponent = log(other_matching_bits);
2967
2968   /* Check if we would overflow; our largest possible value is 2^64 approx. e^44.361419555836498 */
2969   if (exponent * calc_value >= 44.361419555836498)
2970     return ULLONG_MAX;
2971
2972   /* Clear errno and all math exceptions */
2973   errno = 0;
2974   feclearexcept(FE_ALL_EXCEPT);
2975   ret = (unsigned long long)pow(other_matching_bits, calc_value);
2976   if ((errno != 0) || fetestexcept(FE_INVALID | FE_DIVBYZERO | FE_OVERFLOW |
2977       FE_UNDERFLOW))
2978     {
2979       if (0 != fetestexcept(FE_OVERFLOW))
2980         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "FE_OVERFLOW\n");
2981       if (0 != fetestexcept(FE_INVALID))
2982         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "FE_INVALID\n");
2983       if (0 != fetestexcept(FE_UNDERFLOW))
2984         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "FE_UNDERFLOW\n");
2985       return 0;
2986     }
2987   else
2988     return ret;
2989 }
2990
2991 /**
2992  * Comparison function for two struct PeerInfo's
2993  * which have already had their matching bits to
2994  * some target calculated.
2995  *
2996  * @param p1 a pointer pointer to a struct PeerInfo
2997  * @param p2 a pointer pointer to a struct PeerInfo
2998  *
2999  * @return 0 if equidistant to target,
3000  *        -1 if p1 is closer,
3001  *         1 if p2 is closer
3002  */
3003 static int
3004 compare_peers (const void *p1, const void *p2)
3005 {
3006   struct PeerInfo **first = (struct PeerInfo **)p1;
3007   struct PeerInfo **second = (struct PeerInfo **)p2;
3008
3009   if ((*first)->matching_bits > (*second)->matching_bits)
3010     return -1;
3011   if ((*first)->matching_bits < (*second)->matching_bits)
3012     return 1;
3013   else
3014     return 0;
3015 }
3016
3017
3018 /**
3019  * Select a peer from the routing table that would be a good routing
3020  * destination for sending a message for "target".  The resulting peer
3021  * must not be in the set of blocked peers.<p>
3022  *
3023  * Note that we should not ALWAYS select the closest peer to the
3024  * target, peers further away from the target should be chosen with
3025  * exponentially declining probability.
3026  *
3027  * @param target the key we are selecting a peer to route to
3028  * @param bloom a bloomfilter containing entries this request has seen already
3029  *
3030  * @return Peer to route to, or NULL on error
3031  */
3032 static struct PeerInfo *
3033 select_peer (const GNUNET_HashCode * target,
3034              struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
3035 {
3036   unsigned int bc;
3037   unsigned int i;
3038   unsigned int count;
3039   unsigned int offset;
3040   unsigned int my_matching_bits;
3041   int closest_bucket;
3042   struct PeerInfo *pos;
3043   struct PeerInfo *sorted_closest[bucket_size];
3044   unsigned long long temp_converge_distance;
3045   unsigned long long total_distance;
3046   unsigned long long selected;
3047 #if DEBUG_DHT > 1
3048   unsigned long long stats_total_distance;
3049   double sum;
3050 #endif
3051   /* For kademlia */
3052   unsigned int distance;
3053   unsigned int largest_distance;
3054   struct PeerInfo *chosen;
3055
3056   my_matching_bits = GNUNET_CRYPTO_hash_matching_bits(target, &my_identity.hashPubKey);
3057
3058   total_distance = 0;
3059   if (strict_kademlia == GNUNET_YES)
3060     {
3061       largest_distance = 0;
3062       chosen = NULL;
3063       for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
3064         {
3065           pos = k_buckets[bc].head;
3066           count = 0;
3067           while ((pos != NULL) && (count < bucket_size))
3068             {
3069               /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
3070               if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3071                 {
3072                   distance = inverse_distance (target, &pos->id.hashPubKey);
3073                   if (distance > largest_distance)
3074                     {
3075                       chosen = pos;
3076                       largest_distance = distance;
3077                     }
3078                 }
3079               count++;
3080               pos = pos->next;
3081             }
3082         }
3083
3084       if ((largest_distance > 0) && (chosen != NULL))
3085         {
3086           GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
3087           return chosen;
3088         }
3089       else
3090         {
3091           return NULL;
3092         }
3093     }
3094
3095   /* GNUnet-style */
3096   total_distance = 0;
3097   /* Three steps: order peers in closest bucket (most matching bits).
3098    * Then go over all LOWER buckets (matching same bits we do)
3099    * Then go over all HIGHER buckets (matching less then we do)
3100    */
3101
3102   closest_bucket = find_current_bucket(target);
3103   GNUNET_assert(closest_bucket >= lowest_bucket);
3104   pos = k_buckets[closest_bucket].head;
3105   count = 0;
3106   offset = 0; /* Need offset as well as count in case peers are bloomfiltered */
3107   memset(sorted_closest, 0, sizeof(sorted_closest));
3108   /* Put any peers in the closest bucket in the sorting array */
3109   while ((pos != NULL) && (count < bucket_size))
3110     {
3111       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3112         {
3113           count++;
3114           pos = pos->next;
3115           continue; /* Ignore bloomfiltered peers */
3116         }
3117       pos->matching_bits = GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target);
3118       sorted_closest[offset] = pos;
3119       pos = pos->next;
3120       offset++;
3121       count++;
3122     }
3123
3124   /* Sort the peers in descending order */
3125   qsort(&sorted_closest[0], offset, sizeof(struct PeerInfo *), &compare_peers);
3126
3127   /* Put the sorted closest peers into the possible bins first, in case of overflow. */
3128   for (i = 0; i < offset; i++)
3129     {
3130       temp_converge_distance = converge_distance(target, sorted_closest[i], hops);
3131       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &sorted_closest[i]->id.hashPubKey))
3132         break; /* Ignore bloomfiltered peers */
3133       if ((temp_converge_distance <= ULLONG_MAX) && (total_distance + temp_converge_distance > total_distance)) /* Handle largest case and overflow */
3134         total_distance += temp_converge_distance;
3135       else
3136         break; /* overflow case */
3137     }
3138
3139   /* Now handle peers in lower buckets (matches same # of bits as target) */
3140   for (bc = lowest_bucket; bc < closest_bucket; bc++)
3141     {
3142       pos = k_buckets[bc].head;
3143       count = 0;
3144       while ((pos != NULL) && (count < bucket_size))
3145         {
3146           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3147             {
3148               count++;
3149               pos = pos->next;
3150               continue; /* Ignore bloomfiltered peers */
3151             }
3152           temp_converge_distance = converge_distance(target, pos, hops);
3153           if ((temp_converge_distance <= ULLONG_MAX) && (total_distance + temp_converge_distance > total_distance)) /* Handle largest case and overflow */
3154             total_distance += temp_converge_distance;
3155           else
3156             break; /* overflow case */
3157           pos = pos->next;
3158           count++;
3159         }
3160     }
3161
3162   /* Now handle all the further away peers */
3163   for (bc = closest_bucket + 1; bc < MAX_BUCKETS; bc++)
3164     {
3165       pos = k_buckets[bc].head;
3166       count = 0;
3167       while ((pos != NULL) && (count < bucket_size))
3168         {
3169           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3170             {
3171               count++;
3172               pos = pos->next;
3173               continue; /* Ignore bloomfiltered peers */
3174             }
3175           temp_converge_distance = converge_distance(target, pos, hops);
3176           if ((temp_converge_distance <= ULLONG_MAX) && (total_distance + temp_converge_distance > total_distance)) /* Handle largest case and overflow */
3177             total_distance += temp_converge_distance;
3178           else
3179             break; /* overflow case */
3180           pos = pos->next;
3181           count++;
3182         }
3183     }
3184
3185   if (total_distance == 0) /* No peers to select from! */
3186     {
3187       increment_stats("# select_peer, total_distance == 0");
3188       return NULL;
3189     }
3190
3191 #if DEBUG_DHT_ROUTING > 1
3192   sum = 0.0;
3193   /* PRINT STATS */
3194   /* Put the sorted closest peers into the possible bins first, in case of overflow. */
3195   stats_total_distance = 0;
3196   for (i = 0; i < offset; i++)
3197     {
3198       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &sorted_closest[i]->id.hashPubKey))
3199         break; /* Ignore bloomfiltered peers */
3200       temp_converge_distance = converge_distance(target, sorted_closest[i], hops);
3201       if ((temp_converge_distance <= ULLONG_MAX) && (stats_total_distance + temp_converge_distance > stats_total_distance)) /* Handle largest case and overflow */
3202         stats_total_distance += temp_converge_distance;
3203       else
3204         break; /* overflow case */
3205       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose %d matching bits (%d bits match me) (%.2f percent) converge ret %llu\n", GNUNET_CRYPTO_hash_matching_bits(&sorted_closest[i]->id.hashPubKey, target), GNUNET_CRYPTO_hash_matching_bits(&sorted_closest[i]->id.hashPubKey, &my_identity.hashPubKey), (temp_converge_distance / (double)total_distance) * 100, temp_converge_distance);
3206     }
3207
3208   /* Now handle peers in lower buckets (matches same # of bits as target) */
3209   for (bc = lowest_bucket; bc < closest_bucket; bc++)
3210     {
3211       pos = k_buckets[bc].head;
3212       count = 0;
3213       while ((pos != NULL) && (count < bucket_size))
3214         {
3215           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3216             {
3217               count++;
3218               pos = pos->next;
3219               continue; /* Ignore bloomfiltered peers */
3220             }
3221           temp_converge_distance = converge_distance(target, pos, hops);
3222           if ((temp_converge_distance <= ULLONG_MAX) && (stats_total_distance + temp_converge_distance > stats_total_distance)) /* Handle largest case and overflow */
3223             stats_total_distance += temp_converge_distance;
3224           else
3225             break; /* overflow case */
3226           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose %d matching bits (%d bits match me) (%.2f percent) converge ret %llu\n", GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey), (temp_converge_distance / (double)total_distance) * 100, temp_converge_distance);
3227           pos = pos->next;
3228           count++;
3229         }
3230     }
3231
3232   /* Now handle all the further away peers */
3233   for (bc = closest_bucket + 1; bc < MAX_BUCKETS; bc++)
3234     {
3235       pos = k_buckets[bc].head;
3236       count = 0;
3237       while ((pos != NULL) && (count < bucket_size))
3238         {
3239           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3240             {
3241               count++;
3242               pos = pos->next;
3243               continue; /* Ignore bloomfiltered peers */
3244             }
3245           temp_converge_distance = converge_distance(target, pos, hops);
3246           if ((temp_converge_distance <= ULLONG_MAX) && (stats_total_distance + temp_converge_distance > stats_total_distance)) /* Handle largest case and overflow */
3247             stats_total_distance += temp_converge_distance;
3248           else
3249             break; /* overflow case */
3250           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose %d matching bits (%d bits match me) (%.2f percent) converge ret %llu\n", GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey),  (temp_converge_distance / (double)total_distance) * 100, temp_converge_distance);
3251           pos = pos->next;
3252           count++;
3253         }
3254     }
3255   /* END PRINT STATS */
3256 #endif
3257
3258   /* Now actually choose a peer */
3259   selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
3260
3261   /* Go over closest sorted peers. */
3262   for (i = 0; i < offset; i++)
3263     {
3264       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &sorted_closest[i]->id.hashPubKey))
3265         break; /* Ignore bloomfiltered peers */
3266       temp_converge_distance = converge_distance(target, sorted_closest[i], hops);
3267       if (temp_converge_distance >= selected)
3268         return sorted_closest[i];
3269       else
3270         selected -= temp_converge_distance;
3271     }
3272
3273   /* Now handle peers in lower buckets (matches same # of bits as target) */
3274   for (bc = lowest_bucket; bc < closest_bucket; bc++)
3275     {
3276       pos = k_buckets[bc].head;
3277       count = 0;
3278       while ((pos != NULL) && (count < bucket_size))
3279         {
3280           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3281             {
3282               count++;
3283               pos = pos->next;
3284               continue; /* Ignore bloomfiltered peers */
3285             }
3286           temp_converge_distance = converge_distance(target, pos, hops);
3287           if (temp_converge_distance >= selected)
3288             return pos;
3289           else
3290             selected -= temp_converge_distance;
3291           pos = pos->next;
3292           count++;
3293         }
3294     }
3295
3296   /* Now handle all the further away peers */
3297   for (bc = closest_bucket + 1; bc < MAX_BUCKETS; bc++)
3298     {
3299       pos = k_buckets[bc].head;
3300       count = 0;
3301       while ((pos != NULL) && (count < bucket_size))
3302         {
3303           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3304             {
3305               count++;
3306               pos = pos->next;
3307               continue; /* Ignore bloomfiltered peers */
3308             }
3309           temp_converge_distance = converge_distance(target, pos, hops);
3310           if (temp_converge_distance >= selected)
3311             return pos;
3312           else
3313             selected -= temp_converge_distance;
3314           pos = pos->next;
3315           count++;
3316         }
3317     }
3318
3319   increment_stats("# failed to select peer");
3320   return NULL;
3321 }
3322
3323
3324 /**
3325  * Task used to remove recent entries, either
3326  * after timeout, when full, or on shutdown.
3327  *
3328  * @param cls the entry to remove
3329  * @param tc context, reason, etc.
3330  */
3331 static void
3332 remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3333 {
3334   struct RecentRequest *req = cls;
3335   static GNUNET_HashCode hash;
3336
3337   GNUNET_assert(req != NULL);
3338   hash_from_uid(req->uid, &hash);
3339   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req));
3340   GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node);
3341   GNUNET_CONTAINER_bloomfilter_free(req->bloom);
3342   GNUNET_free(req);
3343
3344   /*
3345   if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
3346   {
3347     GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
3348     GNUNET_CONTAINER_heap_destroy(recent.minHeap);
3349   }
3350   */
3351 }
3352
3353
3354 /**
3355  * Task used to remove forwarding entries, either
3356  * after timeout, when full, or on shutdown.
3357  *
3358  * @param cls the entry to remove
3359  * @param tc context, reason, etc.
3360  */
3361 static void
3362 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3363 {
3364   struct DHTRouteSource *source_info = cls;
3365   struct DHTQueryRecord *record;
3366   source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
3367   record = source_info->record;
3368   GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
3369
3370   if (record->head == NULL) /* No more entries in DLL */
3371     {
3372       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap, &record->key, record));
3373       GNUNET_free(record);
3374     }
3375   if (source_info->find_peers_responded != NULL)
3376     GNUNET_CONTAINER_bloomfilter_free(source_info->find_peers_responded);
3377   GNUNET_free(source_info);
3378 }
3379
3380 /**
3381  * Remember this routing request so that if a reply is
3382  * received we can either forward it to the correct peer
3383  * or return the result locally.
3384  *
3385  * @param msg_ctx Context of the route request
3386  *
3387  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
3388  */
3389 static int cache_response(struct DHT_MessageContext *msg_ctx)
3390 {
3391   struct DHTQueryRecord *record;
3392   struct DHTRouteSource *source_info;
3393   struct DHTRouteSource *pos;
3394   struct GNUNET_TIME_Absolute now;
3395   unsigned int current_size;
3396
3397   current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
3398
3399 #if DELETE_WHEN_FULL
3400   while (current_size >= MAX_OUTSTANDING_FORWARDS)
3401     {
3402       source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
3403       GNUNET_assert(source_info != NULL);
3404       record = source_info->record;
3405       GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
3406       if (record->head == NULL) /* No more entries in DLL */
3407         {
3408           GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
3409           GNUNET_free(record);
3410         }
3411       if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
3412         GNUNET_SCHEDULER_cancel(source_info->delete_task);
3413       if (source_info->find_peers_responded != NULL)
3414         GNUNET_CONTAINER_bloomfilter_free(source_info->find_peers_responded);
3415       GNUNET_free(source_info);
3416       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
3417     }
3418 #endif
3419   /** Non-local request and have too many outstanding forwards, discard! */
3420   if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
3421     return GNUNET_NO;
3422
3423   now = GNUNET_TIME_absolute_get();
3424   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key);
3425   if (record != NULL) /* Already know this request! */
3426     {
3427       pos = record->head;
3428       while (pos != NULL)
3429         {
3430           if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
3431             break; /* Already have this peer in reply list! */
3432           pos = pos->next;
3433         }
3434       if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
3435         {
3436           GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.abs_value);
3437           return GNUNET_NO;
3438         }
3439     }
3440   else
3441     {
3442       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
3443       GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3444       memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
3445     }
3446
3447   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
3448   source_info->record = record;
3449   source_info->delete_task = GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
3450   source_info->find_peers_responded = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3451   memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
3452   GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail, source_info);
3453   if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
3454     {
3455       source_info->client = msg_ctx->client;
3456       now = GNUNET_TIME_absolute_get_forever();
3457     }
3458   source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.abs_value);
3459 #if DEBUG_DHT > 1
3460       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3461                   "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
3462                   "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id);
3463 #endif
3464   return GNUNET_YES;
3465 }
3466
3467
3468 /**
3469  * Main function that handles whether or not to route a message to other
3470  * peers.
3471  *
3472  * @param msg the message to be routed
3473  * @param msg_ctx the context containing all pertinent information about the message
3474  */
3475 static void
3476 route_message(const struct GNUNET_MessageHeader *msg,
3477                struct DHT_MessageContext *msg_ctx)
3478 {
3479   int i;
3480   struct PeerInfo *selected;
3481 #if DEBUG_DHT_ROUTING > 1
3482   struct PeerInfo *nearest;
3483 #endif
3484   unsigned int target_forward_count;
3485   unsigned int forward_count;
3486   struct RecentRequest *recent_req;
3487   GNUNET_HashCode unique_hash;
3488   char *stat_forward_count;
3489   char *temp_stat_str;
3490 #if DEBUG_DHT_ROUTING
3491   int ret;
3492 #endif
3493
3494   if (malicious_dropper == GNUNET_YES)
3495     {
3496 #if DEBUG_DHT_ROUTING
3497       if ((debug_routes_extended) && (dhtlog_handle != NULL))
3498         {
3499           dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3500                                        msg_ctx->hop_count, GNUNET_SYSERR,
3501                                        &my_identity, &msg_ctx->key, msg_ctx->peer,
3502                                        NULL);
3503         }
3504 #endif
3505       if (msg_ctx->bloom != NULL)
3506       {
3507         GNUNET_CONTAINER_bloomfilter_free(msg_ctx->bloom);
3508         msg_ctx->bloom = NULL;
3509       }
3510       return;
3511     }
3512
3513   increment_stats(STAT_ROUTES);
3514   target_forward_count = get_forward_count(msg_ctx->hop_count, msg_ctx->replication);
3515   GNUNET_asprintf(&stat_forward_count, "# forward counts of %d", target_forward_count);
3516   increment_stats(stat_forward_count);
3517   GNUNET_free(stat_forward_count);
3518   if (msg_ctx->bloom == NULL)
3519     msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3520
3521   if ((stop_on_closest == GNUNET_YES) && (msg_ctx->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
3522     target_forward_count = 0;
3523
3524   /**
3525    * NOTICE:  In Kademlia, a find peer request goes no further if the peer doesn't return
3526    * any closer peers (which is being checked for below).  Since we are doing recursive
3527    * routing we have no choice but to stop forwarding in this case.  This means that at
3528    * any given step the request may NOT be forwarded to alpha peers (because routes will
3529    * stop and the parallel route will not be aware of it).  Of course, assuming that we
3530    * have fulfilled the Kademlia requirements for routing table fullness this will never
3531    * ever ever be a problem.
3532    *
3533    * However, is this fair?
3534    *
3535    * Since we use these requests to build our routing tables (and we build them in the
3536    * testing driver) we will ignore this restriction for FIND_PEER messages so that
3537    * routing tables still get constructed.
3538    */
3539   if ((GNUNET_YES == strict_kademlia) && (msg_ctx->closest == GNUNET_YES) && (msg_ctx->hop_count > 0) && (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER))
3540     target_forward_count = 0;
3541
3542
3543   GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom, &my_identity.hashPubKey);
3544   hash_from_uid (msg_ctx->unique_id, &unique_hash);
3545   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent.hashmap, &unique_hash))
3546   {
3547     recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap, &unique_hash);
3548     GNUNET_assert(recent_req != NULL);
3549     if (0 != memcmp(&recent_req->key, &msg_ctx->key, sizeof(GNUNET_HashCode)))
3550       increment_stats(STAT_DUPLICATE_UID);
3551     else
3552       {
3553         increment_stats(STAT_RECENT_SEEN);
3554         GNUNET_CONTAINER_bloomfilter_or2(msg_ctx->bloom, recent_req->bloom, DHT_BLOOM_SIZE);
3555       }
3556     }
3557   else
3558     {
3559       recent_req = GNUNET_malloc(sizeof(struct RecentRequest));
3560       recent_req->uid = msg_ctx->unique_id;
3561       memcpy(&recent_req->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
3562       recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req);
3563       recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap, recent_req, GNUNET_TIME_absolute_get().abs_value);
3564       recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3565       GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3566     }
3567
3568   if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT)
3569     {
3570       recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap);
3571       GNUNET_assert(recent_req != NULL);
3572       GNUNET_SCHEDULER_cancel(recent_req->remove_task);
3573       GNUNET_SCHEDULER_add_now(&remove_recent, recent_req);
3574     }
3575
3576   forward_count = 0;
3577   for (i = 0; i < target_forward_count; i++)
3578     {
3579       selected = select_peer(&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
3580
3581       if (selected != NULL)
3582         {
3583           forward_count++;
3584           if (GNUNET_CRYPTO_hash_matching_bits(&selected->id.hashPubKey, &msg_ctx->key) >= GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, &msg_ctx->key))
3585             GNUNET_asprintf(&temp_stat_str, "# requests routed to close(r) peer hop %u", msg_ctx->hop_count);
3586           else
3587             GNUNET_asprintf(&temp_stat_str, "# requests routed to less close peer hop %u", msg_ctx->hop_count);
3588           if (temp_stat_str != NULL)
3589             {
3590               increment_stats(temp_stat_str);
3591               GNUNET_free(temp_stat_str);
3592             }
3593           GNUNET_CONTAINER_bloomfilter_add(msg_ctx->bloom, &selected->id.hashPubKey);
3594 #if DEBUG_DHT_ROUTING > 1
3595           nearest = find_closest_peer(&msg_ctx->key);
3596           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
3597           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3598                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
3599                       "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id, GNUNET_i2s(&selected->id), nearest_buf, GNUNET_CRYPTO_hash_matching_bits(&nearest->id.hashPubKey, msg_ctx->key), distance(&nearest->id.hashPubKey, msg_ctx->key));
3600           GNUNET_free(nearest_buf);
3601 #endif
3602 #if DEBUG_DHT_ROUTING
3603           if ((debug_routes_extended) && (dhtlog_handle != NULL))
3604             {
3605               dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3606                                            msg_ctx->hop_count, GNUNET_NO,
3607                                            &my_identity, &msg_ctx->key, msg_ctx->peer,
3608                                            &selected->id);
3609             }
3610 #endif
3611           forward_message (msg, selected, msg_ctx);
3612         }
3613     }
3614
3615   if (msg_ctx->bloom != NULL)
3616     {
3617       GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom, msg_ctx->bloom, DHT_BLOOM_SIZE);
3618       GNUNET_CONTAINER_bloomfilter_free(msg_ctx->bloom);
3619       msg_ctx->bloom = NULL;
3620     }
3621
3622 #if DEBUG_DHT_ROUTING
3623   if (forward_count == 0)
3624     ret = GNUNET_SYSERR;
3625   else
3626     ret = GNUNET_NO;
3627
3628   if ((debug_routes_extended) && (dhtlog_handle != NULL))
3629     {
3630       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3631                                    msg_ctx->hop_count, ret,
3632                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
3633                                    NULL);
3634     }
3635 #endif
3636 }
3637
3638
3639
3640 /**
3641  * Main function that handles whether or not to route a message to other
3642  * peers.
3643  *
3644  * @param msg the message to be routed
3645  * @param msg_ctx the context containing all pertinent information about the message
3646  */
3647 static void
3648 demultiplex_message(const struct GNUNET_MessageHeader *msg,
3649                     struct DHT_MessageContext *msg_ctx)
3650 {
3651   msg_ctx->closest = am_closest_peer(&msg_ctx->key, NULL);
3652   switch (ntohs(msg->type))
3653     {
3654     case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
3655       cache_response (msg_ctx);
3656       handle_dht_get (msg, msg_ctx);
3657       break;
3658     case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. */
3659       increment_stats(STAT_PUTS);
3660       handle_dht_put (msg, msg_ctx);
3661       break;
3662     case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
3663       increment_stats(STAT_FIND_PEER);
3664       if (((msg_ctx->hop_count > 0) && (0 != memcmp(msg_ctx->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))) || (msg_ctx->client != NULL))
3665       {
3666         cache_response (msg_ctx);
3667         if ((msg_ctx->closest == GNUNET_YES) || (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
3668           handle_dht_find_peer (msg, msg_ctx);
3669       }
3670       else
3671         route_message (msg, msg_ctx);
3672 #if DEBUG_DHT_ROUTING
3673       if (msg_ctx->hop_count == 0) /* Locally initiated request */
3674         {
3675           if ((debug_routes) && (dhtlog_handle != NULL))
3676             {
3677               dhtlog_handle->insert_dhtkey(NULL, &msg_ctx->key);
3678               dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_FIND_PEER,
3679                                            msg_ctx->hop_count, GNUNET_NO, &my_identity,
3680                                            &msg_ctx->key);
3681             }
3682         }
3683 #endif
3684       break;
3685     default:
3686       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3687                   "`%s': Message type (%d) not handled, forwarding anyway!\n", "DHT", ntohs(msg->type));
3688       route_message (msg, msg_ctx);
3689     }
3690 }
3691
3692
3693
3694
3695 /**
3696  * Iterator for local get request results,
3697  *
3698  * @param cls closure for iterator, NULL
3699  * @param exp when does this value expire?
3700  * @param key the key this data is stored under
3701  * @param size the size of the data identified by key
3702  * @param data the actual data
3703  * @param type the type of the data
3704  *
3705  * @return GNUNET_OK to continue iteration, anything else
3706  * to stop iteration.
3707  */
3708 static int
3709 republish_content_iterator (void *cls,
3710                             struct GNUNET_TIME_Absolute exp,
3711                             const GNUNET_HashCode * key,
3712                             size_t size, const char *data, uint32_t type)
3713 {
3714
3715   struct DHT_MessageContext *new_msg_ctx;
3716   struct GNUNET_DHT_PutMessage *put_msg;
3717 #if DEBUG_DHT
3718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3719               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
3720 #endif
3721   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
3722
3723   put_msg =
3724     GNUNET_malloc (sizeof (struct GNUNET_DHT_PutMessage) + size);
3725   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
3726   put_msg->header.size = htons (sizeof (struct GNUNET_DHT_PutMessage) + size);
3727   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
3728   put_msg->type = htons (type);
3729   memcpy (&put_msg[1], data, size);
3730   new_msg_ctx->unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3731   new_msg_ctx->replication = ntohl (DEFAULT_PUT_REPLICATION);
3732   new_msg_ctx->msg_options = ntohl (0);
3733   new_msg_ctx->network_size = estimate_diameter();
3734   new_msg_ctx->peer = &my_identity;
3735   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3736   new_msg_ctx->hop_count = 0;
3737   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
3738   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
3739   increment_stats(STAT_PUT_START);
3740   demultiplex_message(&put_msg->header, new_msg_ctx);
3741
3742   GNUNET_free(new_msg_ctx);
3743   GNUNET_free (put_msg);
3744   return GNUNET_OK;
3745 }
3746
3747 /**
3748  * Task used to republish data.
3749  *
3750  * @param cls closure (a struct RepublishContext)
3751  * @param tc runtime context for this task
3752  */
3753 static void
3754 republish_content(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3755 {
3756   struct RepublishContext *put_context = cls;
3757
3758   unsigned int results;
3759
3760   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3761     {
3762       GNUNET_free(put_context);
3763       return;
3764     }
3765
3766   GNUNET_assert (datacache != NULL); /* If we have no datacache we never should have scheduled this! */
3767   results = GNUNET_DATACACHE_get(datacache, &put_context->key, put_context->type, &republish_content_iterator, NULL);
3768   if (results == 0) /* Data must have expired */
3769     GNUNET_free(put_context);
3770   else /* Reschedule task for next time period */
3771     GNUNET_SCHEDULER_add_delayed(dht_republish_frequency, &republish_content, put_context);
3772
3773 }
3774
3775
3776 /**
3777  * Iterator over hash map entries.
3778  *
3779  * @param cls client to search for in source routes
3780  * @param key current key code (ignored)
3781  * @param value value in the hash map, a DHTQueryRecord
3782  * @return GNUNET_YES if we should continue to
3783  *         iterate,
3784  *         GNUNET_NO if not.
3785  */
3786 static int find_client_records (void *cls,
3787                                 const GNUNET_HashCode * key, void *value)
3788 {
3789   struct ClientList *client = cls;
3790   struct DHTQueryRecord *record = value;
3791   struct DHTRouteSource *pos;
3792   pos = record->head;
3793   while (pos != NULL)
3794     {
3795       if (pos->client == client)
3796         break;
3797       pos = pos->next;
3798     }
3799   if (pos != NULL)
3800     {
3801       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, pos);
3802       GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, pos->hnode);
3803       if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
3804         GNUNET_SCHEDULER_cancel(pos->delete_task);
3805
3806       if (pos->find_peers_responded != NULL)
3807         GNUNET_CONTAINER_bloomfilter_free(pos->find_peers_responded);
3808       GNUNET_free(pos);
3809     }
3810   if (record->head == NULL) /* No more entries in DLL */
3811     {
3812       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
3813       GNUNET_free(record);
3814     }
3815   return GNUNET_YES;
3816 }
3817
3818 /**
3819  * Functions with this signature are called whenever a client
3820  * is disconnected on the network level.
3821  *
3822  * @param cls closure (NULL for dht)
3823  * @param client identification of the client; NULL
3824  *        for the last call when the server is destroyed
3825  */
3826 static void handle_client_disconnect (void *cls,
3827                                       struct GNUNET_SERVER_Client* client)
3828 {
3829   struct ClientList *pos = client_list;
3830   struct ClientList *prev;
3831   struct ClientList *found;
3832   struct PendingMessage *reply;
3833
3834   prev = NULL;
3835   found = NULL;
3836   while (pos != NULL)
3837     {
3838       if (pos->client_handle == client)
3839         {
3840           if (prev != NULL)
3841             prev->next = pos->next;
3842           else
3843             client_list = pos->next;
3844           found = pos;
3845           break;
3846         }
3847       prev = pos;
3848       pos = pos->next;
3849     }
3850
3851   if (found != NULL)
3852     {
3853       if (found->transmit_handle != NULL)
3854         GNUNET_CONNECTION_notify_transmit_ready_cancel(found->transmit_handle);
3855
3856       while(NULL != (reply = found->pending_head))
3857         {
3858           GNUNET_CONTAINER_DLL_remove(found->pending_head, found->pending_tail, reply);
3859           GNUNET_free(reply);
3860         }
3861       GNUNET_CONTAINER_multihashmap_iterate(forward_list.hashmap, &find_client_records, found);
3862       GNUNET_free(found);
3863     }
3864 }
3865
3866 /**
3867  * Find a client if it exists, add it otherwise.
3868  *
3869  * @param client the server handle to the client
3870  *
3871  * @return the client if found, a new client otherwise
3872  */
3873 static struct ClientList *
3874 find_active_client (struct GNUNET_SERVER_Client *client)
3875 {
3876   struct ClientList *pos = client_list;
3877   struct ClientList *ret;
3878
3879   while (pos != NULL)
3880     {
3881       if (pos->client_handle == client)
3882         return pos;
3883       pos = pos->next;
3884     }
3885
3886   ret = GNUNET_malloc (sizeof (struct ClientList));
3887   ret->client_handle = client;
3888   ret->next = client_list;
3889   client_list = ret;
3890
3891   return ret;
3892 }
3893
3894 #if HAVE_MALICIOUS
3895 /**
3896  * Task to send a malicious put message across the network.
3897  *
3898  * @param cls closure for this task
3899  * @param tc the context under which the task is running
3900  */
3901 static void
3902 malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3903 {
3904   static struct GNUNET_DHT_PutMessage put_message;
3905   static struct DHT_MessageContext msg_ctx;
3906   static GNUNET_HashCode key;
3907   uint32_t random_key;
3908
3909   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3910     return;
3911   put_message.header.size = htons(sizeof(struct GNUNET_DHT_PutMessage));
3912   put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
3913   put_message.type = htonl(GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE);
3914   put_message.expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever());
3915   memset(&msg_ctx, 0, sizeof(struct DHT_MessageContext));
3916   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
3917   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
3918   memcpy(&msg_ctx.key, &key, sizeof(GNUNET_HashCode));
3919   msg_ctx.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3920   msg_ctx.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3921   msg_ctx.msg_options = ntohl (0);
3922   msg_ctx.network_size = estimate_diameter();
3923   msg_ctx.peer = &my_identity;
3924   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE;
3925   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3926 #if DEBUG_DHT_ROUTING
3927   if (dhtlog_handle != NULL)
3928     dhtlog_handle->insert_dhtkey(NULL, &key);
3929 #endif
3930   increment_stats(STAT_PUT_START);
3931   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious PUT message with hash %s\n", my_short_id, "DHT", GNUNET_h2s(&key));
3932   demultiplex_message(&put_message.header, &msg_ctx);
3933   GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_put_frequency), &malicious_put_task, NULL);
3934 }
3935
3936
3937 /**
3938  * Task to send a malicious put message across the network.
3939  *
3940  * @param cls closure for this task
3941  * @param tc the context under which the task is running
3942  */
3943 static void
3944 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3945 {
3946   static struct GNUNET_DHT_GetMessage get_message;
3947   struct DHT_MessageContext msg_ctx;
3948   static GNUNET_HashCode key;
3949   uint32_t random_key;
3950
3951   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3952     return;
3953
3954   get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
3955   get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
3956   get_message.type = htonl(GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE);
3957   memset(&msg_ctx, 0, sizeof(struct DHT_MessageContext));
3958   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
3959   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
3960   memcpy(&msg_ctx.key, &key, sizeof(GNUNET_HashCode));
3961   msg_ctx.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3962   msg_ctx.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3963   msg_ctx.msg_options = ntohl (0);
3964   msg_ctx.network_size = estimate_diameter();
3965   msg_ctx.peer = &my_identity;
3966   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE;
3967   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3968 #if DEBUG_DHT_ROUTING
3969   if (dhtlog_handle != NULL)
3970     dhtlog_handle->insert_dhtkey(NULL, &key);
3971 #endif
3972   increment_stats(STAT_GET_START);
3973   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message with hash %s\n", my_short_id, "DHT", GNUNET_h2s(&key));
3974   demultiplex_message (&get_message.header, &msg_ctx);
3975   GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_get_frequency), &malicious_get_task, NULL);
3976 }
3977 #endif
3978
3979
3980 /**
3981  * Iterator over hash map entries.
3982  *
3983  * @param cls closure
3984  * @param key current key code
3985  * @param value value in the hash map
3986  * @return GNUNET_YES if we should continue to
3987  *         iterate,
3988  *         GNUNET_NO if not.
3989  */
3990 static int
3991 add_known_to_bloom (void *cls,
3992                     const GNUNET_HashCode * key,
3993                     void *value)
3994 {
3995   struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
3996   GNUNET_CONTAINER_bloomfilter_add (bloom, key);
3997   return GNUNET_YES;
3998 }
3999
4000 /**
4001  * Task to send a find peer message for our own peer identifier
4002  * so that we can find the closest peers in the network to ourselves
4003  * and attempt to connect to them.
4004  *
4005  * @param cls closure for this task
4006  * @param tc the context under which the task is running
4007  */
4008 static void
4009 send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
4010 {
4011   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
4012   struct DHT_MessageContext msg_ctx;
4013   struct GNUNET_TIME_Relative next_send_time;
4014   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
4015 #if COUNT_INTERVAL
4016   struct GNUNET_TIME_Relative time_diff;
4017   struct GNUNET_TIME_Absolute end;
4018   double multiplier;
4019   double count_per_interval;
4020 #endif
4021   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
4022     return;
4023
4024   if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If we are finding peers already, no need to send out our request right now! */
4025     {
4026       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since last find peer message sent!\n", newly_found_peers);
4027       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
4028                                     &send_find_peer_message, NULL);
4029       newly_found_peers = 0;
4030       return;
4031     }
4032     
4033   increment_stats(STAT_FIND_PEER_START);
4034 #if COUNT_INTERVAL
4035   end = GNUNET_TIME_absolute_get();
4036   time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start, end);
4037
4038   if (time_diff.abs_value > FIND_PEER_CALC_INTERVAL.abs_value)
4039     {
4040       multiplier = time_diff.abs_value / FIND_PEER_CALC_INTERVAL.abs_value;
4041       count_per_interval = find_peer_context.count / multiplier;
4042     }
4043   else
4044     {
4045       multiplier = FIND_PEER_CALC_INTERVAL.abs_value / time_diff.abs_value;
4046       count_per_interval = find_peer_context.count * multiplier;
4047     }
4048 #endif
4049
4050 #if FIND_PEER_WITH_HELLO
4051   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage) + GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)my_hello));
4052   find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage) + GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)my_hello));
4053   memcpy(&find_peer_msg[1], my_hello, GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)my_hello));
4054 #else
4055   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage));
4056   find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
4057 #endif
4058   find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
4059   temp_bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
4060   GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom, temp_bloom);
4061   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom, find_peer_msg->bloomfilter, DHT_BLOOM_SIZE));
4062   GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
4063   memset(&msg_ctx, 0, sizeof(struct DHT_MessageContext));
4064   memcpy(&msg_ctx.key, &my_identity.hashPubKey, sizeof(GNUNET_HashCode));
4065   msg_ctx.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
4066   msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
4067   msg_ctx.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
4068   msg_ctx.network_size = estimate_diameter();
4069   msg_ctx.peer = &my_identity;
4070   msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
4071   msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
4072
4073   demultiplex_message(&find_peer_msg->header, &msg_ctx);
4074   GNUNET_free(find_peer_msg);
4075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4076               "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id, "DHT",
4077               "FIND PEER");
4078   if (newly_found_peers < bucket_size)
4079     {
4080       next_send_time.rel_value = (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
4081                               GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
4082                                                        DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
4083     }
4084   else
4085     {
4086       next_send_time.rel_value = DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
4087                              GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
4088                                                       DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value - DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
4089     }
4090
4091   GNUNET_assert (next_send_time.rel_value != 0);
4092   find_peer_context.count = 0;
4093   newly_found_peers = 0;
4094   find_peer_context.start = GNUNET_TIME_absolute_get();
4095   if (GNUNET_YES == do_find_peer)
4096   {
4097     GNUNET_SCHEDULER_add_delayed (next_send_time,
4098                                   &send_find_peer_message, NULL);
4099   }
4100 }
4101
4102 /**
4103  * Handler for any generic DHT messages, calls the appropriate handler
4104  * depending on message type, sends confirmation if responses aren't otherwise
4105  * expected.
4106  *
4107  * @param cls closure for the service
4108  * @param client the client we received this message from
4109  * @param message the actual message received
4110  */
4111 static void
4112 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
4113                                 const struct GNUNET_MessageHeader *message)
4114 {
4115   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
4116   const struct GNUNET_MessageHeader *enc_msg;
4117   struct DHT_MessageContext msg_ctx;
4118
4119   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
4120 #if DEBUG_DHT
4121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4122               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
4123               my_short_id, 
4124               "DHT",
4125               "GENERIC",
4126               ntohs (message->type), 
4127               GNUNET_h2s (&dht_msg->key),
4128               GNUNET_ntohll (dht_msg->unique_id));
4129 #endif
4130 #if DEBUG_DHT_ROUTING
4131   if (dhtlog_handle != NULL)
4132     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
4133 #endif
4134   memset(&msg_ctx, 0, sizeof(struct DHT_MessageContext));
4135   msg_ctx.client = find_active_client (client);
4136   memcpy(&msg_ctx.key, &dht_msg->key, sizeof(GNUNET_HashCode));
4137   msg_ctx.unique_id = GNUNET_ntohll (dht_msg->unique_id);
4138   msg_ctx.replication = ntohl (dht_msg->desired_replication_level);
4139   msg_ctx.msg_options = ntohl (dht_msg->options);
4140   msg_ctx.network_size = estimate_diameter();
4141   msg_ctx.peer = &my_identity;
4142   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 4; /* Make local routing a higher priority */
4143   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
4144   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
4145     increment_stats(STAT_GET_START);
4146   else if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
4147     increment_stats(STAT_PUT_START);
4148   else if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER)
4149     increment_stats(STAT_FIND_PEER_START);
4150
4151   demultiplex_message(enc_msg, &msg_ctx);
4152
4153   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4154
4155 }
4156
4157 /**
4158  * Handler for any locally received DHT control messages,
4159  * sets malicious flags mostly for now.
4160  *
4161  * @param cls closure for the service
4162  * @param client the client we received this message from
4163  * @param message the actual message received
4164  *
4165  */
4166 static void
4167 handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client,
4168                             const struct GNUNET_MessageHeader *message)
4169 {
4170   const struct GNUNET_DHT_ControlMessage *dht_control_msg =
4171       (const struct GNUNET_DHT_ControlMessage *) message;
4172 #if DEBUG_DHT
4173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4174               "`%s:%s': Received `%s' request from client, command %d\n", my_short_id, "DHT",
4175               "CONTROL", ntohs(dht_control_msg->command));
4176 #endif
4177
4178   switch (ntohs(dht_control_msg->command))
4179   {
4180   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
4181     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending self seeking find peer request!\n");
4182     GNUNET_SCHEDULER_add_now(&send_find_peer_message, NULL);
4183     break;
4184 #if HAVE_MALICIOUS
4185   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
4186     if (ntohs(dht_control_msg->variable) > 0)
4187       malicious_get_frequency = ntohs(dht_control_msg->variable);
4188     if (malicious_get_frequency == 0)
4189       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4190     if (malicious_getter != GNUNET_YES)
4191       GNUNET_SCHEDULER_add_now(&malicious_get_task, NULL);
4192     malicious_getter = GNUNET_YES;
4193     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 
4194                "%s:%s Initiating malicious GET behavior, frequency %d\n", my_short_id, "DHT", malicious_get_frequency);
4195     break;
4196   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT:
4197     if (ntohs(dht_control_msg->variable) > 0)
4198       malicious_put_frequency = ntohs(dht_control_msg->variable);
4199     if (malicious_put_frequency == 0)
4200       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
4201     if (malicious_putter != GNUNET_YES)
4202       GNUNET_SCHEDULER_add_now(&malicious_put_task, NULL);
4203     malicious_putter = GNUNET_YES;
4204     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
4205                "%s:%s Initiating malicious PUT behavior, frequency %d\n", my_short_id, "DHT", malicious_put_frequency);
4206     break;
4207   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP:
4208 #if DEBUG_DHT_ROUTING
4209     if ((malicious_dropper != GNUNET_YES) && (dhtlog_handle != NULL))
4210       dhtlog_handle->set_malicious(&my_identity);
4211 #endif
4212     malicious_dropper = GNUNET_YES;
4213     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
4214                "%s:%s Initiating malicious DROP behavior\n", my_short_id, "DHT");
4215     break;
4216 #endif
4217   default:
4218     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 
4219                "%s:%s Unknown control command type `%d'!\n", 
4220                my_short_id, "DHT",
4221                ntohs(dht_control_msg->command));
4222     break;
4223   }
4224
4225   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4226 }
4227
4228 /**
4229  * Handler for any generic DHT stop messages, calls the appropriate handler
4230  * depending on message type (if processed locally)
4231  *
4232  * @param cls closure for the service
4233  * @param client the client we received this message from
4234  * @param message the actual message received
4235  *
4236  */
4237 static void
4238 handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
4239                             const struct GNUNET_MessageHeader *message)
4240 {
4241
4242   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
4243     (const struct GNUNET_DHT_StopMessage *) message;
4244   struct DHTQueryRecord *record;
4245   struct DHTRouteSource *pos;
4246 #if DEBUG_DHT
4247   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4248               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
4249               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
4250 #endif
4251   record = GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &dht_stop_msg->key);
4252   if (record != NULL)
4253     {
4254       pos = record->head;
4255
4256       while (pos != NULL)
4257         {
4258           /* If the client is non-null (local request) and the client matches the requesting client, remove the entry. */
4259           if ((pos->client != NULL) && (pos->client->client_handle == client))
4260             {
4261               GNUNET_SCHEDULER_cancel(pos->delete_task);
4262               pos->delete_task = GNUNET_SCHEDULER_NO_TASK;
4263               GNUNET_SCHEDULER_add_continuation (&remove_forward_entry, pos, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
4264             }
4265           pos = pos->next;
4266         }
4267     }
4268
4269   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4270 }
4271
4272
4273 /**
4274  * Core handler for p2p route requests.
4275  */
4276 static int
4277 handle_dht_p2p_route_request (void *cls,
4278                               const struct GNUNET_PeerIdentity *peer,
4279                               const struct GNUNET_MessageHeader *message,
4280                               const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4281 {
4282 #if DEBUG_DHT
4283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4284               "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
4285 #endif
4286   struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
4287   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
4288   struct DHT_MessageContext *msg_ctx;
4289
4290   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/
4291     {
4292 #if DEBUG_PING
4293       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n", my_short_id, "DHT");
4294 #endif
4295       return GNUNET_YES;
4296     }
4297
4298   if (ntohs(enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4299     {
4300       GNUNET_break_op(0);
4301       return GNUNET_YES;
4302     }
4303
4304   if (get_max_send_delay().rel_value > MAX_REQUEST_TIME.rel_value)
4305   {
4306     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending of previous replies took too long, backing off!\n");
4307     increment_stats("# route requests dropped due to high load");
4308     decrease_max_send_delay(get_max_send_delay());
4309 #if DEBUG_DHT_ROUTING
4310     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4311       {
4312         /** Log routes that die due to high load! */
4313         dhtlog_handle->insert_route (NULL, GNUNET_ntohll(incoming->unique_id), DHTLOG_ROUTE,
4314                                      ntohl(incoming->hop_count), GNUNET_SYSERR,
4315                                      &my_identity, &incoming->key, peer,
4316                                      NULL);
4317       }
4318 #endif
4319     return GNUNET_YES;
4320   }
4321   msg_ctx = GNUNET_malloc(sizeof (struct DHT_MessageContext));
4322   msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
4323   GNUNET_assert(msg_ctx->bloom != NULL);
4324   msg_ctx->hop_count = ntohl(incoming->hop_count);
4325   memcpy(&msg_ctx->key, &incoming->key, sizeof(GNUNET_HashCode));
4326   msg_ctx->replication = ntohl(incoming->desired_replication_level);
4327   msg_ctx->unique_id = GNUNET_ntohll(incoming->unique_id);
4328   msg_ctx->msg_options = ntohl(incoming->options);
4329   msg_ctx->network_size = ntohl(incoming->network_size);
4330   msg_ctx->peer = peer;
4331   msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
4332   msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
4333   demultiplex_message (enc_msg, msg_ctx);
4334   if (msg_ctx->bloom != NULL)
4335   {
4336     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
4337     msg_ctx->bloom = NULL;
4338   }
4339   GNUNET_free(msg_ctx);
4340   return GNUNET_YES;
4341 }
4342
4343
4344 /**
4345  * Core handler for p2p route results.
4346  */
4347 static int
4348 handle_dht_p2p_route_result (void *cls,
4349                              const struct GNUNET_PeerIdentity *peer,
4350                              const struct GNUNET_MessageHeader *message,
4351                              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4352 {
4353 #if DEBUG_DHT
4354   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4355               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
4356 #endif
4357   struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
4358   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
4359   struct DHT_MessageContext msg_ctx;
4360
4361   if (ntohs(enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4362     {
4363       GNUNET_break_op(0);
4364       return GNUNET_YES;
4365     }
4366
4367   memset(&msg_ctx, 0, sizeof(struct DHT_MessageContext));
4368   // FIXME: call GNUNET_BLOCK_evaluate (...) -- instead of doing your own bloomfilter!
4369   msg_ctx.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
4370   GNUNET_assert(msg_ctx.bloom != NULL);
4371   memcpy(&msg_ctx.key, &incoming->key, sizeof(GNUNET_HashCode));
4372   msg_ctx.unique_id = GNUNET_ntohll(incoming->unique_id);
4373   msg_ctx.msg_options = ntohl(incoming->options);
4374   msg_ctx.hop_count = ntohl(incoming->hop_count);
4375   msg_ctx.peer = peer;
4376   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
4377   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
4378   route_result_message(enc_msg, &msg_ctx);
4379   return GNUNET_YES;
4380 }
4381
4382
4383 /**
4384  * Receive the HELLO from transport service,
4385  * free current and replace if necessary.
4386  *
4387  * @param cls NULL
4388  * @param message HELLO message of peer
4389  */
4390 static void
4391 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
4392 {
4393 #if DEBUG_DHT
4394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4395               "Received our `%s' from transport service\n",
4396               "HELLO");
4397 #endif
4398
4399   GNUNET_assert (message != NULL);
4400   GNUNET_free_non_null(my_hello);
4401   my_hello = GNUNET_malloc(ntohs(message->size));
4402   memcpy(my_hello, message, ntohs(message->size));
4403 }
4404
4405
4406 /**
4407  * Task run during shutdown.
4408  *
4409  * @param cls unused
4410  * @param tc unused
4411  */
4412 static void
4413 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
4414 {
4415   int bucket_count;
4416   struct PeerInfo *pos;
4417
4418   if (transport_handle != NULL)
4419     {
4420       GNUNET_free_non_null(my_hello);
4421       GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
4422       GNUNET_TRANSPORT_disconnect(transport_handle);
4423     }
4424   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
4425     {
4426       while (k_buckets[bucket_count].head != NULL)
4427         {
4428           pos = k_buckets[bucket_count].head;
4429 #if DEBUG_DHT
4430           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4431                       "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
4432 #endif
4433           delete_peer(pos, bucket_count);
4434         }
4435     }
4436   if (coreAPI != NULL)
4437     {
4438 #if DEBUG_DHT
4439       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4440                   "%s:%s Disconnecting core!\n", my_short_id, "DHT");
4441 #endif
4442       GNUNET_CORE_disconnect (coreAPI);
4443       coreAPI = NULL;
4444     }
4445   if (datacache != NULL)
4446     {
4447 #if DEBUG_DHT
4448       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4449                   "%s:%s Destroying datacache!\n", my_short_id, "DHT");
4450 #endif
4451       GNUNET_DATACACHE_destroy (datacache);
4452       datacache = NULL;
4453     }
4454   if (stats != NULL)
4455     {
4456       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
4457       stats = NULL;
4458     }
4459   if (dhtlog_handle != NULL)
4460     {
4461       GNUNET_DHTLOG_disconnect(dhtlog_handle);
4462       dhtlog_handle = NULL;
4463     }
4464   if (block_context != NULL)
4465     {
4466       GNUNET_BLOCK_context_destroy (block_context);
4467       block_context = NULL;
4468     }
4469   GNUNET_free_non_null(my_short_id);
4470   my_short_id = NULL;
4471 }
4472
4473
4474 /**
4475  * To be called on core init/fail.
4476  *
4477  * @param cls service closure
4478  * @param server handle to the server for this service
4479  * @param identity the public identity of this peer
4480  * @param publicKey the public key of this peer
4481  */
4482 void
4483 core_init (void *cls,
4484            struct GNUNET_CORE_Handle *server,
4485            const struct GNUNET_PeerIdentity *identity,
4486            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
4487 {
4488
4489   if (server == NULL)
4490     {
4491 #if DEBUG_DHT
4492       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4493                   "%s: Connection to core FAILED!\n", "dht",
4494                   GNUNET_i2s (identity));
4495 #endif
4496       GNUNET_SCHEDULER_cancel (cleanup_task);
4497       GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
4498       return;
4499     }
4500 #if DEBUG_DHT
4501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4502               "%s: Core connection initialized, I am peer: %s\n", "dht",
4503               GNUNET_i2s (identity));
4504 #endif
4505
4506   /* Copy our identity so we can use it */
4507   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
4508   if (my_short_id != NULL)
4509     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n", "DHT SERVICE");
4510   my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
4511   /* Set the server to local variable */
4512   coreAPI = server;
4513
4514   if (dhtlog_handle != NULL)
4515     dhtlog_handle->insert_node (NULL, &my_identity);
4516 }
4517
4518
4519 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
4520   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE, 0},
4521   {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
4522   {&handle_dht_control_message, NULL, GNUNET_MESSAGE_TYPE_DHT_CONTROL, 0},
4523   {NULL, NULL, 0, 0}
4524 };
4525
4526
4527 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
4528   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
4529   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
4530   {NULL, 0, 0}
4531 };
4532
4533
4534 /**
4535  * Method called whenever a peer connects.
4536  *
4537  * @param cls closure
4538  * @param peer peer identity this notification is about
4539  * @param atsi performance data
4540  */
4541 static void 
4542 handle_core_connect (void *cls,
4543                      const struct GNUNET_PeerIdentity * peer,
4544                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4545 {
4546   struct PeerInfo *ret;
4547
4548 #if DEBUG_DHT
4549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4550               "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
4551 #endif
4552
4553   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
4554     {
4555       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received %s message for peer %s, but already have peer in RT!", my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s(peer));
4556       return;
4557     }
4558
4559   if (datacache != NULL)
4560     GNUNET_DATACACHE_put(datacache, &peer->hashPubKey, sizeof(struct GNUNET_PeerIdentity), (const char *)peer, GNUNET_BLOCK_TYPE_DHT_HELLO, GNUNET_TIME_absolute_get_forever());
4561   ret = try_add_peer(peer,
4562                      find_current_bucket(&peer->hashPubKey),
4563                      atsi);
4564   if (ret != NULL)
4565     {
4566       newly_found_peers++;
4567       GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
4568     }
4569 #if DEBUG_DHT
4570     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4571                 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED");
4572 #endif
4573 }
4574
4575
4576 /**
4577  * Method called whenever a peer disconnects.
4578  *
4579  * @param cls closure
4580  * @param peer peer identity this notification is about
4581  */
4582 static void
4583 handle_core_disconnect (void *cls,
4584                         const struct
4585                         GNUNET_PeerIdentity * peer)
4586 {
4587   struct PeerInfo *to_remove;
4588   int current_bucket;
4589
4590   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: Received peer disconnect message for peer `%s' from %s\n", my_short_id, "DHT", GNUNET_i2s(peer), "CORE");
4591
4592   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
4593     {
4594       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: do not have peer `%s' in RT, can't disconnect!\n", my_short_id, "DHT", GNUNET_i2s(peer));
4595       return;
4596     }
4597   increment_stats(STAT_DISCONNECTS);
4598   GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey));
4599   to_remove = GNUNET_CONTAINER_multihashmap_get(all_known_peers, &peer->hashPubKey);
4600   GNUNET_assert (to_remove != NULL);
4601   GNUNET_assert(0 == memcmp(peer, &to_remove->id, sizeof(struct GNUNET_PeerIdentity)));
4602   current_bucket = find_current_bucket(&to_remove->id.hashPubKey);
4603   delete_peer(to_remove, current_bucket);
4604 }
4605
4606
4607 /**
4608  * Process dht requests.
4609  *
4610  * @param cls closure
4611  * @param server the initialized server
4612  * @param c configuration to use
4613  */
4614 static void
4615 run (void *cls,
4616      struct GNUNET_SERVER_Handle *server,
4617      const struct GNUNET_CONFIGURATION_Handle *c)
4618 {
4619   struct GNUNET_TIME_Relative next_send_time;
4620   unsigned long long temp_config_num;
4621   char *converge_modifier_buf;
4622
4623   cfg = c;
4624   datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
4625   GNUNET_SERVER_add_handlers (server, plugin_handlers);
4626   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
4627   coreAPI = GNUNET_CORE_connect (cfg,    /* Main configuration */
4628                                  DEFAULT_CORE_QUEUE_SIZE, /* queue size */
4629                                  NULL,  /* Closure passed to DHT functions */
4630                                  &core_init,    /* Call core_init once connected */
4631                                  &handle_core_connect,  /* Handle connects */
4632                                  &handle_core_disconnect,  /* remove peers on disconnects */
4633                                  NULL,  /* Do we care about "status" updates? */
4634                                  NULL,  /* Don't want notified about all incoming messages */
4635                                  GNUNET_NO,     /* For header only inbound notification */
4636                                  NULL,  /* Don't want notified about all outbound messages */
4637                                  GNUNET_NO,     /* For header only outbound notification */
4638                                  core_handlers);        /* Register these handlers */
4639
4640   if (coreAPI == NULL)
4641     return;
4642   transport_handle = GNUNET_TRANSPORT_connect(cfg,
4643                                               NULL, NULL, NULL, NULL, NULL);
4644   if (transport_handle != NULL)
4645     GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
4646   else
4647     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 
4648                "Failed to connect to transport service!\n");
4649   block_context = GNUNET_BLOCK_context_create (cfg);
4650   lowest_bucket = MAX_BUCKETS - 1;
4651   forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
4652   forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
4653   all_known_peers = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
4654   recent_find_peer_requests = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
4655   GNUNET_assert(all_known_peers != NULL);
4656   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
4657     {
4658       debug_routes = GNUNET_YES;
4659     }
4660
4661   if (GNUNET_YES ==
4662       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4663                                            "strict_kademlia"))
4664     {
4665       strict_kademlia = GNUNET_YES;
4666     }
4667
4668   if (GNUNET_YES ==
4669       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4670                                            "stop_on_closest"))
4671     {
4672       stop_on_closest = GNUNET_YES;
4673     }
4674
4675   if (GNUNET_YES ==
4676       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4677                                            "stop_found"))
4678     {
4679       stop_on_found = GNUNET_YES;
4680     }
4681
4682   if (GNUNET_YES ==
4683       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4684                                            "malicious_getter"))
4685     {
4686       malicious_getter = GNUNET_YES;
4687       if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4688                                             "MALICIOUS_GET_FREQUENCY",
4689                                             &malicious_get_frequency))
4690         malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4691     }
4692
4693   if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4694                                         "MAX_HOPS",
4695                                         &max_hops))
4696     {
4697       max_hops = DEFAULT_MAX_HOPS;
4698     }
4699
4700   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno (cfg, "DHT",
4701                                                           "USE_MAX_HOPS"))
4702     {
4703       use_max_hops = GNUNET_YES;
4704     }
4705
4706   if (GNUNET_YES ==
4707       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4708                                            "malicious_putter"))
4709     {
4710       malicious_putter = GNUNET_YES;
4711       if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4712                                             "MALICIOUS_PUT_FREQUENCY",
4713                                             &malicious_put_frequency))
4714         malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
4715     }
4716
4717   dht_republish_frequency = GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY;
4718   if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "DHT", "REPLICATION_FREQUENCY", &temp_config_num))
4719     {
4720       dht_republish_frequency = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, temp_config_num);
4721     }
4722
4723   if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "DHT", "bucket_size", &temp_config_num))
4724     {
4725       bucket_size = (unsigned int)temp_config_num;
4726     }
4727
4728   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number(cfg, "DHT", "kad_alpha", &kademlia_replication))
4729     {
4730       kademlia_replication = DEFAULT_KADEMLIA_REPLICATION;
4731     }
4732
4733   if (GNUNET_YES ==
4734           GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4735                                                "malicious_dropper"))
4736     {
4737       malicious_dropper = GNUNET_YES;
4738     }
4739
4740   if (GNUNET_YES ==
4741         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4742                                              "republish"))
4743     do_republish = GNUNET_NO;
4744
4745   if (GNUNET_NO ==
4746         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4747                                              "do_find_peer"))
4748     {
4749       do_find_peer = GNUNET_NO;
4750     }
4751   else
4752     do_find_peer = GNUNET_YES;
4753
4754   if (GNUNET_YES ==
4755         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4756                                              "use_real_distance"))
4757     use_real_distance = GNUNET_YES;
4758
4759   if (GNUNET_YES ==
4760       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
4761                                            "mysql_logging_extended"))
4762     {
4763       debug_routes = GNUNET_YES;
4764       debug_routes_extended = GNUNET_YES;
4765     }
4766
4767 #if DEBUG_DHT_ROUTING
4768   if (GNUNET_YES == debug_routes)
4769     {
4770       dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
4771       if (dhtlog_handle == NULL)
4772         {
4773           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4774                       "Could not connect to mysql logging server, logging will not happen!");
4775         }
4776     }
4777 #endif
4778
4779   converge_option = DHT_CONVERGE_SQUARE;
4780   if (GNUNET_YES ==
4781       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4782                                            "converge_linear"))
4783     {
4784       converge_option = DHT_CONVERGE_LINEAR;
4785     }
4786   else if (GNUNET_YES ==
4787         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4788                                              "converge_exponential"))
4789     {
4790       converge_option = DHT_CONVERGE_EXPONENTIAL;
4791     }
4792   else if (GNUNET_YES ==
4793         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4794                                              "converge_random"))
4795     {
4796       converge_option = DHT_CONVERGE_RANDOM;
4797     }
4798   else if (GNUNET_YES ==
4799         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4800                                              "converge_binary"))
4801     {
4802       converge_option = DHT_CONVERGE_BINARY;
4803     }
4804
4805   if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string(cfg, "dht", "converge_modifier", &converge_modifier_buf))
4806     {
4807       if (1 != sscanf(converge_modifier_buf, "%f", &converge_modifier))
4808         {
4809           GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to read decimal value for %s from `%s'\n", "CONVERGE_MODIFIER", converge_modifier_buf);
4810           converge_modifier = 0.0;
4811         }
4812       GNUNET_free(converge_modifier_buf);
4813     }
4814
4815   stats = GNUNET_STATISTICS_create("dht", cfg);
4816
4817   if (stats != NULL)
4818     {
4819       GNUNET_STATISTICS_set(stats, STAT_ROUTES, 0, GNUNET_NO);
4820       GNUNET_STATISTICS_set(stats, STAT_ROUTE_FORWARDS, 0, GNUNET_NO);
4821       GNUNET_STATISTICS_set(stats, STAT_ROUTE_FORWARDS_CLOSEST, 0, GNUNET_NO);
4822       GNUNET_STATISTICS_set(stats, STAT_RESULTS, 0, GNUNET_NO);
4823       GNUNET_STATISTICS_set(stats, STAT_RESULTS_TO_CLIENT, 0, GNUNET_NO);
4824       GNUNET_STATISTICS_set(stats, STAT_RESULT_FORWARDS, 0, GNUNET_NO);
4825       GNUNET_STATISTICS_set(stats, STAT_GETS, 0, GNUNET_NO);
4826       GNUNET_STATISTICS_set(stats, STAT_PUTS, 0, GNUNET_NO);
4827       GNUNET_STATISTICS_set(stats, STAT_PUTS_INSERTED, 0, GNUNET_NO);
4828       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER, 0, GNUNET_NO);
4829       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_START, 0, GNUNET_NO);
4830       GNUNET_STATISTICS_set(stats, STAT_GET_START, 0, GNUNET_NO);
4831       GNUNET_STATISTICS_set(stats, STAT_PUT_START, 0, GNUNET_NO);
4832       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_REPLY, 0, GNUNET_NO);
4833       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_ANSWER, 0, GNUNET_NO);
4834       GNUNET_STATISTICS_set(stats, STAT_BLOOM_FIND_PEER, 0, GNUNET_NO);
4835       GNUNET_STATISTICS_set(stats, STAT_GET_REPLY, 0, GNUNET_NO);
4836       GNUNET_STATISTICS_set(stats, STAT_GET_RESPONSE_START, 0, GNUNET_NO);
4837       GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
4838       GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO);
4839     }
4840   /* FIXME: if there are no recent requests then these never get freed, but alternative is _annoying_! */
4841   recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2);
4842   recent.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
4843   if (GNUNET_YES == do_find_peer)
4844   {
4845     next_send_time.rel_value = DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
4846                            GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
4847                                                     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
4848     find_peer_context.start = GNUNET_TIME_absolute_get();
4849     GNUNET_SCHEDULER_add_delayed (next_send_time,
4850                                   &send_find_peer_message, &find_peer_context);
4851   }
4852
4853   /* Scheduled the task to clean up when shutdown is called */
4854   cleanup_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4855                                                &shutdown_task, NULL);
4856 }
4857
4858 /**
4859  * The main function for the dht service.
4860  *
4861  * @param argc number of arguments from the command line
4862  * @param argv command line arguments
4863  * @return 0 ok, 1 on error
4864  */
4865 int
4866 main (int argc, char *const *argv)
4867 {
4868   int ret;
4869
4870   ret = (GNUNET_OK ==
4871          GNUNET_SERVICE_run (argc,
4872                              argv,
4873                              "dht",
4874                              GNUNET_SERVICE_OPTION_NONE,
4875                              &run, NULL)) ? 0 : 1;
4876   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap));
4877   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap));
4878   GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
4879   GNUNET_CONTAINER_multihashmap_destroy (recent.hashmap);
4880   GNUNET_CONTAINER_heap_destroy (recent.minHeap);
4881   return ret;
4882 }