options should be plain 'int'
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief GNUnet DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_client_lib.h"
31 #include "gnunet_getopt_lib.h"
32 #include "gnunet_os_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_service_lib.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_signal_lib.h"
37 #include "gnunet_util_lib.h"
38 #include "gnunet_datacache_lib.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_dht_service.h"
42 #include "gnunet_statistics_service.h"
43 #include "dhtlog.h"
44 #include "dht.h"
45 #include <fenv.h>
46
47 #define PRINT_TABLES GNUNET_NO
48
49 #define REAL_DISTANCE GNUNET_NO
50
51 #define EXTRA_CHECKS GNUNET_NO
52
53 /**
54  * How many buckets will we allow total.
55  */
56 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
57
58 /**
59  * Should the DHT issue FIND_PEER requests to get better routing tables?
60  */
61 #define DEFAULT_DO_FIND_PEER GNUNET_YES
62
63 /**
64  * Defines whether find peer requests send their HELLO's outgoing,
65  * or expect replies to contain hellos.
66  */
67 #define FIND_PEER_WITH_HELLO GNUNET_YES
68
69 /**
70  * What is the maximum number of peers in a given bucket.
71  */
72 #define DEFAULT_BUCKET_SIZE 4
73
74 #define DEFAULT_CORE_QUEUE_SIZE 32
75
76 /**
77  * Minimum number of peers we need for "good" routing,
78  * any less than this and we will allow messages to
79  * travel much further through the network!
80  */
81 #define MINIMUM_PEER_THRESHOLD 20
82
83 #define DHT_MAX_RECENT 1000
84
85 #define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
86
87 /**
88  * Default time to wait to send messages on behalf of other peers.
89  */
90 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
91
92 /**
93  * Default importance for handling messages on behalf of other peers.
94  */
95 #define DHT_DEFAULT_P2P_IMPORTANCE 0
96
97 /**
98  * How long to keep recent requests around by default.
99  */
100 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
101
102 /**
103  * Default time to wait to send find peer messages sent by the dht service.
104  */
105 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
106
107 /**
108  * Default importance for find peer messages sent by the dht service.
109  */
110 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
111
112 /**
113  * Default replication parameter for find peer messages sent by the dht service.
114  */
115 #define DHT_DEFAULT_FIND_PEER_REPLICATION 4
116
117 /**
118  * Default options for find peer requests sent by the dht service.
119  */
120 #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
121 /*#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE*/
122
123 /**
124  * How long at least to wait before sending another find peer request.
125  */
126 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
127
128 /**
129  * How long at most to wait before sending another find peer request.
130  */
131 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
132
133 /**
134  * How often to update our preference levels for peers in our routing tables.
135  */
136 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
137
138 /**
139  * How long at most on average will we allow a reply forward to take
140  * (before we quit sending out new requests)
141  */
142 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
143
144 /**
145  * How many initial requests to send out (in true Kademlia fashion)
146  */
147 #define DEFAULT_KADEMLIA_REPLICATION 3
148
149 /*
150  * Default frequency for sending malicious get messages
151  */
152 #define DEFAULT_MALICIOUS_GET_FREQUENCY 1000    /* Number of milliseconds */
153
154 /*
155  * Default frequency for sending malicious put messages
156  */
157 #define DEFAULT_MALICIOUS_PUT_FREQUENCY 1000    /* Default is in milliseconds */
158
159
160 #define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
161
162 /**
163  * Real maximum number of hops, at which point we refuse
164  * to forward the message.
165  */
166 #define DEFAULT_MAX_HOPS 10
167
168 /**
169  * How many time differences between requesting a core send and
170  * the actual callback to remember.
171  */
172 #define MAX_REPLY_TIMES 8
173
174
175 /**
176  * Linked list of messages to send to clients.
177  */
178 struct P2PPendingMessage
179 {
180   /**
181    * Pointer to next item in the list
182    */
183   struct P2PPendingMessage *next;
184
185   /**
186    * Pointer to previous item in the list
187    */
188   struct P2PPendingMessage *prev;
189
190   /**
191    * Message importance level.
192    */
193   unsigned int importance;
194
195   /**
196    * Time when this request was scheduled to be sent.
197    */
198   struct GNUNET_TIME_Absolute scheduled;
199
200   /**
201    * How long to wait before sending message.
202    */
203   struct GNUNET_TIME_Relative timeout;
204
205   /**
206    * Actual message to be sent; // avoid allocation
207    */
208   const struct GNUNET_MessageHeader *msg;       // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
209
210 };
211
212 /**
213  * Per-peer information.
214  */
215 struct PeerInfo
216 {
217   /**
218    * Next peer entry (DLL)
219    */
220   struct PeerInfo *next;
221
222   /**
223    *  Prev peer entry (DLL)
224    */
225   struct PeerInfo *prev;
226
227   /**
228    * Count of outstanding messages for peer.
229    */
230   unsigned int pending_count;
231
232   /**
233    * Head of pending messages to be sent to this peer.
234    */
235   struct P2PPendingMessage *head;
236
237   /**
238    * Tail of pending messages to be sent to this peer.
239    */
240   struct P2PPendingMessage *tail;
241
242   /**
243    * Core handle for sending messages to this peer.
244    */
245   struct GNUNET_CORE_TransmitHandle *th;
246
247   /**
248    * Task for scheduling message sends.
249    */
250   GNUNET_SCHEDULER_TaskIdentifier send_task;
251
252   /**
253    * Task for scheduling preference updates
254    */
255   GNUNET_SCHEDULER_TaskIdentifier preference_task;
256
257   /**
258    * Preference update context
259    */
260   struct GNUNET_CORE_InformationRequestContext *info_ctx;
261
262   /**
263    * What is the identity of the peer?
264    */
265   struct GNUNET_PeerIdentity id;
266
267 #if 0
268   /**
269    * What is the average latency for replies received?
270    */
271   struct GNUNET_TIME_Relative latency;
272
273   /**
274    * Transport level distance to peer.
275    */
276   unsigned int distance;
277 #endif
278
279   /**
280    * Holds matching bits from peer to current target,
281    * used for distance comparisons between peers. May
282    * be considered a really bad idea.
283    * FIXME: remove this value (create struct which holds
284    *        a single peerinfo and the matching bits, use
285    *        that to pass to comparator)
286    */
287   unsigned int matching_bits;
288
289   /**
290    * Task for scheduling periodic ping messages for this peer.
291    */
292   GNUNET_SCHEDULER_TaskIdentifier ping_task;
293 };
294
295 /**
296  * Peers are grouped into buckets.
297  */
298 struct PeerBucket
299 {
300   /**
301    * Head of DLL
302    */
303   struct PeerInfo *head;
304
305   /**
306    * Tail of DLL
307    */
308   struct PeerInfo *tail;
309
310   /**
311    * Number of peers in the bucket.
312    */
313   unsigned int peers_size;
314 };
315
316 /**
317  * Linked list of messages to send to clients.
318  */
319 struct PendingMessage
320 {
321   /**
322    * Pointer to next item in the list
323    */
324   struct PendingMessage *next;
325
326   /**
327    * Pointer to previous item in the list
328    */
329   struct PendingMessage *prev;
330
331   /**
332    * Actual message to be sent; // avoid allocation
333    */
334   const struct GNUNET_MessageHeader *msg;       // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
335
336 };
337
338 /**
339  * Struct containing information about a client,
340  * handle to connect to it, and any pending messages
341  * that need to be sent to it.
342  */
343 struct ClientList
344 {
345   /**
346    * Linked list of active clients
347    */
348   struct ClientList *next;
349
350   /**
351    * The handle to this client
352    */
353   struct GNUNET_SERVER_Client *client_handle;
354
355   /**
356    * Handle to the current transmission request, NULL
357    * if none pending.
358    */
359   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
360
361   /**
362    * Linked list of pending messages for this client
363    */
364   struct PendingMessage *pending_head;
365
366   /**
367    * Tail of linked list of pending messages for this client
368    */
369   struct PendingMessage *pending_tail;
370 };
371
372
373 /**
374  * Context containing information about a DHT message received.
375  */
376 struct DHT_MessageContext
377 {
378   /**
379    * The client this request was received from.
380    * (NULL if received from another peer)
381    */
382   struct ClientList *client;
383
384   /**
385    * The peer this request was received from.
386    * (NULL if received from local client)
387    */
388   const struct GNUNET_PeerIdentity *peer;
389
390   /**
391    * Bloomfilter for this routing request.
392    */
393   struct GNUNET_CONTAINER_BloomFilter *bloom;
394
395   /**
396    * extended query (see gnunet_block_lib.h).
397    */
398   const void *xquery;
399
400   /**
401    * Bloomfilter to filter out duplicate replies.
402    */
403   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
404
405   /**
406    * The key this request was about
407    */
408   GNUNET_HashCode key;
409
410   /**
411    * How long should we wait to transmit this request?
412    */
413   struct GNUNET_TIME_Relative timeout;
414
415   /**
416    * The unique identifier of this request
417    */
418   uint64_t unique_id;
419
420   /**
421    * Number of bytes in xquery.
422    */
423   size_t xquery_size;
424
425   /**
426    * Mutator value for the reply_bf, see gnunet_block_lib.h
427    */
428   uint32_t reply_bf_mutator;
429
430   /**
431    * Desired replication level
432    */
433   uint32_t replication;
434
435   /**
436    * Network size estimate, either ours or the sum of
437    * those routed to thus far. =~ Log of number of peers
438    * chosen from for this request.
439    */
440   uint32_t network_size;
441
442   /**
443    * Any message options for this request
444    */
445   uint32_t msg_options;
446
447   /**
448    * How many hops has the message already traversed?
449    */
450   uint32_t hop_count;
451
452   /**
453    * How many peer identities are present in the path history?
454    */
455   uint32_t path_history_len;
456
457   /**
458    * Path history.
459    */
460   char *path_history;
461
462   /**
463    * How important is this message?
464    */
465   unsigned int importance;
466
467   /**
468    * Should we (still) forward the request on to other peers?
469    */
470   int do_forward;
471
472   /**
473    * Did we forward this message? (may need to remember it!)
474    */
475   int forwarded;
476
477   /**
478    * Are we the closest known peer to this key (out of our neighbors?)
479    */
480   int closest;
481 };
482
483 /**
484  * Record used for remembering what peers are waiting for what
485  * responses (based on search key).
486  */
487 struct DHTRouteSource
488 {
489   /**
490    * This is a DLL.
491    */
492   struct DHTRouteSource *next;
493
494   /**
495    * This is a DLL.
496    */
497   struct DHTRouteSource *prev;
498
499   /**
500    * Source of the request.  Replies should be forwarded to
501    * this peer.
502    */
503   struct GNUNET_PeerIdentity source;
504
505   /**
506    * If this was a local request, remember the client; otherwise NULL.
507    */
508   struct ClientList *client;
509
510   /**
511    * Pointer to this nodes heap location (for removal)
512    */
513   struct GNUNET_CONTAINER_HeapNode *hnode;
514
515   /**
516    * Back pointer to the record storing this information.
517    */
518   struct DHTQueryRecord *record;
519
520   /**
521    * Task to remove this entry on timeout.
522    */
523   GNUNET_SCHEDULER_TaskIdentifier delete_task;
524
525   /**
526    * Bloomfilter of peers we have already sent back as
527    * replies to the initial request.  Allows us to not
528    * forward the same peer multiple times for a find peer
529    * request.
530    */
531   struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
532
533 };
534
535 /**
536  * Entry in the DHT routing table.
537  */
538 struct DHTQueryRecord
539 {
540   /**
541    * Head of DLL for result forwarding.
542    */
543   struct DHTRouteSource *head;
544
545   /**
546    * Tail of DLL for result forwarding.
547    */
548   struct DHTRouteSource *tail;
549
550   /**
551    * Key that the record concerns.
552    */
553   GNUNET_HashCode key;
554
555   /**
556    * GET message of this record (what we already forwarded?).
557    */
558   //DV_DHT_MESSAGE get; Try to get away with not saving this.
559
560   /**
561    * Bloomfilter of the peers we've replied to so far
562    */
563   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
564
565 };
566
567 /**
568  * Context used to calculate the number of find peer messages
569  * per X time units since our last scheduled find peer message
570  * was sent.  If we have seen too many messages, delay or don't
571  * send our own out.
572  */
573 struct FindPeerMessageContext
574 {
575   unsigned int count;
576
577   struct GNUNET_TIME_Absolute start;
578
579   struct GNUNET_TIME_Absolute end;
580 };
581
582 /**
583  * DHT Routing results structure
584  */
585 struct DHTResults
586 {
587   /*
588    * Min heap for removal upon reaching limit
589    */
590   struct GNUNET_CONTAINER_Heap *minHeap;
591
592   /*
593    * Hashmap for fast key based lookup
594    */
595   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
596
597 };
598
599 /**
600  * DHT structure for recent requests.
601  */
602 struct RecentRequests
603 {
604   /*
605    * Min heap for removal upon reaching limit
606    */
607   struct GNUNET_CONTAINER_Heap *minHeap;
608
609   /*
610    * Hashmap for key based lookup
611    */
612   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
613 };
614
615 struct RecentRequest
616 {
617   /**
618    * Position of this node in the min heap.
619    */
620   struct GNUNET_CONTAINER_HeapNode *heap_node;
621
622   /**
623    * Bloomfilter containing entries for peers
624    * we forwarded this request to.
625    */
626   struct GNUNET_CONTAINER_BloomFilter *bloom;
627
628   /**
629    * Timestamp of this request, for ordering
630    * the min heap.
631    */
632   struct GNUNET_TIME_Absolute timestamp;
633
634   /**
635    * Key of this request.
636    */
637   GNUNET_HashCode key;
638
639   /**
640    * Unique identifier for this request.
641    */
642   uint64_t uid;
643
644   /**
645    * Task to remove this entry on timeout.
646    */
647   GNUNET_SCHEDULER_TaskIdentifier remove_task;
648 };
649
650 struct RepublishContext
651 {
652   /**
653    * Key to republish.
654    */
655   GNUNET_HashCode key;
656
657   /**
658    * Type of the data.
659    */
660   unsigned int type;
661
662 };
663
664
665 /**
666  * Modifier for the convergence function
667  */
668 static float converge_modifier;
669
670 /**
671  * Recent requests by hash/uid and by time inserted.
672  */
673 static struct RecentRequests recent;
674
675 /**
676  * Context to use to calculate find peer rates.
677  */
678 static struct FindPeerMessageContext find_peer_context;
679
680 /**
681  * Don't use our routing algorithm, always route
682  * to closest peer; initially send requests to 3
683  * peers.
684  */
685 static int strict_kademlia;
686
687 /**
688  * Routing option to end routing when closest peer found.
689  */
690 static int stop_on_closest;
691
692 /**
693  * Routing option to end routing when data is found.
694  */
695 static int stop_on_found;
696
697 /**
698  * Whether DHT needs to manage find peer requests, or
699  * an external force will do it on behalf of the DHT.
700  */
701 static int do_find_peer;
702
703 /**
704  * Once we have stored an item in the DHT, refresh it
705  * according to our republish interval.
706  */
707 static int do_republish;
708
709 /**
710  * Use exactly the forwarding formula as described in
711  * the paper if set to GNUNET_YES, otherwise use the
712  * slightly modified version.
713  */
714 static int paper_forwarding;
715
716 /**
717  * PUT Peer Identities of peers we know about into
718  * the datacache.
719  */
720 static int put_peer_identities;
721
722 /**
723  * Use the "real" distance metric when selecting the
724  * next routing hop.  Can be less accurate.
725  */
726 static int use_real_distance;
727
728 /**
729  * How many peers have we added since we sent out our last
730  * find peer request?
731  */
732 static unsigned int newly_found_peers;
733
734 /**
735  * Container of active queries we should remember
736  */
737 static struct DHTResults forward_list;
738
739 /**
740  * Handle to the datacache service (for inserting/retrieving data)
741  */
742 static struct GNUNET_DATACACHE_Handle *datacache;
743
744 /**
745  * Handle for the statistics service.
746  */
747 struct GNUNET_STATISTICS_Handle *stats;
748
749 /**
750  * Handle to get our current HELLO.
751  */
752 static struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
753
754 /**
755  * The configuration the DHT service is running with
756  */
757 static const struct GNUNET_CONFIGURATION_Handle *cfg;
758
759 /**
760  * Handle to the core service
761  */
762 static struct GNUNET_CORE_Handle *coreAPI;
763
764 /**
765  * Handle to the transport service, for getting our hello
766  */
767 static struct GNUNET_TRANSPORT_Handle *transport_handle;
768
769 /**
770  * The identity of our peer.
771  */
772 static struct GNUNET_PeerIdentity my_identity;
773
774 /**
775  * Short id of the peer, for printing
776  */
777 static char *my_short_id;
778
779 /**
780  * Our HELLO
781  */
782 static struct GNUNET_MessageHeader *my_hello;
783
784 /**
785  * Task to run when we shut down, cleaning up all our trash
786  */
787 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
788
789 /**
790  * The lowest currently used bucket.
791  */
792 static unsigned int lowest_bucket;      /* Initially equal to MAX_BUCKETS - 1 */
793
794 /**
795  * The maximum number of hops before we stop routing messages.
796  */
797 static unsigned long long max_hops;
798
799 /**
800  * How often to republish content we have previously stored.
801  */
802 static struct GNUNET_TIME_Relative dht_republish_frequency;
803
804 /**
805  * GNUNET_YES to stop at max_hops, GNUNET_NO to heuristically decide when to stop forwarding.
806  */
807 static int use_max_hops;
808
809 /**
810  * The buckets (Kademlia routing table, complete with growth).
811  * Array of size MAX_BUCKET_SIZE.
812  */
813 static struct PeerBucket k_buckets[MAX_BUCKETS];        /* From 0 to MAX_BUCKETS - 1 */
814
815 /**
816  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
817  */
818 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
819
820 /**
821  * Recently seen find peer requests.
822  */
823 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
824
825 /**
826  * Maximum size for each bucket.
827  */
828 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;  /* Initially equal to DEFAULT_BUCKET_SIZE */
829
830 /**
831  * List of active clients.
832  */
833 static struct ClientList *client_list;
834
835 /**
836  * Handle to the DHT logger.
837  */
838 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
839
840 /*
841  * Whether or not to send routing debugging information
842  * to the dht logging server
843  */
844 static unsigned int debug_routes;
845
846 /*
847  * Whether or not to send FULL route information to
848  * logging server
849  */
850 static unsigned int debug_routes_extended;
851
852 /*
853  * GNUNET_YES or GNUNET_NO, whether or not to act as
854  * a malicious node which drops all messages
855  */
856 static unsigned int malicious_dropper;
857
858 /*
859  * GNUNET_YES or GNUNET_NO, whether or not to act as
860  * a malicious node which sends out lots of GETS
861  */
862 static unsigned int malicious_getter;
863
864 /**
865  * GNUNET_YES or GNUNET_NO, whether or not to act as
866  * a malicious node which sends out lots of PUTS
867  */
868 static unsigned int malicious_putter;
869
870 /**
871  * Frequency for malicious get requests.
872  */
873 static unsigned long long malicious_get_frequency;
874
875 /**
876  * Frequency for malicious put requests.
877  */
878 static unsigned long long malicious_put_frequency;
879
880 /**
881  * Kademlia replication
882  */
883 static unsigned long long kademlia_replication;
884
885 /**
886  * Reply times for requests, if we are busy, don't send any
887  * more requests!
888  */
889 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
890
891 /**
892  * Current counter for replies.
893  */
894 static unsigned int reply_counter;
895
896 /**
897  * Our handle to the BLOCK library.
898  */
899 static struct GNUNET_BLOCK_Context *block_context;
900
901
902 /**
903  * Forward declaration.
904  */
905 static size_t
906 send_generic_reply (void *cls, size_t size, void *buf);
907
908
909 /** Declare here so retry_core_send is aware of it */
910 static size_t
911 core_transmit_notify (void *cls, size_t size, void *buf);
912
913 /**
914  * Convert unique ID to hash code.
915  *
916  * @param uid unique ID to convert
917  * @param hash set to uid (extended with zeros)
918  */
919 static void
920 hash_from_uid (uint64_t uid, GNUNET_HashCode * hash)
921 {
922   memset (hash, 0, sizeof (GNUNET_HashCode));
923   *((uint64_t *) hash) = uid;
924 }
925
926 #if AVG
927 /**
928  * Calculate the average send time between messages so that we can
929  * ignore certain requests if we get too busy.
930  *
931  * @return the average time between asking core to send a message
932  *         and when the buffer for copying it is passed
933  */
934 static struct GNUNET_TIME_Relative
935 get_average_send_delay ()
936 {
937   unsigned int i;
938   unsigned int divisor;
939   struct GNUNET_TIME_Relative average_time;
940
941   average_time = GNUNET_TIME_relative_get_zero ();
942   divisor = 0;
943   for (i = 0; i < MAX_REPLY_TIMES; i++)
944   {
945     average_time = GNUNET_TIME_relative_add (average_time, reply_times[i]);
946     if (reply_times[i].abs_value == (uint64_t) 0)
947       continue;
948     else
949       divisor++;
950   }
951   if (divisor == 0)
952   {
953     return average_time;
954   }
955
956   average_time = GNUNET_TIME_relative_divide (average_time, divisor);
957   fprintf (stderr, "Avg send delay: %u sends is %llu\n", divisor,
958            (unsigned long long) average_time.abs_value);
959   return average_time;
960 }
961 #endif
962
963 /**
964  * Given the largest send delay, artificially decrease it
965  * so the next time around we may have a chance at sending
966  * again.
967  */
968 static void
969 decrease_max_send_delay (struct GNUNET_TIME_Relative max_time)
970 {
971   unsigned int i;
972
973   for (i = 0; i < MAX_REPLY_TIMES; i++)
974   {
975     if (reply_times[i].rel_value == max_time.rel_value)
976     {
977       reply_times[i].rel_value = reply_times[i].rel_value / 2;
978       return;
979     }
980   }
981 }
982
983 /**
984  * Find the maximum send time of the recently sent values.
985  *
986  * @return the average time between asking core to send a message
987  *         and when the buffer for copying it is passed
988  */
989 static struct GNUNET_TIME_Relative
990 get_max_send_delay ()
991 {
992   unsigned int i;
993   struct GNUNET_TIME_Relative max_time;
994
995   max_time = GNUNET_TIME_relative_get_zero ();
996
997   for (i = 0; i < MAX_REPLY_TIMES; i++)
998   {
999     if (reply_times[i].rel_value > max_time.rel_value)
1000       max_time.rel_value = reply_times[i].rel_value;
1001   }
1002 #if DEBUG_DHT
1003   if (max_time.rel_value > MAX_REQUEST_TIME.rel_value)
1004     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n",
1005                 (unsigned long long) max_time.rel_value);
1006 #endif
1007   return max_time;
1008 }
1009
1010 static void
1011 increment_stats (const char *value)
1012 {
1013   if (stats != NULL)
1014   {
1015     GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
1016   }
1017 }
1018
1019 static void
1020 decrement_stats (const char *value)
1021 {
1022   if (stats != NULL)
1023   {
1024     GNUNET_STATISTICS_update (stats, value, -1, GNUNET_NO);
1025   }
1026 }
1027
1028 /**
1029  *  Try to send another message from our core send list
1030  */
1031 static void
1032 try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1033 {
1034   struct PeerInfo *peer = cls;
1035   struct P2PPendingMessage *pending;
1036   size_t ssize;
1037
1038   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
1039
1040   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1041     return;
1042
1043   if (peer->th != NULL)
1044     return;                     /* Message send already in progress */
1045
1046   pending = peer->head;
1047   if (pending != NULL)
1048   {
1049     ssize = ntohs (pending->msg->size);
1050 #if DEBUG_DHT > 1
1051     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1052                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n",
1053                 my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id));
1054 #endif
1055     pending->scheduled = GNUNET_TIME_absolute_get ();
1056     reply_counter++;
1057     if (reply_counter >= MAX_REPLY_TIMES)
1058       reply_counter = 0;
1059     peer->th =
1060         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
1061                                            pending->importance,
1062                                            pending->timeout, &peer->id, ssize,
1063                                            &core_transmit_notify, peer);
1064     if (peer->th == NULL)
1065       increment_stats ("# notify transmit ready failed");
1066   }
1067 }
1068
1069 /**
1070  * Function called to send a request out to another peer.
1071  * Called both for locally initiated requests and those
1072  * received from other peers.
1073  *
1074  * @param msg the encapsulated message
1075  * @param peer the peer to forward the message to
1076  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1077  */
1078 static void
1079 forward_result_message (const struct GNUNET_MessageHeader *msg,
1080                         struct PeerInfo *peer,
1081                         struct DHT_MessageContext *msg_ctx)
1082 {
1083   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
1084   struct P2PPendingMessage *pending;
1085   size_t msize;
1086   size_t psize;
1087   char *path_start;
1088   char *path_offset;
1089
1090 #if DEBUG_PATH
1091   unsigned int i;
1092 #endif
1093
1094   increment_stats (STAT_RESULT_FORWARDS);
1095   msize =
1096       sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) +
1097       (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len);
1098   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1099   psize = sizeof (struct P2PPendingMessage) + msize;
1100   pending = GNUNET_malloc (psize);
1101   pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
1102   pending->importance = DHT_SEND_PRIORITY;
1103   pending->timeout = GNUNET_TIME_relative_get_forever ();
1104   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg;
1105   result_message->header.size = htons (msize);
1106   result_message->header.type =
1107       htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
1108   result_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
1109   if (msg_ctx->path_history_len > 0)
1110   {
1111     /* End of pending is where enc_msg starts */
1112     path_start = (char *) &pending[1];
1113     /* Offset by the size of the enc_msg */
1114     path_start += ntohs (msg->size);
1115     memcpy (path_start, msg_ctx->path_history,
1116             msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
1117 #if DEBUG_PATH
1118     for (i = 0; i < msg_ctx->path_history_len; i++)
1119     {
1120       path_offset =
1121           &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1122       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123                   "(forward_result) Key %s Found peer %d:%s\n",
1124                   GNUNET_h2s (&msg_ctx->key), i,
1125                   GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1126     }
1127 #endif
1128   }
1129   result_message->options = htonl (msg_ctx->msg_options);
1130   result_message->hop_count = htonl (msg_ctx->hop_count + 1);
1131   GNUNET_assert (GNUNET_OK ==
1132                  GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
1133                                                             result_message->
1134                                                             bloomfilter,
1135                                                             DHT_BLOOM_SIZE));
1136   result_message->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1137   memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1138   /* Copy the enc_msg, then the path history as well! */
1139   memcpy (&result_message[1], msg, ntohs (msg->size));
1140   path_offset = (char *) &result_message[1];
1141   path_offset += ntohs (msg->size);
1142   /* If we have path history, copy it to the end of the whole thing */
1143   if (msg_ctx->path_history_len > 0)
1144     memcpy (path_offset, msg_ctx->path_history,
1145             msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
1146 #if DEBUG_DHT > 1
1147   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1148               "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1149               "DHT", msize, GNUNET_i2s (&peer->id));
1150 #endif
1151   peer->pending_count++;
1152   increment_stats ("# pending messages scheduled");
1153   GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1154                                      pending);
1155   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1156     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1157 }
1158
1159
1160 /**
1161  * Called when core is ready to send a message we asked for
1162  * out to the destination.
1163  *
1164  * @param cls closure (NULL)
1165  * @param size number of bytes available in buf
1166  * @param buf where the callee should write the message
1167  * @return number of bytes written to buf
1168  */
1169 static size_t
1170 core_transmit_notify (void *cls, size_t size, void *buf)
1171 {
1172   struct PeerInfo *peer = cls;
1173   char *cbuf = buf;
1174   struct P2PPendingMessage *pending;
1175
1176   size_t off;
1177   size_t msize;
1178
1179   peer->th = NULL;
1180   if (buf == NULL)
1181   {
1182     /* client disconnected */
1183 #if DEBUG_DHT
1184     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n",
1185                 my_short_id, "DHT");
1186 #endif
1187     return 0;
1188   }
1189
1190   if (peer->head == NULL)
1191     return 0;
1192
1193   off = 0;
1194   pending = peer->head;
1195 #if DUMB
1196   reply_times[reply_counter] =
1197       GNUNET_TIME_absolute_get_difference (pending->scheduled,
1198                                            GNUNET_TIME_absolute_get ());
1199   msize = ntohs (pending->msg->size);
1200   if (msize <= size)
1201   {
1202     off = msize;
1203     memcpy (cbuf, pending->msg, msize);
1204     peer->pending_count--;
1205     increment_stats ("# pending messages sent");
1206     GNUNET_assert (peer->pending_count >= 0);
1207     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
1208     GNUNET_free (pending);
1209   }
1210 #else
1211   while (NULL != pending &&
1212          (size - off >= (msize = ntohs (pending->msg->size))))
1213   {
1214     memcpy (&cbuf[off], pending->msg, msize);
1215     off += msize;
1216     peer->pending_count--;
1217     increment_stats ("# pending messages sent");
1218     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
1219     GNUNET_free (pending);
1220     pending = peer->head;
1221   }
1222 #endif
1223   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
1224     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1225
1226   return off;
1227 }
1228
1229
1230 /**
1231  * Compute the distance between have and target as a 32-bit value.
1232  * Differences in the lower bits must count stronger than differences
1233  * in the higher bits.
1234  *
1235  * @return 0 if have==target, otherwise a number
1236  *           that is larger as the distance between
1237  *           the two hash codes increases
1238  */
1239 static unsigned int
1240 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1241 {
1242   unsigned int bucket;
1243   unsigned int msb;
1244   unsigned int lsb;
1245   unsigned int i;
1246
1247   /* We have to represent the distance between two 2^9 (=512)-bit
1248    * numbers as a 2^5 (=32)-bit number with "0" being used for the
1249    * two numbers being identical; furthermore, we need to
1250    * guarantee that a difference in the number of matching
1251    * bits is always represented in the result.
1252    *
1253    * We use 2^32/2^9 numerical values to distinguish between
1254    * hash codes that have the same LSB bit distance and
1255    * use the highest 2^9 bits of the result to signify the
1256    * number of (mis)matching LSB bits; if we have 0 matching
1257    * and hence 512 mismatching LSB bits we return -1 (since
1258    * 512 itself cannot be represented with 9 bits) */
1259
1260   /* first, calculate the most significant 9 bits of our
1261    * result, aka the number of LSBs */
1262   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1263   /* bucket is now a value between 0 and 512 */
1264   if (bucket == 512)
1265     return 0;                   /* perfect match */
1266   if (bucket == 0)
1267     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1268                                  * below, we'd end up with max+1 (overflow)) */
1269
1270   /* calculate the most significant bits of the final result */
1271   msb = (512 - bucket) << (32 - 9);
1272   /* calculate the 32-9 least significant bits of the final result by
1273    * looking at the differences in the 32-9 bits following the
1274    * mismatching bit at 'bucket' */
1275   lsb = 0;
1276   for (i = bucket + 1;
1277        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1278   {
1279     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
1280         GNUNET_CRYPTO_hash_get_bit (have, i))
1281       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
1282                                                  * last bit set will be 31 -- if
1283                                                  * i does not reach 512 first... */
1284   }
1285   return msb | lsb;
1286 }
1287
1288 /**
1289  * Return a number that is larger the closer the
1290  * "have" GNUNET_hash code is to the "target".
1291  *
1292  * @return inverse distance metric, non-zero.
1293  *         Must fudge the value if NO bits match.
1294  */
1295 static unsigned int
1296 inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1297 {
1298   if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
1299     return 1;                   /* Never return 0! */
1300   return ((unsigned int) -1) - distance (target, have);
1301 }
1302
1303
1304 /**
1305  * Find the optimal bucket for this key, regardless
1306  * of the current number of buckets in use.
1307  *
1308  * @param hc the hashcode to compare our identity to
1309  *
1310  * @return the proper bucket index, or GNUNET_SYSERR
1311  *         on error (same hashcode)
1312  */
1313 static int
1314 find_bucket (const GNUNET_HashCode * hc)
1315 {
1316   unsigned int bits;
1317
1318   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
1319   if (bits == MAX_BUCKETS)
1320     return GNUNET_SYSERR;
1321   return MAX_BUCKETS - bits - 1;
1322 }
1323
1324
1325 /**
1326  * Find which k-bucket this peer should go into,
1327  * taking into account the size of the k-bucket
1328  * array.  This means that if more bits match than
1329  * there are currently buckets, lowest_bucket will
1330  * be returned.
1331  *
1332  * @param hc GNUNET_HashCode we are finding the bucket for.
1333  *
1334  * @return the proper bucket index for this key,
1335  *         or GNUNET_SYSERR on error (same hashcode)
1336  */
1337 static int
1338 find_current_bucket (const GNUNET_HashCode * hc)
1339 {
1340   int actual_bucket;
1341
1342   actual_bucket = find_bucket (hc);
1343   if (actual_bucket == GNUNET_SYSERR)   /* hc and our peer identity match! */
1344     return lowest_bucket;
1345   if (actual_bucket < lowest_bucket)    /* actual_bucket not yet used */
1346     return lowest_bucket;
1347   return actual_bucket;
1348 }
1349
1350 #if EXTRA_CHECKS
1351 /**
1352  * Find a routing table entry from a peer identity
1353  *
1354  * @param peer the peer to look up
1355  *
1356  * @return the bucket number holding the peer, GNUNET_SYSERR if not found
1357  */
1358 static int
1359 find_bucket_by_peer (const struct PeerInfo *peer)
1360 {
1361   int bucket;
1362   struct PeerInfo *pos;
1363
1364   for (bucket = lowest_bucket; bucket < MAX_BUCKETS - 1; bucket++)
1365   {
1366     pos = k_buckets[bucket].head;
1367     while (pos != NULL)
1368     {
1369       if (peer == pos)
1370         return bucket;
1371       pos = pos->next;
1372     }
1373   }
1374
1375   return GNUNET_SYSERR;         /* No such peer. */
1376 }
1377 #endif
1378
1379 #if PRINT_TABLES
1380 /**
1381  * Print the complete routing table for this peer.
1382  */
1383 static void
1384 print_routing_table ()
1385 {
1386   int bucket;
1387   struct PeerInfo *pos;
1388   char char_buf[30000];
1389   int char_pos;
1390
1391   memset (char_buf, 0, sizeof (char_buf));
1392   char_pos = 0;
1393   char_pos +=
1394       sprintf (&char_buf[char_pos], "Printing routing table for peer %s\n",
1395                my_short_id);
1396   //fprintf(stderr, "Printing routing table for peer %s\n", my_short_id);
1397   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1398   {
1399     pos = k_buckets[bucket].head;
1400     char_pos += sprintf (&char_buf[char_pos], "Bucket %d:\n", bucket);
1401     //fprintf(stderr, "Bucket %d:\n", bucket);
1402     while (pos != NULL)
1403     {
1404       //fprintf(stderr, "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1405       char_pos +=
1406           sprintf (&char_buf[char_pos],
1407                    "\tPeer %s, best bucket %d, %d bits match\n",
1408                    GNUNET_i2s (&pos->id), find_bucket (&pos->id.hashPubKey),
1409                    GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey,
1410                                                      &my_identity.hashPubKey));
1411       pos = pos->next;
1412     }
1413   }
1414   fprintf (stderr, "%s", char_buf);
1415   fflush (stderr);
1416 }
1417 #endif
1418
1419 /**
1420  * Find a routing table entry from a peer identity
1421  *
1422  * @param peer the peer identity to look up
1423  *
1424  * @return the routing table entry, or NULL if not found
1425  */
1426 static struct PeerInfo *
1427 find_peer_by_id (const struct GNUNET_PeerIdentity *peer)
1428 {
1429   int bucket;
1430   struct PeerInfo *pos;
1431
1432   bucket = find_current_bucket (&peer->hashPubKey);
1433
1434   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
1435     return NULL;
1436
1437   pos = k_buckets[bucket].head;
1438   while (pos != NULL)
1439   {
1440     if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity)))
1441       return pos;
1442     pos = pos->next;
1443   }
1444   return NULL;                  /* No such peer. */
1445 }
1446
1447 /* Forward declaration */
1448 static void
1449 update_core_preference (void *cls,
1450                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1451 /**
1452  * Function called with statistics about the given peer.
1453  *
1454  * @param cls closure
1455  * @param peer identifies the peer
1456  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1457  * @param amount set to the amount that was actually reserved or unreserved;
1458  *               either the full requested amount or zero (no partial reservations)
1459  * @param res_delay if the reservation could not be satisfied (amount was 0), how
1460  *        long should the client wait until re-trying?
1461  * @param preference current traffic preference for the given peer
1462  */
1463 static void
1464 update_core_preference_finish (void *cls,
1465                                const struct GNUNET_PeerIdentity *peer,
1466                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1467                                int32_t amount,
1468                                struct GNUNET_TIME_Relative res_delay,
1469                                uint64_t preference)
1470 {
1471   struct PeerInfo *peer_info = cls;
1472
1473   peer_info->info_ctx = NULL;
1474   GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
1475                                 &update_core_preference, peer_info);
1476 }
1477
1478 static void
1479 update_core_preference (void *cls,
1480                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1481 {
1482   struct PeerInfo *peer = cls;
1483   uint64_t preference;
1484   unsigned int matching;
1485
1486   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1487   {
1488     return;
1489   }
1490   matching =
1491       GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
1492                                         &peer->id.hashPubKey);
1493   if (matching >= 64)
1494   {
1495 #if DEBUG_DHT
1496     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1497                 "Peer identifier matches by %u bits, only shifting as much as we can!\n",
1498                 matching);
1499 #endif
1500     matching = 63;
1501   }
1502   preference = 1LL << matching;
1503   peer->info_ctx =
1504       GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
1505                                           GNUNET_TIME_UNIT_FOREVER_REL,
1506                                           GNUNET_BANDWIDTH_VALUE_MAX, 0,
1507                                           preference,
1508                                           &update_core_preference_finish, peer);
1509 }
1510
1511
1512 /**
1513  * Given a peer and its corresponding bucket,
1514  * remove it from that bucket.  Does not free
1515  * the PeerInfo struct, nor cancel messages
1516  * or free messages waiting to be sent to this
1517  * peer!
1518  *
1519  * @param peer the peer to remove
1520  * @param bucket the bucket the peer belongs to
1521  */
1522 static void
1523 remove_peer (struct PeerInfo *peer, unsigned int bucket)
1524 {
1525   GNUNET_assert (k_buckets[bucket].peers_size > 0);
1526   GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail,
1527                                peer);
1528   k_buckets[bucket].peers_size--;
1529 #if CHANGE_LOWEST
1530   if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) &&
1531       (lowest_bucket < MAX_BUCKETS - 1))
1532     lowest_bucket++;
1533 #endif
1534 }
1535
1536 /**
1537  * Removes peer from a bucket, then frees associated
1538  * resources and frees peer.
1539  *
1540  * @param peer peer to be removed and freed
1541  * @param bucket which bucket this peer belongs to
1542  */
1543 static void
1544 delete_peer (struct PeerInfo *peer, unsigned int bucket)
1545 {
1546   struct P2PPendingMessage *pos;
1547   struct P2PPendingMessage *next;
1548
1549 #if EXTRA_CHECKS
1550   struct PeerInfo *peer_pos;
1551
1552   peer_pos = k_buckets[bucket].head;
1553   while ((peer_pos != NULL) && (peer_pos != peer))
1554     peer_pos = peer_pos->next;
1555   if (peer_pos == NULL)
1556   {
1557     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1558                 "%s:%s: Expected peer `%s' in bucket %d\n", my_short_id, "DHT",
1559                 GNUNET_i2s (&peer->id), bucket);
1560     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1561                 "%s:%s: Lowest bucket: %d, find_current_bucket: %d, peer resides in bucket: %d\n",
1562                 my_short_id, "DHT", lowest_bucket,
1563                 find_current_bucket (&peer->id.hashPubKey),
1564                 find_bucket_by_peer (peer));
1565   }
1566   GNUNET_assert (peer_pos != NULL);
1567 #endif
1568   remove_peer (peer, bucket);   /* First remove the peer from its bucket */
1569
1570   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
1571     GNUNET_SCHEDULER_cancel (peer->send_task);
1572   if ((peer->th != NULL) && (coreAPI != NULL))
1573     GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
1574
1575   pos = peer->head;
1576   while (pos != NULL)           /* Remove any pending messages for this peer */
1577   {
1578     increment_stats
1579         ("# dht pending messages discarded (due to disconnect/shutdown)");
1580     next = pos->next;
1581     GNUNET_free (pos);
1582     pos = next;
1583   }
1584
1585   GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
1586                  (all_known_peers, &peer->id.hashPubKey));
1587   GNUNET_assert (GNUNET_YES ==
1588                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
1589                                                        &peer->id.hashPubKey,
1590                                                        peer));
1591   GNUNET_free (peer);
1592   decrement_stats (STAT_PEERS_KNOWN);
1593 }
1594
1595
1596 /**
1597  * Iterator over hash map entries.
1598  *
1599  * @param cls closure
1600  * @param key current key code
1601  * @param value PeerInfo of the peer to move to new lowest bucket
1602  * @return GNUNET_YES if we should continue to
1603  *         iterate,
1604  *         GNUNET_NO if not.
1605  */
1606 static int
1607 move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value)
1608 {
1609   struct PeerInfo *peer = value;
1610   int new_bucket;
1611
1612   GNUNET_assert (lowest_bucket > 0);
1613   new_bucket = lowest_bucket - 1;
1614   remove_peer (peer, lowest_bucket);
1615   GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head,
1616                                      k_buckets[new_bucket].tail,
1617                                      k_buckets[new_bucket].tail, peer);
1618   k_buckets[new_bucket].peers_size++;
1619   return GNUNET_YES;
1620 }
1621
1622
1623 /**
1624  * The current lowest bucket is full, so change the lowest
1625  * bucket to the next lower down, and move any appropriate
1626  * entries in the current lowest bucket to the new bucket.
1627  */
1628 static void
1629 enable_next_bucket ()
1630 {
1631   struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1632   struct PeerInfo *pos;
1633
1634   GNUNET_assert (lowest_bucket > 0);
1635   to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size);
1636   pos = k_buckets[lowest_bucket].head;
1637
1638 #if PRINT_TABLES
1639   fprintf (stderr, "Printing RT before new bucket\n");
1640   print_routing_table ();
1641 #endif
1642   /* Populate the array of peers which should be in the next lowest bucket */
1643   while (pos != NULL)
1644   {
1645     if (find_bucket (&pos->id.hashPubKey) < lowest_bucket)
1646       GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos,
1647                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1648     pos = pos->next;
1649   }
1650
1651   /* Remove peers from lowest bucket, insert into next lowest bucket */
1652   GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL);
1653   GNUNET_CONTAINER_multihashmap_destroy (to_remove);
1654   lowest_bucket = lowest_bucket - 1;
1655 #if PRINT_TABLES
1656   fprintf (stderr, "Printing RT after new bucket\n");
1657   print_routing_table ();
1658 #endif
1659 }
1660
1661 /**
1662  * Find the closest peer in our routing table to the
1663  * given hashcode.
1664  *
1665  * @return The closest peer in our routing table to the
1666  *         key, or NULL on error.
1667  */
1668 static struct PeerInfo *
1669 find_closest_peer (const GNUNET_HashCode * hc)
1670 {
1671   struct PeerInfo *pos;
1672   struct PeerInfo *current_closest;
1673   unsigned int lowest_distance;
1674   unsigned int temp_distance;
1675   int bucket;
1676   int count;
1677
1678   lowest_distance = -1;
1679
1680   if (k_buckets[lowest_bucket].peers_size == 0)
1681     return NULL;
1682
1683   current_closest = NULL;
1684   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1685   {
1686     pos = k_buckets[bucket].head;
1687     count = 0;
1688     while ((pos != NULL) && (count < bucket_size))
1689     {
1690       temp_distance = distance (&pos->id.hashPubKey, hc);
1691       if (temp_distance <= lowest_distance)
1692       {
1693         lowest_distance = temp_distance;
1694         current_closest = pos;
1695       }
1696       pos = pos->next;
1697       count++;
1698     }
1699   }
1700   GNUNET_assert (current_closest != NULL);
1701   return current_closest;
1702 }
1703
1704
1705 /**
1706  * Function called to send a request out to another peer.
1707  * Called both for locally initiated requests and those
1708  * received from other peers.
1709  *
1710  * @param msg the encapsulated message
1711  * @param peer the peer to forward the message to
1712  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1713  */
1714 static void
1715 forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer,
1716                  struct DHT_MessageContext *msg_ctx)
1717 {
1718   struct GNUNET_DHT_P2PRouteMessage *route_message;
1719   struct P2PPendingMessage *pending;
1720   size_t msize;
1721   size_t psize;
1722   char *route_path;
1723
1724   increment_stats (STAT_ROUTE_FORWARDS);
1725   GNUNET_assert (peer != NULL);
1726   if ((msg_ctx->closest != GNUNET_YES) &&
1727       (peer == find_closest_peer (&msg_ctx->key)))
1728     increment_stats (STAT_ROUTE_FORWARDS_CLOSEST);
1729
1730   msize =
1731       sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) +
1732       (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1733   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1734   psize = sizeof (struct P2PPendingMessage) + msize;
1735   pending = GNUNET_malloc (psize);
1736   pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
1737   pending->importance = msg_ctx->importance;
1738   pending->timeout = msg_ctx->timeout;
1739   route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg;
1740   route_message->header.size = htons (msize);
1741   route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1742   route_message->options = htonl (msg_ctx->msg_options);
1743   route_message->hop_count = htonl (msg_ctx->hop_count + 1);
1744   route_message->network_size = htonl (msg_ctx->network_size);
1745   route_message->desired_replication_level = htonl (msg_ctx->replication);
1746   route_message->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1747   if (msg_ctx->bloom != NULL)
1748     GNUNET_assert (GNUNET_OK ==
1749                    GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
1750                                                               route_message->
1751                                                               bloomfilter,
1752                                                               DHT_BLOOM_SIZE));
1753   memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1754   memcpy (&route_message[1], msg, ntohs (msg->size));
1755   if (GNUNET_DHT_RO_RECORD_ROUTE ==
1756       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1757   {
1758     route_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
1759     /* Set pointer to start of enc_msg */
1760     route_path = (char *) &route_message[1];
1761     /* Offset to the end of the enc_msg */
1762     route_path += ntohs (msg->size);
1763     /* Copy the route_path after enc_msg */
1764     memcpy (route_path, msg_ctx->path_history,
1765             msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1766   }
1767 #if DEBUG_DHT > 1
1768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1769               "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1770               "DHT", msize, GNUNET_i2s (&peer->id));
1771 #endif
1772   peer->pending_count++;
1773   increment_stats ("# pending messages scheduled");
1774   GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1775                                      pending);
1776   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1777     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1778 }
1779
1780 #if DO_PING
1781 /**
1782  * Task used to send ping messages to peers so that
1783  * they don't get disconnected.
1784  *
1785  * @param cls the peer to send a ping message to
1786  * @param tc context, reason, etc.
1787  */
1788 static void
1789 periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1790 {
1791   struct PeerInfo *peer = cls;
1792   struct GNUNET_MessageHeader ping_message;
1793   struct DHT_MessageContext msg_ctx;
1794
1795   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1796     return;
1797
1798   ping_message.size = htons (sizeof (struct GNUNET_MessageHeader));
1799   ping_message.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PING);
1800
1801   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
1802 #if DEBUG_PING
1803   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n",
1804               my_short_id, "DHT", GNUNET_i2s (&peer->id));
1805 #endif
1806   forward_message (&ping_message, peer, &msg_ctx);
1807   peer->ping_task =
1808       GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PING_DELAY, &periodic_ping_task,
1809                                     peer);
1810 }
1811
1812 /**
1813  * Schedule PING messages for the top X peers in each
1814  * bucket of the routing table (so core won't disconnect them!)
1815  */
1816 void
1817 schedule_ping_messages ()
1818 {
1819   unsigned int bucket;
1820   unsigned int count;
1821   struct PeerInfo *pos;
1822
1823   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1824   {
1825     pos = k_buckets[bucket].head;
1826     count = 0;
1827     while (pos != NULL)
1828     {
1829       if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK))
1830         GNUNET_SCHEDULER_add_now (&periodic_ping_task, pos);
1831       else if ((count >= bucket_size) &&
1832                (pos->ping_task != GNUNET_SCHEDULER_NO_TASK))
1833       {
1834         GNUNET_SCHEDULER_cancel (pos->ping_task);
1835         pos->ping_task = GNUNET_SCHEDULER_NO_TASK;
1836       }
1837       pos = pos->next;
1838       count++;
1839     }
1840   }
1841 }
1842 #endif
1843
1844
1845 /**
1846  * Task run to check for messages that need to be sent to a client.
1847  *
1848  * @param client a ClientList, containing the client and any messages to be sent to it
1849  */
1850 static void
1851 process_pending_messages (struct ClientList *client)
1852 {
1853   if (client->pending_head == NULL)
1854     return;
1855   if (client->transmit_handle != NULL)
1856     return;
1857
1858   client->transmit_handle =
1859       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1860                                            ntohs (client->pending_head->
1861                                                   msg->size),
1862                                            GNUNET_TIME_UNIT_FOREVER_REL,
1863                                            &send_generic_reply, client);
1864 }
1865
1866 /**
1867  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1868  * request.  A ClientList is passed as closure, take the head of the list
1869  * and copy it into buf, which has the result of sending the message to the
1870  * client.
1871  *
1872  * @param cls closure to this call
1873  * @param size maximum number of bytes available to send
1874  * @param buf where to copy the actual message to
1875  *
1876  * @return the number of bytes actually copied, 0 indicates failure
1877  */
1878 static size_t
1879 send_generic_reply (void *cls, size_t size, void *buf)
1880 {
1881   struct ClientList *client = cls;
1882   char *cbuf = buf;
1883   struct PendingMessage *reply;
1884   size_t off;
1885   size_t msize;
1886
1887   client->transmit_handle = NULL;
1888   if (buf == NULL)
1889   {
1890     /* client disconnected */
1891     return 0;
1892   }
1893   off = 0;
1894   while ((NULL != (reply = client->pending_head)) &&
1895          (size >= off + (msize = ntohs (reply->msg->size))))
1896   {
1897     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
1898                                  reply);
1899     memcpy (&cbuf[off], reply->msg, msize);
1900     GNUNET_free (reply);
1901     off += msize;
1902   }
1903   process_pending_messages (client);
1904 #if DEBUG_DHT
1905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906               "Transmitted %u bytes of replies to client\n",
1907               (unsigned int) off);
1908 #endif
1909   return off;
1910 }
1911
1912
1913 /**
1914  * Add a PendingMessage to the clients list of messages to be sent
1915  *
1916  * @param client the active client to send the message to
1917  * @param pending_message the actual message to send
1918  */
1919 static void
1920 add_pending_message (struct ClientList *client,
1921                      struct PendingMessage *pending_message)
1922 {
1923   GNUNET_CONTAINER_DLL_insert_after (client->pending_head, client->pending_tail,
1924                                      client->pending_tail, pending_message);
1925   process_pending_messages (client);
1926 }
1927
1928
1929 /**
1930  * Called when a reply needs to be sent to a client, as
1931  * a result it found to a GET or FIND PEER request.
1932  *
1933  * @param client the client to send the reply to
1934  * @param message the encapsulated message to send
1935  * @param msg_ctx the context of the received message
1936  */
1937 static void
1938 send_reply_to_client (struct ClientList *client,
1939                       const struct GNUNET_MessageHeader *message,
1940                       struct DHT_MessageContext *msg_ctx)
1941 {
1942   struct GNUNET_DHT_RouteResultMessage *reply;
1943   struct PendingMessage *pending_message;
1944   uint16_t msize;
1945   size_t tsize;
1946   char *reply_offset;
1947
1948 #if DEBUG_PATH
1949   char *path_offset;
1950   unsigned int i;
1951 #endif
1952 #if DEBUG_DHT
1953   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n",
1954               my_short_id, "DHT");
1955 #endif
1956   msize = ntohs (message->size);
1957   tsize =
1958       sizeof (struct GNUNET_DHT_RouteResultMessage) + msize +
1959       (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1960   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1961   {
1962     GNUNET_break_op (0);
1963     return;
1964   }
1965   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1966   pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1];
1967   reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1];
1968   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1969   reply->header.size = htons (tsize);
1970   reply->outgoing_path_length = htonl (msg_ctx->path_history_len);
1971   reply->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1972   memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1973   reply_offset = (char *) &reply[1];
1974   memcpy (&reply[1], message, msize);
1975   if (msg_ctx->path_history_len > 0)
1976   {
1977     reply_offset += msize;
1978     memcpy (reply_offset, msg_ctx->path_history,
1979             msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1980   }
1981 #if DEBUG_PATH
1982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1983               "Returning message with outgoing path length %d\n",
1984               msg_ctx->path_history_len);
1985   for (i = 0; i < msg_ctx->path_history_len; i++)
1986   {
1987     path_offset =
1988         &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1989     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found peer %d:%s\n", i,
1990                 GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1991   }
1992 #endif
1993   add_pending_message (client, pending_message);
1994 }
1995
1996 /**
1997  * Consider whether or not we would like to have this peer added to
1998  * our routing table.  Check whether bucket for this peer is full,
1999  * if so return negative; if not return positive.  Since peers are
2000  * only added on CORE level connect, this doesn't actually add the
2001  * peer to the routing table.
2002  *
2003  * @param peer the peer we are considering adding
2004  *
2005  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
2006  *         already full)
2007  */
2008 static int
2009 consider_peer (struct GNUNET_PeerIdentity *peer)
2010 {
2011   int bucket;
2012
2013   if ((GNUNET_YES ==
2014        GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
2015                                                &peer->hashPubKey)) ||
2016       (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))))
2017     return GNUNET_NO;           /* We already know this peer (are connected even!) */
2018   bucket = find_current_bucket (&peer->hashPubKey);
2019
2020   if ((k_buckets[bucket].peers_size < bucket_size) ||
2021       ((bucket == lowest_bucket) && (lowest_bucket > 0)))
2022     return GNUNET_YES;
2023
2024   return GNUNET_NO;
2025 }
2026
2027
2028 /**
2029  * Task used to remove forwarding entries, either
2030  * after timeout, when full, or on shutdown.
2031  *
2032  * @param cls the entry to remove
2033  * @param tc context, reason, etc.
2034  */
2035 static void
2036 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2037 {
2038   struct DHTRouteSource *source_info = cls;
2039   struct DHTQueryRecord *record;
2040
2041   source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode);
2042   record = source_info->record;
2043   GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
2044
2045   if (record->head == NULL)     /* No more entries in DLL */
2046   {
2047     GNUNET_assert (GNUNET_YES ==
2048                    GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
2049                                                          &record->key, record));
2050     GNUNET_free (record);
2051   }
2052   if (source_info->find_peers_responded != NULL)
2053     GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
2054   GNUNET_free (source_info);
2055 }
2056
2057 /**
2058  * Main function that handles whether or not to route a result
2059  * message to other peers, or to send to our local client.
2060  *
2061  * @param msg the result message to be routed
2062  * @param msg_ctx context of the message we are routing
2063  *
2064  * @return the number of peers the message was routed to,
2065  *         GNUNET_SYSERR on failure
2066  */
2067 static int
2068 route_result_message (struct GNUNET_MessageHeader *msg,
2069                       struct DHT_MessageContext *msg_ctx)
2070 {
2071   struct GNUNET_PeerIdentity new_peer;
2072   struct DHTQueryRecord *record;
2073   struct DHTRouteSource *pos;
2074   struct PeerInfo *peer_info;
2075   const struct GNUNET_MessageHeader *hello_msg;
2076
2077 #if DEBUG_DHT > 1
2078   unsigned int i;
2079 #endif
2080
2081   increment_stats (STAT_RESULTS);
2082   /**
2083    * If a find peer result message is received and contains a valid
2084    * HELLO for another peer, offer it to the transport service.
2085    */
2086   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
2087   {
2088     if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader))
2089       GNUNET_break_op (0);
2090
2091     hello_msg = &msg[1];
2092     if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
2093         (GNUNET_SYSERR ==
2094          GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg,
2095                               &new_peer)))
2096     {
2097       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2098                   "%s:%s Received non-HELLO message type in find peer result message!\n",
2099                   my_short_id, "DHT");
2100       GNUNET_break_op (0);
2101       return GNUNET_NO;
2102     }
2103     else                        /* We have a valid hello, and peer id stored in new_peer */
2104     {
2105       find_peer_context.count++;
2106       increment_stats (STAT_FIND_PEER_REPLY);
2107       if (GNUNET_YES == consider_peer (&new_peer))
2108       {
2109         increment_stats (STAT_HELLOS_PROVIDED);
2110         GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL);
2111         GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL);
2112       }
2113     }
2114   }
2115
2116   if (malicious_dropper == GNUNET_YES)
2117     record = NULL;
2118   else
2119     record =
2120         GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
2121
2122   if (record == NULL)           /* No record of this message! */
2123   {
2124 #if DEBUG_DHT
2125     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2126                 "`%s:%s': Have no record of response key %s uid %llu\n",
2127                 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2128                 msg_ctx->unique_id);
2129 #endif
2130 #if DEBUG_DHT_ROUTING
2131     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2132     {
2133       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
2134                                    msg_ctx->hop_count, GNUNET_SYSERR,
2135                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
2136                                    NULL);
2137     }
2138 #endif
2139     if (msg_ctx->bloom != NULL)
2140     {
2141       GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2142       msg_ctx->bloom = NULL;
2143     }
2144     return 0;
2145   }
2146
2147   pos = record->head;
2148   while (pos != NULL)
2149   {
2150 #if STRICT_FORWARDING
2151     if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)  /* If we have already forwarded this peer id, don't do it again! */
2152     {
2153       if (GNUNET_YES ==
2154           GNUNET_CONTAINER_bloomfilter_test (pos->find_peers_responded,
2155                                              &new_peer.hashPubKey))
2156       {
2157         increment_stats ("# find peer responses NOT forwarded (bloom match)");
2158         pos = pos->next;
2159         continue;
2160       }
2161       else
2162         GNUNET_CONTAINER_bloomfilter_add (pos->find_peers_responded,
2163                                           &new_peer.hashPubKey);
2164     }
2165 #endif
2166
2167     if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity)))  /* Local client (or DHT) initiated request! */
2168     {
2169 #if DEBUG_DHT
2170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2171                   "`%s:%s': Sending response key %s uid %llu to client\n",
2172                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2173                   msg_ctx->unique_id);
2174 #endif
2175 #if DEBUG_DHT_ROUTING
2176       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2177       {
2178         dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
2179                                      msg_ctx->hop_count, GNUNET_YES,
2180                                      &my_identity, &msg_ctx->key, msg_ctx->peer,
2181                                      NULL);
2182       }
2183 #endif
2184       increment_stats (STAT_RESULTS_TO_CLIENT);
2185       if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
2186         increment_stats (STAT_GET_REPLY);
2187 #if DEBUG_DHT > 1
2188       for (i = 0; i < msg_ctx->path_history_len; i++)
2189       {
2190         char *path_offset;
2191
2192         path_offset =
2193             &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
2194         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2195                     "(before client) Key %s Found peer %d:%s\n",
2196                     GNUNET_h2s (&msg_ctx->key), i,
2197                     GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
2198       }
2199 #endif
2200       send_reply_to_client (pos->client, msg, msg_ctx);
2201     }
2202     else                        /* Send to peer */
2203     {
2204       peer_info = find_peer_by_id (&pos->source);
2205       if (peer_info == NULL)    /* Didn't find the peer in our routing table, perhaps peer disconnected! */
2206       {
2207         pos = pos->next;
2208         continue;
2209       }
2210
2211       if (msg_ctx->bloom == NULL)
2212         msg_ctx->bloom =
2213             GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
2214                                                DHT_BLOOM_K);
2215       GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
2216                                         &my_identity.hashPubKey);
2217       if ((GNUNET_NO ==
2218            GNUNET_CONTAINER_bloomfilter_test (msg_ctx->bloom,
2219                                               &peer_info->id.hashPubKey)))
2220       {
2221 #if DEBUG_DHT
2222         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2223                     "`%s:%s': Forwarding response key %s uid %llu to peer %s\n",
2224                     my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2225                     msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
2226 #endif
2227 #if DEBUG_DHT_ROUTING
2228         if ((debug_routes_extended) && (dhtlog_handle != NULL))
2229         {
2230           dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
2231                                        msg_ctx->hop_count, GNUNET_NO,
2232                                        &my_identity, &msg_ctx->key,
2233                                        msg_ctx->peer, &pos->source);
2234         }
2235 #endif
2236         forward_result_message (msg, peer_info, msg_ctx);
2237         /* Try removing forward entries after sending once, only allows ONE response per request */
2238         if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
2239         {
2240           GNUNET_SCHEDULER_cancel (pos->delete_task);
2241           pos->delete_task =
2242               GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);
2243         }
2244       }
2245       else
2246       {
2247 #if DEBUG_DHT
2248         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249                     "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n",
2250                     my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2251                     msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
2252 #endif
2253       }
2254     }
2255     pos = pos->next;
2256   }
2257   if (msg_ctx->bloom != NULL)
2258   {
2259     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2260     msg_ctx->bloom = NULL;
2261   }
2262   return 0;
2263 }
2264
2265
2266 /**
2267  * Iterator for local get request results,
2268  *
2269  * @param cls closure for iterator, a DatacacheGetContext
2270  * @param exp when does this value expire?
2271  * @param key the key this data is stored under
2272  * @param size the size of the data identified by key
2273  * @param data the actual data
2274  * @param type the type of the data
2275  *
2276  * @return GNUNET_OK to continue iteration, anything else
2277  * to stop iteration.
2278  */
2279 static int
2280 datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
2281                         const GNUNET_HashCode * key, size_t size,
2282                         const char *data, enum GNUNET_BLOCK_Type type)
2283 {
2284   struct DHT_MessageContext *msg_ctx = cls;
2285   struct DHT_MessageContext *new_msg_ctx;
2286   struct GNUNET_DHT_GetResultMessage *get_result;
2287   enum GNUNET_BLOCK_EvaluationResult eval;
2288   const struct DHTPutEntry *put_entry;
2289   int get_size;
2290   char *path_offset;
2291
2292 #if DEBUG_PATH
2293   unsigned int i;
2294 #endif
2295
2296 #if DEBUG_DHT
2297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2298               "`%s:%s': Received `%s' response from datacache\n", my_short_id,
2299               "DHT", "GET");
2300 #endif
2301
2302   put_entry = (const struct DHTPutEntry *) data;
2303
2304   if (size !=
2305       sizeof (struct DHTPutEntry) + put_entry->data_size +
2306       (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)))
2307   {
2308     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2309                 "Path + data size doesn't add up for data inserted into datacache!\nData size %d, path length %d, expected %d, got %d\n",
2310                 put_entry->data_size, put_entry->path_length,
2311                 sizeof (struct DHTPutEntry) + put_entry->data_size +
2312                 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)),
2313                 size);
2314     msg_ctx->do_forward = GNUNET_NO;
2315     return GNUNET_OK;
2316   }
2317
2318   eval =
2319       GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf,
2320                              msg_ctx->reply_bf_mutator, msg_ctx->xquery,
2321                              msg_ctx->xquery_size, &put_entry[1],
2322                              put_entry->data_size);
2323
2324   switch (eval)
2325   {
2326   case GNUNET_BLOCK_EVALUATION_OK_LAST:
2327     msg_ctx->do_forward = GNUNET_NO;
2328   case GNUNET_BLOCK_EVALUATION_OK_MORE:
2329     new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
2330     memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
2331     if (GNUNET_DHT_RO_RECORD_ROUTE ==
2332         (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2333     {
2334       new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
2335       new_msg_ctx->path_history_len = msg_ctx->path_history_len;
2336       /* Assign to previous msg_ctx path history, caller should free after our return */
2337       new_msg_ctx->path_history = msg_ctx->path_history;
2338 #if DEBUG_PATH
2339       for (i = 0; i < new_msg_ctx->path_history_len; i++)
2340       {
2341         path_offset =
2342             &new_msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
2343         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2344                     "(get_iterator) Key %s Found peer %d:%s\n",
2345                     GNUNET_h2s (&msg_ctx->key), i,
2346                     GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
2347       }
2348 #endif
2349     }
2350
2351     get_size =
2352         sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size +
2353         (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity));
2354     get_result = GNUNET_malloc (get_size);
2355     get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2356     get_result->header.size = htons (get_size);
2357     get_result->expiration = GNUNET_TIME_absolute_hton (exp);
2358     get_result->type = htons (type);
2359     get_result->put_path_length = htons (put_entry->path_length);
2360     path_offset = (char *) &put_entry[1];
2361     path_offset += put_entry->data_size;
2362 #if DEBUG_PATH
2363     for (i = 0; i < put_entry->path_length; i++)
2364     {
2365       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366                   "(get_iterator PUT path) Key %s Found peer %d:%s\n",
2367                   GNUNET_h2s (&msg_ctx->key), i,
2368                   GNUNET_i2s ((struct GNUNET_PeerIdentity *)
2369                               &path_offset[i *
2370                                            sizeof (struct
2371                                                    GNUNET_PeerIdentity)]));
2372     }
2373 #endif
2374     /* Copy the actual data and the path_history to the end of the get result */
2375     memcpy (&get_result[1], &put_entry[1],
2376             put_entry->data_size +
2377             (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)));
2378     new_msg_ctx->peer = &my_identity;
2379     new_msg_ctx->bloom =
2380         GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2381     new_msg_ctx->hop_count = 0;
2382     new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;   /* Make result routing a higher priority */
2383     new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2384     increment_stats (STAT_GET_RESPONSE_START);
2385     route_result_message (&get_result->header, new_msg_ctx);
2386     GNUNET_free (new_msg_ctx);
2387     GNUNET_free (get_result);
2388     break;
2389   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2390 #if DEBUG_DHT
2391     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n",
2392                 my_short_id, "DHT");
2393 #endif
2394     break;
2395   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2396 #if DEBUG_DHT
2397     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n",
2398                 my_short_id, "DHT");
2399 #endif
2400     break;
2401   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2402 #if DEBUG_DHT
2403     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2404                 "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
2405 #endif
2406     GNUNET_break (0);
2407     break;
2408   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2409     GNUNET_break_op (0);
2410     msg_ctx->do_forward = GNUNET_NO;
2411     break;
2412   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2413 #if DEBUG_DHT
2414     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2415                 "`%s:%s': Unsupported block type (%u) in response!\n",
2416                 my_short_id, "DHT", type);
2417 #endif
2418     /* msg_ctx->do_forward = GNUNET_NO;  // not sure... */
2419     break;
2420   }
2421   return GNUNET_OK;
2422 }
2423
2424
2425 /**
2426  * Main function that handles whether or not to route a message to other
2427  * peers.
2428  *
2429  * @param msg the message to be routed
2430  * @param msg_ctx the context containing all pertinent information about the message
2431  */
2432 static void
2433 route_message (const struct GNUNET_MessageHeader *msg,
2434                struct DHT_MessageContext *msg_ctx);
2435
2436
2437 /**
2438  * Server handler for all dht get requests, look for data,
2439  * if found, send response either to clients or other peers.
2440  *
2441  * @param msg the actual get message
2442  * @param msg_ctx struct containing pertinent information about the get request
2443  *
2444  * @return number of items found for GET request
2445  */
2446 static unsigned int
2447 handle_dht_get (const struct GNUNET_MessageHeader *msg,
2448                 struct DHT_MessageContext *msg_ctx)
2449 {
2450   const struct GNUNET_DHT_GetMessage *get_msg;
2451   uint16_t msize;
2452   uint16_t bf_size;
2453   unsigned int results;
2454   const char *end;
2455   enum GNUNET_BLOCK_Type type;
2456
2457   msize = ntohs (msg->size);
2458   if (msize < sizeof (struct GNUNET_DHT_GetMessage))
2459   {
2460     GNUNET_break (0);
2461     return 0;
2462   }
2463   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
2464   bf_size = ntohs (get_msg->bf_size);
2465   msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
2466   msg_ctx->reply_bf_mutator = get_msg->bf_mutator;
2467   if (msize !=
2468       sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
2469   {
2470     GNUNET_break (0);
2471     return 0;
2472   }
2473   end = (const char *) &get_msg[1];
2474   if (msg_ctx->xquery_size == 0)
2475   {
2476     msg_ctx->xquery = NULL;
2477   }
2478   else
2479   {
2480     msg_ctx->xquery = (const void *) end;
2481     end += msg_ctx->xquery_size;
2482   }
2483   if (bf_size == 0)
2484   {
2485     msg_ctx->reply_bf = NULL;
2486   }
2487   else
2488   {
2489     msg_ctx->reply_bf =
2490         GNUNET_CONTAINER_bloomfilter_init (end, bf_size,
2491                                            GNUNET_DHT_GET_BLOOMFILTER_K);
2492   }
2493   type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
2494 #if DEBUG_DHT
2495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2496               "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
2497               my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key),
2498               msg_ctx->unique_id);
2499 #endif
2500   increment_stats (STAT_GETS);
2501   results = 0;
2502 #if HAVE_MALICIOUS
2503   if (type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE)
2504   {
2505     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
2506     return results;
2507   }
2508 #endif
2509   msg_ctx->do_forward = GNUNET_YES;
2510   if (datacache != NULL)
2511     results =
2512         GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
2513                               &datacache_get_iterator, msg_ctx);
2514 #if DEBUG_DHT
2515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2516               "`%s:%s': Found %d results for `%s' request uid %llu\n",
2517               my_short_id, "DHT", results, "GET", msg_ctx->unique_id);
2518 #endif
2519   if (results >= 1)
2520   {
2521 #if DEBUG_DHT_ROUTING
2522     if ((debug_routes) && (dhtlog_handle != NULL))
2523     {
2524       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_GET,
2525                                    msg_ctx->hop_count, GNUNET_YES, &my_identity,
2526                                    &msg_ctx->key);
2527     }
2528
2529     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2530     {
2531       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2532                                    msg_ctx->hop_count, GNUNET_YES, &my_identity,
2533                                    &msg_ctx->key, msg_ctx->peer, NULL);
2534     }
2535 #endif
2536   }
2537   else
2538   {
2539     /* check query valid */
2540     if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID ==
2541         GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key,
2542                                &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator,
2543                                msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0))
2544     {
2545       GNUNET_break_op (0);
2546       msg_ctx->do_forward = GNUNET_NO;
2547     }
2548   }
2549
2550   if (msg_ctx->hop_count == 0)  /* Locally initiated request */
2551   {
2552 #if DEBUG_DHT_ROUTING
2553     if ((debug_routes) && (dhtlog_handle != NULL))
2554     {
2555       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_GET,
2556                                    msg_ctx->hop_count, GNUNET_NO, &my_identity,
2557                                    &msg_ctx->key);
2558     }
2559 #endif
2560   }
2561   if (msg_ctx->do_forward == GNUNET_YES)
2562     route_message (msg, msg_ctx);
2563   GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
2564   return results;
2565 }
2566
2567 static void
2568 remove_recent_find_peer (void *cls,
2569                          const struct GNUNET_SCHEDULER_TaskContext *tc)
2570 {
2571   GNUNET_HashCode *key = cls;
2572
2573   GNUNET_assert (GNUNET_YES ==
2574                  GNUNET_CONTAINER_multihashmap_remove
2575                  (recent_find_peer_requests, key, NULL));
2576   GNUNET_free (key);
2577 }
2578
2579 /**
2580  * Server handler for initiating local dht find peer requests
2581  *
2582  * @param find_msg the actual find peer message
2583  * @param msg_ctx struct containing pertinent information about the request
2584  *
2585  */
2586 static void
2587 handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
2588                       struct DHT_MessageContext *msg_ctx)
2589 {
2590   struct GNUNET_MessageHeader *find_peer_result;
2591   struct GNUNET_DHT_FindPeerMessage *find_peer_message;
2592   struct DHT_MessageContext *new_msg_ctx;
2593   struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
2594   size_t hello_size;
2595   size_t tsize;
2596   GNUNET_HashCode *recent_hash;
2597   struct GNUNET_MessageHeader *other_hello;
2598   size_t other_hello_size;
2599   struct GNUNET_PeerIdentity peer_id;
2600
2601   find_peer_message = (struct GNUNET_DHT_FindPeerMessage *) find_msg;
2602   GNUNET_break_op (ntohs (find_msg->size) >=
2603                    (sizeof (struct GNUNET_DHT_FindPeerMessage)));
2604   if (ntohs (find_msg->size) < sizeof (struct GNUNET_DHT_FindPeerMessage))
2605     return;
2606   other_hello = NULL;
2607   other_hello_size = 0;
2608   if (ntohs (find_msg->size) > sizeof (struct GNUNET_DHT_FindPeerMessage))
2609   {
2610     other_hello_size =
2611         ntohs (find_msg->size) - sizeof (struct GNUNET_DHT_FindPeerMessage);
2612     other_hello = GNUNET_malloc (other_hello_size);
2613     memcpy (other_hello, &find_peer_message[1], other_hello_size);
2614     if ((GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) other_hello) == 0)
2615         || (GNUNET_OK !=
2616             GNUNET_HELLO_get_id ((struct GNUNET_HELLO_Message *) other_hello,
2617                                  &peer_id)))
2618     {
2619       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2620                   "Received invalid HELLO message in find peer request!\n");
2621       GNUNET_free (other_hello);
2622       return;
2623     }
2624 #if FIND_PEER_WITH_HELLO
2625     if (GNUNET_YES == consider_peer (&peer_id))
2626     {
2627       increment_stats (STAT_HELLOS_PROVIDED);
2628       GNUNET_TRANSPORT_offer_hello (transport_handle, other_hello, NULL, NULL);
2629       GNUNET_CORE_peer_request_connect (coreAPI, &peer_id, NULL, NULL);
2630       route_message (find_msg, msg_ctx);
2631       GNUNET_free (other_hello);
2632       return;
2633     }
2634     else                        /* We don't want this peer! */
2635     {
2636       route_message (find_msg, msg_ctx);
2637       GNUNET_free (other_hello);
2638       return;
2639     }
2640 #endif
2641   }
2642
2643 #if DEBUG_DHT
2644   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2645               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
2646               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&msg_ctx->key),
2647               ntohs (find_msg->size), sizeof (struct GNUNET_MessageHeader));
2648 #endif
2649   if (my_hello == NULL)
2650   {
2651 #if DEBUG_DHT
2652     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2653                 "`%s': Our HELLO is null, can't return.\n", "DHT");
2654 #endif
2655     GNUNET_free_non_null (other_hello);
2656     route_message (find_msg, msg_ctx);
2657     return;
2658   }
2659
2660   incoming_bloom =
2661       GNUNET_CONTAINER_bloomfilter_init (find_peer_message->bloomfilter,
2662                                          DHT_BLOOM_SIZE, DHT_BLOOM_K);
2663   if (GNUNET_YES ==
2664       GNUNET_CONTAINER_bloomfilter_test (incoming_bloom,
2665                                          &my_identity.hashPubKey))
2666   {
2667     increment_stats (STAT_BLOOM_FIND_PEER);
2668     GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
2669     GNUNET_free_non_null (other_hello);
2670     route_message (find_msg, msg_ctx);
2671     return;                     /* We match the bloomfilter, do not send a response to this peer (they likely already know us!) */
2672   }
2673   GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
2674
2675 #if RESTRICT_FIND_PEER
2676
2677   /**
2678    * Ignore any find peer requests from a peer we have seen very recently.
2679    */
2680   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent_find_peer_requests, &msg_ctx->key))  /* We have recently responded to a find peer request for this peer! */
2681   {
2682     increment_stats ("# dht find peer requests ignored (recently seen!)");
2683     GNUNET_free_non_null (other_hello);
2684     return;
2685   }
2686
2687   /**
2688    * Use this check to only allow the peer to respond to find peer requests if
2689    * it would be beneficial to have the requesting peer in this peers routing
2690    * table.  Can be used to thwart peers flooding the network with find peer
2691    * requests that we don't care about.  However, if a new peer is joining
2692    * the network and has no other peers this is a problem (assume all buckets
2693    * full, no one will respond!).
2694    */
2695   memcpy (&peer_id.hashPubKey, &msg_ctx->key, sizeof (GNUNET_HashCode));
2696   if (GNUNET_NO == consider_peer (&peer_id))
2697   {
2698     increment_stats ("# dht find peer requests ignored (do not need!)");
2699     GNUNET_free_non_null (other_hello);
2700     route_message (find_msg, msg_ctx);
2701     return;
2702   }
2703 #endif
2704
2705   recent_hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
2706   memcpy (recent_hash, &msg_ctx->key, sizeof (GNUNET_HashCode));
2707   if (GNUNET_SYSERR !=
2708       GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests,
2709                                          &msg_ctx->key, NULL,
2710                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
2711   {
2712 #if DEBUG_DHT
2713     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2714                 "Adding recent remove task for key `%s`!\n",
2715                 GNUNET_h2s (&msg_ctx->key));
2716 #endif
2717     /* Only add a task if there wasn't one for this key already! */
2718     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
2719                                   (GNUNET_TIME_UNIT_SECONDS, 30),
2720                                   &remove_recent_find_peer, recent_hash);
2721   }
2722   else
2723   {
2724     GNUNET_free (recent_hash);
2725 #if DEBUG_DHT
2726     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2727                 "Received duplicate find peer request too soon!\n");
2728 #endif
2729   }
2730
2731   /* Simplistic find_peer functionality, always return our hello */
2732   hello_size = ntohs (my_hello->size);
2733   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
2734
2735   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2736   {
2737     GNUNET_break_op (0);
2738     GNUNET_free_non_null (other_hello);
2739     return;
2740   }
2741
2742   find_peer_result = GNUNET_malloc (tsize);
2743   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
2744   find_peer_result->size = htons (tsize);
2745   memcpy (&find_peer_result[1], my_hello, hello_size);
2746 #if DEBUG_DHT
2747   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2748               "`%s': Sending hello size %d to requesting peer.\n", "DHT",
2749               hello_size);
2750 #endif
2751
2752   new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
2753   memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
2754   new_msg_ctx->peer = &my_identity;
2755   new_msg_ctx->bloom =
2756       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2757   new_msg_ctx->hop_count = 0;
2758   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;     /* Make find peer requests a higher priority */
2759   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2760   increment_stats (STAT_FIND_PEER_ANSWER);
2761   if (GNUNET_DHT_RO_RECORD_ROUTE ==
2762       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2763   {
2764     new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
2765     new_msg_ctx->path_history_len = msg_ctx->path_history_len;
2766     /* Assign to previous msg_ctx path history, caller should free after our return */
2767     new_msg_ctx->path_history = msg_ctx->path_history;
2768   }
2769   route_result_message (find_peer_result, new_msg_ctx);
2770   GNUNET_free (new_msg_ctx);
2771 #if DEBUG_DHT_ROUTING
2772   if ((debug_routes) && (dhtlog_handle != NULL))
2773   {
2774     dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_FIND_PEER,
2775                                  msg_ctx->hop_count, GNUNET_YES, &my_identity,
2776                                  &msg_ctx->key);
2777   }
2778 #endif
2779   GNUNET_free_non_null (other_hello);
2780   GNUNET_free (find_peer_result);
2781   route_message (find_msg, msg_ctx);
2782 }
2783
2784 /**
2785  * Task used to republish data.
2786  * Forward declaration; function call loop.
2787  *
2788  * @param cls closure (a struct RepublishContext)
2789  * @param tc runtime context for this task
2790  */
2791 static void
2792 republish_content (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
2793
2794 /**
2795  * Server handler for initiating local dht put requests
2796  *
2797  * @param msg the actual put message
2798  * @param msg_ctx struct containing pertinent information about the request
2799  */
2800 static void
2801 handle_dht_put (const struct GNUNET_MessageHeader *msg,
2802                 struct DHT_MessageContext *msg_ctx)
2803 {
2804   const struct GNUNET_DHT_PutMessage *put_msg;
2805   struct DHTPutEntry *put_entry;
2806   unsigned int put_size;
2807   char *path_offset;
2808   enum GNUNET_BLOCK_Type put_type;
2809   size_t data_size;
2810   int ret;
2811   struct RepublishContext *put_context;
2812   GNUNET_HashCode key;
2813   struct DHTQueryRecord *record;
2814
2815   GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage));
2816
2817   put_msg = (const struct GNUNET_DHT_PutMessage *) msg;
2818   put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
2819 #if HAVE_MALICIOUS
2820   if (put_type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE)
2821   {
2822 #if DEBUG_DHT_ROUTING
2823     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2824     {
2825           /** Log routes that die due to high load! */
2826       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2827                                    msg_ctx->hop_count, GNUNET_SYSERR,
2828                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
2829                                    NULL);
2830     }
2831 #endif
2832     return;
2833   }
2834 #endif
2835   data_size =
2836       ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
2837   ret =
2838       GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size,
2839                             &key);
2840   if (GNUNET_NO == ret)
2841   {
2842 #if DEBUG_DHT_ROUTING
2843     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2844     {
2845       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2846                                    msg_ctx->hop_count, GNUNET_SYSERR,
2847                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
2848                                    NULL);
2849     }
2850 #endif
2851     /* invalid reply */
2852     GNUNET_break_op (0);
2853     return;
2854   }
2855   if ((GNUNET_YES == ret) &&
2856       (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode))))
2857   {
2858 #if DEBUG_DHT_ROUTING
2859     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2860     {
2861       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2862                                    msg_ctx->hop_count, GNUNET_SYSERR,
2863                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
2864                                    NULL);
2865     }
2866 #endif
2867     /* invalid wrapper: key mismatch! */
2868     GNUNET_break_op (0);
2869     return;
2870   }
2871   /* ret == GNUNET_SYSERR means that there is no known relationship between
2872    * data and the key, so we cannot check it */
2873 #if DEBUG_DHT
2874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2875               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2876               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
2877               msg_ctx->unique_id);
2878 #endif
2879 #if DEBUG_DHT_ROUTING
2880   if (msg_ctx->hop_count == 0)  /* Locally initiated request */
2881   {
2882     if ((debug_routes) && (dhtlog_handle != NULL))
2883     {
2884       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_PUT,
2885                                    msg_ctx->hop_count, GNUNET_NO, &my_identity,
2886                                    &msg_ctx->key);
2887     }
2888   }
2889 #endif
2890
2891 //   GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "******************************************************** PUT 1\n");
2892   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key);
2893   if (NULL != record)
2894   {
2895     struct DHTRouteSource *pos;
2896     struct GNUNET_DHT_GetMessage *gmsg;
2897     size_t gsize;
2898
2899 //     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "******************************************************** PUT 2\n");
2900     pos = record->head;
2901     while (pos != NULL)
2902     {
2903       /* TODO: do only for local started requests? or also for remote peers? */
2904       /* TODO: include this in statistics? under what? */
2905       if (NULL == pos->client)
2906         continue;
2907
2908       gsize = data_size + sizeof(struct GNUNET_DHT_GetMessage);
2909       gmsg = GNUNET_malloc(gsize);
2910       gmsg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2911       gmsg->header.size = htons(gsize);
2912       gmsg->type = put_msg->type;
2913       memcpy(&gmsg[1], &put_msg[1], data_size);
2914
2915       /* TODO: duplicate and reverse order of path_history? */
2916       send_reply_to_client (pos->client, &gmsg->header, msg_ctx);
2917       GNUNET_free(gmsg);
2918     }
2919   }
2920 //   GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "******************************************************** PUT END\n");
2921
2922   if (msg_ctx->closest != GNUNET_YES)
2923   {
2924     route_message (msg, msg_ctx);
2925     return;
2926   }
2927
2928 #if DEBUG_DHT
2929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2930               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2931               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
2932               msg_ctx->unique_id);
2933 #endif
2934
2935 #if DEBUG_DHT_ROUTING
2936   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2937   {
2938     dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2939                                  msg_ctx->hop_count, GNUNET_YES, &my_identity,
2940                                  &msg_ctx->key, msg_ctx->peer, NULL);
2941   }
2942
2943   if ((debug_routes) && (dhtlog_handle != NULL))
2944   {
2945     dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_PUT,
2946                                  msg_ctx->hop_count, GNUNET_YES, &my_identity,
2947                                  &msg_ctx->key);
2948   }
2949 #endif
2950
2951   increment_stats (STAT_PUTS_INSERTED);
2952   if (datacache != NULL)
2953   {
2954     /* Put size is actual data size plus struct overhead plus path length (if any) */
2955     put_size =
2956         data_size + sizeof (struct DHTPutEntry) +
2957         (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
2958     put_entry = GNUNET_malloc (put_size);
2959     put_entry->data_size = data_size;
2960     put_entry->path_length = msg_ctx->path_history_len;
2961     /* Copy data to end of put entry */
2962     memcpy (&put_entry[1], &put_msg[1], data_size);
2963     if (msg_ctx->path_history_len > 0)
2964     {
2965       /* Copy path after data */
2966       path_offset = (char *) &put_entry[1];
2967       path_offset += data_size;
2968       memcpy (path_offset, msg_ctx->path_history,
2969               msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
2970     }
2971
2972     ret =
2973         GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size,
2974                               (const char *) put_entry, put_type,
2975                               GNUNET_TIME_absolute_ntoh (put_msg->expiration));
2976     GNUNET_free (put_entry);
2977
2978     if ((ret == GNUNET_YES) && (do_republish == GNUNET_YES))
2979     {
2980       put_context = GNUNET_malloc (sizeof (struct RepublishContext));
2981       memcpy (&put_context->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
2982       put_context->type = put_type;
2983       GNUNET_SCHEDULER_add_delayed (dht_republish_frequency, &republish_content,
2984                                     put_context);
2985     }
2986   }
2987   else
2988     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2989                 "`%s:%s': %s request received, but have no datacache!\n",
2990                 my_short_id, "DHT", "PUT");
2991
2992   if (stop_on_closest == GNUNET_NO)
2993     route_message (msg, msg_ctx);
2994 }
2995
2996 /**
2997  * Estimate the diameter of the network based
2998  * on how many buckets are currently in use.
2999  * Concept here is that the diameter of the network
3000  * is roughly the distance a message must travel in
3001  * order to reach its intended destination.  Since
3002  * at each hop we expect to get one bit closer, and
3003  * we have one bit per bucket, the number of buckets
3004  * in use should be the largest number of hops for
3005  * a successful message. (of course, this assumes we
3006  * know all peers in the network!)
3007  *
3008  * @return ballpark diameter figure
3009  */
3010 static unsigned int
3011 estimate_diameter ()
3012 {
3013   return MAX_BUCKETS - lowest_bucket;
3014 }
3015
3016 /**
3017  * To how many peers should we (on average)
3018  * forward the request to obtain the desired
3019  * target_replication count (on average).
3020  *
3021  * returns: target_replication / (est. hops) + (target_replication * hop_count)
3022  * where est. hops is typically 2 * the routing table depth
3023  *
3024  * @param hop_count number of hops the message has traversed
3025  * @param target_replication the number of total paths desired
3026  *
3027  * @return Some number of peers to forward the message to
3028  */
3029 static unsigned int
3030 get_forward_count (unsigned int hop_count, size_t target_replication)
3031 {
3032   uint32_t random_value;
3033   unsigned int forward_count;
3034   float target_value;
3035   unsigned int diameter;
3036
3037   diameter = estimate_diameter ();
3038
3039   if (GNUNET_NO == use_max_hops)
3040     max_hops = (diameter + 1) * 2;
3041
3042   /**
3043    * If we are behaving in strict kademlia mode, send multiple initial requests,
3044    * but then only send to 1 or 0 peers based strictly on the number of hops.
3045    */
3046   if (strict_kademlia == GNUNET_YES)
3047   {
3048     if (hop_count == 0)
3049       return kademlia_replication;
3050      if (hop_count < max_hops)
3051       return 1;
3052     return 0;
3053   }
3054
3055   /* FIXME: the smaller we think the network is the more lenient we should be for
3056    * routing right?  The estimation below only works if we think we have reasonably
3057    * full routing tables, which for our RR topologies may not be the case!
3058    */
3059   if (hop_count > max_hops)
3060   {
3061 #if DEBUG_DHT
3062     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3063                 "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n",
3064                 my_short_id, "DHT", estimate_diameter (), lowest_bucket);
3065 #endif
3066     /* FIXME: does this work as intended, isn't the decision to forward or not made based on closeness as well? */
3067     if (GNUNET_YES == paper_forwarding) /* Once we have reached our ideal number of hops, don't stop forwarding! */
3068     {
3069       return 1;
3070     }
3071
3072     return 0;
3073   }
3074
3075   if (GNUNET_YES == paper_forwarding)
3076   {
3077     /* FIXME: re-run replication trials with this formula */
3078     target_value =
3079         1 + (target_replication - 1.0) / (diameter +
3080                                           ((float) (target_replication - 1.0) *
3081                                            hop_count));
3082     /* Set forward count to floor of target_value */
3083     forward_count = (unsigned int) target_value;
3084     /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
3085     target_value = target_value - forward_count;
3086     random_value =
3087         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
3088
3089     if (random_value < (target_value * UINT32_MAX))
3090       forward_count += 1;
3091   }
3092   else
3093   {
3094     random_value = 0;
3095     forward_count = 1;
3096     target_value =
3097         target_replication / (diameter +
3098                               ((float) target_replication * hop_count));
3099     if (target_value > 1)
3100     {
3101       /* Set forward count to floor of target_value */
3102       forward_count = (unsigned int) target_value;
3103       /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
3104       target_value = target_value - forward_count;
3105     }
3106     else
3107       random_value =
3108           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
3109
3110     if (random_value < (target_value * UINT32_MAX))
3111       forward_count += 1;
3112   }
3113
3114   return forward_count;
3115 }
3116
3117 /*
3118  * Check whether my identity is closer than any known peers.
3119  * If a non-null bloomfilter is given, check if this is the closest
3120  * peer that hasn't already been routed to.
3121  *
3122  * @param target hash code to check closeness to
3123  * @param bloom bloomfilter, exclude these entries from the decision
3124  *
3125  * Return GNUNET_YES if node location is closest, GNUNET_NO
3126  * otherwise.
3127  */
3128 int
3129 am_closest_peer (const GNUNET_HashCode * target,
3130                  struct GNUNET_CONTAINER_BloomFilter *bloom)
3131 {
3132   int bits;
3133   int other_bits;
3134   int bucket_num;
3135   int count;
3136   struct PeerInfo *pos;
3137   unsigned int my_distance;
3138
3139   if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode)))
3140     return GNUNET_YES;
3141
3142   bucket_num = find_current_bucket (target);
3143
3144   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target);
3145   my_distance = distance (&my_identity.hashPubKey, target);
3146   pos = k_buckets[bucket_num].head;
3147   count = 0;
3148   while ((pos != NULL) && (count < bucket_size))
3149   {
3150     if ((bloom != NULL) &&
3151         (GNUNET_YES ==
3152          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
3153     {
3154       pos = pos->next;
3155       continue;                 /* Skip already checked entries */
3156     }
3157
3158     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target);
3159     if (other_bits > bits)
3160       return GNUNET_NO;
3161     else if (other_bits == bits)        /* We match the same number of bits */
3162     {
3163       if (strict_kademlia != GNUNET_YES)        /* Return that we at as close as any other peer */
3164         return GNUNET_YES;
3165       else if (distance (&pos->id.hashPubKey, target) < my_distance)    /* Check all known peers, only return if we are the true closest */
3166         return GNUNET_NO;
3167     }
3168     pos = pos->next;
3169   }
3170
3171   /* No peers closer, we are the closest! */
3172   return GNUNET_YES;
3173 }
3174
3175
3176 /**
3177  * Select a peer from the routing table that would be a good routing
3178  * destination for sending a message for "target".  The resulting peer
3179  * must not be in the set of blocked peers.<p>
3180  *
3181  * Note that we should not ALWAYS select the closest peer to the
3182  * target, peers further away from the target should be chosen with
3183  * exponentially declining probability.
3184  *
3185  * @param target the key we are selecting a peer to route to
3186  * @param bloom a bloomfilter containing entries this request has seen already
3187  * @param hops how many hops has this message traversed thus far
3188  *
3189  * @return Peer to route to, or NULL on error
3190  */
3191 static struct PeerInfo *
3192 select_peer (const GNUNET_HashCode * target,
3193              struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
3194 {
3195   unsigned int bc;
3196   unsigned int count;
3197   unsigned int selected;
3198   struct PeerInfo *pos;
3199   unsigned int distance;
3200   unsigned int largest_distance;
3201   struct PeerInfo *chosen;
3202
3203   /** If we are doing kademlia routing (saves some cycles) */
3204   if ( (strict_kademlia == GNUNET_YES) ||
3205        (hops >= converge_modifier) )
3206   {
3207     /* greedy selection (closest peer that is not in bloomfilter) */
3208     largest_distance = 0;
3209     chosen = NULL;
3210     for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
3211     {
3212       pos = k_buckets[bc].head;
3213       count = 0;
3214       while ((pos != NULL) && (count < bucket_size))
3215       {
3216         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
3217         if (GNUNET_NO ==
3218             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3219         {
3220           distance = inverse_distance (target, &pos->id.hashPubKey);
3221           if (distance > largest_distance)
3222           {
3223             chosen = pos;
3224             largest_distance = distance;
3225           }
3226         }
3227         count++;
3228         pos = pos->next;
3229       }
3230     }
3231     if ((largest_distance > 0) && (chosen != NULL))
3232     {
3233       GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey);
3234       return chosen;
3235     }
3236     return NULL; /* no peer available or we are the closest */
3237   }
3238
3239
3240   /* select "random" peer */
3241   /* count number of peers that are available and not filtered */
3242   count = 0;
3243   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
3244   {
3245     pos = k_buckets[bc].head;
3246     while ((pos != NULL) && (count < bucket_size))
3247     {
3248       if (GNUNET_YES ==
3249           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3250       {
3251         pos = pos->next;
3252         continue;               /* Ignore bloomfiltered peers */
3253       }
3254       count++;
3255       pos = pos->next;
3256     }
3257   }
3258   if (count == 0)      /* No peers to select from! */
3259   {
3260     increment_stats ("# failed to select peer");
3261     return NULL;
3262   }
3263   /* Now actually choose a peer */
3264   selected =
3265       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
3266   count = 0;
3267   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
3268   {
3269     pos = k_buckets[bc].head;
3270     while ((pos != NULL) && (count < bucket_size))
3271     {
3272       if (GNUNET_YES ==
3273           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3274       {
3275         pos = pos->next;
3276         continue;               /* Ignore bloomfiltered peers */
3277       }
3278       if (0 == selected)
3279         return pos;     
3280       pos = pos->next;
3281     }
3282   }
3283   GNUNET_break (0);
3284   return NULL;
3285 }
3286
3287
3288 /**
3289  * Task used to remove recent entries, either
3290  * after timeout, when full, or on shutdown.
3291  *
3292  * @param cls the entry to remove
3293  * @param tc context, reason, etc.
3294  */
3295 static void
3296 remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3297 {
3298   struct RecentRequest *req = cls;
3299   static GNUNET_HashCode hash;
3300
3301   GNUNET_assert (req != NULL);
3302   hash_from_uid (req->uid, &hash);
3303   GNUNET_assert (GNUNET_YES ==
3304                  GNUNET_CONTAINER_multihashmap_remove (recent.hashmap, &hash,
3305                                                        req));
3306   GNUNET_CONTAINER_heap_remove_node (req->heap_node);
3307   GNUNET_CONTAINER_bloomfilter_free (req->bloom);
3308   GNUNET_free (req);
3309
3310   /*
3311    * if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
3312    * {
3313    * GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
3314    * GNUNET_CONTAINER_heap_destroy(recent.minHeap);
3315    * }
3316    */
3317 }
3318
3319 /**
3320  * Remember this routing request so that if a reply is
3321  * received we can either forward it to the correct peer
3322  * or return the result locally.
3323  *
3324  * @param msg_ctx Context of the route request
3325  *
3326  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
3327  */
3328 static int
3329 cache_response (struct DHT_MessageContext *msg_ctx)
3330 {
3331   struct DHTQueryRecord *record;
3332   struct DHTRouteSource *source_info;
3333   struct DHTRouteSource *pos;
3334   struct GNUNET_TIME_Absolute now;
3335   unsigned int current_size;
3336
3337   current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
3338
3339 #if DELETE_WHEN_FULL
3340   while (current_size >= MAX_OUTSTANDING_FORWARDS)
3341   {
3342     source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
3343     GNUNET_assert (source_info != NULL);
3344     record = source_info->record;
3345     GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
3346     if (record->head == NULL)   /* No more entries in DLL */
3347     {
3348       GNUNET_assert (GNUNET_YES ==
3349                      GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
3350                                                            &record->key,
3351                                                            record));
3352       GNUNET_free (record);
3353     }
3354     if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
3355     {
3356       GNUNET_SCHEDULER_cancel (source_info->delete_task);
3357       source_info->delete_task = GNUNET_SCHEDULER_NO_TASK;
3358     }
3359     if (source_info->find_peers_responded != NULL)
3360       GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
3361     GNUNET_free (source_info);
3362     current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
3363   }
3364 #endif
3365   /** Non-local request and have too many outstanding forwards, discard! */
3366   if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
3367     return GNUNET_NO;
3368
3369   now = GNUNET_TIME_absolute_get ();
3370   record =
3371       GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
3372   if (record != NULL)           /* Already know this request! */
3373   {
3374     pos = record->head;
3375     while (pos != NULL)
3376     {
3377       if ( (NULL != msg_ctx->peer) &&
3378            (0 ==
3379             memcmp (msg_ctx->peer, &pos->source,
3380                     sizeof (struct GNUNET_PeerIdentity))) )
3381         break;                  /* Already have this peer in reply list! */
3382       pos = pos->next;
3383     }
3384     if ((pos != NULL) && (pos->client == msg_ctx->client))      /* Seen this already */
3385     {
3386       GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode,
3387                                          now.abs_value);
3388       return GNUNET_NO;
3389     }
3390   }
3391   else
3392   {
3393     record = GNUNET_malloc (sizeof (struct DHTQueryRecord));
3394     GNUNET_assert (GNUNET_OK ==
3395                    GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap,
3396                                                       &msg_ctx->key, record,
3397                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3398     memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
3399   }
3400
3401   source_info = GNUNET_malloc (sizeof (struct DHTRouteSource));
3402   source_info->record = record;
3403   source_info->delete_task =
3404       GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry,
3405                                     source_info);
3406   source_info->find_peers_responded =
3407       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3408   memcpy (&source_info->source, msg_ctx->peer,
3409           sizeof (struct GNUNET_PeerIdentity));
3410   GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail,
3411                                      source_info);
3412   if (msg_ctx->client != NULL)  /* For local request, set timeout so high it effectively never gets pushed out */
3413   {
3414     source_info->client = msg_ctx->client;
3415     now = GNUNET_TIME_absolute_get_forever ();
3416   }
3417   source_info->hnode =
3418       GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info,
3419                                     now.abs_value);
3420 #if DEBUG_DHT > 1
3421   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3422               "`%s:%s': Created new forward source info for %s uid %llu\n",
3423               my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
3424               msg_ctx->unique_id);
3425 #endif
3426   return GNUNET_YES;
3427 }
3428
3429
3430 /**
3431  * Main function that handles whether or not to route a message to other
3432  * peers.
3433  *
3434  * @param msg the message to be routed
3435  * @param msg_ctx the context containing all pertinent information about the message
3436  */
3437 static void
3438 route_message (const struct GNUNET_MessageHeader *msg,
3439                struct DHT_MessageContext *msg_ctx)
3440 {
3441   int i;
3442   struct PeerInfo *selected;
3443
3444 #if DEBUG_DHT_ROUTING > 1
3445   struct PeerInfo *nearest;
3446 #endif
3447   unsigned int target_forward_count;
3448   unsigned int forward_count;
3449   struct RecentRequest *recent_req;
3450   GNUNET_HashCode unique_hash;
3451   char *stat_forward_count;
3452   char *temp_stat_str;
3453
3454 #if DEBUG_DHT_ROUTING
3455   int ret;
3456 #endif
3457
3458   if (malicious_dropper == GNUNET_YES)
3459   {
3460 #if DEBUG_DHT_ROUTING
3461     if ((debug_routes_extended) && (dhtlog_handle != NULL))
3462     {
3463       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3464                                    msg_ctx->hop_count, GNUNET_SYSERR,
3465                                    &my_identity, &msg_ctx->key, msg_ctx->peer,
3466                                    NULL);
3467     }
3468 #endif
3469     if (msg_ctx->bloom != NULL)
3470     {
3471       GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
3472       msg_ctx->bloom = NULL;
3473     }
3474     return;
3475   }
3476
3477   increment_stats (STAT_ROUTES);
3478   target_forward_count =
3479       get_forward_count (msg_ctx->hop_count, msg_ctx->replication);
3480   GNUNET_asprintf (&stat_forward_count, "# forward counts of %d",
3481                    target_forward_count);
3482   increment_stats (stat_forward_count);
3483   GNUNET_free (stat_forward_count);
3484   if (msg_ctx->bloom == NULL)
3485     msg_ctx->bloom =
3486         GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3487
3488   if ((stop_on_closest == GNUNET_YES) && (msg_ctx->closest == GNUNET_YES) &&
3489       (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
3490     target_forward_count = 0;
3491
3492   /**
3493    * NOTICE:  In Kademlia, a find peer request goes no further if the peer doesn't return
3494    * any closer peers (which is being checked for below).  Since we are doing recursive
3495    * routing we have no choice but to stop forwarding in this case.  This means that at
3496    * any given step the request may NOT be forwarded to alpha peers (because routes will
3497    * stop and the parallel route will not be aware of it).  Of course, assuming that we
3498    * have fulfilled the Kademlia requirements for routing table fullness this will never
3499    * ever ever be a problem.
3500    *
3501    * However, is this fair?
3502    *
3503    * Since we use these requests to build our routing tables (and we build them in the
3504    * testing driver) we will ignore this restriction for FIND_PEER messages so that
3505    * routing tables still get constructed.
3506    */
3507   if ((GNUNET_YES == strict_kademlia) && (msg_ctx->closest == GNUNET_YES) &&
3508       (msg_ctx->hop_count > 0) &&
3509       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER))
3510     target_forward_count = 0;
3511
3512
3513   GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom, &my_identity.hashPubKey);
3514   hash_from_uid (msg_ctx->unique_id, &unique_hash);
3515   if (GNUNET_YES ==
3516       GNUNET_CONTAINER_multihashmap_contains (recent.hashmap, &unique_hash))
3517   {
3518     recent_req =
3519         GNUNET_CONTAINER_multihashmap_get (recent.hashmap, &unique_hash);
3520     GNUNET_assert (recent_req != NULL);
3521     if (0 != memcmp (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode)))
3522       increment_stats (STAT_DUPLICATE_UID);
3523     else
3524     {
3525       increment_stats (STAT_RECENT_SEEN);
3526       GNUNET_CONTAINER_bloomfilter_or2 (msg_ctx->bloom, recent_req->bloom,
3527                                         DHT_BLOOM_SIZE);
3528     }
3529   }
3530   else
3531   {
3532     recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
3533     recent_req->uid = msg_ctx->unique_id;
3534     memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
3535     recent_req->remove_task =
3536         GNUNET_SCHEDULER_add_delayed (DEFAULT_RECENT_REMOVAL, &remove_recent,
3537                                       recent_req);
3538     recent_req->heap_node =
3539         GNUNET_CONTAINER_heap_insert (recent.minHeap, recent_req,
3540                                       GNUNET_TIME_absolute_get ().abs_value);
3541     recent_req->bloom =
3542         GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3543     GNUNET_CONTAINER_multihashmap_put (recent.hashmap, &unique_hash, recent_req,
3544                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3545   }
3546
3547   if (GNUNET_CONTAINER_multihashmap_size (recent.hashmap) > DHT_MAX_RECENT)
3548   {
3549     recent_req = GNUNET_CONTAINER_heap_peek (recent.minHeap);
3550     GNUNET_assert (recent_req != NULL);
3551     GNUNET_SCHEDULER_cancel (recent_req->remove_task);
3552     GNUNET_SCHEDULER_add_now (&remove_recent, recent_req);
3553   }
3554
3555   forward_count = 0;
3556   for (i = 0; i < target_forward_count; i++)
3557   {
3558     selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
3559
3560     if (selected != NULL)
3561     {
3562       forward_count++;
3563       if (GNUNET_CRYPTO_hash_matching_bits
3564           (&selected->id.hashPubKey,
3565            &msg_ctx->key) >=
3566           GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
3567                                             &msg_ctx->key))
3568         GNUNET_asprintf (&temp_stat_str,
3569                          "# requests routed to close(r) peer hop %u",
3570                          msg_ctx->hop_count);
3571       else
3572         GNUNET_asprintf (&temp_stat_str,
3573                          "# requests routed to less close peer hop %u",
3574                          msg_ctx->hop_count);
3575       if (temp_stat_str != NULL)
3576       {
3577         increment_stats (temp_stat_str);
3578         GNUNET_free (temp_stat_str);
3579       }
3580       GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
3581                                         &selected->id.hashPubKey);
3582 #if DEBUG_DHT_ROUTING > 1
3583       nearest = find_closest_peer (&msg_ctx->key);
3584       nearest_buf = GNUNET_strdup (GNUNET_i2s (&nearest->id));
3585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3586                   "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n",
3587                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
3588                   msg_ctx->unique_id, GNUNET_i2s (&selected->id), nearest_buf,
3589                   GNUNET_CRYPTO_hash_matching_bits (&nearest->id.hashPubKey,
3590                                                     msg_ctx->key),
3591                   distance (&nearest->id.hashPubKey, msg_ctx->key));
3592       GNUNET_free (nearest_buf);
3593 #endif
3594 #if DEBUG_DHT_ROUTING
3595       if ((debug_routes_extended) && (dhtlog_handle != NULL))
3596       {
3597         dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3598                                      msg_ctx->hop_count, GNUNET_NO,
3599                                      &my_identity, &msg_ctx->key, msg_ctx->peer,
3600                                      &selected->id);
3601       }
3602 #endif
3603       forward_message (msg, selected, msg_ctx);
3604     }
3605   }
3606
3607   if (msg_ctx->bloom != NULL)
3608   {
3609     GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom,
3610                                       DHT_BLOOM_SIZE);
3611     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
3612     msg_ctx->bloom = NULL;
3613   }
3614
3615 #if DEBUG_DHT_ROUTING
3616   if (forward_count == 0)
3617     ret = GNUNET_SYSERR;
3618   else
3619     ret = GNUNET_NO;
3620
3621   if ((debug_routes_extended) && (dhtlog_handle != NULL))
3622   {
3623     dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3624                                  msg_ctx->hop_count, ret, &my_identity,
3625                                  &msg_ctx->key, msg_ctx->peer, NULL);
3626   }
3627 #endif
3628 }
3629
3630
3631
3632 /**
3633  * Main function that handles whether or not to route a message to other
3634  * peers.
3635  *
3636  * @param msg the message to be routed
3637  * @param msg_ctx the context containing all pertinent information about the message
3638  */
3639 static void
3640 demultiplex_message (const struct GNUNET_MessageHeader *msg,
3641                      struct DHT_MessageContext *msg_ctx)
3642 {
3643   /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */
3644   msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom);
3645
3646   switch (ntohs (msg->type))
3647   {
3648   case GNUNET_MESSAGE_TYPE_DHT_GET:    /* Add to hashmap of requests seen, search for data (always) */
3649     cache_response (msg_ctx);
3650     handle_dht_get (msg, msg_ctx);
3651     break;
3652   case GNUNET_MESSAGE_TYPE_DHT_PUT:    /* Check if closest, if so insert data. */
3653     increment_stats (STAT_PUTS);
3654     handle_dht_put (msg, msg_ctx);
3655     break;
3656   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:      /* Check if closest and not started by us, check options, add to requests seen */
3657     increment_stats (STAT_FIND_PEER);
3658     if (((msg_ctx->hop_count > 0) &&
3659          (0 !=
3660           memcmp (msg_ctx->peer, &my_identity,
3661                   sizeof (struct GNUNET_PeerIdentity)))) ||
3662         (msg_ctx->client != NULL))
3663     {
3664       cache_response (msg_ctx);
3665       if ((msg_ctx->closest == GNUNET_YES) ||
3666           (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
3667         handle_dht_find_peer (msg, msg_ctx);
3668     }
3669     else
3670       route_message (msg, msg_ctx);
3671 #if DEBUG_DHT_ROUTING
3672     if (msg_ctx->hop_count == 0)        /* Locally initiated request */
3673     {
3674       if ((debug_routes) && (dhtlog_handle != NULL))
3675       {
3676         dhtlog_handle->insert_dhtkey (NULL, &msg_ctx->key);
3677         dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_FIND_PEER,
3678                                      msg_ctx->hop_count, GNUNET_NO,
3679                                      &my_identity, &msg_ctx->key);
3680       }
3681     }
3682 #endif
3683     break;
3684   default:
3685     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3686                 "`%s': Message type (%d) not handled, forwarding anyway!\n",
3687                 "DHT", ntohs (msg->type));
3688     route_message (msg, msg_ctx);
3689   }
3690 }
3691
3692
3693
3694
3695 /**
3696  * Iterator for local get request results,
3697  *
3698  * @param cls closure for iterator, NULL
3699  * @param exp when does this value expire?
3700  * @param key the key this data is stored under
3701  * @param size the size of the data identified by key
3702  * @param data the actual data
3703  * @param type the type of the data
3704  *
3705  * @return GNUNET_OK to continue iteration, anything else
3706  * to stop iteration.
3707  */
3708 static int
3709 republish_content_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
3710                             const GNUNET_HashCode * key, size_t size,
3711                             const char *data, uint32_t type)
3712 {
3713
3714   struct DHT_MessageContext *new_msg_ctx;
3715   struct GNUNET_DHT_PutMessage *put_msg;
3716
3717 #if DEBUG_DHT
3718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3719               "`%s:%s': Received `%s' response from datacache\n", my_short_id,
3720               "DHT", "GET");
3721 #endif
3722   new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
3723
3724   put_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutMessage) + size);
3725   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
3726   put_msg->header.size = htons (sizeof (struct GNUNET_DHT_PutMessage) + size);
3727   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
3728   put_msg->type = htons (type);
3729   memcpy (&put_msg[1], data, size);
3730   new_msg_ctx->unique_id =
3731       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
3732                      (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX));
3733   new_msg_ctx->replication = ntohl (DEFAULT_PUT_REPLICATION);
3734   new_msg_ctx->msg_options = ntohl (0);
3735   new_msg_ctx->network_size = estimate_diameter ();
3736   new_msg_ctx->peer = &my_identity;
3737   new_msg_ctx->bloom =
3738       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3739   new_msg_ctx->hop_count = 0;
3740   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
3741   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
3742   increment_stats (STAT_PUT_START);
3743   demultiplex_message (&put_msg->header, new_msg_ctx);
3744
3745   GNUNET_free (new_msg_ctx);
3746   GNUNET_free (put_msg);
3747   return GNUNET_OK;
3748 }
3749
3750 /**
3751  * Task used to republish data.
3752  *
3753  * @param cls closure (a struct RepublishContext)
3754  * @param tc runtime context for this task
3755  */
3756 static void
3757 republish_content (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3758 {
3759   struct RepublishContext *put_context = cls;
3760
3761   unsigned int results;
3762
3763   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
3764   {
3765     GNUNET_free (put_context);
3766     return;
3767   }
3768
3769   GNUNET_assert (datacache != NULL);    /* If we have no datacache we never should have scheduled this! */
3770   results =
3771       GNUNET_DATACACHE_get (datacache, &put_context->key, put_context->type,
3772                             &republish_content_iterator, NULL);
3773   if (results == 0)             /* Data must have expired */
3774     GNUNET_free (put_context);
3775   else                          /* Reschedule task for next time period */
3776     GNUNET_SCHEDULER_add_delayed (dht_republish_frequency, &republish_content,
3777                                   put_context);
3778
3779 }
3780
3781
3782 /**
3783  * Iterator over hash map entries.
3784  *
3785  * @param cls client to search for in source routes
3786  * @param key current key code (ignored)
3787  * @param value value in the hash map, a DHTQueryRecord
3788  * @return GNUNET_YES if we should continue to
3789  *         iterate,
3790  *         GNUNET_NO if not.
3791  */
3792 static int
3793 find_client_records (void *cls, const GNUNET_HashCode * key, void *value)
3794 {
3795   struct ClientList *client = cls;
3796   struct DHTQueryRecord *record = value;
3797   struct DHTRouteSource *pos;
3798
3799   pos = record->head;
3800   while (pos != NULL)
3801   {
3802     if (pos->client == client)
3803       break;
3804     pos = pos->next;
3805   }
3806   if (pos != NULL)
3807   {
3808     GNUNET_CONTAINER_DLL_remove (record->head, record->tail, pos);
3809     GNUNET_CONTAINER_heap_remove_node (pos->hnode);
3810     if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
3811     {
3812       GNUNET_SCHEDULER_cancel (pos->delete_task);
3813       pos->delete_task = GNUNET_SCHEDULER_NO_TASK;
3814     }
3815     if (pos->find_peers_responded != NULL)
3816       GNUNET_CONTAINER_bloomfilter_free (pos->find_peers_responded);
3817     GNUNET_free (pos);
3818   }
3819   if (record->head == NULL)     /* No more entries in DLL */
3820   {
3821     GNUNET_assert (GNUNET_YES ==
3822                    GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
3823                                                          &record->key, record));
3824     GNUNET_free (record);
3825   }
3826   return GNUNET_YES;
3827 }
3828
3829 /**
3830  * Functions with this signature are called whenever a client
3831  * is disconnected on the network level.
3832  *
3833  * @param cls closure (NULL for dht)
3834  * @param client identification of the client; NULL
3835  *        for the last call when the server is destroyed
3836  */
3837 static void
3838 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3839 {
3840   struct ClientList *pos = client_list;
3841   struct ClientList *prev;
3842   struct ClientList *found;
3843   struct PendingMessage *reply;
3844
3845   prev = NULL;
3846   found = NULL;
3847   while (pos != NULL)
3848   {
3849     if (pos->client_handle == client)
3850     {
3851       if (prev != NULL)
3852         prev->next = pos->next;
3853       else
3854         client_list = pos->next;
3855       found = pos;
3856       break;
3857     }
3858     prev = pos;
3859     pos = pos->next;
3860   }
3861
3862   if (found != NULL)
3863   {
3864     if (found->transmit_handle != NULL)
3865       GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
3866
3867     while (NULL != (reply = found->pending_head))
3868     {
3869       GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
3870                                    reply);
3871       GNUNET_free (reply);
3872     }
3873     GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
3874                                            &find_client_records, found);
3875     GNUNET_free (found);
3876   }
3877 }
3878
3879 /**
3880  * Find a client if it exists, add it otherwise.
3881  *
3882  * @param client the server handle to the client
3883  *
3884  * @return the client if found, a new client otherwise
3885  */
3886 static struct ClientList *
3887 find_active_client (struct GNUNET_SERVER_Client *client)
3888 {
3889   struct ClientList *pos = client_list;
3890   struct ClientList *ret;
3891
3892   while (pos != NULL)
3893   {
3894     if (pos->client_handle == client)
3895       return pos;
3896     pos = pos->next;
3897   }
3898
3899   ret = GNUNET_malloc (sizeof (struct ClientList));
3900   ret->client_handle = client;
3901   ret->next = client_list;
3902   client_list = ret;
3903
3904   return ret;
3905 }
3906
3907 #if HAVE_MALICIOUS
3908 /**
3909  * Task to send a malicious put message across the network.
3910  *
3911  * @param cls closure for this task
3912  * @param tc the context under which the task is running
3913  */
3914 static void
3915 malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3916 {
3917   static struct GNUNET_DHT_PutMessage put_message;
3918   static struct DHT_MessageContext msg_ctx;
3919   static GNUNET_HashCode key;
3920   uint32_t random_key;
3921
3922   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
3923     return;
3924   put_message.header.size = htons (sizeof (struct GNUNET_DHT_PutMessage));
3925   put_message.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
3926   put_message.type = htonl (GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE);
3927   put_message.expiration =
3928       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_forever ());
3929   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
3930   random_key =
3931       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
3932   GNUNET_CRYPTO_hash (&random_key, sizeof (uint32_t), &key);
3933   memcpy (&msg_ctx.key, &key, sizeof (GNUNET_HashCode));
3934   msg_ctx.unique_id =
3935       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
3936                      (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX));
3937   msg_ctx.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3938   msg_ctx.msg_options = ntohl (0);
3939   msg_ctx.network_size = estimate_diameter ();
3940   msg_ctx.peer = &my_identity;
3941   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE;
3942   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3943 #if DEBUG_DHT_ROUTING
3944   if (dhtlog_handle != NULL)
3945     dhtlog_handle->insert_dhtkey (NULL, &key);
3946 #endif
3947   increment_stats (STAT_PUT_START);
3948   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3949               "%s:%s Sending malicious PUT message with hash %s\n", my_short_id,
3950               "DHT", GNUNET_h2s (&key));
3951   demultiplex_message (&put_message.header, &msg_ctx);
3952   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
3953                                 (GNUNET_TIME_UNIT_MILLISECONDS,
3954                                  malicious_put_frequency), &malicious_put_task,
3955                                 NULL);
3956 }
3957
3958
3959 /**
3960  * Task to send a malicious put message across the network.
3961  *
3962  * @param cls closure for this task
3963  * @param tc the context under which the task is running
3964  */
3965 static void
3966 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3967 {
3968   static struct GNUNET_DHT_GetMessage get_message;
3969   struct DHT_MessageContext msg_ctx;
3970   static GNUNET_HashCode key;
3971   uint32_t random_key;
3972
3973   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
3974     return;
3975
3976   get_message.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
3977   get_message.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
3978   get_message.type = htonl (GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE);
3979   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
3980   random_key =
3981       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
3982   GNUNET_CRYPTO_hash (&random_key, sizeof (uint32_t), &key);
3983   memcpy (&msg_ctx.key, &key, sizeof (GNUNET_HashCode));
3984   msg_ctx.unique_id =
3985       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
3986                      (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX));
3987   msg_ctx.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3988   msg_ctx.msg_options = ntohl (0);
3989   msg_ctx.network_size = estimate_diameter ();
3990   msg_ctx.peer = &my_identity;
3991   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE;
3992   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3993 #if DEBUG_DHT_ROUTING
3994   if (dhtlog_handle != NULL)
3995     dhtlog_handle->insert_dhtkey (NULL, &key);
3996 #endif
3997   increment_stats (STAT_GET_START);
3998   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3999               "%s:%s Sending malicious GET message with hash %s\n", my_short_id,
4000               "DHT", GNUNET_h2s (&key));
4001   demultiplex_message (&get_message.header, &msg_ctx);
4002   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
4003                                 (GNUNET_TIME_UNIT_MILLISECONDS,
4004                                  malicious_get_frequency), &malicious_get_task,
4005                                 NULL);
4006 }
4007 #endif
4008
4009
4010 /**
4011  * Iterator over hash map entries.
4012  *
4013  * @param cls closure
4014  * @param key current key code
4015  * @param value value in the hash map
4016  * @return GNUNET_YES if we should continue to
4017  *         iterate,
4018  *         GNUNET_NO if not.
4019  */
4020 static int
4021 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
4022 {
4023   struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
4024
4025   GNUNET_CONTAINER_bloomfilter_add (bloom, key);
4026   return GNUNET_YES;
4027 }
4028
4029 /**
4030  * Task to send a find peer message for our own peer identifier
4031  * so that we can find the closest peers in the network to ourselves
4032  * and attempt to connect to them.
4033  *
4034  * @param cls closure for this task
4035  * @param tc the context under which the task is running
4036  */
4037 static void
4038 send_find_peer_message (void *cls,
4039                         const struct GNUNET_SCHEDULER_TaskContext *tc)
4040 {
4041   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
4042   struct DHT_MessageContext msg_ctx;
4043   struct GNUNET_TIME_Relative next_send_time;
4044   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
4045
4046 #if COUNT_INTERVAL
4047   struct GNUNET_TIME_Relative time_diff;
4048   struct GNUNET_TIME_Absolute end;
4049   double multiplier;
4050   double count_per_interval;
4051 #endif
4052   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
4053     return;
4054
4055   if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer))        /* If we are finding peers already, no need to send out our request right now! */
4056   {
4057     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4058                 "Have %d newly found peers since last find peer message sent!\n",
4059                 newly_found_peers);
4060     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
4061                                   &send_find_peer_message, NULL);
4062     newly_found_peers = 0;
4063     return;
4064   }
4065
4066   increment_stats (STAT_FIND_PEER_START);
4067 #if COUNT_INTERVAL
4068   end = GNUNET_TIME_absolute_get ();
4069   time_diff =
4070       GNUNET_TIME_absolute_get_difference (find_peer_context.start, end);
4071
4072   if (time_diff.abs_value > FIND_PEER_CALC_INTERVAL.abs_value)
4073   {
4074     multiplier = time_diff.abs_value / FIND_PEER_CALC_INTERVAL.abs_value;
4075     count_per_interval = find_peer_context.count / multiplier;
4076   }
4077   else
4078   {
4079     multiplier = FIND_PEER_CALC_INTERVAL.abs_value / time_diff.abs_value;
4080     count_per_interval = find_peer_context.count * multiplier;
4081   }
4082 #endif
4083
4084 #if FIND_PEER_WITH_HELLO
4085   find_peer_msg =
4086       GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) +
4087                      GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *)
4088                                         my_hello));
4089   find_peer_msg->header.size =
4090       htons (sizeof (struct GNUNET_DHT_FindPeerMessage) +
4091              GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
4092   memcpy (&find_peer_msg[1], my_hello,
4093           GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
4094 #else
4095   find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
4096   find_peer_msg->header.size =
4097       htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
4098 #endif
4099   find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
4100   temp_bloom =
4101       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
4102   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
4103                                          temp_bloom);
4104   GNUNET_assert (GNUNET_OK ==
4105                  GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
4106                                                             find_peer_msg->
4107                                                             bloomfilter,
4108                                                             DHT_BLOOM_SIZE));
4109   GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
4110   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
4111   memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
4112   msg_ctx.unique_id =
4113       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
4114                      (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
4115   msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
4116   msg_ctx.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
4117   msg_ctx.network_size = estimate_diameter ();
4118   msg_ctx.peer = &my_identity;
4119   msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
4120   msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
4121
4122   demultiplex_message (&find_peer_msg->header, &msg_ctx);
4123   GNUNET_free (find_peer_msg);
4124   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4125               "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id,
4126               "DHT", "FIND PEER");
4127   if (newly_found_peers < bucket_size)
4128   {
4129     next_send_time.rel_value =
4130         (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
4131         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
4132                                   DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
4133   }
4134   else
4135   {
4136     next_send_time.rel_value =
4137         DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
4138         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
4139                                   DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value -
4140                                   DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
4141   }
4142
4143   GNUNET_assert (next_send_time.rel_value != 0);
4144   find_peer_context.count = 0;
4145   newly_found_peers = 0;
4146   find_peer_context.start = GNUNET_TIME_absolute_get ();
4147   if (GNUNET_YES == do_find_peer)
4148   {
4149     GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
4150                                   NULL);
4151   }
4152 }
4153
4154 /**
4155  * Handler for any generic DHT messages, calls the appropriate handler
4156  * depending on message type, sends confirmation if responses aren't otherwise
4157  * expected.
4158  *
4159  * @param cls closure for the service
4160  * @param client the client we received this message from
4161  * @param message the actual message received
4162  */
4163 static void
4164 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
4165                                 const struct GNUNET_MessageHeader *message)
4166 {
4167   const struct GNUNET_DHT_RouteMessage *dht_msg =
4168       (const struct GNUNET_DHT_RouteMessage *) message;
4169   const struct GNUNET_MessageHeader *enc_msg;
4170   struct DHT_MessageContext msg_ctx;
4171
4172   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
4173 #if DEBUG_DHT
4174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4175               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
4176               my_short_id, "DHT", "GENERIC", ntohs (message->type),
4177               GNUNET_h2s (&dht_msg->key), GNUNET_ntohll (dht_msg->unique_id));
4178 #endif
4179 #if DEBUG_DHT_ROUTING
4180   if (dhtlog_handle != NULL)
4181     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
4182 #endif
4183
4184   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
4185   msg_ctx.client = find_active_client (client);
4186   memcpy (&msg_ctx.key, &dht_msg->key, sizeof (GNUNET_HashCode));
4187   msg_ctx.unique_id = GNUNET_ntohll (dht_msg->unique_id);
4188   msg_ctx.replication = ntohl (dht_msg->desired_replication_level);
4189   msg_ctx.msg_options = ntohl (dht_msg->options);
4190   if (GNUNET_DHT_RO_RECORD_ROUTE ==
4191       (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
4192   {
4193     msg_ctx.path_history = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
4194     memcpy (msg_ctx.path_history, &my_identity,
4195             sizeof (struct GNUNET_PeerIdentity));
4196     msg_ctx.path_history_len = 1;
4197   }
4198   msg_ctx.network_size = estimate_diameter ();
4199   msg_ctx.peer = &my_identity; /* FIXME bart NULL? Fix doxygen? */
4200   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 4;  /* Make local routing a higher priority */
4201   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
4202
4203   if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
4204     increment_stats (STAT_GET_START);
4205   else if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
4206     increment_stats (STAT_PUT_START);
4207   else if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER)
4208     increment_stats (STAT_FIND_PEER_START);
4209
4210   if (GNUNET_YES == malicious_dropper)
4211   {
4212     if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
4213     {
4214 #if DEBUG_DHT_ROUTING
4215       if ((debug_routes) && (dhtlog_handle != NULL))
4216       {
4217         dhtlog_handle->insert_query (NULL, msg_ctx.unique_id, DHTLOG_GET,
4218                                      msg_ctx.hop_count, GNUNET_NO, &my_identity,
4219                                      &msg_ctx.key);
4220       }
4221 #endif
4222     }
4223     else if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
4224     {
4225 #if DEBUG_DHT_ROUTING
4226       if ((debug_routes) && (dhtlog_handle != NULL))
4227       {
4228         dhtlog_handle->insert_query (NULL, msg_ctx.unique_id, DHTLOG_PUT,
4229                                      msg_ctx.hop_count, GNUNET_NO, &my_identity,
4230                                      &msg_ctx.key);
4231       }
4232 #endif
4233     }
4234     GNUNET_SERVER_receive_done (client, GNUNET_OK);
4235     GNUNET_free_non_null (msg_ctx.path_history);
4236     return;
4237   }
4238
4239   demultiplex_message (enc_msg, &msg_ctx);
4240   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4241
4242 }
4243
4244 /**
4245  * Handler for any locally received DHT control messages,
4246  * sets malicious flags mostly for now.
4247  *
4248  * @param cls closure for the service
4249  * @param client the client we received this message from
4250  * @param message the actual message received
4251  *
4252  */
4253 static void
4254 handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client,
4255                             const struct GNUNET_MessageHeader *message)
4256 {
4257   const struct GNUNET_DHT_ControlMessage *dht_control_msg =
4258       (const struct GNUNET_DHT_ControlMessage *) message;
4259
4260 #if DEBUG_DHT
4261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4262               "`%s:%s': Received `%s' request from client, command %d\n",
4263               my_short_id, "DHT", "CONTROL", ntohs (dht_control_msg->command));
4264 #endif
4265
4266   switch (ntohs (dht_control_msg->command))
4267   {
4268   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
4269     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4270                 "Sending self seeking find peer request!\n");
4271     GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL);
4272     break;
4273 #if HAVE_MALICIOUS
4274   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
4275     if (ntohs (dht_control_msg->variable) > 0)
4276       malicious_get_frequency = ntohs (dht_control_msg->variable);
4277     if (malicious_get_frequency == 0)
4278       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4279     if (malicious_getter != GNUNET_YES)
4280       GNUNET_SCHEDULER_add_now (&malicious_get_task, NULL);
4281     malicious_getter = GNUNET_YES;
4282     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4283                 "%s:%s Initiating malicious GET behavior, frequency %d\n",
4284                 my_short_id, "DHT", malicious_get_frequency);
4285     break;
4286   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT:
4287     if (ntohs (dht_control_msg->variable) > 0)
4288       malicious_put_frequency = ntohs (dht_control_msg->variable);
4289     if (malicious_put_frequency == 0)
4290       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
4291     if (malicious_putter != GNUNET_YES)
4292       GNUNET_SCHEDULER_add_now (&malicious_put_task, NULL);
4293     malicious_putter = GNUNET_YES;
4294     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4295                 "%s:%s Initiating malicious PUT behavior, frequency %d\n",
4296                 my_short_id, "DHT", malicious_put_frequency);
4297     break;
4298   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP:
4299 #if DEBUG_DHT_ROUTING
4300     if ((malicious_dropper != GNUNET_YES) && (dhtlog_handle != NULL))
4301       dhtlog_handle->set_malicious (&my_identity);
4302 #endif
4303     malicious_dropper = GNUNET_YES;
4304     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4305                 "%s:%s Initiating malicious DROP behavior\n", my_short_id,
4306                 "DHT");
4307     break;
4308 #endif
4309   default:
4310     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4311                 "%s:%s Unknown control command type `%d'!\n", my_short_id,
4312                 "DHT", ntohs (dht_control_msg->command));
4313     break;
4314   }
4315
4316   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4317 }
4318
4319 /**
4320  * Handler for any generic DHT stop messages, calls the appropriate handler
4321  * depending on message type (if processed locally)
4322  *
4323  * @param cls closure for the service
4324  * @param client the client we received this message from
4325  * @param message the actual message received
4326  *
4327  */
4328 static void
4329 handle_dht_local_route_stop (void *cls, struct GNUNET_SERVER_Client *client,
4330                              const struct GNUNET_MessageHeader *message)
4331 {
4332
4333   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
4334       (const struct GNUNET_DHT_StopMessage *) message;
4335   struct DHTQueryRecord *record;
4336   struct DHTRouteSource *pos;
4337
4338 #if DEBUG_DHT
4339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4340               "`%s:%s': Received `%s' request from client, uid %llu\n",
4341               my_short_id, "DHT", "GENERIC STOP",
4342               GNUNET_ntohll (dht_stop_msg->unique_id));
4343 #endif
4344   record =
4345       GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap,
4346                                          &dht_stop_msg->key);
4347   if (record != NULL)
4348   {
4349     pos = record->head;
4350
4351     while (pos != NULL)
4352     {
4353       /* If the client is non-null (local request) and the client matches the requesting client, remove the entry. */
4354       if ((pos->client != NULL) && (pos->client->client_handle == client))
4355       {
4356         if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
4357           GNUNET_SCHEDULER_cancel (pos->delete_task);
4358         pos->delete_task =
4359             GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);
4360       }
4361       pos = pos->next;
4362     }
4363   }
4364
4365   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4366 }
4367
4368
4369 /**
4370  * Core handler for p2p route requests.
4371  *
4372  * @param cls closure
4373  * @param message message
4374  * @param peer peer identity this notification is about
4375  * @param atsi performance data
4376  * @return GNUNET_OK to keep the connection open,
4377  *         GNUNET_SYSERR to close it (signal serious error)
4378  */
4379 static int
4380 handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity *peer,
4381                               const struct GNUNET_MessageHeader *message,
4382                               const struct GNUNET_TRANSPORT_ATS_Information
4383                               *atsi)
4384 {
4385 #if DEBUG_DHT
4386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4387               "`%s:%s': Received P2P request from peer %s\n", my_short_id,
4388               "DHT", GNUNET_i2s (peer));
4389 #endif
4390   struct GNUNET_DHT_P2PRouteMessage *incoming =
4391       (struct GNUNET_DHT_P2PRouteMessage *) message;
4392   struct GNUNET_MessageHeader *enc_msg =
4393       (struct GNUNET_MessageHeader *) &incoming[1];
4394   struct DHT_MessageContext *msg_ctx;
4395   char *route_path;
4396   int path_size;
4397
4398   if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING)        
4399   {
4400     /* Throw these away. FIXME: Don't throw these away? (reply) */
4401 #if DEBUG_PING
4402     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n",
4403                 my_short_id, "DHT");
4404 #endif
4405     return GNUNET_YES;
4406   }
4407
4408   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4409   {
4410     GNUNET_break_op (0);
4411     return GNUNET_YES;
4412   }
4413
4414   if (malicious_dropper == GNUNET_YES)
4415   {
4416 #if DEBUG_DHT_ROUTING
4417     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4418     {
4419           /** Log routes that die due to high load! */
4420       dhtlog_handle->insert_route (NULL, GNUNET_ntohll (incoming->unique_id),
4421                                    DHTLOG_ROUTE, ntohl (incoming->hop_count),
4422                                    GNUNET_SYSERR, &my_identity, &incoming->key,
4423                                    peer, NULL);
4424     }
4425 #endif
4426     return GNUNET_YES;
4427   }
4428
4429   if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
4430   {
4431     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4432                 "Sending of previous replies took too long, backing off!\n");
4433     increment_stats ("# route requests dropped due to high load");
4434     decrease_max_send_delay (get_max_send_delay ());
4435 #if DEBUG_DHT_ROUTING
4436     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4437     {
4438         /** Log routes that die due to high load! */
4439       dhtlog_handle->insert_route (NULL, GNUNET_ntohll (incoming->unique_id),
4440                                    DHTLOG_ROUTE, ntohl (incoming->hop_count),
4441                                    GNUNET_SYSERR, &my_identity, &incoming->key,
4442                                    peer, NULL);
4443     }
4444 #endif
4445     return GNUNET_YES;
4446   }
4447   msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
4448   msg_ctx->bloom =
4449       GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
4450                                          DHT_BLOOM_K);
4451   GNUNET_assert (msg_ctx->bloom != NULL);
4452   msg_ctx->hop_count = ntohl (incoming->hop_count);
4453   memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
4454   msg_ctx->replication = ntohl (incoming->desired_replication_level);
4455   msg_ctx->unique_id = GNUNET_ntohll (incoming->unique_id);
4456   msg_ctx->msg_options = ntohl (incoming->options);
4457   if (GNUNET_DHT_RO_RECORD_ROUTE ==
4458       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
4459   {
4460     path_size =
4461         ntohl (incoming->outgoing_path_length) *
4462         sizeof (struct GNUNET_PeerIdentity);
4463     if (ntohs (message->size) !=
4464           (sizeof (struct GNUNET_DHT_P2PRouteMessage) +
4465           ntohs (enc_msg->size) + path_size))
4466     {
4467       GNUNET_break_op(0);
4468       return GNUNET_YES;
4469     }
4470     route_path = (char *) &incoming[1];
4471     route_path = route_path + ntohs (enc_msg->size);
4472     msg_ctx->path_history =
4473         GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
4474     memcpy (msg_ctx->path_history, route_path, path_size);
4475     memcpy (&msg_ctx->path_history[path_size], &my_identity,
4476             sizeof (struct GNUNET_PeerIdentity));
4477     msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
4478   }
4479   msg_ctx->network_size = ntohl (incoming->network_size);
4480   msg_ctx->peer = peer;
4481   msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
4482   msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
4483   demultiplex_message (enc_msg, msg_ctx);
4484   if (msg_ctx->bloom != NULL)
4485   {
4486     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
4487     msg_ctx->bloom = NULL;
4488   }
4489   GNUNET_free (msg_ctx);
4490   return GNUNET_YES;
4491 }
4492
4493
4494 /**
4495  * Core handler for p2p route results.
4496  *
4497  * @param cls closure
4498  * @param message message
4499  * @param peer peer identity this notification is about
4500  * @param atsi performance data
4501  *
4502  */
4503 static int
4504 handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer,
4505                              const struct GNUNET_MessageHeader *message,
4506                              const struct GNUNET_TRANSPORT_ATS_Information
4507                              *atsi)
4508 {
4509 #if DEBUG_DHT
4510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4511               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT",
4512               GNUNET_i2s (peer));
4513 #endif
4514   struct GNUNET_DHT_P2PRouteResultMessage *incoming =
4515       (struct GNUNET_DHT_P2PRouteResultMessage *) message;
4516   struct GNUNET_MessageHeader *enc_msg =
4517       (struct GNUNET_MessageHeader *) &incoming[1];
4518   struct DHT_MessageContext msg_ctx;
4519
4520 #if DEBUG_PATH
4521   char *path_offset;
4522   unsigned int i;
4523 #endif
4524   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4525   {
4526     GNUNET_break_op (0);
4527     return GNUNET_YES;
4528   }
4529
4530   if (malicious_dropper == GNUNET_YES)
4531   {
4532 #if DEBUG_DHT_ROUTING
4533     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4534     {
4535           /** Log routes that die due to high load! */
4536       dhtlog_handle->insert_route (NULL, GNUNET_ntohll (incoming->unique_id),
4537                                    DHTLOG_ROUTE, ntohl (incoming->hop_count),
4538                                    GNUNET_SYSERR, &my_identity, &incoming->key,
4539                                    peer, NULL);
4540     }
4541 #endif
4542     return GNUNET_YES;
4543   }
4544
4545   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
4546   // FIXME: call GNUNET_BLOCK_evaluate (...) -- instead of doing your own bloomfilter!
4547   memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
4548   msg_ctx.unique_id = GNUNET_ntohll (incoming->unique_id);
4549   msg_ctx.msg_options = ntohl (incoming->options);
4550   msg_ctx.hop_count = ntohl (incoming->hop_count);
4551   msg_ctx.peer = peer;
4552   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;  /* Make result routing a higher priority */
4553   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
4554   if ((GNUNET_DHT_RO_RECORD_ROUTE ==
4555        (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
4556       (ntohl (incoming->outgoing_path_length) > 0))
4557   {
4558     if (ntohs (message->size) -
4559         sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
4560         ntohs (enc_msg->size) !=
4561         ntohl (incoming->outgoing_path_length) *
4562         sizeof (struct GNUNET_PeerIdentity))
4563     {
4564 #if DEBUG_DHT
4565       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4566                   "Return message indicated a path was included, but sizes are wrong: Total size %d, enc size %d, left %d, expected %d\n",
4567                   ntohs (message->size), ntohs (enc_msg->size),
4568                   ntohs (message->size) -
4569                   sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
4570                   ntohs (enc_msg->size),
4571                   ntohl (incoming->outgoing_path_length) *
4572                   sizeof (struct GNUNET_PeerIdentity));
4573 #endif
4574       GNUNET_break_op (0);
4575       return GNUNET_NO;
4576     }
4577     msg_ctx.path_history = (char *) &incoming[1];
4578     msg_ctx.path_history += ntohs (enc_msg->size);
4579     msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
4580 #if DEBUG_PATH
4581     for (i = 0; i < msg_ctx.path_history_len; i++)
4582     {
4583       path_offset =
4584           &msg_ctx.path_history[i * sizeof (struct GNUNET_PeerIdentity)];
4585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4586                   "(handle_p2p_route_result) Key %s Found peer %d:%s\n",
4587                   GNUNET_h2s (&msg_ctx.key), i,
4588                   GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
4589     }
4590 #endif
4591   }
4592   msg_ctx.bloom =
4593       GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
4594                                          DHT_BLOOM_K);
4595   GNUNET_assert (msg_ctx.bloom != NULL);
4596   route_result_message (enc_msg, &msg_ctx);
4597   return GNUNET_YES;
4598 }
4599
4600
4601 /**
4602  * Receive the HELLO from transport service,
4603  * free current and replace if necessary.
4604  *
4605  * @param cls NULL
4606  * @param message HELLO message of peer
4607  */
4608 static void
4609 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
4610 {
4611 #if DEBUG_DHT
4612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4613               "Received our `%s' from transport service\n", "HELLO");
4614 #endif
4615
4616   GNUNET_assert (message != NULL);
4617   GNUNET_free_non_null (my_hello);
4618   my_hello = GNUNET_malloc (ntohs (message->size));
4619   memcpy (my_hello, message, ntohs (message->size));
4620 }
4621
4622
4623 /**
4624  * Task run during shutdown.
4625  *
4626  * @param cls unused
4627  * @param tc unused
4628  */
4629 static void
4630 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
4631 {
4632   int bucket_count;
4633   struct PeerInfo *pos;
4634
4635   if (NULL != ghh)
4636   {
4637     GNUNET_TRANSPORT_get_hello_cancel (ghh);
4638     ghh = NULL;
4639   }
4640   if (transport_handle != NULL)
4641   {
4642     GNUNET_free_non_null (my_hello);
4643     GNUNET_TRANSPORT_disconnect (transport_handle);
4644     transport_handle = NULL;
4645   }
4646   if (coreAPI != NULL)
4647   {
4648 #if DEBUG_DHT
4649     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n",
4650                 my_short_id, "DHT");
4651 #endif
4652     GNUNET_CORE_disconnect (coreAPI);
4653     coreAPI = NULL;
4654   }
4655   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
4656   {
4657     while (k_buckets[bucket_count].head != NULL)
4658     {
4659       pos = k_buckets[bucket_count].head;
4660 #if DEBUG_DHT
4661       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4662                   "%s:%s Removing peer %s from bucket %d!\n", my_short_id,
4663                   "DHT", GNUNET_i2s (&pos->id), bucket_count);
4664 #endif
4665       delete_peer (pos, bucket_count);
4666     }
4667   }
4668   if (datacache != NULL)
4669   {
4670 #if DEBUG_DHT
4671     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n",
4672                 my_short_id, "DHT");
4673 #endif
4674     GNUNET_DATACACHE_destroy (datacache);
4675     datacache = NULL;
4676   }
4677   if (stats != NULL)
4678   {
4679     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
4680     stats = NULL;
4681   }
4682   if (dhtlog_handle != NULL)
4683   {
4684     GNUNET_DHTLOG_disconnect (dhtlog_handle);
4685     dhtlog_handle = NULL;
4686   }
4687   if (block_context != NULL)
4688   {
4689     GNUNET_BLOCK_context_destroy (block_context);
4690     block_context = NULL;
4691   }
4692   GNUNET_free_non_null (my_short_id);
4693   my_short_id = NULL;
4694 }
4695
4696
4697 /**
4698  * To be called on core init/fail.
4699  *
4700  * @param cls service closure
4701  * @param server handle to the server for this service
4702  * @param identity the public identity of this peer
4703  * @param publicKey the public key of this peer
4704  */
4705 void
4706 core_init (void *cls, struct GNUNET_CORE_Handle *server,
4707            const struct GNUNET_PeerIdentity *identity,
4708            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
4709 {
4710
4711   if (server == NULL)
4712   {
4713 #if DEBUG_DHT
4714     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n",
4715                 "dht", GNUNET_i2s (identity));
4716 #endif
4717     GNUNET_SCHEDULER_cancel (cleanup_task);
4718     GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
4719     return;
4720   }
4721 #if DEBUG_DHT
4722   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4723               "%s: Core connection initialized, I am peer: %s\n", "dht",
4724               GNUNET_i2s (identity));
4725 #endif
4726
4727   /* Copy our identity so we can use it */
4728   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
4729   if (my_short_id != NULL)
4730     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4731                 "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n",
4732                 "DHT SERVICE");
4733   my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity));
4734   /* Set the server to local variable */
4735   coreAPI = server;
4736
4737   if (dhtlog_handle != NULL)
4738     dhtlog_handle->insert_node (NULL, &my_identity);
4739 }
4740
4741
4742 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
4743   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE,
4744    0},
4745   {&handle_dht_local_route_stop, NULL,
4746    GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
4747   {&handle_dht_control_message, NULL, GNUNET_MESSAGE_TYPE_DHT_CONTROL, 0},
4748   {NULL, NULL, 0, 0}
4749 };
4750
4751
4752 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
4753   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
4754   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
4755   {NULL, 0, 0}
4756 };
4757
4758
4759 /**
4760  * Method called whenever a peer connects.
4761  *
4762  * @param cls closure
4763  * @param peer peer identity this notification is about
4764  * @param atsi performance data
4765  */
4766 static void
4767 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
4768                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4769 {
4770   struct PeerInfo *ret;
4771   struct DHTPutEntry *put_entry;
4772   int peer_bucket;
4773
4774   /* Check for connect to self message */
4775   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
4776     return;
4777
4778 #if DEBUG_DHT
4779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4780               "%s:%s Receives core connect message for peer %s distance %d!\n",
4781               my_short_id, "dht", GNUNET_i2s (peer), distance);
4782 #endif
4783
4784   if (GNUNET_YES ==
4785       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
4786                                               &peer->hashPubKey))
4787   {
4788 #if DEBUG_DHT
4789     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4790                 "%s:%s Received %s message for peer %s, but already have peer in RT!",
4791                 my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
4792 #endif
4793     GNUNET_break (0);
4794     return;
4795   }
4796
4797   if ((datacache != NULL) && (GNUNET_YES == put_peer_identities))
4798   {
4799     put_entry =
4800         GNUNET_malloc (sizeof (struct DHTPutEntry) +
4801                        sizeof (struct GNUNET_PeerIdentity));
4802     put_entry->path_length = 0;
4803     put_entry->data_size = sizeof (struct GNUNET_PeerIdentity);
4804     memcpy (&put_entry[1], peer, sizeof (struct GNUNET_PeerIdentity));
4805     GNUNET_DATACACHE_put (datacache, &peer->hashPubKey,
4806                           sizeof (struct DHTPutEntry) +
4807                           sizeof (struct GNUNET_PeerIdentity),
4808                           (char *) put_entry, GNUNET_BLOCK_TYPE_DHT_HELLO,
4809                           GNUNET_TIME_absolute_get_forever ());
4810     GNUNET_free (put_entry);
4811   }
4812   else if (datacache == NULL)
4813     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4814                 "DHT has no connection to datacache!\n");
4815
4816   peer_bucket = find_current_bucket (&peer->hashPubKey);
4817   GNUNET_assert (peer_bucket >= lowest_bucket);
4818   GNUNET_assert (peer_bucket < MAX_BUCKETS);
4819   ret = GNUNET_malloc (sizeof (struct PeerInfo));
4820 #if 0
4821   ret->latency = latency;
4822   ret->distance = distance;
4823 #endif
4824   ret->id = *peer;
4825   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
4826                                      k_buckets[peer_bucket].tail,
4827                                      k_buckets[peer_bucket].tail, ret);
4828   k_buckets[peer_bucket].peers_size++;
4829 #if DO_UPDATE_PREFERENCE
4830   if ((GNUNET_CRYPTO_hash_matching_bits
4831        (&my_identity.hashPubKey, &peer->hashPubKey) > 0) &&
4832       (k_buckets[peer_bucket].peers_size <= bucket_size))
4833     ret->preference_task =
4834         GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
4835 #endif
4836   if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
4837     enable_next_bucket ();
4838 #if DO_PING
4839   schedule_ping_messages ();
4840 #endif
4841   newly_found_peers++;
4842   GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret,
4843                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
4844   increment_stats (STAT_PEERS_KNOWN);
4845
4846 #if DEBUG_DHT
4847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4848               "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
4849               ret == NULL ? "NOT ADDED" : "PEER ADDED");
4850 #endif
4851 }
4852
4853
4854 /**
4855  * Method called whenever a peer disconnects.
4856  *
4857  * @param cls closure
4858  * @param peer peer identity this notification is about
4859  */
4860 static void
4861 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
4862 {
4863   struct PeerInfo *to_remove;
4864   int current_bucket;
4865
4866   /* Check for disconnect from self message */
4867   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
4868     return;
4869 #if DEBUG_DHT
4870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4871               "%s:%s: Received peer disconnect message for peer `%s' from %s\n",
4872               my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
4873 #endif
4874
4875   if (GNUNET_YES !=
4876       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
4877                                               &peer->hashPubKey))
4878   {
4879     GNUNET_break (0);
4880 #if DEBUG_DHT
4881     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4882                 "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
4883                 my_short_id, "DHT", GNUNET_i2s (peer));
4884 #endif
4885     return;
4886   }
4887   increment_stats (STAT_DISCONNECTS);
4888   GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
4889                  (all_known_peers, &peer->hashPubKey));
4890   to_remove =
4891       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
4892   GNUNET_assert (to_remove != NULL);
4893   if (NULL != to_remove->info_ctx)
4894   {
4895     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
4896     to_remove->info_ctx = NULL;
4897   }
4898   GNUNET_assert (0 ==
4899                  memcmp (peer, &to_remove->id,
4900                          sizeof (struct GNUNET_PeerIdentity)));
4901   current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
4902   delete_peer (to_remove, current_bucket);
4903 }
4904
4905
4906 /**
4907  * Process dht requests.
4908  *
4909  * @param cls closure
4910  * @param server the initialized server
4911  * @param c configuration to use
4912  */
4913 static void
4914 run (void *cls, struct GNUNET_SERVER_Handle *server,
4915      const struct GNUNET_CONFIGURATION_Handle *c)
4916 {
4917   struct GNUNET_TIME_Relative next_send_time;
4918   unsigned long long temp_config_num;
4919   char *converge_modifier_buf;
4920
4921   cfg = c;
4922   datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
4923   GNUNET_SERVER_add_handlers (server, plugin_handlers);
4924   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
4925   coreAPI = GNUNET_CORE_connect (cfg,   /* Main configuration */
4926                                  DEFAULT_CORE_QUEUE_SIZE,       /* queue size */
4927                                  NULL,  /* Closure passed to DHT functions */
4928                                  &core_init,    /* Call core_init once connected */
4929                                  &handle_core_connect,  /* Handle connects */
4930                                  &handle_core_disconnect,       /* remove peers on disconnects */
4931                                  NULL,  /* Do we care about "status" updates? */
4932                                  NULL,  /* Don't want notified about all incoming messages */
4933                                  GNUNET_NO,     /* For header only inbound notification */
4934                                  NULL,  /* Don't want notified about all outbound messages */
4935                                  GNUNET_NO,     /* For header only outbound notification */
4936                                  core_handlers);        /* Register these handlers */
4937
4938   if (coreAPI == NULL)
4939     return;
4940   transport_handle =
4941       GNUNET_TRANSPORT_connect (cfg, NULL, NULL, NULL, NULL, NULL);
4942   if (transport_handle != NULL)
4943     ghh = GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
4944   else
4945     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4946                 "Failed to connect to transport service!\n");
4947   block_context = GNUNET_BLOCK_context_create (cfg);
4948   lowest_bucket = MAX_BUCKETS - 1;
4949   forward_list.hashmap =
4950       GNUNET_CONTAINER_multihashmap_create (MAX_OUTSTANDING_FORWARDS / 10);
4951   forward_list.minHeap =
4952       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4953   all_known_peers = GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
4954   recent_find_peer_requests =
4955       GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
4956   GNUNET_assert (all_known_peers != NULL);
4957   if (GNUNET_YES ==
4958       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht_testing",
4959                                             "mysql_logging"))
4960   {
4961     debug_routes = GNUNET_YES;
4962   }
4963
4964   if (GNUNET_YES ==
4965       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "strict_kademlia"))
4966   {
4967     strict_kademlia = GNUNET_YES;
4968   }
4969
4970   if (GNUNET_YES ==
4971       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "stop_on_closest"))
4972   {
4973     stop_on_closest = GNUNET_YES;
4974   }
4975
4976   if (GNUNET_YES ==
4977       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "stop_found"))
4978   {
4979     stop_on_found = GNUNET_YES;
4980   }
4981
4982   if (GNUNET_YES ==
4983       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "malicious_getter"))
4984   {
4985     malicious_getter = GNUNET_YES;
4986     if (GNUNET_NO ==
4987         GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4988                                                "MALICIOUS_GET_FREQUENCY",
4989                                                &malicious_get_frequency))
4990       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4991   }
4992
4993   if (GNUNET_YES !=
4994       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "MAX_HOPS", &max_hops))
4995   {
4996     max_hops = DEFAULT_MAX_HOPS;
4997   }
4998
4999   if (GNUNET_YES ==
5000       GNUNET_CONFIGURATION_get_value_yesno (cfg, "DHT", "USE_MAX_HOPS"))
5001   {
5002     use_max_hops = GNUNET_YES;
5003   }
5004
5005   if (GNUNET_YES ==
5006       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "malicious_putter"))
5007   {
5008     malicious_putter = GNUNET_YES;
5009     if (GNUNET_NO ==
5010         GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
5011                                                "MALICIOUS_PUT_FREQUENCY",
5012                                                &malicious_put_frequency))
5013       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
5014   }
5015
5016   dht_republish_frequency = GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY;
5017   if (GNUNET_OK ==
5018       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
5019                                              "REPLICATION_FREQUENCY",
5020                                              &temp_config_num))
5021   {
5022     dht_republish_frequency =
5023         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES,
5024                                        temp_config_num);
5025   }
5026
5027   if (GNUNET_OK ==
5028       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
5029                                              &temp_config_num))
5030   {
5031     bucket_size = (unsigned int) temp_config_num;
5032   }
5033
5034   if (GNUNET_OK !=
5035       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "kad_alpha",
5036                                              &kademlia_replication))
5037   {
5038     kademlia_replication = DEFAULT_KADEMLIA_REPLICATION;
5039   }
5040
5041   if (GNUNET_YES ==
5042       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "malicious_dropper"))
5043   {
5044     malicious_dropper = GNUNET_YES;
5045   }
5046
5047   if (GNUNET_YES ==
5048       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "republish"))
5049     do_republish = GNUNET_NO;
5050
5051   if (GNUNET_NO ==
5052       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "do_find_peer"))
5053   {
5054     do_find_peer = GNUNET_NO;
5055   }
5056   else
5057     do_find_peer = GNUNET_YES;
5058
5059   if (GNUNET_YES ==
5060       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "use_real_distance"))
5061     use_real_distance = GNUNET_YES;
5062
5063   if (GNUNET_YES ==
5064       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht_testing",
5065                                             "mysql_logging_extended"))
5066   {
5067     debug_routes = GNUNET_YES;
5068     debug_routes_extended = GNUNET_YES;
5069   }
5070
5071 #if DEBUG_DHT_ROUTING
5072   if (GNUNET_YES == debug_routes)
5073   {
5074     dhtlog_handle = GNUNET_DHTLOG_connect (cfg);
5075     if (dhtlog_handle == NULL)
5076     {
5077       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
5078                   "Could not connect to mysql logging server, logging will not happen!");
5079     }
5080   }
5081 #endif
5082
5083   converge_modifier = 4.0;
5084   if (GNUNET_OK ==
5085       GNUNET_CONFIGURATION_get_value_string (cfg, "dht", "converge_modifier",
5086                                              &converge_modifier_buf))
5087   {
5088     if (1 != sscanf (converge_modifier_buf, "%f", &converge_modifier))
5089     {
5090       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
5091                   "Failed to read decimal value for %s from `%s'\n",
5092                   "CONVERGE_MODIFIER", converge_modifier_buf);
5093       converge_modifier = 0.0;
5094     }
5095     GNUNET_free (converge_modifier_buf);
5096   }
5097
5098   if (GNUNET_YES ==
5099       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "paper_forwarding"))
5100     paper_forwarding = GNUNET_YES;
5101
5102   if (GNUNET_YES ==
5103       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "put_peer_identities"))
5104     put_peer_identities = GNUNET_YES;
5105
5106   stats = GNUNET_STATISTICS_create ("dht", cfg);
5107
5108   if (stats != NULL)
5109   {
5110     GNUNET_STATISTICS_set (stats, STAT_ROUTES, 0, GNUNET_NO);
5111     GNUNET_STATISTICS_set (stats, STAT_ROUTE_FORWARDS, 0, GNUNET_NO);
5112     GNUNET_STATISTICS_set (stats, STAT_ROUTE_FORWARDS_CLOSEST, 0, GNUNET_NO);
5113     GNUNET_STATISTICS_set (stats, STAT_RESULTS, 0, GNUNET_NO);
5114     GNUNET_STATISTICS_set (stats, STAT_RESULTS_TO_CLIENT, 0, GNUNET_NO);
5115     GNUNET_STATISTICS_set (stats, STAT_RESULT_FORWARDS, 0, GNUNET_NO);
5116     GNUNET_STATISTICS_set (stats, STAT_GETS, 0, GNUNET_NO);
5117     GNUNET_STATISTICS_set (stats, STAT_PUTS, 0, GNUNET_NO);
5118     GNUNET_STATISTICS_set (stats, STAT_PUTS_INSERTED, 0, GNUNET_NO);
5119     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER, 0, GNUNET_NO);
5120     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER_START, 0, GNUNET_NO);
5121     GNUNET_STATISTICS_set (stats, STAT_GET_START, 0, GNUNET_NO);
5122     GNUNET_STATISTICS_set (stats, STAT_PUT_START, 0, GNUNET_NO);
5123     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER_REPLY, 0, GNUNET_NO);
5124     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER_ANSWER, 0, GNUNET_NO);
5125     GNUNET_STATISTICS_set (stats, STAT_BLOOM_FIND_PEER, 0, GNUNET_NO);
5126     GNUNET_STATISTICS_set (stats, STAT_GET_REPLY, 0, GNUNET_NO);
5127     GNUNET_STATISTICS_set (stats, STAT_GET_RESPONSE_START, 0, GNUNET_NO);
5128     GNUNET_STATISTICS_set (stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
5129     GNUNET_STATISTICS_set (stats, STAT_DISCONNECTS, 0, GNUNET_NO);
5130   }
5131   /* FIXME: if there are no recent requests then these never get freed, but alternative is _annoying_! */
5132   recent.hashmap = GNUNET_CONTAINER_multihashmap_create (DHT_MAX_RECENT / 2);
5133   recent.minHeap =
5134       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
5135   if (GNUNET_YES == do_find_peer)
5136   {
5137     next_send_time.rel_value =
5138         DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
5139         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
5140                                   (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
5141                                    2) -
5142                                   DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
5143     find_peer_context.start = GNUNET_TIME_absolute_get ();
5144     GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
5145                                   &find_peer_context);
5146   }
5147
5148   /* Scheduled the task to clean up when shutdown is called */
5149   cleanup_task =
5150       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
5151                                     &shutdown_task, NULL);
5152 }
5153
5154 /**
5155  * The main function for the dht service.
5156  *
5157  * @param argc number of arguments from the command line
5158  * @param argv command line arguments
5159  * @return 0 ok, 1 on error
5160  */
5161 int
5162 main (int argc, char *const *argv)
5163 {
5164   int ret;
5165
5166   ret =
5167       (GNUNET_OK ==
5168        GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
5169                            NULL)) ? 0 : 1;
5170   if (NULL != recent.hashmap)
5171   {
5172     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (recent.hashmap));
5173     GNUNET_CONTAINER_multihashmap_destroy (recent.hashmap);
5174     recent.hashmap = NULL;
5175   }
5176   if (NULL != recent.minHeap)
5177   {
5178     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (recent.minHeap));
5179     GNUNET_CONTAINER_heap_destroy (recent.minHeap);
5180     recent.minHeap = NULL;
5181   }
5182   if (NULL != recent_find_peer_requests)
5183   {
5184     GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
5185     recent_find_peer_requests = NULL;
5186   }
5187   return ret;
5188 }