block library initialization
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief main DHT service shell, building block for DHT implementations
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_client_lib.h"
31 #include "gnunet_getopt_lib.h"
32 #include "gnunet_os_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_service_lib.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_signal_lib.h"
37 #include "gnunet_util_lib.h"
38 #include "gnunet_datacache_lib.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_dht_service.h"
42 #include "gnunet_statistics_service.h"
43 #include "dhtlog.h"
44 #include "dht.h"
45 #include <fenv.h>
46
47 #define PRINT_TABLES GNUNET_NO
48
49 #define REAL_DISTANCE GNUNET_NO
50
51 #define EXTRA_CHECKS GNUNET_NO
52
53 /**
54  * How many buckets will we allow total.
55  */
56 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
57
58 /**
59  * Should the DHT issue FIND_PEER requests to get better routing tables?
60  */
61 #define DEFAULT_DO_FIND_PEER GNUNET_YES
62
63 /**
64  * Defines whether find peer requests send their HELLO's outgoing,
65  * or expect replies to contain hellos.
66  */
67 #define FIND_PEER_WITH_HELLO GNUNET_YES
68
69 /**
70  * What is the maximum number of peers in a given bucket.
71  */
72 #define DEFAULT_BUCKET_SIZE 4
73
74 /**
75  * Minimum number of peers we need for "good" routing,
76  * any less than this and we will allow messages to
77  * travel much further through the network!
78  */
79 #define MINIMUM_PEER_THRESHOLD 20
80
81 #define DHT_MAX_RECENT 1000
82
83 #define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
84
85 /**
86  * Default time to wait to send messages on behalf of other peers.
87  */
88 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
89
90 /**
91  * Default importance for handling messages on behalf of other peers.
92  */
93 #define DHT_DEFAULT_P2P_IMPORTANCE 0
94
95 /**
96  * How long to keep recent requests around by default.
97  */
98 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
99
100 /**
101  * Default time to wait to send find peer messages sent by the dht service.
102  */
103 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
104
105 /**
106  * Default importance for find peer messages sent by the dht service.
107  */
108 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
109
110 /**
111  * Default replication parameter for find peer messages sent by the dht service.
112  */
113 #define DHT_DEFAULT_PUT_REPLICATION 4
114
115 /**
116  * Default replication parameter for find peer messages sent by the dht service.
117  */
118 #define DHT_DEFAULT_FIND_PEER_REPLICATION 4
119
120 /**
121  * Default options for find peer requests sent by the dht service.
122  */
123 #define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
124 /*#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE*/
125
126 /**
127  * How long at least to wait before sending another find peer request.
128  */
129 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
130
131 /**
132  * How long at most to wait before sending another find peer request.
133  */
134 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
135
136 /**
137  * How often to update our preference levels for peers in our routing tables.
138  */
139 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
140
141 /**
142  * How long at most on average will we allow a reply forward to take
143  * (before we quit sending out new requests)
144  */
145 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
146
147 /**
148  * How many initial requests to send out (in true Kademlia fashion)
149  */
150 #define DHT_KADEMLIA_REPLICATION 3
151
152 /*
153  * Default frequency for sending malicious get messages
154  */
155 #define DEFAULT_MALICIOUS_GET_FREQUENCY 1000 /* Number of milliseconds */
156
157 /*
158  * Default frequency for sending malicious put messages
159  */
160 #define DEFAULT_MALICIOUS_PUT_FREQUENCY 1000 /* Default is in milliseconds */
161
162 /**
163  * Type for a malicious request, so we can ignore it during testing
164  */
165 #define DHT_MALICIOUS_MESSAGE_TYPE 42
166
167 #define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
168
169 /**
170  * Real maximum number of hops, at which point we refuse
171  * to forward the message.
172  */
173 #define DEFAULT_MAX_HOPS 10
174
175 /**
176  * How many time differences between requesting a core send and
177  * the actual callback to remember.
178  */
179 #define MAX_REPLY_TIMES 8
180
181 enum ConvergenceOptions
182 {
183    /**
184     * Use the linear method for convergence.
185     */
186    DHT_CONVERGE_LINEAR,
187
188    /**
189     * Converge using a fast converging square
190     * function.
191     */
192    DHT_CONVERGE_SQUARE,
193
194    /**
195     * Converge using a slower exponential
196     * function.
197     */
198    DHT_CONVERGE_EXPONENTIAL,
199
200    /**
201     * Don't do any special convergence, allow
202     * the algorithm to hopefully route to closer
203     * peers more often.
204     */
205    DHT_CONVERGE_RANDOM
206 };
207
208 /**
209  * Linked list of messages to send to clients.
210  */
211 struct P2PPendingMessage
212 {
213   /**
214    * Pointer to next item in the list
215    */
216   struct P2PPendingMessage *next;
217
218   /**
219    * Pointer to previous item in the list
220    */
221   struct P2PPendingMessage *prev;
222
223   /**
224    * Message importance level.
225    */
226   unsigned int importance;
227
228   /**
229    * Time when this request was scheduled to be sent.
230    */
231   struct GNUNET_TIME_Absolute scheduled;
232
233   /**
234    * How long to wait before sending message.
235    */
236   struct GNUNET_TIME_Relative timeout;
237
238   /**
239    * Actual message to be sent; // avoid allocation
240    */
241   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
242
243 };
244
245 /**
246  * Per-peer information.
247  */
248 struct PeerInfo
249 {
250   /**
251    * Next peer entry (DLL)
252    */
253   struct PeerInfo *next;
254
255   /**
256    *  Prev peer entry (DLL)
257    */
258   struct PeerInfo *prev;
259
260   /**
261    * Head of pending messages to be sent to this peer.
262    */
263   struct P2PPendingMessage *head;
264
265   /**
266    * Tail of pending messages to be sent to this peer.
267    */
268   struct P2PPendingMessage *tail;
269
270   /**
271    * Core handle for sending messages to this peer.
272    */
273   struct GNUNET_CORE_TransmitHandle *th;
274
275   /**
276    * Task for scheduling message sends.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier send_task;
279
280   /**
281    * Task for scheduling preference updates
282    */
283   GNUNET_SCHEDULER_TaskIdentifier preference_task;
284
285   /**
286    * Preference update context
287    */
288   struct GNUNET_CORE_InformationRequestContext *info_ctx;
289
290   /**
291    * What is the average latency for replies received?
292    */
293   struct GNUNET_TIME_Relative latency;
294
295   /**
296    * What is the identity of the peer?
297    */
298   struct GNUNET_PeerIdentity id;
299
300   /**
301    * Transport level distance to peer.
302    */
303   unsigned int distance;
304
305   /**
306    * Holds matching bits from peer to current target,
307    * used for distance comparisons between peers. May
308    * be considered a really bad idea.
309    * FIXME: remove this value (create struct which holds
310    *        a single peerinfo and the matching bits, use
311    *        that to pass to comparitor)
312    */
313   unsigned int matching_bits;
314
315   /**
316    * Task for scheduling periodic ping messages for this peer.
317    */
318   GNUNET_SCHEDULER_TaskIdentifier ping_task;
319 };
320
321 /**
322  * Peers are grouped into buckets.
323  */
324 struct PeerBucket
325 {
326   /**
327    * Head of DLL
328    */
329   struct PeerInfo *head;
330
331   /**
332    * Tail of DLL
333    */
334   struct PeerInfo *tail;
335
336   /**
337    * Number of peers in the bucket.
338    */
339   unsigned int peers_size;
340 };
341
342 /**
343  * Linked list of messages to send to clients.
344  */
345 struct PendingMessage
346 {
347   /**
348    * Pointer to next item in the list
349    */
350   struct PendingMessage *next;
351
352   /**
353    * Pointer to previous item in the list
354    */
355   struct PendingMessage *prev;
356
357   /**
358    * Actual message to be sent; // avoid allocation
359    */
360   const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
361
362 };
363
364 /**
365  * Struct containing information about a client,
366  * handle to connect to it, and any pending messages
367  * that need to be sent to it.
368  */
369 struct ClientList
370 {
371   /**
372    * Linked list of active clients
373    */
374   struct ClientList *next;
375
376   /**
377    * The handle to this client
378    */
379   struct GNUNET_SERVER_Client *client_handle;
380
381   /**
382    * Handle to the current transmission request, NULL
383    * if none pending.
384    */
385   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
386
387   /**
388    * Linked list of pending messages for this client
389    */
390   struct PendingMessage *pending_head;
391
392   /**
393    * Tail of linked list of pending messages for this client
394    */
395   struct PendingMessage *pending_tail;
396 };
397
398
399 /**
400  * Context containing information about a DHT message received.
401  */
402 struct DHT_MessageContext
403 {
404   /**
405    * The client this request was received from.
406    * (NULL if received from another peer)
407    */
408   struct ClientList *client;
409
410   /**
411    * The peer this request was received from.
412    * (NULL if received from local client)
413    */
414   const struct GNUNET_PeerIdentity *peer;
415
416   /**
417    * The key this request was about
418    */
419   GNUNET_HashCode key;
420
421   /**
422    * The unique identifier of this request
423    */
424   uint64_t unique_id;
425
426   /**
427    * Desired replication level
428    */
429   uint32_t replication;
430
431   /**
432    * Network size estimate, either ours or the sum of
433    * those routed to thus far. =~ Log of number of peers
434    * chosen from for this request.
435    */
436   uint32_t network_size;
437
438   /**
439    * Any message options for this request
440    */
441   uint32_t msg_options;
442
443   /**
444    * How many hops has the message already traversed?
445    */
446   uint32_t hop_count;
447
448   /**
449    * How important is this message?
450    */
451   unsigned int importance;
452
453   /**
454    * How long should we wait to transmit this request?
455    */
456   struct GNUNET_TIME_Relative timeout;
457
458   /**
459    * Bloomfilter for this routing request.
460    */
461   struct GNUNET_CONTAINER_BloomFilter *bloom;
462
463   /**
464    * Did we forward this message? (may need to remember it!)
465    */
466   int forwarded;
467
468   /**
469    * Are we the closest known peer to this key (out of our neighbors?)
470    */
471   int closest;
472 };
473
474 /**
475  * Record used for remembering what peers are waiting for what
476  * responses (based on search key).
477  */
478 struct DHTRouteSource
479 {
480   /**
481    * This is a DLL.
482    */
483   struct DHTRouteSource *next;
484
485   /**
486    * This is a DLL.
487    */
488   struct DHTRouteSource *prev;
489
490   /**
491    * Source of the request.  Replies should be forwarded to
492    * this peer.
493    */
494   struct GNUNET_PeerIdentity source;
495
496   /**
497    * If this was a local request, remember the client; otherwise NULL.
498    */
499   struct ClientList *client;
500
501   /**
502    * Pointer to this nodes heap location (for removal)
503    */
504   struct GNUNET_CONTAINER_HeapNode *hnode;
505
506   /**
507    * Back pointer to the record storing this information.
508    */
509   struct DHTQueryRecord *record;
510
511   /**
512    * Task to remove this entry on timeout.
513    */
514   GNUNET_SCHEDULER_TaskIdentifier delete_task;
515
516   /**
517    * Bloomfilter of peers we have already sent back as
518    * replies to the initial request.  Allows us to not
519    * forward the same peer multiple times for a find peer
520    * request.
521    */
522   struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
523
524 };
525
526 /**
527  * Entry in the DHT routing table.
528  */
529 struct DHTQueryRecord
530 {
531   /**
532    * Head of DLL for result forwarding.
533    */
534   struct DHTRouteSource *head;
535
536   /**
537    * Tail of DLL for result forwarding.
538    */
539   struct DHTRouteSource *tail;
540
541   /**
542    * Key that the record concerns.
543    */
544   GNUNET_HashCode key;
545
546   /**
547    * GET message of this record (what we already forwarded?).
548    */
549   //DV_DHT_MESSAGE get; Try to get away with not saving this.
550
551   /**
552    * Bloomfilter of the peers we've replied to so far
553    */
554   //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
555
556 };
557
558 /**
559  * Context used to calculate the number of find peer messages
560  * per X time units since our last scheduled find peer message
561  * was sent.  If we have seen too many messages, delay or don't
562  * send our own out.
563  */
564 struct FindPeerMessageContext
565 {
566   unsigned int count;
567
568   struct GNUNET_TIME_Absolute start;
569
570   struct GNUNET_TIME_Absolute end;
571 };
572
573 /**
574  * DHT Routing results structure
575  */
576 struct DHTResults
577 {
578   /*
579    * Min heap for removal upon reaching limit
580    */
581   struct GNUNET_CONTAINER_Heap *minHeap;
582
583   /*
584    * Hashmap for fast key based lookup
585    */
586   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
587
588 };
589
590 /**
591  * DHT structure for recent requests.
592  */
593 struct RecentRequests
594 {
595   /*
596    * Min heap for removal upon reaching limit
597    */
598   struct GNUNET_CONTAINER_Heap *minHeap;
599
600   /*
601    * Hashmap for key based lookup
602    */
603   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
604 };
605
606 struct RecentRequest
607 {
608   /**
609    * Position of this node in the min heap.
610    */
611   struct GNUNET_CONTAINER_HeapNode *heap_node;
612
613   /**
614    * Bloomfilter containing entries for peers
615    * we forwarded this request to.
616    */
617   struct GNUNET_CONTAINER_BloomFilter *bloom;
618
619   /**
620    * Timestamp of this request, for ordering
621    * the min heap.
622    */
623   struct GNUNET_TIME_Absolute timestamp;
624
625   /**
626    * Key of this request.
627    */
628   GNUNET_HashCode key;
629
630   /**
631    * Unique identifier for this request.
632    */
633   uint64_t uid;
634
635   /**
636    * Task to remove this entry on timeout.
637    */
638   GNUNET_SCHEDULER_TaskIdentifier remove_task;
639 };
640
641 struct RepublishContext
642 {
643   /**
644    * Key to republish.
645    */
646   GNUNET_HashCode key;
647
648   /**
649    * Type of the data.
650    */
651   unsigned int type;
652
653 };
654
655 /**
656  * Which kind of convergence will we be using?
657  */
658 static enum ConvergenceOptions converge_option;
659
660 /**
661  * Modifier for the convergence function
662  */
663 static float converge_modifier;
664
665 /**
666  * Recent requests by hash/uid and by time inserted.
667  */
668 static struct RecentRequests recent;
669
670 /**
671  * Context to use to calculate find peer rates.
672  */
673 static struct FindPeerMessageContext find_peer_context;
674
675 /**
676  * Don't use our routing algorithm, always route
677  * to closest peer; initially send requests to 3
678  * peers.
679  */
680 static unsigned int strict_kademlia;
681
682 /**
683  * Routing option to end routing when closest peer found.
684  */
685 static unsigned int stop_on_closest;
686
687 /**
688  * Routing option to end routing when data is found.
689  */
690 static unsigned int stop_on_found;
691
692 /**
693  * Whether DHT needs to manage find peer requests, or
694  * an external force will do it on behalf of the DHT.
695  */
696 static unsigned int do_find_peer;
697
698 /**
699  * Once we have stored an item in the DHT, refresh it
700  * according to our republish interval.
701  */
702 static unsigned int do_republish;
703
704 /**
705  * Use the "real" distance metric when selecting the
706  * next routing hop.  Can be less accurate.
707  */
708 static unsigned int use_real_distance;
709
710 /**
711  * How many peers have we added since we sent out our last
712  * find peer request?
713  */
714 static unsigned int newly_found_peers;
715
716 /**
717  * Container of active queries we should remember
718  */
719 static struct DHTResults forward_list;
720
721 /**
722  * Handle to the datacache service (for inserting/retrieving data)
723  */
724 static struct GNUNET_DATACACHE_Handle *datacache;
725
726 /**
727  * Handle for the statistics service.
728  */
729 struct GNUNET_STATISTICS_Handle *stats;
730
731 /**
732  * The main scheduler to use for the DHT service
733  */
734 static struct GNUNET_SCHEDULER_Handle *sched;
735
736 /**
737  * The configuration the DHT service is running with
738  */
739 static const struct GNUNET_CONFIGURATION_Handle *cfg;
740
741 /**
742  * Handle to the core service
743  */
744 static struct GNUNET_CORE_Handle *coreAPI;
745
746 /**
747  * Handle to the transport service, for getting our hello
748  */
749 static struct GNUNET_TRANSPORT_Handle *transport_handle;
750
751 /**
752  * The identity of our peer.
753  */
754 static struct GNUNET_PeerIdentity my_identity;
755
756 /**
757  * Short id of the peer, for printing
758  */
759 static char *my_short_id;
760
761 /**
762  * Our HELLO
763  */
764 static struct GNUNET_MessageHeader *my_hello;
765
766 /**
767  * Task to run when we shut down, cleaning up all our trash
768  */
769 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
770
771 /**
772  * The lowest currently used bucket.
773  */
774 static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
775
776 /**
777  * The maximum number of hops before we stop routing messages.
778  */
779 static unsigned long long max_hops;
780
781 /**
782  * How often to republish content we have previously stored.
783  */
784 static struct GNUNET_TIME_Relative dht_republish_frequency;
785
786 /**
787  * GNUNET_YES to stop at max_hops, GNUNET_NO to heuristically decide when to stop forwarding.
788  */
789 static int use_max_hops;
790
791 /**
792  * The buckets (Kademlia routing table, complete with growth).
793  * Array of size MAX_BUCKET_SIZE.
794  */
795 static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
796
797 /**
798  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
799  */
800 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
801
802 /**
803  * Recently seen find peer requests.
804  */
805 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
806
807 /**
808  * Maximum size for each bucket.
809  */
810 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
811
812 /**
813  * List of active clients.
814  */
815 static struct ClientList *client_list;
816
817 /**
818  * Handle to the DHT logger.
819  */
820 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
821
822 /*
823  * Whether or not to send routing debugging information
824  * to the dht logging server
825  */
826 static unsigned int debug_routes;
827
828 /*
829  * Whether or not to send FULL route information to
830  * logging server
831  */
832 static unsigned int debug_routes_extended;
833
834 /*
835  * GNUNET_YES or GNUNET_NO, whether or not to act as
836  * a malicious node which drops all messages
837  */
838 static unsigned int malicious_dropper;
839
840 /*
841  * GNUNET_YES or GNUNET_NO, whether or not to act as
842  * a malicious node which sends out lots of GETS
843  */
844 static unsigned int malicious_getter;
845
846 /**
847  * GNUNET_YES or GNUNET_NO, whether or not to act as
848  * a malicious node which sends out lots of PUTS
849  */
850 static unsigned int malicious_putter;
851
852 /**
853  * Frequency for malicious get requests.
854  */
855 static unsigned long long malicious_get_frequency;
856
857 /**
858  * Frequency for malicious put requests.
859  */
860 static unsigned long long malicious_put_frequency;
861
862 /**
863  * Reply times for requests, if we are busy, don't send any
864  * more requests!
865  */
866 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
867
868 /**
869  * Current counter for replies.
870  */
871 static unsigned int reply_counter;
872
873 /**
874  * Our handle to the BLOCK library.
875  */
876 static struct GNUNET_BLOCK_Context *block_context;
877
878
879 /**
880  * Forward declaration.
881  */
882 static size_t 
883 send_generic_reply (void *cls, size_t size, void *buf);
884
885
886 /** Declare here so retry_core_send is aware of it */
887 static size_t 
888 core_transmit_notify (void *cls,
889                       size_t size, void *buf);
890
891 /**
892  * Convert unique ID to hash code.
893  *
894  * @param uid unique ID to convert
895  * @param hash set to uid (extended with zeros)
896  */
897 static void
898 hash_from_uid (uint64_t uid,
899                GNUNET_HashCode *hash)
900 {
901   memset (hash, 0, sizeof(GNUNET_HashCode));
902   *((uint64_t*)hash) = uid;
903 }
904
905 #if AVG
906 /**
907  * Calculate the average send time between messages so that we can
908  * ignore certain requests if we get too busy.
909  *
910  * @return the average time between asking core to send a message
911  *         and when the buffer for copying it is passed
912  */
913 static struct GNUNET_TIME_Relative get_average_send_delay()
914 {
915   unsigned int i;
916   unsigned int divisor;
917   struct GNUNET_TIME_Relative average_time;
918   average_time = GNUNET_TIME_relative_get_zero();
919   divisor = 0;
920   for (i = 0; i < MAX_REPLY_TIMES; i++)
921   {
922     average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]);
923     if (reply_times[i].value == (uint64_t)0)
924       continue;
925     else
926       divisor++;
927   }
928   if (divisor == 0)
929   {
930     return average_time;
931   }
932
933   average_time = GNUNET_TIME_relative_divide(average_time, divisor);
934   fprintf(stderr, "Avg send delay: %u sends is %llu\n", divisor, (long long unsigned int)average_time.value);
935   return average_time;
936 }
937 #endif
938
939 /**
940  * Given the largest send delay, artificially decrease it
941  * so the next time around we may have a chance at sending
942  * again.
943  */
944 static void decrease_max_send_delay(struct GNUNET_TIME_Relative max_time)
945 {
946   unsigned int i;
947   for (i = 0; i < MAX_REPLY_TIMES; i++)
948     {
949       if (reply_times[i].value == max_time.value)
950         {
951           reply_times[i].value = reply_times[i].value / 2;
952           return;
953         }
954     }
955 }
956
957 /**
958  * Find the maximum send time of the recently sent values.
959  *
960  * @return the average time between asking core to send a message
961  *         and when the buffer for copying it is passed
962  */
963 static struct GNUNET_TIME_Relative get_max_send_delay()
964 {
965   unsigned int i;
966   struct GNUNET_TIME_Relative max_time;
967   max_time = GNUNET_TIME_relative_get_zero();
968
969   for (i = 0; i < MAX_REPLY_TIMES; i++)
970   {
971     if (reply_times[i].value > max_time.value)
972       max_time.value = reply_times[i].value;
973   }
974
975   if (max_time.value > MAX_REQUEST_TIME.value)
976     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", (long long unsigned int)max_time.value);
977   return max_time;
978 }
979
980 static void
981 increment_stats(const char *value)
982 {
983   if (stats != NULL)
984     {
985       GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
986     }
987 }
988
989 /**
990  *  Try to send another message from our core send list
991  */
992 static void
993 try_core_send (void *cls,
994                const struct GNUNET_SCHEDULER_TaskContext *tc)
995 {
996   struct PeerInfo *peer = cls;
997   struct P2PPendingMessage *pending;
998   size_t ssize;
999
1000   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
1001
1002   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1003     return;
1004
1005   if (peer->th != NULL)
1006     return; /* Message send already in progress */
1007
1008   pending = peer->head;
1009   if (pending != NULL)
1010     {
1011       ssize = ntohs(pending->msg->size);
1012 #if DEBUG_DHT > 1
1013      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
1015                 "DHT", ssize, GNUNET_i2s(&peer->id));
1016 #endif
1017       pending->scheduled = GNUNET_TIME_absolute_get();
1018       reply_counter++;
1019       if (reply_counter >= MAX_REPLY_TIMES)
1020         reply_counter = 0;
1021       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
1022                                                    pending->timeout, &peer->id,
1023                                                    ssize, &core_transmit_notify, peer);
1024     }
1025 }
1026
1027 /**
1028  * Function called to send a request out to another peer.
1029  * Called both for locally initiated requests and those
1030  * received from other peers.
1031  *
1032  * @param cls DHT service closure argument
1033  * @param msg the encapsulated message
1034  * @param peer the peer to forward the message to
1035  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1036  */
1037 static void 
1038 forward_result_message (void *cls,
1039                         const struct GNUNET_MessageHeader *msg,
1040                         struct PeerInfo *peer,
1041                         struct DHT_MessageContext *msg_ctx)
1042 {
1043   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
1044   struct P2PPendingMessage *pending;
1045   size_t msize;
1046   size_t psize;
1047
1048   increment_stats(STAT_RESULT_FORWARDS);
1049   msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
1050   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1051   psize = sizeof(struct P2PPendingMessage) + msize;
1052   pending = GNUNET_malloc(psize);
1053   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1054   pending->importance = DHT_SEND_PRIORITY;
1055   pending->timeout = GNUNET_TIME_relative_get_forever();
1056   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
1057   result_message->header.size = htons(msize);
1058   result_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
1059   result_message->put_path_length = htons(0); /* FIXME: implement */
1060   result_message->get_path_length = htons(0); /* FIXME: implement */
1061   result_message->options = htonl(msg_ctx->msg_options);
1062   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
1063   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
1064   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1065   memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
1066   memcpy(&result_message[1], msg, ntohs(msg->size));
1067 #if DEBUG_DHT > 1
1068   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1069 #endif
1070   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1071   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1072     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1073 }
1074
1075
1076 /**
1077  * Called when core is ready to send a message we asked for
1078  * out to the destination.
1079  *
1080  * @param cls closure (NULL)
1081  * @param size number of bytes available in buf
1082  * @param buf where the callee should write the message
1083  * @return number of bytes written to buf
1084  */
1085 static size_t 
1086 core_transmit_notify (void *cls,
1087                       size_t size, void *buf)
1088 {
1089   struct PeerInfo *peer = cls;
1090   char *cbuf = buf;
1091   struct P2PPendingMessage *pending;
1092
1093   size_t off;
1094   size_t msize;
1095
1096   if (buf == NULL)
1097     {
1098       /* client disconnected */
1099 #if DEBUG_DHT
1100       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
1101 #endif
1102       return 0;
1103     }
1104
1105   if (peer->head == NULL)
1106     return 0;
1107
1108   peer->th = NULL;
1109   off = 0;
1110   pending = peer->head;
1111   reply_times[reply_counter] = GNUNET_TIME_absolute_get_difference(pending->scheduled, GNUNET_TIME_absolute_get());
1112   msize = ntohs(pending->msg->size);
1113   if (msize <= size)
1114     {
1115       off = msize;
1116       memcpy (cbuf, pending->msg, msize);
1117       GNUNET_CONTAINER_DLL_remove (peer->head,
1118                                    peer->tail,
1119                                    pending);
1120 #if DEBUG_DHT > 1
1121       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Removing pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1122 #endif
1123       GNUNET_free (pending);
1124     }
1125 #if SMART
1126   while (NULL != pending &&
1127           (size - off >= (msize = ntohs (pending->msg->size))))
1128     {
1129 #if DEBUG_DHT_ROUTING
1130       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d\n", my_short_id, "dht service", msize, size);
1131 #endif
1132       memcpy (&cbuf[off], pending->msg, msize);
1133       off += msize;
1134       GNUNET_CONTAINER_DLL_remove (peer->head,
1135                                    peer->tail,
1136                                    pending);
1137       GNUNET_free (pending);
1138       pending = peer->head;
1139     }
1140 #endif
1141   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
1142     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1143 #if DEBUG_DHT > 1
1144   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d, returning %d\n", my_short_id, "dht service", msize, size, off);
1145 #endif
1146   return off;
1147 }
1148
1149
1150 /**
1151  * Compute the distance between have and target as a 32-bit value.
1152  * Differences in the lower bits must count stronger than differences
1153  * in the higher bits.
1154  *
1155  * @return 0 if have==target, otherwise a number
1156  *           that is larger as the distance between
1157  *           the two hash codes increases
1158  */
1159 static unsigned int
1160 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1161 {
1162   unsigned int bucket;
1163   unsigned int msb;
1164   unsigned int lsb;
1165   unsigned int i;
1166
1167   /* We have to represent the distance between two 2^9 (=512)-bit
1168      numbers as a 2^5 (=32)-bit number with "0" being used for the
1169      two numbers being identical; furthermore, we need to
1170      guarantee that a difference in the number of matching
1171      bits is always represented in the result.
1172
1173      We use 2^32/2^9 numerical values to distinguish between
1174      hash codes that have the same LSB bit distance and
1175      use the highest 2^9 bits of the result to signify the
1176      number of (mis)matching LSB bits; if we have 0 matching
1177      and hence 512 mismatching LSB bits we return -1 (since
1178      512 itself cannot be represented with 9 bits) */
1179
1180   /* first, calculate the most significant 9 bits of our
1181      result, aka the number of LSBs */
1182   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1183   /* bucket is now a value between 0 and 512 */
1184   if (bucket == 512)
1185     return 0;                   /* perfect match */
1186   if (bucket == 0)
1187     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1188                                    below, we'd end up with max+1 (overflow)) */
1189
1190   /* calculate the most significant bits of the final result */
1191   msb = (512 - bucket) << (32 - 9);
1192   /* calculate the 32-9 least significant bits of the final result by
1193      looking at the differences in the 32-9 bits following the
1194      mismatching bit at 'bucket' */
1195   lsb = 0;
1196   for (i = bucket + 1;
1197        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1198     {
1199       if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
1200         lsb |= (1 << (bucket + 32 - 9 - i));    /* first bit set will be 10,
1201                                                    last bit set will be 31 -- if
1202                                                    i does not reach 512 first... */
1203     }
1204   return msb | lsb;
1205 }
1206
1207 /**
1208  * Return a number that is larger the closer the
1209  * "have" GNUNET_hash code is to the "target".
1210  *
1211  * @return inverse distance metric, non-zero.
1212  *         Must fudge the value if NO bits match.
1213  */
1214 static unsigned int
1215 inverse_distance (const GNUNET_HashCode * target,
1216                   const GNUNET_HashCode * have)
1217 {
1218   if (GNUNET_CRYPTO_hash_matching_bits(target, have) == 0)
1219     return 1; /* Never return 0! */
1220   return ((unsigned int) -1) - distance (target, have);
1221 }
1222
1223 /**
1224  * Find the optimal bucket for this key, regardless
1225  * of the current number of buckets in use.
1226  *
1227  * @param hc the hashcode to compare our identity to
1228  *
1229  * @return the proper bucket index, or GNUNET_SYSERR
1230  *         on error (same hashcode)
1231  */
1232 static int find_bucket(const GNUNET_HashCode *hc)
1233 {
1234   unsigned int bits;
1235
1236   bits = GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, hc);
1237   if (bits == MAX_BUCKETS)
1238     return GNUNET_SYSERR;
1239   return MAX_BUCKETS - bits - 1;
1240 }
1241
1242 /**
1243  * Find which k-bucket this peer should go into,
1244  * taking into account the size of the k-bucket
1245  * array.  This means that if more bits match than
1246  * there are currently buckets, lowest_bucket will
1247  * be returned.
1248  *
1249  * @param hc GNUNET_HashCode we are finding the bucket for.
1250  *
1251  * @return the proper bucket index for this key,
1252  *         or GNUNET_SYSERR on error (same hashcode)
1253  */
1254 static int find_current_bucket(const GNUNET_HashCode *hc)
1255 {
1256   int actual_bucket;
1257   actual_bucket = find_bucket(hc);
1258
1259   if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
1260     return lowest_bucket;
1261   else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
1262     return lowest_bucket;
1263   else
1264     return actual_bucket;
1265 }
1266
1267 #if EXTRA_CHECKS
1268 /**
1269  * Find a routing table entry from a peer identity
1270  *
1271  * @param peer the peer to look up
1272  *
1273  * @return the bucket number holding the peer, GNUNET_SYSERR if not found
1274  */
1275 static int
1276 find_bucket_by_peer(const struct PeerInfo *peer)
1277 {
1278   int bucket;
1279   struct PeerInfo *pos;
1280
1281   for (bucket = lowest_bucket; bucket < MAX_BUCKETS - 1; bucket++)
1282     {
1283       pos = k_buckets[bucket].head;
1284       while (pos != NULL)
1285         {
1286           if (peer == pos)
1287             return bucket;
1288           pos = pos->next;
1289         }
1290     }
1291
1292   return GNUNET_SYSERR; /* No such peer. */
1293 }
1294 #endif
1295
1296 #if PRINT_TABLES
1297 /**
1298  * Print the complete routing table for this peer.
1299  */
1300 static void
1301 print_routing_table ()
1302 {
1303   int bucket;
1304   struct PeerInfo *pos;
1305   char char_buf[30000];
1306   int char_pos;
1307   memset(char_buf, 0, sizeof(char_buf));
1308   char_pos = 0;
1309   char_pos += sprintf(&char_buf[char_pos], "Printing routing table for peer %s\n", my_short_id);
1310   //fprintf(stderr, "Printing routing table for peer %s\n", my_short_id);
1311   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1312     {
1313       pos = k_buckets[bucket].head;
1314       char_pos += sprintf(&char_buf[char_pos], "Bucket %d:\n", bucket);
1315       //fprintf(stderr, "Bucket %d:\n", bucket);
1316       while (pos != NULL)
1317         {
1318           //fprintf(stderr, "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1319           char_pos += sprintf(&char_buf[char_pos], "\tPeer %s, best bucket %d, %d bits match\n", GNUNET_i2s(&pos->id), find_bucket(&pos->id.hashPubKey), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey));
1320           pos = pos->next;
1321         }
1322     }
1323   fprintf(stderr, "%s", char_buf);
1324   fflush(stderr);
1325 }
1326 #endif
1327
1328 /**
1329  * Find a routing table entry from a peer identity
1330  *
1331  * @param peer the peer identity to look up
1332  *
1333  * @return the routing table entry, or NULL if not found
1334  */
1335 static struct PeerInfo *
1336 find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
1337 {
1338   int bucket;
1339   struct PeerInfo *pos;
1340   bucket = find_current_bucket(&peer->hashPubKey);
1341
1342   if (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity)))
1343     return NULL;
1344
1345   pos = k_buckets[bucket].head;
1346   while (pos != NULL)
1347     {
1348       if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
1349         return pos;
1350       pos = pos->next;
1351     }
1352   return NULL; /* No such peer. */
1353 }
1354
1355 /* Forward declaration */
1356 static void
1357 update_core_preference (void *cls,
1358                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1359 /**
1360  * Function called with statistics about the given peer.
1361  *
1362  * @param cls closure
1363  * @param peer identifies the peer
1364  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1365  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1366  * @param amount set to the amount that was actually reserved or unreserved;
1367  *               either the full requested amount or zero (no partial reservations)
1368  * @param preference current traffic preference for the given peer
1369  */
1370 static void
1371 update_core_preference_finish (void *cls,
1372                                const struct GNUNET_PeerIdentity * peer,
1373                                struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1374                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1375                                int amount, uint64_t preference)
1376 {
1377   struct PeerInfo *peer_info = cls;
1378   peer_info->info_ctx = NULL;
1379   GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PREFERENCE_INTERVAL, &update_core_preference, peer_info);
1380 }
1381
1382 static void
1383 update_core_preference (void *cls,
1384                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1385 {
1386   struct PeerInfo *peer = cls;
1387   uint64_t preference;
1388   unsigned int matching;
1389   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1390     {
1391       return;
1392     }
1393   matching = GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, &peer->id.hashPubKey);
1394   if (matching >= 64)
1395     {
1396       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer identifier matches by %u bits, only shifting as much as we can!\n", matching);
1397       matching = 63;
1398     }
1399   preference = 1LL << matching;
1400   peer->info_ctx = GNUNET_CORE_peer_change_preference (sched, cfg,
1401                                                        &peer->id,
1402                                                        GNUNET_TIME_relative_get_forever(),
1403                                                        GNUNET_BANDWIDTH_value_init (UINT32_MAX),
1404                                                        0,
1405                                                        preference,
1406                                                        &update_core_preference_finish,
1407                                                        peer);
1408 }
1409
1410 /**
1411  * Really add a peer to a bucket (only do assertions
1412  * on size, etc.)
1413  *
1414  * @param peer GNUNET_PeerIdentity of the peer to add
1415  * @param bucket the already figured out bucket to add
1416  *        the peer to
1417  * @param latency the core reported latency of this peer
1418  * @param distance the transport level distance to this peer
1419  *
1420  * @return the newly added PeerInfo
1421  */
1422 static struct PeerInfo *
1423 add_peer(const struct GNUNET_PeerIdentity *peer,
1424          unsigned int bucket,
1425          struct GNUNET_TIME_Relative latency,
1426          unsigned int distance)
1427 {
1428   struct PeerInfo *new_peer;
1429   GNUNET_assert(bucket < MAX_BUCKETS);
1430   GNUNET_assert(peer != NULL);
1431   new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
1432   new_peer->latency = latency;
1433   new_peer->distance = distance;
1434
1435   memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
1436
1437   GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
1438                                     k_buckets[bucket].tail,
1439                                     k_buckets[bucket].tail,
1440                                     new_peer);
1441   k_buckets[bucket].peers_size++;
1442
1443   if ((GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, &peer->hashPubKey) > 0) && (k_buckets[bucket].peers_size <= bucket_size))
1444     {
1445 #if DO_UPDATE_PREFERENCE
1446       new_peer->preference_task = GNUNET_SCHEDULER_add_now(sched, &update_core_preference, new_peer);
1447 #endif
1448     }
1449
1450   return new_peer;
1451 }
1452
1453 /**
1454  * Given a peer and its corresponding bucket,
1455  * remove it from that bucket.  Does not free
1456  * the PeerInfo struct, nor cancel messages
1457  * or free messages waiting to be sent to this
1458  * peer!
1459  *
1460  * @param peer the peer to remove
1461  * @param bucket the bucket the peer belongs to
1462  */
1463 static void remove_peer (struct PeerInfo *peer,
1464                          unsigned int bucket)
1465 {
1466   GNUNET_assert(k_buckets[bucket].peers_size > 0);
1467   GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
1468                               k_buckets[bucket].tail,
1469                               peer);
1470   k_buckets[bucket].peers_size--;
1471 #if CHANGE_LOWEST
1472   if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) && (lowest_bucket < MAX_BUCKETS - 1))
1473     lowest_bucket++;
1474 #endif
1475 }
1476
1477 /**
1478  * Removes peer from a bucket, then frees associated
1479  * resources and frees peer.
1480  *
1481  * @param peer peer to be removed and freed
1482  * @param bucket which bucket this peer belongs to
1483  */
1484 static void delete_peer (struct PeerInfo *peer,
1485                          unsigned int bucket)
1486 {
1487   struct P2PPendingMessage *pos;
1488   struct P2PPendingMessage *next;
1489 #if EXTRA_CHECKS
1490   struct PeerInfo *peer_pos;
1491
1492   peer_pos = k_buckets[bucket].head;
1493   while ((peer_pos != NULL) && (peer_pos != peer))
1494     peer_pos = peer_pos->next;
1495   if (peer_pos == NULL)
1496     {
1497       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Expected peer `%s' in bucket %d\n", my_short_id, "DHT", GNUNET_i2s(&peer->id), bucket);
1498       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s: Lowest bucket: %d, find_current_bucket: %d, peer resides in bucket: %d\n", my_short_id, "DHT", lowest_bucket, find_current_bucket(&peer->id.hashPubKey), find_bucket_by_peer(peer));
1499     }
1500   GNUNET_assert(peer_pos != NULL);
1501 #endif
1502   remove_peer(peer, bucket); /* First remove the peer from its bucket */
1503
1504   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
1505     GNUNET_SCHEDULER_cancel(sched, peer->send_task);
1506   if (peer->th != NULL)
1507     GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
1508
1509   pos = peer->head;
1510   while (pos != NULL) /* Remove any pending messages for this peer */
1511     {
1512       next = pos->next;
1513       GNUNET_free(pos);
1514       pos = next;
1515     }
1516
1517   GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey));
1518   GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer);
1519   GNUNET_free(peer);
1520 }
1521
1522
1523 /**
1524  * Iterator over hash map entries.
1525  *
1526  * @param cls closure
1527  * @param key current key code
1528  * @param value PeerInfo of the peer to move to new lowest bucket
1529  * @return GNUNET_YES if we should continue to
1530  *         iterate,
1531  *         GNUNET_NO if not.
1532  */
1533 static int move_lowest_bucket (void *cls,
1534                                const GNUNET_HashCode * key,
1535                                void *value)
1536 {
1537   struct PeerInfo *peer = value;
1538   int new_bucket;
1539
1540   GNUNET_assert(lowest_bucket > 0);
1541   new_bucket = lowest_bucket - 1;
1542   remove_peer(peer, lowest_bucket);
1543   GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
1544                                     k_buckets[new_bucket].tail,
1545                                     k_buckets[new_bucket].tail,
1546                                     peer);
1547   k_buckets[new_bucket].peers_size++;
1548   return GNUNET_YES;
1549 }
1550
1551
1552 /**
1553  * The current lowest bucket is full, so change the lowest
1554  * bucket to the next lower down, and move any appropriate
1555  * entries in the current lowest bucket to the new bucket.
1556  */
1557 static void enable_next_bucket()
1558 {
1559   struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1560   struct PeerInfo *pos;
1561   GNUNET_assert(lowest_bucket > 0);
1562   to_remove = GNUNET_CONTAINER_multihashmap_create(bucket_size);
1563   pos = k_buckets[lowest_bucket].head;
1564
1565 #if PRINT_TABLES
1566   fprintf(stderr, "Printing RT before new bucket\n");
1567   print_routing_table();
1568 #endif
1569   /* Populate the array of peers which should be in the next lowest bucket */
1570   while (pos != NULL)
1571     {
1572       if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
1573         GNUNET_CONTAINER_multihashmap_put(to_remove, &pos->id.hashPubKey, pos, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1574       pos = pos->next;
1575     }
1576
1577   /* Remove peers from lowest bucket, insert into next lowest bucket */
1578   GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL);
1579   GNUNET_CONTAINER_multihashmap_destroy(to_remove);
1580   lowest_bucket = lowest_bucket - 1;
1581 #if PRINT_TABLES
1582   fprintf(stderr, "Printing RT after new bucket\n");
1583   print_routing_table();
1584 #endif
1585 }
1586
1587 /**
1588  * Find the closest peer in our routing table to the
1589  * given hashcode.
1590  *
1591  * @return The closest peer in our routing table to the
1592  *         key, or NULL on error.
1593  */
1594 static struct PeerInfo *
1595 find_closest_peer (const GNUNET_HashCode *hc)
1596 {
1597   struct PeerInfo *pos;
1598   struct PeerInfo *current_closest;
1599   unsigned int lowest_distance;
1600   unsigned int temp_distance;
1601   int bucket;
1602   int count;
1603
1604   lowest_distance = -1;
1605
1606   if (k_buckets[lowest_bucket].peers_size == 0)
1607     return NULL;
1608
1609   current_closest = NULL;
1610   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1611     {
1612       pos = k_buckets[bucket].head;
1613       count = 0;
1614       while ((pos != NULL) && (count < bucket_size))
1615         {
1616           temp_distance = distance(&pos->id.hashPubKey, hc);
1617           if (temp_distance <= lowest_distance)
1618             {
1619               lowest_distance = temp_distance;
1620               current_closest = pos;
1621             }
1622           pos = pos->next;
1623           count++;
1624         }
1625     }
1626   GNUNET_assert(current_closest != NULL);
1627   return current_closest;
1628 }
1629
1630
1631 /**
1632  * Function called to send a request out to another peer.
1633  * Called both for locally initiated requests and those
1634  * received from other peers.
1635  *
1636  * @param cls DHT service closure argument (unused)
1637  * @param msg the encapsulated message
1638  * @param peer the peer to forward the message to
1639  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1640  */
1641 static void forward_message (void *cls,
1642                              const struct GNUNET_MessageHeader *msg,
1643                              struct PeerInfo *peer,
1644                              struct DHT_MessageContext *msg_ctx)
1645 {
1646   struct GNUNET_DHT_P2PRouteMessage *route_message;
1647   struct P2PPendingMessage *pending;
1648   size_t msize;
1649   size_t psize;
1650
1651   increment_stats(STAT_ROUTE_FORWARDS);
1652
1653   if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(&msg_ctx->key)))
1654     increment_stats(STAT_ROUTE_FORWARDS_CLOSEST);
1655
1656   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1657   GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1658   psize = sizeof(struct P2PPendingMessage) + msize;
1659   pending = GNUNET_malloc(psize);
1660   pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1661   pending->importance = msg_ctx->importance;
1662   pending->timeout = msg_ctx->timeout;
1663   route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1664   route_message->header.size = htons(msize);
1665   route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1666   route_message->options = htonl(msg_ctx->msg_options);
1667   route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1668   route_message->network_size = htonl(msg_ctx->network_size);
1669   route_message->desired_replication_level = htonl(msg_ctx->replication);
1670   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1671   if (msg_ctx->bloom != NULL)
1672     GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1673   memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
1674   memcpy(&route_message[1], msg, ntohs(msg->size));
1675 #if DEBUG_DHT > 1
1676   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1677 #endif
1678   GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1679   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1680     peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1681 }
1682
1683 #if DO_PING
1684 /**
1685  * Task used to send ping messages to peers so that
1686  * they don't get disconnected.
1687  *
1688  * @param cls the peer to send a ping message to
1689  * @param tc context, reason, etc.
1690  */
1691 static void
1692 periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1693 {
1694   struct PeerInfo *peer = cls;
1695   struct GNUNET_MessageHeader ping_message;
1696   struct DHT_MessageContext message_context;
1697
1698   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1699     return;
1700
1701   ping_message.size = htons(sizeof(struct GNUNET_MessageHeader));
1702   ping_message.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_PING);
1703
1704   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
1705 #if DEBUG_PING
1706   GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n", my_short_id, "DHT", GNUNET_i2s(&peer->id));
1707 #endif
1708   forward_message(NULL, &ping_message, peer, &message_context);
1709   peer->ping_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PING_DELAY, &periodic_ping_task, peer);
1710 }
1711
1712 /**
1713  * Schedule PING messages for the top X peers in each
1714  * bucket of the routing table (so core won't disconnect them!)
1715  */
1716 void schedule_ping_messages()
1717 {
1718   unsigned int bucket;
1719   unsigned int count;
1720   struct PeerInfo *pos;
1721   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1722     {
1723       pos = k_buckets[bucket].head;
1724       count = 0;
1725       while (pos != NULL)
1726         {
1727           if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK))
1728             GNUNET_SCHEDULER_add_now(sched, &periodic_ping_task, pos);
1729           else if ((count >= bucket_size) && (pos->ping_task != GNUNET_SCHEDULER_NO_TASK))
1730             {
1731               GNUNET_SCHEDULER_cancel(sched, pos->ping_task);
1732               pos->ping_task = GNUNET_SCHEDULER_NO_TASK;
1733             }
1734           pos = pos->next;
1735           count++;
1736         }
1737     }
1738 }
1739 #endif
1740
1741 /**
1742  * Attempt to add a peer to our k-buckets.
1743  *
1744  * @param peer the peer identity of the peer being added
1745  * @param bucket the bucket that we want this peer to go in
1746  * @param latency transport latency of this peer
1747  * @param distance transport distance to this peer
1748  *
1749  * @return NULL if the peer was not added,
1750  *         pointer to PeerInfo for new peer otherwise
1751  */
1752 static struct PeerInfo *
1753 try_add_peer(const struct GNUNET_PeerIdentity *peer,
1754              unsigned int bucket,
1755              struct GNUNET_TIME_Relative latency,
1756              unsigned int distance)
1757 {
1758   int peer_bucket;
1759   struct PeerInfo *new_peer;
1760
1761   if (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity)))
1762     return NULL;
1763
1764   peer_bucket = find_current_bucket(&peer->hashPubKey);
1765
1766   GNUNET_assert(peer_bucket >= lowest_bucket);
1767   new_peer = add_peer(peer, peer_bucket, latency, distance);
1768
1769   if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
1770     enable_next_bucket();
1771 #if DO_PING
1772   schedule_ping_messages();
1773 #endif
1774   return new_peer;
1775 }
1776
1777
1778 /**
1779  * Task run to check for messages that need to be sent to a client.
1780  *
1781  * @param client a ClientList, containing the client and any messages to be sent to it
1782  */
1783 static void
1784 process_pending_messages (struct ClientList *client)
1785
1786   if (client->pending_head == NULL) 
1787     return;    
1788   if (client->transmit_handle != NULL) 
1789     return;
1790
1791   client->transmit_handle =
1792     GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1793                                          ntohs (client->pending_head->msg->
1794                                                 size),
1795                                          GNUNET_TIME_UNIT_FOREVER_REL,
1796                                          &send_generic_reply, client);
1797 }
1798
1799 /**
1800  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1801  * request.  A ClientList is passed as closure, take the head of the list
1802  * and copy it into buf, which has the result of sending the message to the
1803  * client.
1804  *
1805  * @param cls closure to this call
1806  * @param size maximum number of bytes available to send
1807  * @param buf where to copy the actual message to
1808  *
1809  * @return the number of bytes actually copied, 0 indicates failure
1810  */
1811 static size_t
1812 send_generic_reply (void *cls, size_t size, void *buf)
1813 {
1814   struct ClientList *client = cls;
1815   char *cbuf = buf;
1816   struct PendingMessage *reply;
1817   size_t off;
1818   size_t msize;
1819
1820   client->transmit_handle = NULL;
1821   if (buf == NULL)             
1822     {
1823       /* client disconnected */
1824       return 0;
1825     }
1826   off = 0;
1827   while ( (NULL != (reply = client->pending_head)) &&
1828           (size >= off + (msize = ntohs (reply->msg->size))))
1829     {
1830       GNUNET_CONTAINER_DLL_remove (client->pending_head,
1831                                    client->pending_tail,
1832                                    reply);
1833       memcpy (&cbuf[off], reply->msg, msize);
1834       GNUNET_free (reply);
1835       off += msize;
1836     }
1837   process_pending_messages (client);
1838 #if DEBUG_DHT
1839   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1840               "Transmitted %u bytes of replies to client\n",
1841               (unsigned int) off);
1842 #endif
1843   return off;
1844 }
1845
1846
1847 /**
1848  * Add a PendingMessage to the clients list of messages to be sent
1849  *
1850  * @param client the active client to send the message to
1851  * @param pending_message the actual message to send
1852  */
1853 static void
1854 add_pending_message (struct ClientList *client,
1855                      struct PendingMessage *pending_message)
1856 {
1857   GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
1858                                      client->pending_tail,
1859                                      client->pending_tail,
1860                                      pending_message);
1861   process_pending_messages (client);
1862 }
1863
1864
1865
1866
1867 /**
1868  * Called when a reply needs to be sent to a client, as
1869  * a result it found to a GET or FIND PEER request.
1870  *
1871  * @param client the client to send the reply to
1872  * @param message the encapsulated message to send
1873  * @param uid the unique identifier of this request
1874  */
1875 static void
1876 send_reply_to_client (struct ClientList *client,
1877                       const struct GNUNET_MessageHeader *message,
1878                       unsigned long long uid,
1879                       const GNUNET_HashCode *key)
1880 {
1881   struct GNUNET_DHT_RouteResultMessage *reply;
1882   struct PendingMessage *pending_message;
1883   uint16_t msize;
1884   size_t tsize;
1885 #if DEBUG_DHT
1886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1887               "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
1888 #endif
1889   msize = ntohs (message->size);
1890   tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
1891   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1892     {
1893       GNUNET_break_op (0);
1894       return;
1895     }
1896   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1897   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
1898   reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
1899   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1900   reply->header.size = htons (tsize);
1901   reply->put_path_length = htons(0); /* FIXME: implement */
1902   reply->get_path_length = htons(0); /* FIXME: implement */
1903   reply->unique_id = GNUNET_htonll (uid);
1904   reply->key = *key;
1905   memcpy (&reply[1], message, msize);
1906   add_pending_message (client, pending_message);
1907 }
1908
1909 /**
1910  * Consider whether or not we would like to have this peer added to
1911  * our routing table.  Check whether bucket for this peer is full,
1912  * if so return negative; if not return positive.  Since peers are
1913  * only added on CORE level connect, this doesn't actually add the
1914  * peer to the routing table.
1915  *
1916  * @param peer the peer we are considering adding
1917  *
1918  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1919  *         already full)
1920  */
1921 static int consider_peer (struct GNUNET_PeerIdentity *peer)
1922 {
1923   int bucket;
1924
1925   if ((GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey)) || (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity))))
1926     return GNUNET_NO; /* We already know this peer (are connected even!) */
1927   bucket = find_current_bucket(&peer->hashPubKey);
1928
1929   if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1930     return GNUNET_YES;
1931
1932   return GNUNET_NO;
1933 }
1934
1935 /**
1936  * Main function that handles whether or not to route a result
1937  * message to other peers, or to send to our local client.
1938  *
1939  * @param cls closure (unused, always should be NULL)
1940  * @param msg the result message to be routed
1941  * @param message_context context of the message we are routing
1942  *
1943  * @return the number of peers the message was routed to,
1944  *         GNUNET_SYSERR on failure
1945  */
1946 static int route_result_message(void *cls,
1947                                 struct GNUNET_MessageHeader *msg,
1948                                 struct DHT_MessageContext *message_context)
1949 {
1950   struct GNUNET_PeerIdentity new_peer;
1951   struct DHTQueryRecord *record;
1952   struct DHTRouteSource *pos;
1953   struct PeerInfo *peer_info;
1954   const struct GNUNET_MessageHeader *hello_msg;
1955
1956   increment_stats(STAT_RESULTS);
1957   /**
1958    * If a find peer result message is received and contains a valid
1959    * HELLO for another peer, offer it to the transport service.
1960    */
1961   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1962     {
1963       if (ntohs(msg->size) <= sizeof(struct GNUNET_MessageHeader))
1964         GNUNET_break_op(0);
1965
1966       hello_msg = &msg[1];
1967       if ((ntohs(hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) || (GNUNET_SYSERR == GNUNET_HELLO_get_id((const struct GNUNET_HELLO_Message *)hello_msg, &new_peer)))
1968       {
1969         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Received non-HELLO message type in find peer result message!\n", my_short_id, "DHT");
1970         GNUNET_break_op(0);
1971         return GNUNET_NO;
1972       }
1973       else /* We have a valid hello, and peer id stored in new_peer */
1974       {
1975         find_peer_context.count++;
1976         increment_stats(STAT_FIND_PEER_REPLY);
1977         if (GNUNET_YES == consider_peer(&new_peer))
1978         {
1979           increment_stats(STAT_HELLOS_PROVIDED);
1980           GNUNET_TRANSPORT_offer_hello(transport_handle, hello_msg);
1981           GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5), &new_peer, NULL, NULL);
1982         }
1983       }
1984     }
1985
1986   if (malicious_dropper == GNUNET_YES)
1987     record = NULL;
1988   else
1989     record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &message_context->key);
1990
1991   if (record == NULL) /* No record of this message! */
1992     {
1993 #if DEBUG_DHT
1994     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1995                 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
1996                 "DHT", GNUNET_h2s (&message_context->key), message_context->unique_id);
1997 #endif
1998 #if DEBUG_DHT_ROUTING
1999       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2000         {
2001           dhtlog_handle->insert_route (NULL,
2002                                        message_context->unique_id,
2003                                        DHTLOG_RESULT,
2004                                        message_context->hop_count,
2005                                        GNUNET_SYSERR,
2006                                        &my_identity,
2007                                        &message_context->key,
2008                                        message_context->peer, NULL);
2009         }
2010 #endif
2011       if (message_context->bloom != NULL)
2012         {
2013           GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2014           message_context->bloom = NULL;
2015         }
2016       return 0;
2017     }
2018
2019   pos = record->head;
2020   while (pos != NULL)
2021     {
2022 #if STRICT_FORWARDING
2023       if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) /* If we have already forwarded this peer id, don't do it again! */
2024         {
2025           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pos->find_peers_responded, &new_peer.hashPubKey))
2026           {
2027             increment_stats("# find peer responses NOT forwarded (bloom match)");
2028             pos = pos->next;
2029             continue;
2030           }
2031           else
2032             GNUNET_CONTAINER_bloomfilter_add(pos->find_peers_responded, &new_peer.hashPubKey);
2033         }
2034 #endif
2035
2036       if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
2037         {
2038 #if DEBUG_DHT
2039           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2040                       "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
2041                       "DHT", GNUNET_h2s (&message_context->key), message_context->unique_id);
2042 #endif
2043 #if DEBUG_DHT_ROUTING
2044           if ((debug_routes_extended) && (dhtlog_handle != NULL))
2045             {
2046               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
2047                                            message_context->hop_count,
2048                                            GNUNET_YES, &my_identity, &message_context->key,
2049                                            message_context->peer, NULL);
2050             }
2051 #endif
2052           increment_stats(STAT_RESULTS_TO_CLIENT);
2053           if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
2054             increment_stats(STAT_GET_REPLY);
2055
2056           send_reply_to_client(pos->client, msg, 
2057                                message_context->unique_id,
2058                                &message_context->key);
2059         }
2060       else /* Send to peer */
2061         {
2062           peer_info = find_peer_by_id(&pos->source);
2063           if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
2064             {
2065               pos = pos->next;
2066               continue;
2067             }
2068
2069           if (message_context->bloom == NULL)
2070             message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2071           GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
2072           if ((GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (message_context->bloom, &peer_info->id.hashPubKey)))
2073             {
2074 #if DEBUG_DHT
2075               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2076                           "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
2077                           "DHT", GNUNET_h2s (&message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
2078 #endif
2079 #if DEBUG_DHT_ROUTING
2080               if ((debug_routes_extended) && (dhtlog_handle != NULL))
2081                 {
2082                   dhtlog_handle->insert_route (NULL, message_context->unique_id,
2083                                                DHTLOG_RESULT,
2084                                                message_context->hop_count,
2085                                                GNUNET_NO, &my_identity, &message_context->key,
2086                                                message_context->peer, &pos->source);
2087                 }
2088 #endif
2089               forward_result_message(cls, msg, peer_info, message_context);
2090             }
2091           else
2092             {
2093 #if DEBUG_DHT
2094               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2095                           "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
2096                           "DHT", GNUNET_h2s (&message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
2097 #endif
2098             }
2099         }
2100       pos = pos->next;
2101     }
2102   if (message_context->bloom != NULL)
2103     GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2104   return 0;
2105 }
2106
2107 /**
2108  * Iterator for local get request results,
2109  *
2110  * @param cls closure for iterator, a DatacacheGetContext
2111  * @param exp when does this value expire?
2112  * @param key the key this data is stored under
2113  * @param size the size of the data identified by key
2114  * @param data the actual data
2115  * @param type the type of the data
2116  *
2117  * @return GNUNET_OK to continue iteration, anything else
2118  * to stop iteration.
2119  */
2120 static int
2121 datacache_get_iterator (void *cls,
2122                         struct GNUNET_TIME_Absolute exp,
2123                         const GNUNET_HashCode * key,
2124                         uint32_t size, const char *data, uint32_t type)
2125 {
2126   struct DHT_MessageContext *msg_ctx = cls;
2127   struct DHT_MessageContext *new_msg_ctx;
2128   struct GNUNET_DHT_GetResultMessage *get_result;
2129 #if DEBUG_DHT
2130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2131               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
2132 #endif
2133   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
2134   memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
2135   get_result =
2136     GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
2137   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2138   get_result->header.size =
2139     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
2140   get_result->expiration = GNUNET_TIME_absolute_hton(exp);
2141   get_result->type = htons (type);
2142   memcpy (&get_result[1], data, size);
2143   new_msg_ctx->peer = &my_identity;
2144   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2145   new_msg_ctx->hop_count = 0;
2146   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make result routing a higher priority */
2147   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2148   increment_stats(STAT_GET_RESPONSE_START);
2149   route_result_message(cls, &get_result->header, new_msg_ctx);
2150   GNUNET_free(new_msg_ctx);
2151   //send_reply_to_client (datacache_get_ctx->client, &get_result->header,
2152   //                      datacache_get_ctx->unique_id);
2153   GNUNET_free (get_result);
2154   return GNUNET_OK;
2155 }
2156
2157
2158 /**
2159  * Server handler for all dht get requests, look for data,
2160  * if found, send response either to clients or other peers.
2161  *
2162  * @param cls closure for service
2163  * @param msg the actual get message
2164  * @param message_context struct containing pertinent information about the get request
2165  *
2166  * @return number of items found for GET request
2167  */
2168 static unsigned int
2169 handle_dht_get (void *cls, 
2170                 const struct GNUNET_MessageHeader *msg,
2171                 struct DHT_MessageContext *message_context)
2172 {
2173   const struct GNUNET_DHT_GetMessage *get_msg;
2174   uint16_t get_type;
2175   uint16_t bf_size;
2176   uint16_t msize;
2177   uint16_t xquery_size;
2178   unsigned int results;
2179   struct GNUNET_CONTAINER_BloomFilter *bf;
2180   const void *xquery;
2181   const char *end;
2182
2183   msize = ntohs (msg->size);
2184   if (msize < sizeof (struct GNUNET_DHT_GetMessage))
2185     {
2186       GNUNET_break (0);
2187       return 0;
2188     }
2189   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
2190   bf_size = ntohs (get_msg->bf_size);
2191   xquery_size = ntohs (get_msg->xquery_size);
2192   if (msize != sizeof (struct GNUNET_DHT_GetMessage) + bf_size + xquery_size)
2193     {
2194       GNUNET_break (0);
2195       return 0;
2196     }
2197   end = (const char*) &get_msg[1];
2198   if (xquery_size == 0)
2199     {
2200       xquery = NULL;
2201     }
2202   else
2203     {
2204       xquery = (const void*) end;
2205       end += xquery_size;
2206     }
2207   if (bf_size == 0)
2208     {
2209       bf = NULL;
2210     }
2211   else
2212     {
2213       bf = GNUNET_CONTAINER_bloomfilter_init (end,
2214                                               bf_size,
2215                                               GNUNET_DHT_GET_BLOOMFILTER_K);
2216     }
2217
2218   get_type = ntohs (get_msg->type);
2219 #if DEBUG_DHT
2220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2221               "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
2222               my_short_id,
2223               "DHT", "GET", 
2224               get_type,
2225               GNUNET_h2s (&message_context->key),
2226               message_context->unique_id);
2227 #endif
2228   increment_stats(STAT_GETS);
2229   results = 0;
2230 #if HAVE_MALICIOUS
2231   if (get_type == DHT_MALICIOUS_MESSAGE_TYPE)
2232     {
2233       GNUNET_CONTAINER_bloomfilter_free (bf);
2234       return results;
2235     }
2236 #endif
2237   /* FIXME: put xquery / bf into message_context and use
2238      them for processing! */
2239   if (datacache != NULL)
2240     results
2241       = GNUNET_DATACACHE_get (datacache,
2242                               &message_context->key, get_type,
2243                               &datacache_get_iterator,
2244                               message_context);
2245
2246   if (results >= 1)
2247     {
2248 #if DEBUG_DHT
2249       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2250                   "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
2251                   results, "GET", message_context->unique_id);
2252 #endif
2253 #if DEBUG_DHT_ROUTING
2254       if ((debug_routes) && (dhtlog_handle != NULL))
2255         {
2256           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
2257                                 message_context->hop_count, GNUNET_YES, &my_identity,
2258                                 &message_context->key);
2259         }
2260
2261       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2262         {
2263           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2264                                        message_context->hop_count, GNUNET_YES,
2265                                        &my_identity, &message_context->key, message_context->peer,
2266                                        NULL);
2267         }
2268 #endif
2269     }
2270
2271   if (message_context->hop_count == 0) /* Locally initiated request */
2272     {
2273 #if DEBUG_DHT_ROUTING
2274     if ((debug_routes) && (dhtlog_handle != NULL))
2275       {
2276         dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
2277                                       message_context->hop_count, GNUNET_NO, &my_identity,
2278                                       &message_context->key);
2279       }
2280 #endif
2281     }
2282   GNUNET_CONTAINER_bloomfilter_free (bf);
2283   return results;
2284 }
2285
2286 static void
2287 remove_recent_find_peer(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2288 {
2289   GNUNET_HashCode *key = cls;
2290   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent_find_peer_requests, key, key))
2291     {
2292       GNUNET_free(key);
2293     }
2294 }
2295
2296 /**
2297  * Server handler for initiating local dht find peer requests
2298  *
2299  * @param cls closure for service
2300  * @param find_msg the actual find peer message
2301  * @param message_context struct containing pertinent information about the request
2302  *
2303  */
2304 static void
2305 handle_dht_find_peer (void *cls,
2306                       const struct GNUNET_MessageHeader *find_msg,
2307                       struct DHT_MessageContext *message_context)
2308 {
2309   struct GNUNET_MessageHeader *find_peer_result;
2310   struct GNUNET_DHT_FindPeerMessage *find_peer_message;
2311   struct DHT_MessageContext *new_msg_ctx;
2312   struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
2313   size_t hello_size;
2314   size_t tsize;
2315   GNUNET_HashCode *recent_hash;
2316   struct GNUNET_MessageHeader *other_hello;
2317   size_t other_hello_size;
2318   struct GNUNET_PeerIdentity peer_id;
2319
2320   find_peer_message = (struct GNUNET_DHT_FindPeerMessage *)find_msg;
2321   GNUNET_break_op(ntohs(find_msg->size) >= (sizeof(struct GNUNET_DHT_FindPeerMessage)));
2322   if (ntohs(find_msg->size) < sizeof(struct GNUNET_DHT_FindPeerMessage))
2323     return;
2324   other_hello = NULL;
2325   other_hello_size = 0;
2326   if (ntohs(find_msg->size) > sizeof(struct GNUNET_DHT_FindPeerMessage))
2327     {
2328       other_hello_size = ntohs(find_msg->size) - sizeof(struct GNUNET_DHT_FindPeerMessage);
2329       other_hello = GNUNET_malloc(other_hello_size);
2330       memcpy(other_hello, &find_peer_message[1], other_hello_size);
2331       if ((GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)other_hello) == 0) || (GNUNET_OK != GNUNET_HELLO_get_id((struct GNUNET_HELLO_Message *)other_hello, &peer_id)))
2332         {
2333           GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Received invalid HELLO message in find peer request!\n");
2334           GNUNET_free(other_hello);
2335           return;
2336         }
2337 #if FIND_PEER_WITH_HELLO
2338       if (GNUNET_YES == consider_peer(&peer_id))
2339         {
2340           increment_stats(STAT_HELLOS_PROVIDED);
2341           GNUNET_TRANSPORT_offer_hello(transport_handle, other_hello);
2342           GNUNET_CORE_peer_request_connect(sched, cfg, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5), &peer_id, NULL, NULL);
2343           return;
2344         }
2345       else /* We don't want this peer! */
2346         return;
2347 #endif
2348     }
2349
2350 #if DEBUG_DHT
2351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
2353               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&message_context->key),
2354               ntohs (find_msg->size),
2355               sizeof (struct GNUNET_MessageHeader));
2356 #endif
2357   if (my_hello == NULL)
2358   {
2359 #if DEBUG_DHT
2360     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2361                 "`%s': Our HELLO is null, can't return.\n",
2362                 "DHT");
2363 #endif
2364     GNUNET_free_non_null(other_hello);
2365     return;
2366   }
2367
2368   incoming_bloom = GNUNET_CONTAINER_bloomfilter_init(find_peer_message->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2369   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(incoming_bloom, &my_identity.hashPubKey))
2370     {
2371       increment_stats(STAT_BLOOM_FIND_PEER);
2372       GNUNET_CONTAINER_bloomfilter_free(incoming_bloom);
2373       GNUNET_free_non_null(other_hello);
2374       return; /* We match the bloomfilter, do not send a response to this peer (they likely already know us!)*/
2375     }
2376   GNUNET_CONTAINER_bloomfilter_free(incoming_bloom);
2377
2378 #if RESTRICT_FIND_PEER
2379
2380   /**
2381    * Ignore any find peer requests from a peer we have seen very recently.
2382    */
2383   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent_find_peer_requests, &message_context->key)) /* We have recently responded to a find peer request for this peer! */
2384   {
2385     increment_stats("# dht find peer requests ignored (recently seen!)");
2386     GNUNET_free_non_null(other_hello);
2387     return;
2388   }
2389
2390   /**
2391    * Use this check to only allow the peer to respond to find peer requests if
2392    * it would be beneficial to have the requesting peer in this peers routing
2393    * table.  Can be used to thwart peers flooding the network with find peer
2394    * requests that we don't care about.  However, if a new peer is joining
2395    * the network and has no other peers this is a problem (assume all buckets
2396    * full, no one will respond!).
2397    */
2398   memcpy(&peer_id.hashPubKey, &message_context->key, sizeof(GNUNET_HashCode));
2399   if (GNUNET_NO == consider_peer(&peer_id))
2400     {
2401       increment_stats("# dht find peer requests ignored (do not need!)");
2402       GNUNET_free_non_null(other_hello);
2403       return;
2404     }
2405 #endif
2406
2407   recent_hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
2408   memcpy(recent_hash, &message_context->key, sizeof(GNUNET_HashCode));
2409   GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests, &message_context->key, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2410   GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30), &remove_recent_find_peer, &recent_hash);
2411
2412   /* Simplistic find_peer functionality, always return our hello */
2413   hello_size = ntohs(my_hello->size);
2414   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
2415
2416   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2417     {
2418       GNUNET_break_op (0);
2419       GNUNET_free_non_null(other_hello);
2420       return;
2421     }
2422
2423   find_peer_result = GNUNET_malloc (tsize);
2424   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
2425   find_peer_result->size = htons (tsize);
2426   memcpy (&find_peer_result[1], my_hello, hello_size);
2427
2428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2429               "`%s': Sending hello size %d to requesting peer.\n",
2430               "DHT", hello_size);
2431
2432   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
2433   memcpy(new_msg_ctx, message_context, sizeof(struct DHT_MessageContext));
2434   new_msg_ctx->peer = &my_identity;
2435   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2436   new_msg_ctx->hop_count = 0;
2437   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make find peer requests a higher priority */
2438   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2439   increment_stats(STAT_FIND_PEER_ANSWER);
2440   route_result_message(cls, find_peer_result, new_msg_ctx);
2441   GNUNET_free(new_msg_ctx);
2442 #if DEBUG_DHT_ROUTING
2443   if ((debug_routes) && (dhtlog_handle != NULL))
2444     {
2445       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
2446                                    message_context->hop_count, GNUNET_YES, &my_identity,
2447                                    &message_context->key);
2448     }
2449 #endif
2450   GNUNET_free_non_null(other_hello);
2451   GNUNET_free(find_peer_result);
2452 }
2453
2454 /**
2455  * Task used to republish data.
2456  * Forward declaration; function call loop.
2457  *
2458  * @param cls closure (a struct RepublishContext)
2459  * @param tc runtime context for this task
2460  */
2461 static void
2462 republish_content(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
2463
2464 /**
2465  * Server handler for initiating local dht put requests
2466  *
2467  * @param cls closure for service
2468  * @param msg the actual put message
2469  * @param message_context struct containing pertinent information about the request
2470  */
2471 static void
2472 handle_dht_put (void *cls,
2473                 const struct GNUNET_MessageHeader *msg,
2474                 struct DHT_MessageContext *message_context)
2475 {
2476   struct GNUNET_DHT_PutMessage *put_msg;
2477   size_t put_type;
2478   size_t data_size;
2479   int ret;
2480   struct RepublishContext *put_context;
2481
2482   GNUNET_assert (ntohs (msg->size) >=
2483                  sizeof (struct GNUNET_DHT_PutMessage));
2484
2485
2486   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
2487   put_type = ntohs (put_msg->type);
2488
2489   if (put_type == DHT_MALICIOUS_MESSAGE_TYPE)
2490     return;
2491
2492   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
2493 #if DEBUG_DHT
2494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2495               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2496               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&message_context->key), message_context->unique_id);
2497 #endif
2498 #if DEBUG_DHT_ROUTING
2499   if (message_context->hop_count == 0) /* Locally initiated request */
2500     {
2501       if ((debug_routes) && (dhtlog_handle != NULL))
2502         {
2503           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
2504                                        message_context->hop_count, GNUNET_NO, &my_identity,
2505                                        &message_context->key);
2506         }
2507     }
2508 #endif
2509
2510   if (message_context->closest != GNUNET_YES)
2511     return;
2512
2513 #if DEBUG_DHT_ROUTING
2514   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2515     {
2516       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2517                                    message_context->hop_count, GNUNET_YES,
2518                                    &my_identity, &message_context->key, message_context->peer,
2519                                    NULL);
2520     }
2521
2522   if ((debug_routes) && (dhtlog_handle != NULL))
2523     {
2524       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
2525                                    message_context->hop_count, GNUNET_YES, &my_identity,
2526                                    &message_context->key);
2527     }
2528 #endif
2529
2530   increment_stats(STAT_PUTS_INSERTED);
2531   if (datacache != NULL)
2532     {
2533       ret = GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
2534                                   (char *) &put_msg[1], put_type,
2535                                   GNUNET_TIME_absolute_ntoh(put_msg->expiration));
2536
2537       if ((ret == GNUNET_YES) && (do_republish == GNUNET_YES))
2538         {
2539           put_context = GNUNET_malloc(sizeof(struct RepublishContext));
2540           memcpy(&put_context->key, &message_context->key, sizeof(GNUNET_HashCode));
2541           put_context->type = put_type;
2542           GNUNET_SCHEDULER_add_delayed (sched, dht_republish_frequency, &republish_content, put_context);
2543         }
2544     }
2545   else
2546     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2547                 "`%s:%s': %s request received, but have no datacache!\n",
2548                 my_short_id, "DHT", "PUT");
2549 }
2550
2551 /**
2552  * Estimate the diameter of the network based
2553  * on how many buckets are currently in use.
2554  * Concept here is that the diameter of the network
2555  * is roughly the distance a message must travel in
2556  * order to reach its intended destination.  Since
2557  * at each hop we expect to get one bit closer, and
2558  * we have one bit per bucket, the number of buckets
2559  * in use should be the largest number of hops for
2560  * a sucessful message. (of course, this assumes we
2561  * know all peers in the network!)
2562  *
2563  * @return ballpark diameter figure
2564  */
2565 static unsigned int estimate_diameter()
2566 {
2567   return MAX_BUCKETS - lowest_bucket;
2568 }
2569
2570 /**
2571  * To how many peers should we (on average)
2572  * forward the request to obtain the desired
2573  * target_replication count (on average).
2574  *
2575  * Always 0, 1 or 2 (don't send, send once, split)
2576  */
2577 static unsigned int
2578 get_forward_count (unsigned int hop_count, size_t target_replication)
2579 {
2580 #if DOUBLE
2581   double target_count;
2582   double random_probability;
2583 #else
2584   uint32_t random_value;
2585 #endif
2586   unsigned int target_value;
2587   unsigned int diameter;
2588
2589   /**
2590    * If we are behaving in strict kademlia mode, send multiple initial requests,
2591    * but then only send to 1 or 0 peers based strictly on the number of hops.
2592    */
2593   if (strict_kademlia == GNUNET_YES)
2594     {
2595       if (hop_count == 0)
2596         return DHT_KADEMLIA_REPLICATION;
2597       else if (hop_count < max_hops)
2598         return 1;
2599       else
2600         return 0;
2601     }
2602
2603   /* FIXME: the smaller we think the network is the more lenient we should be for
2604    * routing right?  The estimation below only works if we think we have reasonably
2605    * full routing tables, which for our RR topologies may not be the case!
2606    */
2607   diameter = estimate_diameter ();
2608   if ((hop_count > (diameter + 1) * 2) && (MINIMUM_PEER_THRESHOLD < estimate_diameter() * bucket_size))
2609     {
2610 #if DEBUG_DHT
2611       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2612                   "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
2613                   "DHT", estimate_diameter(), lowest_bucket);
2614 #endif
2615       return 0;
2616     }
2617   else if (hop_count > max_hops)
2618     {
2619 #if DEBUG_DHT
2620       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2621                   "`%s:%s': Hop count too high (greater than max)\n", my_short_id,
2622                   "DHT");
2623 #endif
2624       return 0;
2625     }
2626
2627 #if DOUBLE
2628   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Replication %d, hop_count %u, diameter %u\n", target_replication, hop_count, diameter);
2629   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Numerator %f, denominator %f\n", (double)target_replication, ((double)target_replication * (hop_count + 1) + diameter));
2630   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
2631     (double)target_replication / ((double)target_replication * (hop_count + 1) + diameter);
2632   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Target count is %f\n", target_count);
2633   random_probability = ((double)GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2634       RAND_MAX)) / RAND_MAX;
2635   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Random is %f\n", random_probability);
2636
2637   target_value = 0;
2638   //while (target_value < target_count)
2639   if (target_value < target_count)
2640     target_value++; /* target_value is ALWAYS 1 after this "loop", right?  Because target_count is always > 0, right?  Or does it become 0.00000... at some point because the hop count is so high? */
2641
2642
2643   //if ((target_count + 1 - (double)target_value) > random_probability)
2644   if ((target_count) > random_probability)
2645     target_value++;
2646 #endif
2647
2648   random_value = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_STRONG, target_replication * (hop_count + 1) + diameter) + 1;
2649   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "replication %u, at hop %d, will split with probability %f\n", target_replication, hop_count, target_replication / (double)((target_replication * (hop_count + 1) + diameter) + 1));
2650   target_value = 1;
2651   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "random %u, target %u, max %u\n", random_value, target_replication, target_replication * (hop_count + 1) + diameter);
2652   if (random_value < target_replication)
2653     target_value++;
2654
2655   return target_value;
2656 }
2657
2658 /*
2659  * Check whether my identity is closer than any known peers.
2660  * If a non-null bloomfilter is given, check if this is the closest
2661  * peer that hasn't already been routed to.
2662  *
2663  * @param target hash code to check closeness to
2664  * @param bloom bloomfilter, exclude these entries from the decision
2665  *
2666  * Return GNUNET_YES if node location is closest, GNUNET_NO
2667  * otherwise.
2668  */
2669 int
2670 am_closest_peer (const GNUNET_HashCode * target, struct GNUNET_CONTAINER_BloomFilter *bloom)
2671 {
2672   int bits;
2673   int other_bits;
2674   int bucket_num;
2675   int count;
2676   struct PeerInfo *pos;
2677   unsigned int my_distance;
2678
2679   if (0 == memcmp(&my_identity.hashPubKey, target, sizeof(GNUNET_HashCode)))
2680     return GNUNET_YES;
2681
2682   bucket_num = find_current_bucket(target);
2683
2684   bits = GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, target);
2685   my_distance = distance(&my_identity.hashPubKey, target);
2686   pos = k_buckets[bucket_num].head;
2687   count = 0;
2688   while ((pos != NULL) && (count < bucket_size))
2689     {
2690       if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey)))
2691         {
2692           pos = pos->next;
2693           continue; /* Skip already checked entries */
2694         }
2695
2696       other_bits = GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target);
2697       if (other_bits > bits)
2698         return GNUNET_NO;
2699       else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
2700         {
2701           if (strict_kademlia != GNUNET_YES) /* Return that we at as close as any other peer */
2702             return GNUNET_YES;
2703           else if (distance(&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
2704             return GNUNET_NO;
2705         }
2706       pos = pos->next;
2707     }
2708
2709 #if DEBUG_TABLE
2710   GNUNET_GE_LOG (coreAPI->ectx,
2711                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2712                  GNUNET_GE_BULK, "closest peer\n");
2713   printPeerBits (&closest);
2714   GNUNET_GE_LOG (coreAPI->ectx,
2715                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2716                  GNUNET_GE_BULK, "me\n");
2717   printPeerBits (coreAPI->my_identity);
2718   GNUNET_GE_LOG (coreAPI->ectx,
2719                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2720                  GNUNET_GE_BULK, "key\n");
2721   printKeyBits (target);
2722   GNUNET_GE_LOG (coreAPI->ectx,
2723                  GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
2724                  GNUNET_GE_BULK,
2725                  "closest peer inverse distance is %u, mine is %u\n",
2726                  inverse_distance (target, &closest.hashPubKey),
2727                  inverse_distance (target,
2728                                    &coreAPI->my_identity->hashPubKey));
2729 #endif
2730
2731   /* No peers closer, we are the closest! */
2732   return GNUNET_YES;
2733 }
2734
2735
2736 /**
2737  * Return this peers adjusted value based on the convergence
2738  * function chosen.  This is the key function for randomized
2739  * routing decisions.
2740  *
2741  * @param target the key of the request
2742  * @param peer the peer we would like the value of
2743  * @param hops number of hops this message has already traveled
2744  *
2745  * @return bit distance from target to peer raised to an exponent
2746  *         adjusted based on the current routing convergence algorithm
2747  *
2748  */
2749 unsigned long long
2750 converge_distance (const GNUNET_HashCode *target,
2751                    struct PeerInfo *peer,
2752                    unsigned int hops)
2753 {
2754   unsigned long long ret;
2755   unsigned int other_matching_bits;
2756   double converge_modifier = 0.0;
2757   double base_converge_modifier = .1;
2758   double calc_value;
2759   double exponent;
2760   int curr_max_hops;
2761
2762   if (use_max_hops)
2763     curr_max_hops = max_hops;
2764   else
2765     curr_max_hops = (estimate_diameter() + 1) * 2;
2766
2767   if (converge_modifier > 0)
2768     converge_modifier = converge_modifier * base_converge_modifier;
2769   else
2770     {
2771       converge_modifier = base_converge_modifier;
2772       base_converge_modifier = 0.0;
2773     }
2774
2775   GNUNET_assert(converge_modifier > 0);
2776
2777   other_matching_bits = GNUNET_CRYPTO_hash_matching_bits(target, &peer->id.hashPubKey);
2778
2779   switch (converge_option)
2780     {
2781       case DHT_CONVERGE_RANDOM:
2782         return 1; /* Always return 1, choose equally among all peers */
2783       case DHT_CONVERGE_LINEAR:
2784         calc_value = hops * curr_max_hops * converge_modifier;
2785         break;
2786       case DHT_CONVERGE_SQUARE:
2787         /**
2788          * Simple square based curve.
2789          */
2790         calc_value = (sqrt(hops) / sqrt(curr_max_hops)) * (curr_max_hops / (curr_max_hops * converge_modifier));
2791         break;
2792       case DHT_CONVERGE_EXPONENTIAL:
2793         /**
2794          * Simple exponential curve.
2795          */
2796         if (base_converge_modifier > 0)
2797           calc_value = (converge_modifier * hops * hops) / curr_max_hops;
2798         else
2799           calc_value = (hops * hops) / curr_max_hops;
2800         break;
2801       default:
2802         return 1;
2803     }
2804
2805   /* Take the log (base e) of the number of bits matching the other peer */
2806   exponent = log(other_matching_bits);
2807
2808   /* Check if we would overflow; our largest possible value is 2^64 = e^44.361419555836498 */
2809   if (exponent * calc_value >= 44.361419555836498)
2810     return ULLONG_MAX;
2811
2812   /* Clear errno and all math exceptions */
2813   errno = 0;
2814   feclearexcept(FE_ALL_EXCEPT);
2815   ret = (unsigned long long)pow(other_matching_bits, calc_value);
2816   if ((errno != 0) || fetestexcept(FE_INVALID | FE_DIVBYZERO | FE_OVERFLOW |
2817       FE_UNDERFLOW))
2818     {
2819       if (0 != fetestexcept(FE_OVERFLOW))
2820         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "FE_OVERFLOW\n");
2821       if (0 != fetestexcept(FE_INVALID))
2822         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "FE_INVALID\n");
2823       if (0 != fetestexcept(FE_UNDERFLOW))
2824         GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "FE_UNDERFLOW\n");
2825       return 0;
2826     }
2827   else
2828     return ret;
2829 }
2830
2831 /**
2832  * Comparison function for two struct PeerInfo's
2833  * which have already had their matching bits to
2834  * some target calculated.
2835  *
2836  * @param p1 a pointer pointer to a struct PeerInfo
2837  * @param p2 a pointer pointer to a struct PeerInfo
2838  *
2839  * @return 0 if equidistant to target,
2840  *        -1 if p1 is closer,
2841  *         1 if p2 is closer
2842  */
2843 static int
2844 compare_peers (const void *p1, const void *p2)
2845 {
2846   struct PeerInfo **first = (struct PeerInfo **)p1;
2847   struct PeerInfo **second = (struct PeerInfo **)p2;
2848
2849   if ((*first)->matching_bits > (*second)->matching_bits)
2850     return -1;
2851   if ((*first)->matching_bits < (*second)->matching_bits)
2852     return 1;
2853   else
2854     return 0;
2855 }
2856
2857
2858 /**
2859  * Select a peer from the routing table that would be a good routing
2860  * destination for sending a message for "target".  The resulting peer
2861  * must not be in the set of blocked peers.<p>
2862  *
2863  * Note that we should not ALWAYS select the closest peer to the
2864  * target, peers further away from the target should be chosen with
2865  * exponentially declining probability.
2866  *
2867  * @param target the key we are selecting a peer to route to
2868  * @param bloom a bloomfilter containing entries this request has seen already
2869  *
2870  * @return Peer to route to, or NULL on error
2871  */
2872 static struct PeerInfo *
2873 select_peer (const GNUNET_HashCode * target,
2874              struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
2875 {
2876   unsigned int bc;
2877   unsigned int i;
2878   unsigned int count;
2879   unsigned int offset;
2880   unsigned int my_matching_bits;
2881   int closest_bucket;
2882   struct PeerInfo *pos;
2883   struct PeerInfo *sorted_closest[bucket_size];
2884   unsigned long long temp_converge_distance;
2885   unsigned long long total_distance;
2886   unsigned long long selected;
2887 #if DEBUG_DHT > 1
2888   unsigned long long stats_total_distance;
2889   double sum;
2890 #endif
2891   /* For kademlia */
2892   unsigned int distance;
2893   unsigned int largest_distance;
2894   struct PeerInfo *chosen;
2895
2896   my_matching_bits = GNUNET_CRYPTO_hash_matching_bits(target, &my_identity.hashPubKey);
2897
2898   total_distance = 0;
2899   if (strict_kademlia == GNUNET_YES)
2900     {
2901       largest_distance = 0;
2902       chosen = NULL;
2903       for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2904         {
2905           pos = k_buckets[bc].head;
2906           count = 0;
2907           while ((pos != NULL) && (count < bucket_size))
2908             {
2909               /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
2910               if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2911                 {
2912                   distance = inverse_distance (target, &pos->id.hashPubKey);
2913                   if (distance > largest_distance)
2914                     {
2915                       chosen = pos;
2916                       largest_distance = distance;
2917                     }
2918                 }
2919               count++;
2920               pos = pos->next;
2921             }
2922         }
2923
2924       if ((largest_distance > 0) && (chosen != NULL))
2925         {
2926           GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
2927           return chosen;
2928         }
2929       else
2930         {
2931           return NULL;
2932         }
2933     }
2934
2935   /* GNUnet-style */
2936   total_distance = 0;
2937   /* Three steps: order peers in closest bucket (most matching bits).
2938    * Then go over all LOWER buckets (matching same bits we do)
2939    * Then go over all HIGHER buckets (matching less then we do)
2940    */
2941
2942   closest_bucket = find_current_bucket(target);
2943   GNUNET_assert(closest_bucket >= lowest_bucket);
2944   pos = k_buckets[closest_bucket].head;
2945   count = 0;
2946   offset = 0; /* Need offset as well as count in case peers are bloomfiltered */
2947   memset(sorted_closest, 0, sizeof(sorted_closest));
2948   /* Put any peers in the closest bucket in the sorting array */
2949   while ((pos != NULL) && (count < bucket_size))
2950     {
2951       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2952         {
2953           count++;
2954           pos = pos->next;
2955           continue; /* Ignore bloomfiltered peers */
2956         }
2957       pos->matching_bits = GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target);
2958       sorted_closest[offset] = pos;
2959       pos = pos->next;
2960       offset++;
2961       count++;
2962     }
2963
2964   /* Sort the peers in descending order */
2965   qsort(&sorted_closest[0], offset, sizeof(struct PeerInfo *), &compare_peers);
2966
2967   /* Put the sorted closest peers into the possible bins first, in case of overflow. */
2968   for (i = 0; i < offset; i++)
2969     {
2970       temp_converge_distance = converge_distance(target, sorted_closest[i], hops);
2971       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &sorted_closest[i]->id.hashPubKey))
2972         break; /* Ignore bloomfiltered peers */
2973       if ((temp_converge_distance <= ULLONG_MAX) && (total_distance + temp_converge_distance > total_distance)) /* Handle largest case and overflow */
2974         total_distance += temp_converge_distance;
2975       else
2976         break; /* overflow case */
2977     }
2978
2979   /* Now handle peers in lower buckets (matches same # of bits as target) */
2980   for (bc = lowest_bucket; bc < closest_bucket; bc++)
2981     {
2982       pos = k_buckets[bc].head;
2983       count = 0;
2984       while ((pos != NULL) && (count < bucket_size))
2985         {
2986           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2987             {
2988               count++;
2989               pos = pos->next;
2990               continue; /* Ignore bloomfiltered peers */
2991             }
2992           temp_converge_distance = converge_distance(target, pos, hops);
2993           if ((temp_converge_distance <= ULLONG_MAX) && (total_distance + temp_converge_distance > total_distance)) /* Handle largest case and overflow */
2994             total_distance += temp_converge_distance;
2995           else
2996             break; /* overflow case */
2997           pos = pos->next;
2998           count++;
2999         }
3000     }
3001
3002   /* Now handle all the further away peers */
3003   for (bc = closest_bucket + 1; bc < MAX_BUCKETS; bc++)
3004     {
3005       pos = k_buckets[bc].head;
3006       count = 0;
3007       while ((pos != NULL) && (count < bucket_size))
3008         {
3009           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3010             {
3011               count++;
3012               pos = pos->next;
3013               continue; /* Ignore bloomfiltered peers */
3014             }
3015           temp_converge_distance = converge_distance(target, pos, hops);
3016           if ((temp_converge_distance <= ULLONG_MAX) && (total_distance + temp_converge_distance > total_distance)) /* Handle largest case and overflow */
3017             total_distance += temp_converge_distance;
3018           else
3019             break; /* overflow case */
3020           pos = pos->next;
3021           count++;
3022         }
3023     }
3024
3025   if (total_distance == 0) /* No peers to select from! */
3026     {
3027       increment_stats("# select_peer, total_distance == 0");
3028       return NULL;
3029     }
3030
3031 #if DEBUG_DHT_ROUTING > 1
3032   sum = 0.0;
3033   /* PRINT STATS */
3034   /* Put the sorted closest peers into the possible bins first, in case of overflow. */
3035   stats_total_distance = 0;
3036   for (i = 0; i < offset; i++)
3037     {
3038       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &sorted_closest[i]->id.hashPubKey))
3039         break; /* Ignore bloomfiltered peers */
3040       temp_converge_distance = converge_distance(target, sorted_closest[i], hops);
3041       if ((temp_converge_distance <= ULLONG_MAX) && (stats_total_distance + temp_converge_distance > stats_total_distance)) /* Handle largest case and overflow */
3042         stats_total_distance += temp_converge_distance;
3043       else
3044         break; /* overflow case */
3045       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose %d matching bits (%d bits match me) (%.2f percent) converge ret %llu\n", GNUNET_CRYPTO_hash_matching_bits(&sorted_closest[i]->id.hashPubKey, target), GNUNET_CRYPTO_hash_matching_bits(&sorted_closest[i]->id.hashPubKey, &my_identity.hashPubKey), (temp_converge_distance / (double)total_distance) * 100, temp_converge_distance);
3046     }
3047
3048   /* Now handle peers in lower buckets (matches same # of bits as target) */
3049   for (bc = lowest_bucket; bc < closest_bucket; bc++)
3050     {
3051       pos = k_buckets[bc].head;
3052       count = 0;
3053       while ((pos != NULL) && (count < bucket_size))
3054         {
3055           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3056             {
3057               count++;
3058               pos = pos->next;
3059               continue; /* Ignore bloomfiltered peers */
3060             }
3061           temp_converge_distance = converge_distance(target, pos, hops);
3062           if ((temp_converge_distance <= ULLONG_MAX) && (stats_total_distance + temp_converge_distance > stats_total_distance)) /* Handle largest case and overflow */
3063             stats_total_distance += temp_converge_distance;
3064           else
3065             break; /* overflow case */
3066           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose %d matching bits (%d bits match me) (%.2f percent) converge ret %llu\n", GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey), (temp_converge_distance / (double)total_distance) * 100, temp_converge_distance);
3067           pos = pos->next;
3068           count++;
3069         }
3070     }
3071
3072   /* Now handle all the further away peers */
3073   for (bc = closest_bucket + 1; bc < MAX_BUCKETS; bc++)
3074     {
3075       pos = k_buckets[bc].head;
3076       count = 0;
3077       while ((pos != NULL) && (count < bucket_size))
3078         {
3079           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3080             {
3081               count++;
3082               pos = pos->next;
3083               continue; /* Ignore bloomfiltered peers */
3084             }
3085           temp_converge_distance = converge_distance(target, pos, hops);
3086           if ((temp_converge_distance <= ULLONG_MAX) && (stats_total_distance + temp_converge_distance > stats_total_distance)) /* Handle largest case and overflow */
3087             stats_total_distance += temp_converge_distance;
3088           else
3089             break; /* overflow case */
3090           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose %d matching bits (%d bits match me) (%.2f percent) converge ret %llu\n", GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, target), GNUNET_CRYPTO_hash_matching_bits(&pos->id.hashPubKey, &my_identity.hashPubKey),  (temp_converge_distance / (double)total_distance) * 100, temp_converge_distance);
3091           pos = pos->next;
3092           count++;
3093         }
3094     }
3095   /* END PRINT STATS */
3096 #endif
3097
3098   /* Now actually choose a peer */
3099   selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
3100
3101   /* Put the sorted closest peers into the possible bins first, in case of overflow. */
3102   for (i = 0; i < offset; i++)
3103     {
3104       if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &sorted_closest[i]->id.hashPubKey))
3105         break; /* Ignore bloomfiltered peers */
3106       temp_converge_distance = converge_distance(target, sorted_closest[i], hops);
3107       if (temp_converge_distance >= selected)
3108         return sorted_closest[i];
3109       else
3110         selected -= temp_converge_distance;
3111     }
3112
3113   /* Now handle peers in lower buckets (matches same # of bits as target) */
3114   for (bc = lowest_bucket; bc < closest_bucket; bc++)
3115     {
3116       pos = k_buckets[bc].head;
3117       count = 0;
3118       while ((pos != NULL) && (count < bucket_size))
3119         {
3120           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3121             {
3122               count++;
3123               pos = pos->next;
3124               continue; /* Ignore bloomfiltered peers */
3125             }
3126           temp_converge_distance = converge_distance(target, pos, hops);
3127           if (temp_converge_distance >= selected)
3128             return pos;
3129           else
3130             selected -= temp_converge_distance;
3131           pos = pos->next;
3132           count++;
3133         }
3134     }
3135
3136   /* Now handle all the further away peers */
3137   for (bc = closest_bucket + 1; bc < MAX_BUCKETS; bc++)
3138     {
3139       pos = k_buckets[bc].head;
3140       count = 0;
3141       while ((pos != NULL) && (count < bucket_size))
3142         {
3143           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3144             {
3145               count++;
3146               pos = pos->next;
3147               continue; /* Ignore bloomfiltered peers */
3148             }
3149           temp_converge_distance = converge_distance(target, pos, hops);
3150           if (temp_converge_distance >= selected)
3151             return pos;
3152           else
3153             selected -= temp_converge_distance;
3154           pos = pos->next;
3155           count++;
3156         }
3157     }
3158
3159   increment_stats("# failed to select peer");
3160   return NULL;
3161 }
3162
3163
3164 /**
3165  * Task used to remove recent entries, either
3166  * after timeout, when full, or on shutdown.
3167  *
3168  * @param cls the entry to remove
3169  * @param tc context, reason, etc.
3170  */
3171 static void
3172 remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3173 {
3174   struct RecentRequest *req = cls;
3175   static GNUNET_HashCode hash;
3176
3177   GNUNET_assert(req != NULL);
3178   hash_from_uid(req->uid, &hash);
3179   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req));
3180   GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node);
3181   GNUNET_CONTAINER_bloomfilter_free(req->bloom);
3182   GNUNET_free(req);
3183
3184   if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
3185   {
3186     GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
3187     GNUNET_CONTAINER_heap_destroy(recent.minHeap);
3188   }
3189 }
3190
3191
3192 /**
3193  * Task used to remove forwarding entries, either
3194  * after timeout, when full, or on shutdown.
3195  *
3196  * @param cls the entry to remove
3197  * @param tc context, reason, etc.
3198  */
3199 static void
3200 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3201 {
3202   struct DHTRouteSource *source_info = cls;
3203   struct DHTQueryRecord *record;
3204   source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
3205   record = source_info->record;
3206   GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
3207
3208   if (record->head == NULL) /* No more entries in DLL */
3209     {
3210       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
3211       GNUNET_free(record);
3212     }
3213   if (source_info->find_peers_responded != NULL)
3214     GNUNET_CONTAINER_bloomfilter_free(source_info->find_peers_responded);
3215   GNUNET_free(source_info);
3216 }
3217
3218 /**
3219  * Remember this routing request so that if a reply is
3220  * received we can either forward it to the correct peer
3221  * or return the result locally.
3222  *
3223  * @param cls DHT service closure
3224  * @param msg_ctx Context of the route request
3225  *
3226  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
3227  */
3228 static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
3229 {
3230   struct DHTQueryRecord *record;
3231   struct DHTRouteSource *source_info;
3232   struct DHTRouteSource *pos;
3233   struct GNUNET_TIME_Absolute now;
3234   unsigned int current_size;
3235
3236   current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
3237   while (current_size >= MAX_OUTSTANDING_FORWARDS)
3238     {
3239       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
3240       GNUNET_assert(source_info != NULL);
3241       record = source_info->record;
3242       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
3243       if (record->head == NULL) /* No more entries in DLL */
3244         {
3245           GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
3246           GNUNET_free(record);
3247         }
3248       GNUNET_SCHEDULER_cancel(sched, source_info->delete_task);
3249       if (source_info->find_peers_responded != NULL)
3250         GNUNET_CONTAINER_bloomfilter_free(source_info->find_peers_responded);
3251       GNUNET_free(source_info);
3252       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
3253     }
3254   now = GNUNET_TIME_absolute_get();
3255   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key);
3256   if (record != NULL) /* Already know this request! */
3257     {
3258       pos = record->head;
3259       while (pos != NULL)
3260         {
3261           if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
3262             break; /* Already have this peer in reply list! */
3263           pos = pos->next;
3264         }
3265       if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
3266         {
3267           GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.value);
3268           return GNUNET_NO;
3269         }
3270     }
3271   else
3272     {
3273       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
3274       GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3275       memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
3276     }
3277
3278   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
3279   source_info->record = record;
3280   source_info->delete_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
3281   source_info->find_peers_responded = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3282   memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
3283   GNUNET_CONTAINER_DLL_insert_after(record->head, record->tail, record->tail, source_info);
3284   if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
3285     {
3286       source_info->client = msg_ctx->client;
3287       now = GNUNET_TIME_absolute_get_forever();
3288     }
3289   source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.value);
3290 #if DEBUG_DHT > 1
3291       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3292                   "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
3293                   "DHT", GNUNET_h2s (&msg_ctx->key), msg_ctx->unique_id);
3294 #endif
3295   return GNUNET_YES;
3296 }
3297
3298
3299 /**
3300  * Main function that handles whether or not to route a message to other
3301  * peers.
3302  *
3303  * @param cls closure for dht service (NULL)
3304  * @param msg the message to be routed
3305  * @param message_context the context containing all pertinent information about the message
3306  *
3307  * @return the number of peers the message was routed to,
3308  *         GNUNET_SYSERR on failure
3309  */
3310 static int route_message(void *cls,
3311                          const struct GNUNET_MessageHeader *msg,
3312                          struct DHT_MessageContext *message_context)
3313 {
3314   int i;
3315   int global_closest;
3316   struct PeerInfo *selected;
3317 #if DEBUG_DHT_ROUTING > 1
3318   struct PeerInfo *nearest;
3319 #endif
3320   unsigned int forward_count;
3321   struct RecentRequest *recent_req;
3322   GNUNET_HashCode unique_hash;
3323   char *stat_forward_count;
3324   char *temp_stat_str;
3325 #if DEBUG_DHT_ROUTING
3326   int ret;
3327 #endif
3328
3329   if (malicious_dropper == GNUNET_YES)
3330     {
3331 #if DEBUG_DHT_ROUTING
3332       if ((debug_routes_extended) && (dhtlog_handle != NULL))
3333         {
3334           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
3335                                        message_context->hop_count, GNUNET_SYSERR,
3336                                        &my_identity, &message_context->key, message_context->peer,
3337                                        NULL);
3338         }
3339 #endif
3340       if (message_context->bloom != NULL)
3341         GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
3342       return 0;
3343     }
3344
3345   increment_stats(STAT_ROUTES);
3346   /* Semantics of this call means we find whether we are the closest peer out of those already
3347    * routed to on this messages path.
3348    */
3349   global_closest = am_closest_peer(&message_context->key, NULL);
3350   message_context->closest = am_closest_peer(&message_context->key, message_context->bloom);
3351   forward_count = get_forward_count(message_context->hop_count, message_context->replication);
3352   GNUNET_asprintf(&stat_forward_count, "# forward counts of %d", forward_count);
3353   increment_stats(stat_forward_count);
3354   GNUNET_free(stat_forward_count);
3355   if (message_context->bloom == NULL)
3356     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3357
3358   if ((stop_on_closest == GNUNET_YES) && (global_closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
3359     forward_count = 0;
3360
3361   /**
3362    * NOTICE:  In Kademlia, a find peer request goes no further if the peer doesn't return
3363    * any closer peers (which is being checked for below).  Since we are doing recursive
3364    * routing we have no choice but to stop forwarding in this case.  This means that at
3365    * any given step the request may NOT be forwarded to alpha peers (because routes will
3366    * stop and the parallel route will not be aware of it).  Of course, assuming that we
3367    * have fulfilled the Kademlia requirements for routing table fullness this will never
3368    * ever ever be a problem.
3369    *
3370    * However, is this fair?
3371    *
3372    * Since we use these requests to build our routing tables (and we build them in the
3373    * testing driver) we will ignore this restriction for FIND_PEER messages so that
3374    * routing tables still get constructed.
3375    */
3376   if ((GNUNET_YES == strict_kademlia) && (global_closest == GNUNET_YES) && (message_context->hop_count > 0) && (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER))
3377     forward_count = 0;
3378
3379 #if DEBUG_DHT_ROUTING
3380   if (forward_count == 0)
3381     ret = GNUNET_SYSERR;
3382   else
3383     ret = GNUNET_NO;
3384
3385   if ((debug_routes_extended) && (dhtlog_handle != NULL))
3386     {
3387       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
3388                                    message_context->hop_count, ret,
3389                                    &my_identity, &message_context->key, message_context->peer,
3390                                    NULL);
3391     }
3392 #endif
3393
3394   switch (ntohs(msg->type))
3395     {
3396     case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
3397       cache_response (cls, message_context);
3398       if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
3399         forward_count = 0;
3400       break;
3401     case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding to reduce complexity?*/
3402       increment_stats(STAT_PUTS);
3403       message_context->closest = global_closest;
3404       handle_dht_put (cls, msg, message_context);
3405       break;
3406     case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
3407       increment_stats(STAT_FIND_PEER);
3408       if (((message_context->hop_count > 0) && (0 != memcmp(message_context->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)))) || (message_context->client != NULL))
3409       {
3410         cache_response (cls, message_context);
3411         if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
3412           handle_dht_find_peer (cls, msg, message_context);
3413       }
3414 #if DEBUG_DHT_ROUTING
3415       if (message_context->hop_count == 0) /* Locally initiated request */
3416         {
3417           if ((debug_routes) && (dhtlog_handle != NULL))
3418             {
3419               dhtlog_handle->insert_dhtkey(NULL, &message_context->key);
3420               dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
3421                                            message_context->hop_count, GNUNET_NO, &my_identity,
3422                                            &message_context->key);
3423             }
3424         }
3425 #endif
3426       break;
3427     default:
3428       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3429                   "`%s': Message type (%d) not handled\n", "DHT", ntohs(msg->type));
3430     }
3431
3432   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
3433   hash_from_uid (message_context->unique_id, &unique_hash);
3434   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent.hashmap, &unique_hash))
3435   {
3436     recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap, &unique_hash);
3437     GNUNET_assert(recent_req != NULL);
3438     if (0 != memcmp(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode)))
3439       increment_stats(STAT_DUPLICATE_UID);
3440     else
3441       {
3442         increment_stats(STAT_RECENT_SEEN);
3443         GNUNET_CONTAINER_bloomfilter_or2(message_context->bloom, recent_req->bloom, DHT_BLOOM_SIZE);
3444       }
3445     }
3446   else
3447     {
3448       recent_req = GNUNET_malloc(sizeof(struct RecentRequest));
3449       recent_req->uid = message_context->unique_id;
3450       memcpy(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode));
3451       recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req);
3452       recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap, recent_req, GNUNET_TIME_absolute_get().value);
3453       recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3454       GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3455     }
3456
3457   if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT)
3458     {
3459       recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap);
3460       GNUNET_assert(recent_req != NULL);
3461       GNUNET_SCHEDULER_cancel(sched, recent_req->remove_task);
3462       GNUNET_SCHEDULER_add_now(sched, &remove_recent, recent_req);
3463     }
3464
3465   for (i = 0; i < forward_count; i++)
3466     {
3467       selected = select_peer(&message_context->key, message_context->bloom, message_context->hop_count);
3468
3469       if (selected != NULL)
3470         {
3471           if (GNUNET_CRYPTO_hash_matching_bits(&selected->id.hashPubKey, &message_context->key) >= GNUNET_CRYPTO_hash_matching_bits(&my_identity.hashPubKey, &message_context->key))
3472             GNUNET_asprintf(&temp_stat_str, "# requests routed to close(r) peer hop %u", message_context->hop_count);
3473           else
3474             GNUNET_asprintf(&temp_stat_str, "# requests routed to less close peer hop %u", message_context->hop_count);
3475           if (temp_stat_str != NULL)
3476             {
3477               increment_stats(temp_stat_str);
3478               GNUNET_free(temp_stat_str);
3479             }
3480           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
3481 #if DEBUG_DHT_ROUTING > 1
3482           nearest = find_closest_peer(&message_context->key);
3483           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
3484           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3485                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
3486                       "DHT", GNUNET_h2s (&message_context->key), message_context->unique_id, GNUNET_i2s(&selected->id), nearest_buf, GNUNET_CRYPTO_hash_matching_bits(&nearest->id.hashPubKey, message_context->key), distance(&nearest->id.hashPubKey, message_context->key));
3487           GNUNET_free(nearest_buf);
3488 #endif
3489 #if DEBUG_DHT_ROUTING
3490           if ((debug_routes_extended) && (dhtlog_handle != NULL))
3491             {
3492               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
3493                                            message_context->hop_count, GNUNET_NO,
3494                                            &my_identity, &message_context->key, message_context->peer,
3495                                            &selected->id);
3496             }
3497 #endif
3498           forward_message(cls, msg, selected, message_context);
3499         }
3500     }
3501
3502   if (message_context->bloom != NULL)
3503     {
3504       GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom, message_context->bloom, DHT_BLOOM_SIZE);
3505       GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
3506     }
3507
3508   return forward_count;
3509 }
3510
3511 /**
3512  * Iterator for local get request results,
3513  *
3514  * @param cls closure for iterator, NULL
3515  * @param exp when does this value expire?
3516  * @param key the key this data is stored under
3517  * @param size the size of the data identified by key
3518  * @param data the actual data
3519  * @param type the type of the data
3520  *
3521  * @return GNUNET_OK to continue iteration, anything else
3522  * to stop iteration.
3523  */
3524 static int
3525 republish_content_iterator (void *cls,
3526                             struct GNUNET_TIME_Absolute exp,
3527                             const GNUNET_HashCode * key,
3528                             uint32_t size, const char *data, uint32_t type)
3529 {
3530
3531   struct DHT_MessageContext *new_msg_ctx;
3532   struct GNUNET_DHT_PutMessage *put_msg;
3533 #if DEBUG_DHT
3534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3535               "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
3536 #endif
3537   new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
3538
3539   put_msg =
3540     GNUNET_malloc (sizeof (struct GNUNET_DHT_PutMessage) + size);
3541   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
3542   put_msg->header.size = htons (sizeof (struct GNUNET_DHT_PutMessage) + size);
3543   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
3544   put_msg->type = htons (type);
3545   memcpy (&put_msg[1], data, size);
3546   new_msg_ctx->unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3547   new_msg_ctx->replication = ntohl (DHT_DEFAULT_PUT_REPLICATION);
3548   new_msg_ctx->msg_options = ntohl (0);
3549   new_msg_ctx->network_size = estimate_diameter();
3550   new_msg_ctx->peer = &my_identity;
3551   new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3552   new_msg_ctx->hop_count = 0;
3553   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
3554   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
3555   increment_stats(STAT_PUT_START);
3556   route_message(cls, &put_msg->header, new_msg_ctx);
3557
3558   GNUNET_free(new_msg_ctx);
3559   GNUNET_free (put_msg);
3560   return GNUNET_OK;
3561 }
3562
3563 /**
3564  * Task used to republish data.
3565  *
3566  * @param cls closure (a struct RepublishContext)
3567  * @param tc runtime context for this task
3568  */
3569 static void
3570 republish_content(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3571 {
3572   struct RepublishContext *put_context = cls;
3573
3574   unsigned int results;
3575
3576   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3577     {
3578       GNUNET_free(put_context);
3579       return;
3580     }
3581
3582   GNUNET_assert (datacache != NULL); /* If we have no datacache we never should have scheduled this! */
3583   results = GNUNET_DATACACHE_get(datacache, &put_context->key, put_context->type, &republish_content_iterator, NULL);
3584   if (results == 0) /* Data must have expired */
3585     GNUNET_free(put_context);
3586   else /* Reschedule task for next time period */
3587     GNUNET_SCHEDULER_add_delayed(sched, dht_republish_frequency, &republish_content, put_context);
3588
3589 }
3590
3591
3592 /**
3593  * Iterator over hash map entries.
3594  *
3595  * @param cls client to search for in source routes
3596  * @param key current key code (ignored)
3597  * @param value value in the hash map, a DHTQueryRecord
3598  * @return GNUNET_YES if we should continue to
3599  *         iterate,
3600  *         GNUNET_NO if not.
3601  */
3602 static int find_client_records (void *cls,
3603                                 const GNUNET_HashCode * key, void *value)
3604 {
3605   struct ClientList *client = cls;
3606   struct DHTQueryRecord *record = value;
3607   struct DHTRouteSource *pos;
3608   pos = record->head;
3609   while (pos != NULL)
3610     {
3611       if (pos->client == client)
3612         break;
3613       pos = pos->next;
3614     }
3615   if (pos != NULL)
3616     {
3617       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, pos);
3618       GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, pos->hnode);
3619       if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
3620         GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
3621
3622       if (pos->find_peers_responded != NULL)
3623         GNUNET_CONTAINER_bloomfilter_free(pos->find_peers_responded);
3624       GNUNET_free(pos);
3625     }
3626   if (record->head == NULL) /* No more entries in DLL */
3627     {
3628       GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
3629       GNUNET_free(record);
3630     }
3631   return GNUNET_YES;
3632 }
3633
3634 /**
3635  * Functions with this signature are called whenever a client
3636  * is disconnected on the network level.
3637  *
3638  * @param cls closure (NULL for dht)
3639  * @param client identification of the client; NULL
3640  *        for the last call when the server is destroyed
3641  */
3642 static void handle_client_disconnect (void *cls,
3643                                       struct GNUNET_SERVER_Client* client)
3644 {
3645   struct ClientList *pos = client_list;
3646   struct ClientList *prev;
3647   struct ClientList *found;
3648   struct PendingMessage *reply;
3649
3650   prev = NULL;
3651   found = NULL;
3652   while (pos != NULL)
3653     {
3654       if (pos->client_handle == client)
3655         {
3656           if (prev != NULL)
3657             prev->next = pos->next;
3658           else
3659             client_list = pos->next;
3660           found = pos;
3661           break;
3662         }
3663       prev = pos;
3664       pos = pos->next;
3665     }
3666
3667   if (found != NULL)
3668     {
3669       while(NULL != (reply = found->pending_head))
3670         {
3671           GNUNET_CONTAINER_DLL_remove(found->pending_head, found->pending_tail, reply);
3672           GNUNET_free(reply);
3673         }
3674       GNUNET_CONTAINER_multihashmap_iterate(forward_list.hashmap, &find_client_records, found);
3675       GNUNET_free(found);
3676     }
3677 }
3678
3679 /**
3680  * Find a client if it exists, add it otherwise.
3681  *
3682  * @param client the server handle to the client
3683  *
3684  * @return the client if found, a new client otherwise
3685  */
3686 static struct ClientList *
3687 find_active_client (struct GNUNET_SERVER_Client *client)
3688 {
3689   struct ClientList *pos = client_list;
3690   struct ClientList *ret;
3691
3692   while (pos != NULL)
3693     {
3694       if (pos->client_handle == client)
3695         return pos;
3696       pos = pos->next;
3697     }
3698
3699   ret = GNUNET_malloc (sizeof (struct ClientList));
3700   ret->client_handle = client;
3701   ret->next = client_list;
3702   client_list = ret;
3703
3704   return ret;
3705 }
3706
3707 /**
3708  * Task to send a malicious put message across the network.
3709  *
3710  * @param cls closure for this task
3711  * @param tc the context under which the task is running
3712  */
3713 static void
3714 malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3715 {
3716   static struct GNUNET_DHT_PutMessage put_message;
3717   static struct DHT_MessageContext message_context;
3718   static GNUNET_HashCode key;
3719   uint32_t random_key;
3720
3721   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3722     return;
3723
3724   put_message.header.size = htons(sizeof(struct GNUNET_DHT_PutMessage));
3725   put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
3726   put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
3727   put_message.expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever());
3728   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3729   message_context.client = NULL;
3730   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
3731   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
3732   memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
3733   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3734   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3735   message_context.msg_options = ntohl (0);
3736   message_context.network_size = estimate_diameter();
3737   message_context.peer = &my_identity;
3738   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE; /* Make result routing a higher priority */
3739   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3740 #if DEBUG_DHT_ROUTING
3741   if (dhtlog_handle != NULL)
3742     dhtlog_handle->insert_dhtkey(NULL, &key);
3743 #endif
3744   increment_stats(STAT_PUT_START);
3745   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious PUT message with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
3746   route_message(NULL, &put_message.header, &message_context);
3747   GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_put_frequency), &malicious_put_task, NULL);
3748
3749 }
3750
3751 /**
3752  * Task to send a malicious put message across the network.
3753  *
3754  * @param cls closure for this task
3755  * @param tc the context under which the task is running
3756  */
3757 static void
3758 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3759 {
3760   static struct GNUNET_DHT_GetMessage get_message;
3761   struct DHT_MessageContext message_context;
3762   static GNUNET_HashCode key;
3763   uint32_t random_key;
3764
3765   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3766     return;
3767
3768   get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
3769   get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
3770   get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
3771   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3772   message_context.client = NULL;
3773   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
3774   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
3775   memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
3776   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
3777   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3778   message_context.msg_options = ntohl (0);
3779   message_context.network_size = estimate_diameter();
3780   message_context.peer = &my_identity;
3781   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE; /* Make result routing a higher priority */
3782   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3783 #if DEBUG_DHT_ROUTING
3784   if (dhtlog_handle != NULL)
3785     dhtlog_handle->insert_dhtkey(NULL, &key);
3786 #endif
3787   increment_stats(STAT_GET_START);
3788   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
3789   route_message (NULL, &get_message.header, &message_context);
3790   GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_get_frequency), &malicious_get_task, NULL);
3791 }
3792
3793 /**
3794  * Iterator over hash map entries.
3795  *
3796  * @param cls closure
3797  * @param key current key code
3798  * @param value value in the hash map
3799  * @return GNUNET_YES if we should continue to
3800  *         iterate,
3801  *         GNUNET_NO if not.
3802  */
3803 static int
3804 add_known_to_bloom (void *cls,
3805                     const GNUNET_HashCode * key,
3806                     void *value)
3807 {
3808   struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
3809   GNUNET_CONTAINER_bloomfilter_add (bloom, key);
3810   return GNUNET_YES;
3811 }
3812
3813 /**
3814  * Task to send a find peer message for our own peer identifier
3815  * so that we can find the closest peers in the network to ourselves
3816  * and attempt to connect to them.
3817  *
3818  * @param cls closure for this task
3819  * @param tc the context under which the task is running
3820  */
3821 static void
3822 send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3823 {
3824   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
3825   struct DHT_MessageContext message_context;
3826   int ret;
3827   struct GNUNET_TIME_Relative next_send_time;
3828   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
3829 #if COUNT_INTERVAL
3830   struct GNUNET_TIME_Relative time_diff;
3831   struct GNUNET_TIME_Absolute end;
3832   double multiplier;
3833   double count_per_interval;
3834 #endif
3835   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
3836     return;
3837
3838   if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If we are finding peers already, no need to send out our request right now! */
3839     {
3840       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since last find peer message sent!\n", newly_found_peers);
3841       GNUNET_SCHEDULER_add_delayed (sched,
3842                                     GNUNET_TIME_UNIT_MINUTES,
3843                                     &send_find_peer_message, NULL);
3844       newly_found_peers = 0;
3845       return;
3846     }
3847     
3848   increment_stats(STAT_FIND_PEER_START);
3849 #if COUNT_INTERVAL
3850   end = GNUNET_TIME_absolute_get();
3851   time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start, end);
3852
3853   if (time_diff.value > FIND_PEER_CALC_INTERVAL.value)
3854     {
3855       multiplier = time_diff.value / FIND_PEER_CALC_INTERVAL.value;
3856       count_per_interval = find_peer_context.count / multiplier;
3857     }
3858   else
3859     {
3860       multiplier = FIND_PEER_CALC_INTERVAL.value / time_diff.value;
3861       count_per_interval = find_peer_context.count * multiplier;
3862     }
3863 #endif
3864
3865 #if FIND_PEER_WITH_HELLO
3866   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage) + GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)my_hello));
3867   find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage) + GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)my_hello));
3868   memcpy(&find_peer_msg[1], my_hello, GNUNET_HELLO_size((struct GNUNET_HELLO_Message *)my_hello));
3869 #else
3870   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage));
3871   find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
3872 #endif
3873   find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
3874   temp_bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3875   GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom, temp_bloom);
3876   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom, find_peer_msg->bloomfilter, DHT_BLOOM_SIZE));
3877   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3878   memcpy(&message_context.key, &my_identity.hashPubKey, sizeof(GNUNET_HashCode));
3879   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
3880   message_context.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
3881   message_context.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
3882   message_context.network_size = estimate_diameter();
3883   message_context.peer = &my_identity;
3884   message_context.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
3885   message_context.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
3886
3887   ret = route_message(NULL, &find_peer_msg->header, &message_context);
3888   GNUNET_free(find_peer_msg);
3889   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3890               "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT",
3891               "FIND PEER", ret);
3892   if (newly_found_peers < bucket_size)
3893     {
3894       next_send_time.value = (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) +
3895                               GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
3896                                                        DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2);
3897     }
3898   else
3899     {
3900       next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
3901                              GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
3902                                                       DHT_MAXIMUM_FIND_PEER_INTERVAL.value - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
3903     }
3904
3905   GNUNET_assert (next_send_time.value != 0);
3906   find_peer_context.count = 0;
3907   newly_found_peers = 0;
3908   find_peer_context.start = GNUNET_TIME_absolute_get();
3909   if (GNUNET_YES == do_find_peer)
3910   {
3911     GNUNET_SCHEDULER_add_delayed (sched,
3912                                   next_send_time,
3913                                   &send_find_peer_message, NULL);
3914   }
3915 }
3916
3917 /**
3918  * Handler for any generic DHT messages, calls the appropriate handler
3919  * depending on message type, sends confirmation if responses aren't otherwise
3920  * expected.
3921  *
3922  * @param cls closure for the service
3923  * @param client the client we received this message from
3924  * @param message the actual message received
3925  */
3926 static void
3927 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
3928                                 const struct GNUNET_MessageHeader *message)
3929 {
3930   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
3931   const struct GNUNET_MessageHeader *enc_msg;
3932   struct DHT_MessageContext message_context;
3933
3934   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
3935 #if DEBUG_DHT
3936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3937               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
3938               my_short_id, 
3939               "DHT",
3940               "GENERIC",
3941               ntohs (message->type), 
3942               GNUNET_h2s (&dht_msg->key),
3943               GNUNET_ntohll (dht_msg->unique_id));
3944 #endif
3945 #if DEBUG_DHT_ROUTING
3946   if (dhtlog_handle != NULL)
3947     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
3948 #endif
3949   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
3950   message_context.client = find_active_client (client);
3951   memcpy(&message_context.key, &dht_msg->key, sizeof(GNUNET_HashCode));
3952   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
3953   message_context.replication = ntohl (dht_msg->desired_replication_level);
3954   message_context.msg_options = ntohl (dht_msg->options);
3955   message_context.network_size = estimate_diameter();
3956   message_context.peer = &my_identity;
3957   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE * 4; /* Make local routing a higher priority */
3958   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3959   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
3960     increment_stats(STAT_GET_START);
3961   else if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
3962     increment_stats(STAT_PUT_START);
3963   else if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER)
3964     increment_stats(STAT_FIND_PEER_START);
3965
3966   route_message(cls, enc_msg, &message_context);
3967
3968   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3969
3970 }
3971
3972 /**
3973  * Handler for any locally received DHT control messages,
3974  * sets malicious flags mostly for now.
3975  *
3976  * @param cls closure for the service
3977  * @param client the client we received this message from
3978  * @param message the actual message received
3979  *
3980  */
3981 static void
3982 handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client,
3983                             const struct GNUNET_MessageHeader *message)
3984 {
3985   const struct GNUNET_DHT_ControlMessage *dht_control_msg =
3986       (const struct GNUNET_DHT_ControlMessage *) message;
3987 #if DEBUG_DHT
3988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3989               "`%s:%s': Received `%s' request from client, command %d\n", my_short_id, "DHT",
3990               "CONTROL", ntohs(dht_control_msg->command));
3991 #endif
3992
3993   switch (ntohs(dht_control_msg->command))
3994   {
3995   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
3996     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending self seeking find peer request!\n");
3997     GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL);
3998     break;
3999   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
4000     if (ntohs(dht_control_msg->variable) > 0)
4001       malicious_get_frequency = ntohs(dht_control_msg->variable);
4002     if (malicious_get_frequency == 0)
4003       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4004     if (malicious_getter != GNUNET_YES)
4005       GNUNET_SCHEDULER_add_now(sched, &malicious_get_task, NULL);
4006     malicious_getter = GNUNET_YES;
4007     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 
4008                "%s:%s Initiating malicious GET behavior, frequency %d\n", my_short_id, "DHT", malicious_get_frequency);
4009     break;
4010   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT:
4011     if (ntohs(dht_control_msg->variable) > 0)
4012       malicious_put_frequency = ntohs(dht_control_msg->variable);
4013     if (malicious_put_frequency == 0)
4014       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
4015     if (malicious_putter != GNUNET_YES)
4016       GNUNET_SCHEDULER_add_now(sched, &malicious_put_task, NULL);
4017     malicious_putter = GNUNET_YES;
4018     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
4019                "%s:%s Initiating malicious PUT behavior, frequency %d\n", my_short_id, "DHT", malicious_put_frequency);
4020     break;
4021   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP:
4022 #if DEBUG_DHT_ROUTING
4023     if ((malicious_dropper != GNUNET_YES) && (dhtlog_handle != NULL))
4024       dhtlog_handle->set_malicious(&my_identity);
4025 #endif
4026     malicious_dropper = GNUNET_YES;
4027     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
4028                "%s:%s Initiating malicious DROP behavior\n", my_short_id, "DHT");
4029     break;
4030   default:
4031     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 
4032                "%s:%s Unknown control command type `%d'!\n", 
4033                my_short_id, "DHT",
4034                ntohs(dht_control_msg->command));
4035     break;
4036   }
4037
4038   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4039 }
4040
4041 /**
4042  * Handler for any generic DHT stop messages, calls the appropriate handler
4043  * depending on message type (if processed locally)
4044  *
4045  * @param cls closure for the service
4046  * @param client the client we received this message from
4047  * @param message the actual message received
4048  *
4049  */
4050 static void
4051 handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
4052                             const struct GNUNET_MessageHeader *message)
4053 {
4054
4055   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
4056     (const struct GNUNET_DHT_StopMessage *) message;
4057   struct DHTQueryRecord *record;
4058   struct DHTRouteSource *pos;
4059 #if DEBUG_DHT
4060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4061               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
4062               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
4063 #endif
4064   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
4065   if (record != NULL)
4066     {
4067       pos = record->head;
4068
4069       while (pos != NULL)
4070         {
4071           if ((pos->client != NULL) && (pos->client->client_handle == client))
4072             {
4073               GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
4074               GNUNET_SCHEDULER_add_now(sched, &remove_forward_entry, pos);
4075             }
4076           pos = pos->next;
4077         }
4078     }
4079
4080   GNUNET_SERVER_receive_done (client, GNUNET_OK);
4081 }
4082
4083
4084 /**
4085  * Core handler for p2p route requests.
4086  */
4087 static int
4088 handle_dht_p2p_route_request (void *cls,
4089                               const struct GNUNET_PeerIdentity *peer,
4090                               const struct GNUNET_MessageHeader *message,
4091                               struct GNUNET_TIME_Relative latency, uint32_t distance)
4092 {
4093 #if DEBUG_DHT
4094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4095               "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
4096 #endif
4097   struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
4098   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
4099   struct DHT_MessageContext *message_context;
4100
4101   if (get_max_send_delay().value > MAX_REQUEST_TIME.value)
4102   {
4103     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending of previous replies took too long, backing off!\n");
4104     increment_stats("# route requests dropped due to high load");
4105     decrease_max_send_delay(get_max_send_delay());
4106     return GNUNET_YES;
4107   }
4108
4109   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/
4110     {
4111 #if DEBUG_PING
4112       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n", my_short_id, "DHT");
4113 #endif
4114       return GNUNET_YES;
4115     }
4116
4117   if (ntohs(enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4118     {
4119       GNUNET_break_op(0);
4120       return GNUNET_YES;
4121     }
4122   message_context = GNUNET_malloc(sizeof (struct DHT_MessageContext));
4123   message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
4124   GNUNET_assert(message_context->bloom != NULL);
4125   message_context->hop_count = ntohl(incoming->hop_count);
4126   memcpy(&message_context->key, &incoming->key, sizeof(GNUNET_HashCode));
4127   message_context->replication = ntohl(incoming->desired_replication_level);
4128   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
4129   message_context->msg_options = ntohl(incoming->options);
4130   message_context->network_size = ntohl(incoming->network_size);
4131   message_context->peer = peer;
4132   message_context->importance = DHT_DEFAULT_P2P_IMPORTANCE;
4133   message_context->timeout = DHT_DEFAULT_P2P_TIMEOUT;
4134   route_message(cls, enc_msg, message_context);
4135   GNUNET_free(message_context);
4136   return GNUNET_YES;
4137 }
4138
4139
4140 /**
4141  * Core handler for p2p route results.
4142  */
4143 static int
4144 handle_dht_p2p_route_result (void *cls,
4145                              const struct GNUNET_PeerIdentity *peer,
4146                              const struct GNUNET_MessageHeader *message,
4147                              struct GNUNET_TIME_Relative latency, uint32_t distance)
4148 {
4149 #if DEBUG_DHT
4150   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4151               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
4152 #endif
4153   struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
4154   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
4155   struct DHT_MessageContext message_context;
4156
4157   if (ntohs(enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4158     {
4159       GNUNET_break_op(0);
4160       return GNUNET_YES;
4161     }
4162
4163   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
4164   // FIXME: call GNUNET_BLOCK_evaluate (...) -- instead of doing your own bloomfilter!
4165   message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
4166   GNUNET_assert(message_context.bloom != NULL);
4167   memcpy(&message_context.key, &incoming->key, sizeof(GNUNET_HashCode));
4168   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
4169   message_context.msg_options = ntohl(incoming->options);
4170   message_context.hop_count = ntohl(incoming->hop_count);
4171   message_context.peer = peer;
4172   message_context.importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make result routing a higher priority */
4173   message_context.timeout = DHT_DEFAULT_P2P_TIMEOUT;
4174   route_result_message(cls, enc_msg, &message_context);
4175   return GNUNET_YES;
4176 }
4177
4178
4179 /**
4180  * Receive the HELLO from transport service,
4181  * free current and replace if necessary.
4182  *
4183  * @param cls NULL
4184  * @param message HELLO message of peer
4185  */
4186 static void
4187 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
4188 {
4189 #if DEBUG_DHT
4190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4191               "Received our `%s' from transport service\n",
4192               "HELLO");
4193 #endif
4194
4195   GNUNET_assert (message != NULL);
4196   GNUNET_free_non_null(my_hello);
4197   my_hello = GNUNET_malloc(ntohs(message->size));
4198   memcpy(my_hello, message, ntohs(message->size));
4199 }
4200
4201
4202 /**
4203  * Task run during shutdown.
4204  *
4205  * @param cls unused
4206  * @param tc unused
4207  */
4208 static void
4209 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
4210 {
4211   int bucket_count;
4212   struct PeerInfo *pos;
4213
4214   if (transport_handle != NULL)
4215     {
4216       GNUNET_free_non_null(my_hello);
4217       GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
4218       GNUNET_TRANSPORT_disconnect(transport_handle);
4219     }
4220   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
4221     {
4222       while (k_buckets[bucket_count].head != NULL)
4223         {
4224           pos = k_buckets[bucket_count].head;
4225 #if DEBUG_DHT
4226           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4227                       "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
4228 #endif
4229           delete_peer(pos, bucket_count);
4230         }
4231     }
4232   if (coreAPI != NULL)
4233     {
4234 #if DEBUG_DHT
4235       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4236                   "%s:%s Disconnecting core!\n", my_short_id, "DHT");
4237 #endif
4238       GNUNET_CORE_disconnect (coreAPI);
4239       coreAPI = NULL;
4240     }
4241   if (datacache != NULL)
4242     {
4243 #if DEBUG_DHT
4244       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4245                   "%s:%s Destroying datacache!\n", my_short_id, "DHT");
4246 #endif
4247       GNUNET_DATACACHE_destroy (datacache);
4248       datacache = NULL;
4249     }
4250   if (stats != NULL)
4251     {
4252       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
4253       stats = NULL;
4254     }
4255   if (dhtlog_handle != NULL)
4256     {
4257       GNUNET_DHTLOG_disconnect(dhtlog_handle);
4258       dhtlog_handle = NULL;
4259     }
4260   if (block_context != NULL)
4261     {
4262       GNUNET_BLOCK_context_destroy (block_context);
4263       block_context = NULL;
4264     }
4265   GNUNET_free_non_null(my_short_id);
4266   my_short_id = NULL;
4267 }
4268
4269
4270 /**
4271  * To be called on core init/fail.
4272  *
4273  * @param cls service closure
4274  * @param server handle to the server for this service
4275  * @param identity the public identity of this peer
4276  * @param publicKey the public key of this peer
4277  */
4278 void
4279 core_init (void *cls,
4280            struct GNUNET_CORE_Handle *server,
4281            const struct GNUNET_PeerIdentity *identity,
4282            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
4283 {
4284
4285   if (server == NULL)
4286     {
4287 #if DEBUG_DHT
4288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4289               "%s: Connection to core FAILED!\n", "dht",
4290               GNUNET_i2s (identity));
4291 #endif
4292       GNUNET_SCHEDULER_cancel (sched, cleanup_task);
4293       GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
4294       return;
4295     }
4296 #if DEBUG_DHT
4297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4298               "%s: Core connection initialized, I am peer: %s\n", "dht",
4299               GNUNET_i2s (identity));
4300 #endif
4301
4302   /* Copy our identity so we can use it */
4303   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
4304   if (my_short_id != NULL)
4305     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n", "DHT SERVICE");
4306   my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
4307   /* Set the server to local variable */
4308   coreAPI = server;
4309
4310   if (dhtlog_handle != NULL)
4311     dhtlog_handle->insert_node (NULL, &my_identity);
4312 }
4313
4314
4315 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
4316   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE, 0},
4317   {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
4318   {&handle_dht_control_message, NULL, GNUNET_MESSAGE_TYPE_DHT_CONTROL, 0},
4319   {NULL, NULL, 0, 0}
4320 };
4321
4322
4323 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
4324   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
4325   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
4326   {NULL, 0, 0}
4327 };
4328
4329
4330 /**
4331  * Method called whenever a peer connects.
4332  *
4333  * @param cls closure
4334  * @param peer peer identity this notification is about
4335  * @param latency reported latency of the connection with peer
4336  * @param distance reported distance (DV) to peer
4337  */
4338 void handle_core_connect (void *cls,
4339                           const struct GNUNET_PeerIdentity * peer,
4340                           struct GNUNET_TIME_Relative latency,
4341                           uint32_t distance)
4342 {
4343   struct PeerInfo *ret;
4344
4345 #if DEBUG_DHT
4346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4347               "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
4348 #endif
4349
4350   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
4351     {
4352       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received %s message for peer %s, but already have peer in RT!", my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s(peer));
4353       return;
4354     }
4355
4356   if (datacache != NULL)
4357     GNUNET_DATACACHE_put(datacache, &peer->hashPubKey, sizeof(struct GNUNET_PeerIdentity), (const char *)peer, 0, GNUNET_TIME_absolute_get_forever());
4358   ret = try_add_peer(peer,
4359                      find_current_bucket(&peer->hashPubKey),
4360                      latency,
4361                      distance);
4362   if (ret != NULL)
4363     {
4364       newly_found_peers++;
4365       GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
4366     }
4367 #if DEBUG_DHT
4368     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4369                 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED");
4370 #endif
4371 }
4372
4373
4374 /**
4375  * Method called whenever a peer disconnects.
4376  *
4377  * @param cls closure
4378  * @param peer peer identity this notification is about
4379  */
4380 void handle_core_disconnect (void *cls,
4381                              const struct
4382                              GNUNET_PeerIdentity * peer)
4383 {
4384   struct PeerInfo *to_remove;
4385   int current_bucket;
4386
4387   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: Received peer disconnect message for peer `%s' from %s\n", my_short_id, "DHT", GNUNET_i2s(peer), "CORE");
4388
4389   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
4390     {
4391       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s: do not have peer `%s' in RT, can't disconnect!\n", my_short_id, "DHT", GNUNET_i2s(peer));
4392       return;
4393     }
4394   increment_stats(STAT_DISCONNECTS);
4395   GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey));
4396   to_remove = GNUNET_CONTAINER_multihashmap_get(all_known_peers, &peer->hashPubKey);
4397   GNUNET_assert(0 == memcmp(peer, &to_remove->id, sizeof(struct GNUNET_PeerIdentity)));
4398   current_bucket = find_current_bucket(&to_remove->id.hashPubKey);
4399   delete_peer(to_remove, current_bucket);
4400 }
4401
4402
4403 /**
4404  * Process dht requests.
4405  *
4406  * @param cls closure
4407  * @param scheduler scheduler to use
4408  * @param server the initialized server
4409  * @param c configuration to use
4410  */
4411 static void
4412 run (void *cls,
4413      struct GNUNET_SCHEDULER_Handle *scheduler,
4414      struct GNUNET_SERVER_Handle *server,
4415      const struct GNUNET_CONFIGURATION_Handle *c)
4416 {
4417   struct GNUNET_TIME_Relative next_send_time;
4418   unsigned long long temp_config_num;
4419   char *converge_modifier_buf;
4420
4421   sched = scheduler;
4422   cfg = c;
4423   datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
4424   GNUNET_SERVER_add_handlers (server, plugin_handlers);
4425   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
4426   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
4427                                  cfg,   /* Main configuration */
4428                                  GNUNET_TIME_UNIT_FOREVER_REL,
4429                                  NULL,  /* Closure passed to DHT functions */
4430                                  &core_init,    /* Call core_init once connected */
4431                                  &handle_core_connect,  /* Handle connects */
4432                                  &handle_core_disconnect,  /* remove peers on disconnects */
4433                                  NULL,  /* Do we care about "status" updates? */
4434                                  NULL,  /* Don't want notified about all incoming messages */
4435                                  GNUNET_NO,     /* For header only inbound notification */
4436                                  NULL,  /* Don't want notified about all outbound messages */
4437                                  GNUNET_NO,     /* For header only outbound notification */
4438                                  core_handlers);        /* Register these handlers */
4439
4440   if (coreAPI == NULL)
4441     return;
4442   transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, 
4443                                               NULL, NULL, NULL, NULL, NULL);
4444   if (transport_handle != NULL)
4445     GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
4446   else
4447     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 
4448                "Failed to connect to transport service!\n");
4449   block_context = GNUNET_BLOCK_context_create (cfg);
4450   lowest_bucket = MAX_BUCKETS - 1;
4451   forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
4452   forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
4453   all_known_peers = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
4454   recent_find_peer_requests = GNUNET_CONTAINER_multihashmap_create(MAX_BUCKETS / 8);
4455   GNUNET_assert(all_known_peers != NULL);
4456   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
4457     {
4458       debug_routes = GNUNET_YES;
4459     }
4460
4461   if (GNUNET_YES ==
4462       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4463                                            "strict_kademlia"))
4464     {
4465       strict_kademlia = GNUNET_YES;
4466     }
4467
4468   if (GNUNET_YES ==
4469       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4470                                            "stop_on_closest"))
4471     {
4472       stop_on_closest = GNUNET_YES;
4473     }
4474
4475   if (GNUNET_YES ==
4476       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4477                                            "stop_found"))
4478     {
4479       stop_on_found = GNUNET_YES;
4480     }
4481
4482   if (GNUNET_YES ==
4483       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4484                                            "malicious_getter"))
4485     {
4486       malicious_getter = GNUNET_YES;
4487       if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4488                                             "MALICIOUS_GET_FREQUENCY",
4489                                             &malicious_get_frequency))
4490         malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4491     }
4492
4493   if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4494                                         "MAX_HOPS",
4495                                         &max_hops))
4496     {
4497       max_hops = DEFAULT_MAX_HOPS;
4498     }
4499
4500   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno (cfg, "DHT",
4501                                                           "USE_MAX_HOPS"))
4502     {
4503       use_max_hops = GNUNET_YES;
4504     }
4505
4506   if (GNUNET_YES ==
4507       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4508                                            "malicious_putter"))
4509     {
4510       malicious_putter = GNUNET_YES;
4511       if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT",
4512                                             "MALICIOUS_PUT_FREQUENCY",
4513                                             &malicious_put_frequency))
4514         malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
4515     }
4516
4517   dht_republish_frequency = DEFAULT_DHT_REPUBLISH_FREQUENCY;
4518   if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "DHT", "REPLICATION_FREQUENCY", &temp_config_num))
4519     {
4520       dht_republish_frequency = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, temp_config_num);
4521     }
4522
4523   if (GNUNET_YES ==
4524           GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4525                                                "malicious_dropper"))
4526     {
4527       malicious_dropper = GNUNET_YES;
4528     }
4529
4530   if (GNUNET_YES ==
4531         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4532                                              "republish"))
4533     do_republish = GNUNET_NO;
4534
4535   if (GNUNET_NO ==
4536         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4537                                              "do_find_peer"))
4538     {
4539       do_find_peer = GNUNET_NO;
4540     }
4541   else
4542     do_find_peer = GNUNET_YES;
4543
4544   if (GNUNET_YES ==
4545         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4546                                              "use_real_distance"))
4547     use_real_distance = GNUNET_YES;
4548
4549   if (GNUNET_YES ==
4550       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
4551                                            "mysql_logging_extended"))
4552     {
4553       debug_routes = GNUNET_YES;
4554       debug_routes_extended = GNUNET_YES;
4555     }
4556
4557 #if DEBUG_DHT_ROUTING
4558   if (GNUNET_YES == debug_routes)
4559     {
4560       dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
4561       if (dhtlog_handle == NULL)
4562         {
4563           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4564                       "Could not connect to mysql logging server, logging will not happen!");
4565         }
4566     }
4567 #endif
4568
4569   converge_option = DHT_CONVERGE_SQUARE;
4570   if (GNUNET_YES ==
4571       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4572                                            "converge_linear"))
4573     {
4574       converge_option = DHT_CONVERGE_LINEAR;
4575     }
4576   else if (GNUNET_YES ==
4577         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4578                                              "converge_exponential"))
4579     {
4580       converge_option = DHT_CONVERGE_EXPONENTIAL;
4581     }
4582   else if (GNUNET_YES ==
4583         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
4584                                              "converge_random"))
4585     {
4586       converge_option = DHT_CONVERGE_RANDOM;
4587     }
4588
4589   if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string(cfg, "dht_testing", "converge_modifier", &converge_modifier_buf))
4590     {
4591       if (1 != sscanf(converge_modifier_buf, "%f", &converge_modifier))
4592         {
4593           GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to read decimal value for %s from `%s'\n", "CONVERGE_MODIFIER", converge_modifier_buf);
4594           converge_modifier = 0.0;
4595         }
4596       GNUNET_free(converge_modifier_buf);
4597     }
4598
4599   stats = GNUNET_STATISTICS_create(sched, "dht", cfg);
4600
4601   if (stats != NULL)
4602     {
4603       GNUNET_STATISTICS_set(stats, STAT_ROUTES, 0, GNUNET_NO);
4604       GNUNET_STATISTICS_set(stats, STAT_ROUTE_FORWARDS, 0, GNUNET_NO);
4605       GNUNET_STATISTICS_set(stats, STAT_ROUTE_FORWARDS_CLOSEST, 0, GNUNET_NO);
4606       GNUNET_STATISTICS_set(stats, STAT_RESULTS, 0, GNUNET_NO);
4607       GNUNET_STATISTICS_set(stats, STAT_RESULTS_TO_CLIENT, 0, GNUNET_NO);
4608       GNUNET_STATISTICS_set(stats, STAT_RESULT_FORWARDS, 0, GNUNET_NO);
4609       GNUNET_STATISTICS_set(stats, STAT_GETS, 0, GNUNET_NO);
4610       GNUNET_STATISTICS_set(stats, STAT_PUTS, 0, GNUNET_NO);
4611       GNUNET_STATISTICS_set(stats, STAT_PUTS_INSERTED, 0, GNUNET_NO);
4612       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER, 0, GNUNET_NO);
4613       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_START, 0, GNUNET_NO);
4614       GNUNET_STATISTICS_set(stats, STAT_GET_START, 0, GNUNET_NO);
4615       GNUNET_STATISTICS_set(stats, STAT_PUT_START, 0, GNUNET_NO);
4616       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_REPLY, 0, GNUNET_NO);
4617       GNUNET_STATISTICS_set(stats, STAT_FIND_PEER_ANSWER, 0, GNUNET_NO);
4618       GNUNET_STATISTICS_set(stats, STAT_BLOOM_FIND_PEER, 0, GNUNET_NO);
4619       GNUNET_STATISTICS_set(stats, STAT_GET_REPLY, 0, GNUNET_NO);
4620       GNUNET_STATISTICS_set(stats, STAT_GET_RESPONSE_START, 0, GNUNET_NO);
4621       GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
4622       GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO);
4623     }
4624   /* FIXME: if there are no recent requests then these never get freed, but alternative is _annoying_! */
4625   recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2);
4626   recent.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
4627   if (GNUNET_YES == do_find_peer)
4628   {
4629     next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
4630                            GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
4631                                                     (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
4632     find_peer_context.start = GNUNET_TIME_absolute_get();
4633     GNUNET_SCHEDULER_add_delayed (sched,
4634                                   next_send_time,
4635                                   &send_find_peer_message, &find_peer_context);
4636   }
4637
4638   /* Scheduled the task to clean up when shutdown is called */
4639   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
4640                                                GNUNET_TIME_UNIT_FOREVER_REL,
4641                                                &shutdown_task, NULL);
4642 }
4643
4644 /**
4645  * The main function for the dht service.
4646  *
4647  * @param argc number of arguments from the command line
4648  * @param argv command line arguments
4649  * @return 0 ok, 1 on error
4650  */
4651 int
4652 main (int argc, char *const *argv)
4653 {
4654   return (GNUNET_OK ==
4655           GNUNET_SERVICE_run (argc,
4656                               argv,
4657                               "dht",
4658                               GNUNET_SERVICE_OPTION_NONE,
4659                               &run, NULL)) ? 0 : 1;
4660 }