fix
[oweals/gnunet.git] / src / dht / gnunet-service-dht.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht.c
23  * @brief GNUnet DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO:
28  * - decide which 'benchmark'/test functions to keep (malicious code, kademlia, etc.)
29  * - decide on 'stop_on_closest', 'stop_on_found', 'do_find_peer', 'paper_forwarding'
30  * - use OPTION_MULTIPLE instead of linked list ofr the forward_list.hashmap
31  * - use different 'struct DHT_MessageContext' for the different types of
32  *   messages (currently rather confusing, especially with things like
33  *   peer bloom filters occuring when processing replies).
34  * - why do we have request UIDs again?
35  */
36
37 #include "platform.h"
38 #include "gnunet_block_lib.h"
39 #include "gnunet_client_lib.h"
40 #include "gnunet_getopt_lib.h"
41 #include "gnunet_os_lib.h"
42 #include "gnunet_protocols.h"
43 #include "gnunet_service_lib.h"
44 #include "gnunet_nse_service.h"
45 #include "gnunet_core_service.h"
46 #include "gnunet_signal_lib.h"
47 #include "gnunet_util_lib.h"
48 #include "gnunet_datacache_lib.h"
49 #include "gnunet_transport_service.h"
50 #include "gnunet_hello_lib.h"
51 #include "gnunet_dht_service.h"
52 #include "gnunet_statistics_service.h"
53 #include "dhtlog.h"
54 #include "dht.h"
55 #include <fenv.h>
56
57
58 /**
59  * How many buckets will we allow total.
60  */
61 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
62
63 /**
64  * Should the DHT issue FIND_PEER requests to get better routing tables?
65  */
66 #define DEFAULT_DO_FIND_PEER GNUNET_YES
67
68 /**
69  * Defines whether find peer requests send their HELLO's outgoing,
70  * or expect replies to contain hellos.
71  */
72 #define FIND_PEER_WITH_HELLO GNUNET_YES
73
74 /**
75  * What is the maximum number of peers in a given bucket.
76  */
77 #define DEFAULT_BUCKET_SIZE 4
78
79 #define DEFAULT_CORE_QUEUE_SIZE 32
80
81 /**
82  * Minimum number of peers we need for "good" routing,
83  * any less than this and we will allow messages to
84  * travel much further through the network!
85  */
86 #define MINIMUM_PEER_THRESHOLD 20
87
88 /**
89  * Number of requests we track at most (for routing replies).
90  */
91 #define DHT_MAX_RECENT (1024 * 16)
92
93 /**
94  * How long do we wait at most when queueing messages with core
95  * that we are sending on behalf of other peers.
96  */
97 #define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
98
99 /**
100  * Default importance for handling messages on behalf of other peers.
101  */
102 #define DHT_DEFAULT_P2P_IMPORTANCE 0
103
104 /**
105  * How long to keep recent requests around by default.
106  */
107 #define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
108
109 /**
110  * Default time to wait to send find peer messages sent by the dht service.
111  */
112 #define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
113
114 /**
115  * Default importance for find peer messages sent by the dht service.
116  */
117 #define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
118
119 /**
120  * Default replication parameter for find peer messages sent by the dht service.
121  */
122 #define DHT_DEFAULT_FIND_PEER_REPLICATION 4
123
124 /**
125  * How long at least to wait before sending another find peer request.
126  */
127 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
128
129 /**
130  * How long at most to wait before sending another find peer request.
131  */
132 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
133
134 /**
135  * How often to update our preference levels for peers in our routing tables.
136  */
137 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
138
139 /**
140  * How long at most on average will we allow a reply forward to take
141  * (before we quit sending out new requests)
142  */
143 #define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
144
145 /**
146  * How many initial requests to send out (in true Kademlia fashion)
147  */
148 #define DEFAULT_KADEMLIA_REPLICATION 3
149
150 /**
151  * Default frequency for sending malicious get messages
152  */
153 #define DEFAULT_MALICIOUS_GET_FREQUENCY GNUNET_TIME_UNIT_SECONDS
154
155 /**
156  * Default frequency for sending malicious put messages
157  */
158 #define DEFAULT_MALICIOUS_PUT_FREQUENCY GNUNET_TIME_UNIT_SECONDS
159
160 /**
161  * How many time differences between requesting a core send and
162  * the actual callback to remember.
163  */
164 #define MAX_REPLY_TIMES 8
165
166
167 /**
168  * Linked list of messages to send to clients.
169  */
170 struct P2PPendingMessage
171 {
172   /**
173    * Pointer to next item in the list
174    */
175   struct P2PPendingMessage *next;
176
177   /**
178    * Pointer to previous item in the list
179    */
180   struct P2PPendingMessage *prev;
181
182   /**
183    * Message importance level.
184    */
185   unsigned int importance;
186
187   /**
188    * Time when this request was scheduled to be sent.
189    */
190   struct GNUNET_TIME_Absolute scheduled;
191
192   /**
193    * How long to wait before sending message.
194    */
195   struct GNUNET_TIME_Relative timeout;
196
197   /**
198    * Actual message to be sent; // avoid allocation
199    */
200   const struct GNUNET_MessageHeader *msg;       // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
201
202 };
203
204
205 /**
206  * Per-peer information.
207  */
208 struct PeerInfo
209 {
210   /**
211    * Next peer entry (DLL)
212    */
213   struct PeerInfo *next;
214
215   /**
216    *  Prev peer entry (DLL)
217    */
218   struct PeerInfo *prev;
219
220   /**
221    * Count of outstanding messages for peer.
222    */
223   unsigned int pending_count;
224
225   /**
226    * Head of pending messages to be sent to this peer.
227    */
228   struct P2PPendingMessage *head;
229
230   /**
231    * Tail of pending messages to be sent to this peer.
232    */
233   struct P2PPendingMessage *tail;
234
235   /**
236    * Core handle for sending messages to this peer.
237    */
238   struct GNUNET_CORE_TransmitHandle *th;
239
240   /**
241    * Task for scheduling message sends.
242    */
243   GNUNET_SCHEDULER_TaskIdentifier send_task;
244
245   /**
246    * Task for scheduling preference updates
247    */
248   GNUNET_SCHEDULER_TaskIdentifier preference_task;
249
250   /**
251    * Preference update context
252    */
253   struct GNUNET_CORE_InformationRequestContext *info_ctx;
254
255   /**
256    * What is the identity of the peer?
257    */
258   struct GNUNET_PeerIdentity id;
259
260 #if 0
261   /**
262    * What is the average latency for replies received?
263    */
264   struct GNUNET_TIME_Relative latency;
265
266   /**
267    * Transport level distance to peer.
268    */
269   unsigned int distance;
270 #endif
271
272   /**
273    * Task for scheduling periodic ping messages for this peer.
274    */
275   GNUNET_SCHEDULER_TaskIdentifier ping_task;
276 };
277
278
279 /**
280  * Peers are grouped into buckets.
281  */
282 struct PeerBucket
283 {
284   /**
285    * Head of DLL
286    */
287   struct PeerInfo *head;
288
289   /**
290    * Tail of DLL
291    */
292   struct PeerInfo *tail;
293
294   /**
295    * Number of peers in the bucket.
296    */
297   unsigned int peers_size;
298 };
299
300
301 /**
302  * Linked list of messages to send to clients.
303  */
304 struct PendingMessage
305 {
306   /**
307    * Pointer to next item in the list
308    */
309   struct PendingMessage *next;
310
311   /**
312    * Pointer to previous item in the list
313    */
314   struct PendingMessage *prev;
315
316   /**
317    * Actual message to be sent; // avoid allocation
318    */
319   const struct GNUNET_MessageHeader *msg;       // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
320
321 };
322
323
324 /**
325  * Struct containing information about a client,
326  * handle to connect to it, and any pending messages
327  * that need to be sent to it.
328  */
329 struct ClientList
330 {
331   /**
332    * Linked list of active clients
333    */
334   struct ClientList *next;
335
336   /**
337    * The handle to this client
338    */
339   struct GNUNET_SERVER_Client *client_handle;
340
341   /**
342    * Handle to the current transmission request, NULL
343    * if none pending.
344    */
345   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
346
347   /**
348    * Linked list of pending messages for this client
349    */
350   struct PendingMessage *pending_head;
351
352   /**
353    * Tail of linked list of pending messages for this client
354    */
355   struct PendingMessage *pending_tail;
356 };
357
358
359 /**
360  * Context containing information about a DHT message received.
361  */
362 struct DHT_MessageContext
363 {
364   /**
365    * The client this request was received from.
366    * (NULL if received from another peer)
367    */
368   struct ClientList *client;
369
370   /**
371    * The peer this request was received from.
372    */
373   struct GNUNET_PeerIdentity peer;
374
375   /**
376    * Bloomfilter for this routing request.
377    */
378   struct GNUNET_CONTAINER_BloomFilter *bloom;
379
380   /**
381    * extended query (see gnunet_block_lib.h).
382    */
383   const void *xquery;
384
385   /**
386    * Bloomfilter to filter out duplicate replies.
387    */
388   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
389
390   /**
391    * The key this request was about
392    */
393   GNUNET_HashCode key;
394
395   /**
396    * How long should we wait to transmit this request?
397    */
398   struct GNUNET_TIME_Relative timeout;
399
400   /**
401    * The unique identifier of this request
402    */
403   uint64_t unique_id;
404
405   /**
406    * Number of bytes in xquery.
407    */
408   size_t xquery_size;
409
410   /**
411    * Mutator value for the reply_bf, see gnunet_block_lib.h
412    */
413   uint32_t reply_bf_mutator;
414
415   /**
416    * Desired replication level
417    */
418   uint32_t replication;
419
420   /**
421    * Network size estimate, either ours or the sum of
422    * those routed to thus far. =~ Log of number of peers
423    * chosen from for this request.
424    */
425   uint32_t network_size;
426
427   /**
428    * Any message options for this request
429    */
430   uint32_t msg_options;
431
432   /**
433    * How many hops has the message already traversed?
434    */
435   uint32_t hop_count;
436
437   /**
438    * How many peer identities are present in the path history?
439    */
440   uint32_t path_history_len;
441
442   /**
443    * Path history.
444    */
445   char *path_history;
446
447   /**
448    * How important is this message?
449    */
450   unsigned int importance;
451
452   /**
453    * Should we (still) forward the request on to other peers?
454    */
455   int do_forward;
456
457   /**
458    * Did we forward this message? (may need to remember it!)
459    */
460   int forwarded;
461
462   /**
463    * Are we the closest known peer to this key (out of our neighbors?)
464    */
465   int closest;
466 };
467
468
469 /**
470  * Record used for remembering what peers are waiting for what
471  * responses (based on search key).
472  */
473 struct DHTRouteSource
474 {
475   /**
476    * This is a DLL.
477    */
478   struct DHTRouteSource *next;
479
480   /**
481    * This is a DLL.
482    */
483   struct DHTRouteSource *prev;
484
485   /**
486    * UID of the request, 0 if from another peer.
487    */
488   uint64_t uid;
489
490   /**
491    * Source of the request.  Replies should be forwarded to
492    * this peer.
493    */
494   struct GNUNET_PeerIdentity source;
495
496   /**
497    * If this was a local request, remember the client; otherwise NULL.
498    */
499   struct ClientList *client;
500
501   /**
502    * Pointer to this nodes heap location (for removal)
503    */
504   struct GNUNET_CONTAINER_HeapNode *hnode;
505
506   /**
507    * Back pointer to the record storing this information.
508    */
509   struct DHTQueryRecord *record;
510
511   /**
512    * Task to remove this entry on timeout.
513    */
514   GNUNET_SCHEDULER_TaskIdentifier delete_task;
515
516   /**
517    * Bloomfilter of peers we have already sent back as
518    * replies to the initial request.  Allows us to not
519    * forward the same peer multiple times for a find peer
520    * request.
521    */
522   struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
523
524 };
525
526
527 /**
528  * Entry in the DHT routing table.
529  */
530 struct DHTQueryRecord
531 {
532   /**
533    * Head of DLL for result forwarding.
534    */
535   struct DHTRouteSource *head;
536
537   /**
538    * Tail of DLL for result forwarding.
539    */
540   struct DHTRouteSource *tail;
541
542   /**
543    * Key that the record concerns.
544    */
545   GNUNET_HashCode key;
546
547 };
548
549
550 /**
551  * Context used to calculate the number of find peer messages
552  * per X time units since our last scheduled find peer message
553  * was sent.  If we have seen too many messages, delay or don't
554  * send our own out.
555  */
556 struct FindPeerMessageContext
557 {
558   unsigned int count;
559
560   struct GNUNET_TIME_Absolute start;
561
562   struct GNUNET_TIME_Absolute end;
563 };
564
565
566 /**
567  * DHT Routing results structure
568  */
569 struct DHTResults
570 {
571   /**
572    * Min heap for removal upon reaching limit
573    */
574   struct GNUNET_CONTAINER_Heap *minHeap;
575
576
577   /**
578    * Hashmap for fast key based lookup
579    */
580   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
581 };
582
583
584 /**
585  * DHT structure for recent requests.
586  */
587 struct RecentRequests
588 {
589   /**
590    * Min heap for removal upon reaching limit
591    */
592   struct GNUNET_CONTAINER_Heap *minHeap;
593
594 #if HAVE_UID_FOR_TESTING > 1
595   /**
596    * Hashmap for key based lookup
597    */
598   struct GNUNET_CONTAINER_MultiHashMap *hashmap;
599 #endif
600
601 };
602
603
604 struct RecentRequest
605 {
606   /**
607    * Position of this node in the min heap.
608    */
609   struct GNUNET_CONTAINER_HeapNode *heap_node;
610
611   /**
612    * Bloomfilter containing entries for peers
613    * we forwarded this request to.
614    */
615   struct GNUNET_CONTAINER_BloomFilter *bloom;
616
617   /**
618    * Timestamp of this request, for ordering
619    * the min heap.
620    */
621   struct GNUNET_TIME_Absolute timestamp;
622
623   /**
624    * Key of this request.
625    */
626   GNUNET_HashCode key;
627
628   /**
629    * Unique identifier for this request, 0 if from another peer.
630    */
631   uint64_t uid;
632
633   /**
634    * Task to remove this entry on timeout.
635    */
636   GNUNET_SCHEDULER_TaskIdentifier remove_task;
637 };
638
639
640 /**
641  * log of the current network size estimate, used as the point where
642  * we switch between random and deterministic routing.  Default
643  * value of 4.0 is used if NSE module is not available (i.e. not
644  * configured).
645  */
646 static double log_of_network_size_estimate = 4.0;
647
648 /**
649  * Recent requests by hash/uid and by time inserted.
650  */
651 static struct RecentRequests recent;
652
653 /**
654  * Context to use to calculate find peer rates.
655  */
656 static struct FindPeerMessageContext find_peer_context;
657
658 /**
659  * Don't use our routing algorithm, always route
660  * to closest peer; initially send requests to 3
661  * peers.
662  */
663 static int strict_kademlia;
664
665 /**
666  * Routing option to end routing when closest peer found.
667  */
668 static int stop_on_closest;
669
670 /**
671  * Routing option to end routing when data is found.
672  */
673 static int stop_on_found;
674
675 /**
676  * Whether DHT needs to manage find peer requests, or
677  * an external force will do it on behalf of the DHT.
678  */
679 static int do_find_peer;
680
681 /**
682  * Use exactly the forwarding formula as described in
683  * the paper if set to GNUNET_YES, otherwise use the
684  * slightly modified version.
685  */
686 static int paper_forwarding;
687
688 /**
689  * PUT Peer Identities of peers we know about into
690  * the datacache.
691  */
692 static int put_peer_identities;
693
694 /**
695  * Use the "real" distance metric when selecting the
696  * next routing hop.  Can be less accurate.
697  */
698 static int use_real_distance;
699
700 /**
701  * How many peers have we added since we sent out our last
702  * find peer request?
703  */
704 static unsigned int newly_found_peers;
705
706 /**
707  * Container of active queries we should remember
708  */
709 static struct DHTResults forward_list;
710
711 /**
712  * Handle to the datacache service (for inserting/retrieving data)
713  */
714 static struct GNUNET_DATACACHE_Handle *datacache;
715
716 /**
717  * Handle for the statistics service.
718  */
719 struct GNUNET_STATISTICS_Handle *stats;
720
721 /**
722  * Handle to get our current HELLO.
723  */
724 static struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
725
726 /**
727  * The configuration the DHT service is running with
728  */
729 static const struct GNUNET_CONFIGURATION_Handle *cfg;
730
731 /**
732  * Handle to the core service
733  */
734 static struct GNUNET_CORE_Handle *coreAPI;
735
736 /**
737  * Handle to the transport service, for getting our hello
738  */
739 static struct GNUNET_TRANSPORT_Handle *transport_handle;
740
741 /**
742  * The identity of our peer.
743  */
744 static struct GNUNET_PeerIdentity my_identity;
745
746 /**
747  * Short id of the peer, for printing
748  */
749 static char *my_short_id;
750
751 /**
752  * Our HELLO
753  */
754 static struct GNUNET_MessageHeader *my_hello;
755
756 /**
757  * Task to run when we shut down, cleaning up all our trash
758  */
759 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
760
761 /**
762  * The lowest currently used bucket.
763  */
764 static unsigned int lowest_bucket;      /* Initially equal to MAX_BUCKETS - 1 */
765
766 /**
767  * The buckets (Kademlia routing table, complete with growth).
768  * Array of size MAX_BUCKET_SIZE.
769  */
770 static struct PeerBucket k_buckets[MAX_BUCKETS];
771
772 /**
773  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
774  */
775 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
776
777 /**
778  * Recently seen find peer requests.
779  */
780 static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
781
782 /**
783  * Maximum size for each bucket.
784  */
785 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
786
787 /**
788  * List of active clients.
789  */
790 static struct ClientList *client_list;
791
792 /**
793  * Handle to the DHT logger.
794  */
795 static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
796
797 /**
798  * Whether or not to send routing debugging information
799  * to the dht logging server
800  */
801 static unsigned int debug_routes;
802
803 /**
804  * Whether or not to send FULL route information to
805  * logging server
806  */
807 static unsigned int debug_routes_extended;
808
809 /**
810  * GNUNET_YES or GNUNET_NO, whether or not to act as
811  * a malicious node which drops all messages
812  */
813 static unsigned int malicious_dropper;
814
815 /**
816  * GNUNET_YES or GNUNET_NO, whether or not to act as
817  * a malicious node which sends out lots of GETS
818  */
819 static unsigned int malicious_getter;
820
821 /**
822  * GNUNET_YES or GNUNET_NO, whether or not to act as
823  * a malicious node which sends out lots of PUTS
824  */
825 static unsigned int malicious_putter;
826
827 /**
828  * Frequency for malicious get requests.
829  */
830 static struct GNUNET_TIME_Relative malicious_get_frequency;
831
832 /**
833  * Frequency for malicious put requests.
834  */
835 static struct GNUNET_TIME_Relative malicious_put_frequency;
836
837 /**
838  * Kademlia replication
839  */
840 static unsigned long long kademlia_replication;
841
842 /**
843  * Reply times for requests, if we are busy, don't send any
844  * more requests!
845  */
846 static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
847
848 /**
849  * Current counter for replies.
850  */
851 static unsigned int reply_counter;
852
853 /**
854  * Our handle to the BLOCK library.
855  */
856 static struct GNUNET_BLOCK_Context *block_context;
857
858 /**
859  * Network size estimation handle.
860  */
861 static struct GNUNET_NSE_Handle *nse;
862
863
864 /**
865  * Callback that is called when network size estimate is updated.
866  *
867  * @param cls closure
868  * @param timestamp time when the estimate was received from the server (or created by the server)
869  * @param logestimate the log(Base 2) value of the current network size estimate
870  * @param std_dev standard deviation for the estimate
871  *
872  */
873 static void
874 update_network_size_estimate (void *cls, struct GNUNET_TIME_Absolute timestamp,
875                               double logestimate, double std_dev)
876 {
877   log_of_network_size_estimate = logestimate;
878 }
879
880
881 /**
882  * Forward declaration.
883  */
884 static size_t
885 send_generic_reply (void *cls, size_t size, void *buf);
886
887
888 /** Declare here so retry_core_send is aware of it */
889 static size_t
890 core_transmit_notify (void *cls, size_t size, void *buf);
891
892
893 /**
894  * Convert unique ID to hash code.
895  *
896  * @param uid unique ID to convert
897  * @param hash set to uid (extended with zeros)
898  */
899 static void
900 hash_from_uid (uint64_t uid, GNUNET_HashCode * hash)
901 {
902   memset (hash, 0, sizeof (GNUNET_HashCode));
903   *((uint64_t *) hash) = uid;
904 }
905
906
907 /**
908  * Given the largest send delay, artificially decrease it
909  * so the next time around we may have a chance at sending
910  * again.
911  */
912 static void
913 decrease_max_send_delay (struct GNUNET_TIME_Relative max_time)
914 {
915   unsigned int i;
916
917   for (i = 0; i < MAX_REPLY_TIMES; i++)
918   {
919     if (reply_times[i].rel_value == max_time.rel_value)
920     {
921       reply_times[i].rel_value = reply_times[i].rel_value / 2;
922       return;
923     }
924   }
925 }
926
927
928 /**
929  * Find the maximum send time of the recently sent values.
930  *
931  * @return the average time between asking core to send a message
932  *         and when the buffer for copying it is passed
933  */
934 static struct GNUNET_TIME_Relative
935 get_max_send_delay ()
936 {
937   unsigned int i;
938   struct GNUNET_TIME_Relative max_time;
939
940   max_time = GNUNET_TIME_relative_get_zero ();
941
942   for (i = 0; i < MAX_REPLY_TIMES; i++)
943   {
944     if (reply_times[i].rel_value > max_time.rel_value)
945       max_time.rel_value = reply_times[i].rel_value;
946   }
947 #if DEBUG_DHT
948   if (max_time.rel_value > MAX_REQUEST_TIME.rel_value)
949     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n",
950                 (unsigned long long) max_time.rel_value);
951 #endif
952   return max_time;
953 }
954
955
956 static void
957 increment_stats (const char *value)
958 {
959   if (stats == NULL)
960     return;
961   GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
962 }
963
964
965 static void
966 decrement_stats (const char *value)
967 {
968   if (stats == NULL)
969     return;
970   GNUNET_STATISTICS_update (stats, value, -1, GNUNET_NO);
971 }
972
973
974 /**
975  *  Try to send another message from our core send list
976  */
977 static void
978 try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
979 {
980   struct PeerInfo *peer = cls;
981   struct P2PPendingMessage *pending;
982   size_t ssize;
983
984   peer->send_task = GNUNET_SCHEDULER_NO_TASK;
985
986   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
987     return;
988
989   if (peer->th != NULL)
990     return;                     /* Message send already in progress */
991
992   pending = peer->head;
993   if (pending != NULL)
994   {
995     ssize = ntohs (pending->msg->size);
996 #if DEBUG_DHT > 1
997     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n",
999                 my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id));
1000 #endif
1001     pending->scheduled = GNUNET_TIME_absolute_get ();
1002     reply_counter++;
1003     if (reply_counter >= MAX_REPLY_TIMES)
1004       reply_counter = 0;
1005     peer->th =
1006         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
1007                                            pending->importance,
1008                                            pending->timeout, &peer->id, ssize,
1009                                            &core_transmit_notify, peer);
1010     if (peer->th == NULL)
1011       increment_stats ("# notify transmit ready failed");
1012   }
1013 }
1014
1015
1016 /**
1017  * Function called to send a request out to another peer.
1018  * Called both for locally initiated requests and those
1019  * received from other peers.
1020  *
1021  * @param msg the encapsulated message
1022  * @param peer the peer to forward the message to
1023  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1024  */
1025 static void
1026 forward_result_message (const struct GNUNET_MessageHeader *msg,
1027                         struct PeerInfo *peer,
1028                         struct DHT_MessageContext *msg_ctx)
1029 {
1030   struct GNUNET_DHT_P2PRouteResultMessage *result_message;
1031   struct P2PPendingMessage *pending;
1032   size_t msize;
1033   size_t psize;
1034   char *path_start;
1035   char *path_offset;
1036
1037 #if DEBUG_PATH
1038   unsigned int i;
1039 #endif
1040
1041   increment_stats (STAT_RESULT_FORWARDS);
1042   msize =
1043       sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) +
1044       (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len);
1045   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1046   psize = sizeof (struct P2PPendingMessage) + msize;
1047   pending = GNUNET_malloc (psize);
1048   pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
1049   pending->importance = DHT_SEND_PRIORITY;
1050   pending->timeout = GNUNET_TIME_relative_get_forever ();
1051   result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg;
1052   result_message->header.size = htons (msize);
1053   result_message->header.type =
1054       htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
1055   result_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
1056   if (msg_ctx->path_history_len > 0)
1057   {
1058     /* End of pending is where enc_msg starts */
1059     path_start = (char *) &pending[1];
1060     /* Offset by the size of the enc_msg */
1061     path_start += ntohs (msg->size);
1062     memcpy (path_start, msg_ctx->path_history,
1063             msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
1064 #if DEBUG_PATH
1065     for (i = 0; i < msg_ctx->path_history_len; i++)
1066     {
1067       path_offset =
1068           &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                   "(forward_result) Key %s Found peer %d:%s\n",
1071                   GNUNET_h2s (&msg_ctx->key), i,
1072                   GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1073     }
1074 #endif
1075   }
1076   result_message->options = htonl (msg_ctx->msg_options);
1077   result_message->hop_count = htonl (msg_ctx->hop_count + 1);
1078 #if HAVE_UID_FOR_TESTING 
1079   result_message->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1080 #endif
1081   memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1082   /* Copy the enc_msg, then the path history as well! */
1083   memcpy (&result_message[1], msg, ntohs (msg->size));
1084   path_offset = (char *) &result_message[1];
1085   path_offset += ntohs (msg->size);
1086   /* If we have path history, copy it to the end of the whole thing */
1087   if (msg_ctx->path_history_len > 0)
1088     memcpy (path_offset, msg_ctx->path_history,
1089             msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
1090 #if DEBUG_DHT > 1
1091   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092               "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1093               "DHT", msize, GNUNET_i2s (&peer->id));
1094 #endif
1095   peer->pending_count++;
1096   increment_stats ("# pending messages scheduled");
1097   GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1098                                      pending);
1099   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1100     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1101 }
1102
1103
1104 /**
1105  * Called when core is ready to send a message we asked for
1106  * out to the destination.
1107  *
1108  * @param cls closure (NULL)
1109  * @param size number of bytes available in buf
1110  * @param buf where the callee should write the message
1111  * @return number of bytes written to buf
1112  */
1113 static size_t
1114 core_transmit_notify (void *cls, size_t size, void *buf)
1115 {
1116   struct PeerInfo *peer = cls;
1117   char *cbuf = buf;
1118   struct P2PPendingMessage *pending;
1119
1120   size_t off;
1121   size_t msize;
1122
1123   peer->th = NULL;
1124   if (buf == NULL)
1125   {
1126     /* client disconnected */
1127 #if DEBUG_DHT
1128     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n",
1129                 my_short_id, "DHT");
1130 #endif
1131     return 0;
1132   }
1133
1134   if (peer->head == NULL)
1135     return 0;
1136
1137   off = 0;
1138   pending = peer->head;
1139 #if DUMB
1140   reply_times[reply_counter] =
1141       GNUNET_TIME_absolute_get_difference (pending->scheduled,
1142                                            GNUNET_TIME_absolute_get ());
1143   msize = ntohs (pending->msg->size);
1144   if (msize <= size)
1145   {
1146     off = msize;
1147     memcpy (cbuf, pending->msg, msize);
1148     peer->pending_count--;
1149     increment_stats ("# pending messages sent");
1150     GNUNET_assert (peer->pending_count >= 0);
1151     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
1152     GNUNET_free (pending);
1153   }
1154 #else
1155   while (NULL != pending &&
1156          (size - off >= (msize = ntohs (pending->msg->size))))
1157   {
1158     memcpy (&cbuf[off], pending->msg, msize);
1159     off += msize;
1160     peer->pending_count--;
1161     increment_stats ("# pending messages sent");
1162     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
1163     GNUNET_free (pending);
1164     pending = peer->head;
1165   }
1166 #endif
1167   if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
1168     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1169
1170   return off;
1171 }
1172
1173
1174 /**
1175  * Compute the distance between have and target as a 32-bit value.
1176  * Differences in the lower bits must count stronger than differences
1177  * in the higher bits.
1178  *
1179  * @return 0 if have==target, otherwise a number
1180  *           that is larger as the distance between
1181  *           the two hash codes increases
1182  */
1183 static unsigned int
1184 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1185 {
1186   unsigned int bucket;
1187   unsigned int msb;
1188   unsigned int lsb;
1189   unsigned int i;
1190
1191   /* We have to represent the distance between two 2^9 (=512)-bit
1192    * numbers as a 2^5 (=32)-bit number with "0" being used for the
1193    * two numbers being identical; furthermore, we need to
1194    * guarantee that a difference in the number of matching
1195    * bits is always represented in the result.
1196    *
1197    * We use 2^32/2^9 numerical values to distinguish between
1198    * hash codes that have the same LSB bit distance and
1199    * use the highest 2^9 bits of the result to signify the
1200    * number of (mis)matching LSB bits; if we have 0 matching
1201    * and hence 512 mismatching LSB bits we return -1 (since
1202    * 512 itself cannot be represented with 9 bits) */
1203
1204   /* first, calculate the most significant 9 bits of our
1205    * result, aka the number of LSBs */
1206   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1207   /* bucket is now a value between 0 and 512 */
1208   if (bucket == 512)
1209     return 0;                   /* perfect match */
1210   if (bucket == 0)
1211     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1212                                  * below, we'd end up with max+1 (overflow)) */
1213
1214   /* calculate the most significant bits of the final result */
1215   msb = (512 - bucket) << (32 - 9);
1216   /* calculate the 32-9 least significant bits of the final result by
1217    * looking at the differences in the 32-9 bits following the
1218    * mismatching bit at 'bucket' */
1219   lsb = 0;
1220   for (i = bucket + 1;
1221        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1222   {
1223     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
1224         GNUNET_CRYPTO_hash_get_bit (have, i))
1225       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
1226                                                  * last bit set will be 31 -- if
1227                                                  * i does not reach 512 first... */
1228   }
1229   return msb | lsb;
1230 }
1231
1232
1233 /**
1234  * Return a number that is larger the closer the
1235  * "have" GNUNET_hash code is to the "target".
1236  *
1237  * @return inverse distance metric, non-zero.
1238  *         Must fudge the value if NO bits match.
1239  */
1240 static unsigned int
1241 inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
1242 {
1243   if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
1244     return 1;                   /* Never return 0! */
1245   return ((unsigned int) -1) - distance (target, have);
1246 }
1247
1248
1249 /**
1250  * Find the optimal bucket for this key, regardless
1251  * of the current number of buckets in use.
1252  *
1253  * @param hc the hashcode to compare our identity to
1254  *
1255  * @return the proper bucket index, or GNUNET_SYSERR
1256  *         on error (same hashcode)
1257  */
1258 static int
1259 find_bucket (const GNUNET_HashCode * hc)
1260 {
1261   unsigned int bits;
1262
1263   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
1264   if (bits == MAX_BUCKETS)
1265     return GNUNET_SYSERR;
1266   return MAX_BUCKETS - bits - 1;
1267 }
1268
1269
1270 /**
1271  * Find which k-bucket this peer should go into,
1272  * taking into account the size of the k-bucket
1273  * array.  This means that if more bits match than
1274  * there are currently buckets, lowest_bucket will
1275  * be returned.
1276  *
1277  * @param hc GNUNET_HashCode we are finding the bucket for.
1278  *
1279  * @return the proper bucket index for this key,
1280  *         or GNUNET_SYSERR on error (same hashcode)
1281  */
1282 static int
1283 find_current_bucket (const GNUNET_HashCode * hc)
1284 {
1285   int actual_bucket;
1286
1287   actual_bucket = find_bucket (hc);
1288   if (actual_bucket == GNUNET_SYSERR)   /* hc and our peer identity match! */
1289     return lowest_bucket;
1290   if (actual_bucket < lowest_bucket)    /* actual_bucket not yet used */
1291     return lowest_bucket;
1292   return actual_bucket;
1293 }
1294
1295
1296 /**
1297  * Find a routing table entry from a peer identity
1298  *
1299  * @param peer the peer identity to look up
1300  *
1301  * @return the routing table entry, or NULL if not found
1302  */
1303 static struct PeerInfo *
1304 find_peer_by_id (const struct GNUNET_PeerIdentity *peer)
1305 {
1306   int bucket;
1307   struct PeerInfo *pos;
1308
1309   bucket = find_current_bucket (&peer->hashPubKey);
1310
1311   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
1312     return NULL;
1313
1314   pos = k_buckets[bucket].head;
1315   while (pos != NULL)
1316   {
1317     if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity)))
1318       return pos;
1319     pos = pos->next;
1320   }
1321   return NULL;                  /* No such peer. */
1322 }
1323
1324 /* Forward declaration */
1325 static void
1326 update_core_preference (void *cls,
1327                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1328
1329
1330 /**
1331  * Function called with statistics about the given peer.
1332  *
1333  * @param cls closure
1334  * @param peer identifies the peer
1335  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1336  * @param amount set to the amount that was actually reserved or unreserved;
1337  *               either the full requested amount or zero (no partial reservations)
1338  * @param res_delay if the reservation could not be satisfied (amount was 0), how
1339  *        long should the client wait until re-trying?
1340  * @param preference current traffic preference for the given peer
1341  */
1342 static void
1343 update_core_preference_finish (void *cls,
1344                                const struct GNUNET_PeerIdentity *peer,
1345                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1346                                int32_t amount,
1347                                struct GNUNET_TIME_Relative res_delay,
1348                                uint64_t preference)
1349 {
1350   struct PeerInfo *peer_info = cls;
1351
1352   peer_info->info_ctx = NULL;
1353   GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
1354                                 &update_core_preference, peer_info);
1355 }
1356
1357 static void
1358 update_core_preference (void *cls,
1359                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1360 {
1361   struct PeerInfo *peer = cls;
1362   uint64_t preference;
1363   unsigned int matching;
1364
1365   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1366   {
1367     return;
1368   }
1369   matching =
1370       GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
1371                                         &peer->id.hashPubKey);
1372   if (matching >= 64)
1373   {
1374 #if DEBUG_DHT
1375     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1376                 "Peer identifier matches by %u bits, only shifting as much as we can!\n",
1377                 matching);
1378 #endif
1379     matching = 63;
1380   }
1381   preference = 1LL << matching;
1382   peer->info_ctx =
1383       GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
1384                                           GNUNET_TIME_UNIT_FOREVER_REL,
1385                                           GNUNET_BANDWIDTH_VALUE_MAX, 0,
1386                                           preference,
1387                                           &update_core_preference_finish, peer);
1388 }
1389
1390
1391 /**
1392  * Given a peer and its corresponding bucket,
1393  * remove it from that bucket.  Does not free
1394  * the PeerInfo struct, nor cancel messages
1395  * or free messages waiting to be sent to this
1396  * peer!
1397  *
1398  * @param peer the peer to remove
1399  * @param bucket the bucket the peer belongs to
1400  */
1401 static void
1402 remove_peer (struct PeerInfo *peer, unsigned int bucket)
1403 {
1404   GNUNET_assert (k_buckets[bucket].peers_size > 0);
1405   GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail,
1406                                peer);
1407   k_buckets[bucket].peers_size--;
1408 #if CHANGE_LOWEST
1409   if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) &&
1410       (lowest_bucket < MAX_BUCKETS - 1))
1411     lowest_bucket++;
1412 #endif
1413 }
1414
1415 /**
1416  * Removes peer from a bucket, then frees associated
1417  * resources and frees peer.
1418  *
1419  * @param peer peer to be removed and freed
1420  * @param bucket which bucket this peer belongs to
1421  */
1422 static void
1423 delete_peer (struct PeerInfo *peer, unsigned int bucket)
1424 {
1425   struct P2PPendingMessage *pos;
1426   struct P2PPendingMessage *next;
1427
1428   remove_peer (peer, bucket);   /* First remove the peer from its bucket */
1429   if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
1430     GNUNET_SCHEDULER_cancel (peer->send_task);
1431   if ((peer->th != NULL) && (coreAPI != NULL))
1432     GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
1433
1434   pos = peer->head;
1435   while (pos != NULL)           /* Remove any pending messages for this peer */
1436   {
1437     increment_stats
1438         ("# dht pending messages discarded (due to disconnect/shutdown)");
1439     next = pos->next;
1440     GNUNET_free (pos);
1441     pos = next;
1442   }
1443
1444   GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
1445                  (all_known_peers, &peer->id.hashPubKey));
1446   GNUNET_assert (GNUNET_YES ==
1447                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
1448                                                        &peer->id.hashPubKey,
1449                                                        peer));
1450   GNUNET_free (peer);
1451   decrement_stats (STAT_PEERS_KNOWN);
1452 }
1453
1454
1455 /**
1456  * Iterator over hash map entries.
1457  *
1458  * @param cls closure
1459  * @param key current key code
1460  * @param value PeerInfo of the peer to move to new lowest bucket
1461  * @return GNUNET_YES if we should continue to
1462  *         iterate,
1463  *         GNUNET_NO if not.
1464  */
1465 static int
1466 move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value)
1467 {
1468   struct PeerInfo *peer = value;
1469   int new_bucket;
1470
1471   GNUNET_assert (lowest_bucket > 0);
1472   new_bucket = lowest_bucket - 1;
1473   remove_peer (peer, lowest_bucket);
1474   GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head,
1475                                      k_buckets[new_bucket].tail,
1476                                      k_buckets[new_bucket].tail, peer);
1477   k_buckets[new_bucket].peers_size++;
1478   return GNUNET_YES;
1479 }
1480
1481
1482 /**
1483  * The current lowest bucket is full, so change the lowest
1484  * bucket to the next lower down, and move any appropriate
1485  * entries in the current lowest bucket to the new bucket.
1486  */
1487 static void
1488 enable_next_bucket ()
1489 {
1490   struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1491   struct PeerInfo *pos;
1492
1493   GNUNET_assert (lowest_bucket > 0);
1494   to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size);
1495   pos = k_buckets[lowest_bucket].head;
1496
1497   /* Populate the array of peers which should be in the next lowest bucket */
1498   while (pos != NULL)
1499   {
1500     if (find_bucket (&pos->id.hashPubKey) < lowest_bucket)
1501       GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos,
1502                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1503     pos = pos->next;
1504   }
1505
1506   /* Remove peers from lowest bucket, insert into next lowest bucket */
1507   GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL);
1508   GNUNET_CONTAINER_multihashmap_destroy (to_remove);
1509   lowest_bucket = lowest_bucket - 1;
1510 }
1511
1512
1513 /**
1514  * Find the closest peer in our routing table to the
1515  * given hashcode.
1516  *
1517  * @return The closest peer in our routing table to the
1518  *         key, or NULL on error.
1519  */
1520 static struct PeerInfo *
1521 find_closest_peer (const GNUNET_HashCode * hc)
1522 {
1523   struct PeerInfo *pos;
1524   struct PeerInfo *current_closest;
1525   unsigned int lowest_distance;
1526   unsigned int temp_distance;
1527   int bucket;
1528   int count;
1529
1530   lowest_distance = -1;
1531
1532   if (k_buckets[lowest_bucket].peers_size == 0)
1533     return NULL;
1534
1535   current_closest = NULL;
1536   for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1537   {
1538     pos = k_buckets[bucket].head;
1539     count = 0;
1540     while ((pos != NULL) && (count < bucket_size))
1541     {
1542       temp_distance = distance (&pos->id.hashPubKey, hc);
1543       if (temp_distance <= lowest_distance)
1544       {
1545         lowest_distance = temp_distance;
1546         current_closest = pos;
1547       }
1548       pos = pos->next;
1549       count++;
1550     }
1551   }
1552   GNUNET_assert (current_closest != NULL);
1553   return current_closest;
1554 }
1555
1556
1557 /**
1558  * Function called to send a request out to another peer.
1559  * Called both for locally initiated requests and those
1560  * received from other peers.
1561  *
1562  * @param msg the encapsulated message
1563  * @param peer the peer to forward the message to
1564  * @param msg_ctx the context of the message (hop count, bloom, etc.)
1565  */
1566 static void
1567 forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer,
1568                  struct DHT_MessageContext *msg_ctx)
1569 {
1570   struct GNUNET_DHT_P2PRouteMessage *route_message;
1571   struct P2PPendingMessage *pending;
1572   size_t msize;
1573   size_t psize;
1574   char *route_path;
1575
1576   increment_stats (STAT_ROUTE_FORWARDS);
1577   GNUNET_assert (peer != NULL);
1578   if ((msg_ctx->closest != GNUNET_YES) &&
1579       (peer == find_closest_peer (&msg_ctx->key)))
1580     increment_stats (STAT_ROUTE_FORWARDS_CLOSEST);
1581
1582   msize =
1583       sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) +
1584       (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1585   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1586   psize = sizeof (struct P2PPendingMessage) + msize;
1587   pending = GNUNET_malloc (psize);
1588   pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
1589   pending->importance = msg_ctx->importance;
1590   pending->timeout = msg_ctx->timeout;
1591   route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg;
1592   route_message->header.size = htons (msize);
1593   route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1594   route_message->options = htonl (msg_ctx->msg_options);
1595   route_message->hop_count = htonl (msg_ctx->hop_count + 1);
1596   route_message->network_size = htonl (msg_ctx->network_size);
1597   route_message->desired_replication_level = htonl (msg_ctx->replication);
1598 #if HAVE_UID_FOR_TESTING 
1599   route_message->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1600 #endif
1601   if (msg_ctx->bloom != NULL)
1602     GNUNET_assert (GNUNET_OK ==
1603                    GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
1604                                                               route_message->
1605                                                               bloomfilter,
1606                                                               DHT_BLOOM_SIZE));
1607   memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1608   memcpy (&route_message[1], msg, ntohs (msg->size));
1609   if (GNUNET_DHT_RO_RECORD_ROUTE ==
1610       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1611   {
1612     route_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
1613     /* Set pointer to start of enc_msg */
1614     route_path = (char *) &route_message[1];
1615     /* Offset to the end of the enc_msg */
1616     route_path += ntohs (msg->size);
1617     /* Copy the route_path after enc_msg */
1618     memcpy (route_path, msg_ctx->path_history,
1619             msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1620   }
1621 #if DEBUG_DHT > 1
1622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1623               "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1624               "DHT", msize, GNUNET_i2s (&peer->id));
1625 #endif
1626   peer->pending_count++;
1627   increment_stats ("# pending messages scheduled");
1628   GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1629                                      pending);
1630   if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1631     peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1632 }
1633
1634
1635 /**
1636  * Task run to check for messages that need to be sent to a client.
1637  *
1638  * @param client a ClientList, containing the client and any messages to be sent to it
1639  */
1640 static void
1641 process_pending_messages (struct ClientList *client)
1642 {
1643   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
1644     return;
1645   client->transmit_handle =
1646       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
1647                                            ntohs (client->pending_head->
1648                                                   msg->size),
1649                                            GNUNET_TIME_UNIT_FOREVER_REL,
1650                                            &send_generic_reply, client);
1651 }
1652
1653 /**
1654  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
1655  * request.  A ClientList is passed as closure, take the head of the list
1656  * and copy it into buf, which has the result of sending the message to the
1657  * client.
1658  *
1659  * @param cls closure to this call
1660  * @param size maximum number of bytes available to send
1661  * @param buf where to copy the actual message to
1662  *
1663  * @return the number of bytes actually copied, 0 indicates failure
1664  */
1665 static size_t
1666 send_generic_reply (void *cls, size_t size, void *buf)
1667 {
1668   struct ClientList *client = cls;
1669   char *cbuf = buf;
1670   struct PendingMessage *reply;
1671   size_t off;
1672   size_t msize;
1673
1674   client->transmit_handle = NULL;
1675   if (buf == NULL)
1676   {
1677     /* client disconnected */
1678     return 0;
1679   }
1680   off = 0;
1681   while ((NULL != (reply = client->pending_head)) &&
1682          (size >= off + (msize = ntohs (reply->msg->size))))
1683   {
1684     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
1685                                  reply);
1686     memcpy (&cbuf[off], reply->msg, msize);
1687     GNUNET_free (reply);
1688     off += msize;
1689   }
1690   process_pending_messages (client);
1691 #if DEBUG_DHT
1692   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1693               "Transmitted %u bytes of replies to client\n",
1694               (unsigned int) off);
1695 #endif
1696   return off;
1697 }
1698
1699
1700 /**
1701  * Add a PendingMessage to the clients list of messages to be sent
1702  *
1703  * @param client the active client to send the message to
1704  * @param pending_message the actual message to send
1705  */
1706 static void
1707 add_pending_message (struct ClientList *client,
1708                      struct PendingMessage *pending_message)
1709 {
1710   GNUNET_CONTAINER_DLL_insert_after (client->pending_head, client->pending_tail,
1711                                      client->pending_tail, pending_message);
1712   process_pending_messages (client);
1713 }
1714
1715
1716 /**
1717  * Called when a reply needs to be sent to a client, as
1718  * a result it found to a GET or FIND PEER request.
1719  *
1720  * @param client the client to send the reply to
1721  * @param message the encapsulated message to send
1722  * @param msg_ctx the context of the received message
1723  */
1724 static void
1725 send_reply_to_client (struct ClientList *client,
1726                       const struct GNUNET_MessageHeader *message,
1727                       struct DHT_MessageContext *msg_ctx)
1728 {
1729   struct GNUNET_DHT_RouteResultMessage *reply;
1730   struct PendingMessage *pending_message;
1731   uint16_t msize;
1732   size_t tsize;
1733   char *reply_offset;
1734
1735 #if DEBUG_PATH
1736   char *path_offset;
1737   unsigned int i;
1738 #endif
1739 #if DEBUG_DHT
1740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n",
1741               my_short_id, "DHT");
1742 #endif
1743   msize = ntohs (message->size);
1744   tsize =
1745       sizeof (struct GNUNET_DHT_RouteResultMessage) + msize +
1746       (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1747   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1748   {
1749     GNUNET_break_op (0);
1750     return;
1751   }
1752   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1753   pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1];
1754   reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1];
1755   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1756   reply->header.size = htons (tsize);
1757   reply->outgoing_path_length = htonl (msg_ctx->path_history_len);
1758   reply->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1759   memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1760   reply_offset = (char *) &reply[1];
1761   memcpy (&reply[1], message, msize);
1762   if (msg_ctx->path_history_len > 0)
1763   {
1764     reply_offset += msize;
1765     memcpy (reply_offset, msg_ctx->path_history,
1766             msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1767   }
1768 #if DEBUG_PATH
1769   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770               "Returning message with outgoing path length %d\n",
1771               msg_ctx->path_history_len);
1772   for (i = 0; i < msg_ctx->path_history_len; i++)
1773   {
1774     path_offset =
1775         &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1776     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found peer %d:%s\n", i,
1777                 GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1778   }
1779 #endif
1780   add_pending_message (client, pending_message);
1781 }
1782
1783 /**
1784  * Consider whether or not we would like to have this peer added to
1785  * our routing table.  Check whether bucket for this peer is full,
1786  * if so return negative; if not return positive.  Since peers are
1787  * only added on CORE level connect, this doesn't actually add the
1788  * peer to the routing table.
1789  *
1790  * @param peer the peer we are considering adding
1791  *
1792  * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1793  *         already full)
1794  */
1795 static int
1796 consider_peer (struct GNUNET_PeerIdentity *peer)
1797 {
1798   int bucket;
1799
1800   if ((GNUNET_YES ==
1801        GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
1802                                                &peer->hashPubKey)) ||
1803       (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))))
1804     return GNUNET_NO;           /* We already know this peer (are connected even!) */
1805   bucket = find_current_bucket (&peer->hashPubKey);
1806
1807   if ((k_buckets[bucket].peers_size < bucket_size) ||
1808       ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1809     return GNUNET_YES;
1810
1811   return GNUNET_NO;
1812 }
1813
1814
1815 /**
1816  * Task used to remove forwarding entries, either
1817  * after timeout, when full, or on shutdown.
1818  *
1819  * @param cls the entry to remove
1820  * @param tc context, reason, etc.
1821  */
1822 static void
1823 remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1824 {
1825   struct DHTRouteSource *source_info = cls;
1826   struct DHTQueryRecord *record;
1827
1828   source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode);
1829   record = source_info->record;
1830   GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1831
1832   if (record->head == NULL)     /* No more entries in DLL */
1833   {
1834     GNUNET_assert (GNUNET_YES ==
1835                    GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1836                                                          &record->key, record));
1837     GNUNET_free (record);
1838   }
1839   if (source_info->find_peers_responded != NULL)
1840     GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1841   GNUNET_free (source_info);
1842 }
1843
1844 /**
1845  * Main function that handles whether or not to route a result
1846  * message to other peers, or to send to our local client.
1847  *
1848  * @param msg the result message to be routed
1849  * @param msg_ctx context of the message we are routing
1850  *
1851  * @return the number of peers the message was routed to,
1852  *         GNUNET_SYSERR on failure
1853  */
1854 static int
1855 route_result_message (struct GNUNET_MessageHeader *msg,
1856                       struct DHT_MessageContext *msg_ctx)
1857 {
1858   struct GNUNET_PeerIdentity new_peer;
1859   struct DHTQueryRecord *record;
1860   struct DHTRouteSource *pos;
1861   struct PeerInfo *peer_info;
1862   const struct GNUNET_MessageHeader *hello_msg;
1863
1864 #if DEBUG_DHT > 1
1865   unsigned int i;
1866 #endif
1867
1868   increment_stats (STAT_RESULTS);
1869   /**
1870    * If a find peer result message is received and contains a valid
1871    * HELLO for another peer, offer it to the transport service.
1872    */
1873   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1874   {
1875     if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader))
1876       GNUNET_break_op (0);
1877
1878     hello_msg = &msg[1];
1879     if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1880         (GNUNET_SYSERR ==
1881          GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg,
1882                               &new_peer)))
1883     {
1884       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1885                   "%s:%s Received non-HELLO message type in find peer result message!\n",
1886                   my_short_id, "DHT");
1887       GNUNET_break_op (0);
1888       return GNUNET_NO;
1889     }
1890     else                        /* We have a valid hello, and peer id stored in new_peer */
1891     {
1892       find_peer_context.count++;
1893       increment_stats (STAT_FIND_PEER_REPLY);
1894       if (GNUNET_YES == consider_peer (&new_peer))
1895       {
1896         increment_stats (STAT_HELLOS_PROVIDED);
1897         GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL);
1898         GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL);
1899       }
1900     }
1901   }
1902
1903   if (malicious_dropper == GNUNET_YES)
1904     record = NULL;
1905   else
1906     record =
1907         GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1908
1909   if (record == NULL)           /* No record of this message! */
1910   {
1911 #if DEBUG_DHT
1912     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1913                 "`%s:%s': Have no record of response key %s uid %llu\n",
1914                 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1915                 msg_ctx->unique_id);
1916 #endif
1917 #if DEBUG_DHT_ROUTING
1918     if ((debug_routes_extended) && (dhtlog_handle != NULL))
1919     {
1920       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
1921                                    msg_ctx->hop_count, GNUNET_SYSERR,
1922                                    &my_identity, &msg_ctx->key, &msg_ctx->peer,
1923                                    NULL);
1924     }
1925 #endif
1926     return 0;
1927   }
1928
1929   pos = record->head;
1930   while (pos != NULL)
1931   {
1932 #if STRICT_FORWARDING
1933     if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)  /* If we have already forwarded this peer id, don't do it again! */
1934     {
1935       if (GNUNET_YES ==
1936           GNUNET_CONTAINER_bloomfilter_test (pos->find_peers_responded,
1937                                              &new_peer.hashPubKey))
1938       {
1939         increment_stats ("# find peer responses NOT forwarded (bloom match)");
1940         pos = pos->next;
1941         continue;
1942       }
1943       else
1944         GNUNET_CONTAINER_bloomfilter_add (pos->find_peers_responded,
1945                                           &new_peer.hashPubKey);
1946     }
1947 #endif
1948
1949     if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity)))  /* Local client (or DHT) initiated request! */
1950     {
1951 #if DEBUG_DHT
1952       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1953                   "`%s:%s': Sending response key %s uid %llu to client\n",
1954                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1955                   msg_ctx->unique_id);
1956 #endif
1957 #if DEBUG_DHT_ROUTING
1958       if ((debug_routes_extended) && (dhtlog_handle != NULL))
1959       {
1960         dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
1961                                      msg_ctx->hop_count, GNUNET_YES,
1962                                      &my_identity, &msg_ctx->key, &msg_ctx->peer,
1963                                      NULL);
1964       }
1965 #endif
1966       increment_stats (STAT_RESULTS_TO_CLIENT);
1967       if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
1968         increment_stats (STAT_GET_REPLY);
1969 #if DEBUG_DHT > 1
1970       for (i = 0; i < msg_ctx->path_history_len; i++)
1971       {
1972         char *path_offset;
1973
1974         path_offset =
1975             &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1976         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1977                     "(before client) Key %s Found peer %d:%s\n",
1978                     GNUNET_h2s (&msg_ctx->key), i,
1979                     GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1980       }
1981 #endif
1982       send_reply_to_client (pos->client, msg, msg_ctx);
1983     }
1984     else                        /* Send to peer */
1985     {
1986       peer_info = find_peer_by_id (&pos->source);
1987       if (peer_info == NULL)    /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1988       {
1989         pos = pos->next;
1990         continue;
1991       }
1992 #if DEBUG_DHT
1993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994                   "`%s:%s': Forwarding response key %s uid %llu to peer %s\n",
1995                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1996                   msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
1997 #endif
1998 #if DEBUG_DHT_ROUTING
1999       if ((debug_routes_extended) && (dhtlog_handle != NULL))
2000       {
2001         dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_RESULT,
2002                                      msg_ctx->hop_count, GNUNET_NO,
2003                                      &my_identity, &msg_ctx->key,
2004                                      &msg_ctx->peer, &pos->source);
2005       }
2006 #endif
2007       forward_result_message (msg, peer_info, msg_ctx);
2008       /* Try removing forward entries after sending once, only allows ONE response per request */
2009       if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
2010       {
2011         GNUNET_SCHEDULER_cancel (pos->delete_task);
2012         pos->delete_task =
2013           GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);        
2014       }
2015     }
2016     pos = pos->next;
2017   }
2018   return 0;
2019 }
2020
2021
2022 /**
2023  * Iterator for local get request results,
2024  *
2025  * @param cls closure for iterator, a DatacacheGetContext
2026  * @param exp when does this value expire?
2027  * @param key the key this data is stored under
2028  * @param size the size of the data identified by key
2029  * @param data the actual data
2030  * @param type the type of the data
2031  *
2032  * @return GNUNET_OK to continue iteration, anything else
2033  * to stop iteration.
2034  */
2035 static int
2036 datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
2037                         const GNUNET_HashCode * key, size_t size,
2038                         const char *data, enum GNUNET_BLOCK_Type type)
2039 {
2040   struct DHT_MessageContext *msg_ctx = cls;
2041   struct DHT_MessageContext new_msg_ctx;
2042   struct GNUNET_DHT_GetResultMessage *get_result;
2043   enum GNUNET_BLOCK_EvaluationResult eval;
2044   const struct DHTPutEntry *put_entry;
2045   int get_size;
2046   char *path_offset;
2047
2048 #if DEBUG_PATH
2049   unsigned int i;
2050 #endif
2051
2052 #if DEBUG_DHT
2053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054               "`%s:%s': Received `%s' response from datacache\n", my_short_id,
2055               "DHT", "GET");
2056 #endif
2057
2058   put_entry = (const struct DHTPutEntry *) data;
2059
2060   if (size !=
2061       sizeof (struct DHTPutEntry) + put_entry->data_size +
2062       (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)))
2063   {
2064     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2065                 "Path + data size doesn't add up for data inserted into datacache!\nData size %d, path length %d, expected %d, got %d\n",
2066                 put_entry->data_size, put_entry->path_length,
2067                 sizeof (struct DHTPutEntry) + put_entry->data_size +
2068                 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)),
2069                 size);
2070     msg_ctx->do_forward = GNUNET_NO;
2071     return GNUNET_OK;
2072   }
2073
2074   eval =
2075       GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf,
2076                              msg_ctx->reply_bf_mutator, msg_ctx->xquery,
2077                              msg_ctx->xquery_size, &put_entry[1],
2078                              put_entry->data_size);
2079
2080   switch (eval)
2081   {
2082   case GNUNET_BLOCK_EVALUATION_OK_LAST:
2083     msg_ctx->do_forward = GNUNET_NO;
2084   case GNUNET_BLOCK_EVALUATION_OK_MORE:
2085     memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
2086     if (GNUNET_DHT_RO_RECORD_ROUTE ==
2087         (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2088     {
2089       new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
2090 #if DEBUG_PATH
2091       for (i = 0; i < new_msg_ctx.path_history_len; i++)
2092       {
2093         path_offset =
2094             &new_msg_ctx.path_history[i * sizeof (struct GNUNET_PeerIdentity)];
2095         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2096                     "(get_iterator) Key %s Found peer %d:%s\n",
2097                     GNUNET_h2s (&msg_ctx->key), i,
2098                     GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
2099       }
2100 #endif
2101     }
2102
2103     get_size =
2104         sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size +
2105         (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity));
2106     get_result = GNUNET_malloc (get_size);
2107     get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2108     get_result->header.size = htons (get_size);
2109     get_result->expiration = GNUNET_TIME_absolute_hton (exp);
2110     get_result->type = htons (type);
2111     get_result->put_path_length = htons (put_entry->path_length);
2112     path_offset = (char *) &put_entry[1];
2113     path_offset += put_entry->data_size;
2114 #if DEBUG_PATH
2115     for (i = 0; i < put_entry->path_length; i++)
2116     {
2117       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2118                   "(get_iterator PUT path) Key %s Found peer %d:%s\n",
2119                   GNUNET_h2s (&msg_ctx->key), i,
2120                   GNUNET_i2s ((struct GNUNET_PeerIdentity *)
2121                               &path_offset[i *
2122                                            sizeof (struct
2123                                                    GNUNET_PeerIdentity)]));
2124     }
2125 #endif
2126     /* Copy the actual data and the path_history to the end of the get result */
2127     memcpy (&get_result[1], &put_entry[1],
2128             put_entry->data_size +
2129             (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)));
2130     new_msg_ctx.peer = my_identity;
2131     new_msg_ctx.bloom = NULL;
2132     new_msg_ctx.hop_count = 0;
2133     new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;   /* Make result routing a higher priority */
2134     new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
2135     increment_stats (STAT_GET_RESPONSE_START);
2136     route_result_message (&get_result->header, &new_msg_ctx);
2137     GNUNET_free (get_result);
2138     break;
2139   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2140 #if DEBUG_DHT
2141     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n",
2142                 my_short_id, "DHT");
2143 #endif
2144     break;
2145   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2146 #if DEBUG_DHT
2147     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n",
2148                 my_short_id, "DHT");
2149 #endif
2150     break;
2151   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2152 #if DEBUG_DHT
2153     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2154                 "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
2155 #endif
2156     GNUNET_break (0);
2157     break;
2158   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2159     GNUNET_break_op (0);
2160     msg_ctx->do_forward = GNUNET_NO;
2161     break;
2162   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2163 #if DEBUG_DHT
2164     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2165                 "`%s:%s': Unsupported block type (%u) in response!\n",
2166                 my_short_id, "DHT", type);
2167 #endif
2168     /* msg_ctx->do_forward = GNUNET_NO;  // not sure... */
2169     break;
2170   }
2171   return GNUNET_OK;
2172 }
2173
2174
2175 /**
2176  * Main function that handles whether or not to route a message to other
2177  * peers.
2178  *
2179  * @param msg the message to be routed
2180  * @param msg_ctx the context containing all pertinent information about the message
2181  */
2182 static void
2183 route_message (const struct GNUNET_MessageHeader *msg,
2184                struct DHT_MessageContext *msg_ctx);
2185
2186
2187 /**
2188  * Server handler for all dht get requests, look for data,
2189  * if found, send response either to clients or other peers.
2190  *
2191  * @param msg the actual get message
2192  * @param msg_ctx struct containing pertinent information about the get request
2193  *
2194  * @return number of items found for GET request
2195  */
2196 static unsigned int
2197 handle_dht_get (const struct GNUNET_MessageHeader *msg,
2198                 struct DHT_MessageContext *msg_ctx)
2199 {
2200   const struct GNUNET_DHT_GetMessage *get_msg;
2201   uint16_t msize;
2202   uint16_t bf_size;
2203   unsigned int results;
2204   const char *end;
2205   enum GNUNET_BLOCK_Type type;
2206
2207   msize = ntohs (msg->size);
2208   if (msize < sizeof (struct GNUNET_DHT_GetMessage))
2209   {
2210     GNUNET_break (0);
2211     return 0;
2212   }
2213   get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
2214   bf_size = ntohs (get_msg->bf_size);
2215   msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
2216   msg_ctx->reply_bf_mutator = get_msg->bf_mutator;
2217   if (msize !=
2218       sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
2219   {
2220     GNUNET_break_op (0);
2221     return 0;
2222   }
2223   end = (const char *) &get_msg[1];
2224   if (msg_ctx->xquery_size == 0)
2225   {
2226     msg_ctx->xquery = NULL;
2227   }
2228   else
2229   {
2230     msg_ctx->xquery = (const void *) end;
2231     end += msg_ctx->xquery_size;
2232   }
2233   if (bf_size == 0)
2234   {
2235     msg_ctx->reply_bf = NULL;
2236   }
2237   else
2238   {
2239     msg_ctx->reply_bf =
2240         GNUNET_CONTAINER_bloomfilter_init (end, bf_size,
2241                                            GNUNET_DHT_GET_BLOOMFILTER_K);
2242   }
2243   type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
2244 #if DEBUG_DHT
2245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2246               "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
2247               my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key),
2248               msg_ctx->unique_id);
2249 #endif
2250   increment_stats (STAT_GETS);
2251   results = 0;
2252 #if HAVE_MALICIOUS
2253   if (type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE)
2254   {
2255     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
2256     return results;
2257   }
2258 #endif
2259   msg_ctx->do_forward = GNUNET_YES;
2260   if (datacache != NULL)
2261     results =
2262         GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
2263                               &datacache_get_iterator, msg_ctx);
2264 #if DEBUG_DHT
2265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2266               "`%s:%s': Found %d results for `%s' request uid %llu\n",
2267               my_short_id, "DHT", results, "GET", msg_ctx->unique_id);
2268 #endif
2269   if (results >= 1)
2270   {
2271 #if DEBUG_DHT_ROUTING
2272     if ((debug_routes) && (dhtlog_handle != NULL))
2273     {
2274       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_GET,
2275                                    msg_ctx->hop_count, GNUNET_YES, &my_identity,
2276                                    &msg_ctx->key);
2277     }
2278
2279     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2280     {
2281       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2282                                    msg_ctx->hop_count, GNUNET_YES, &my_identity,
2283                                    &msg_ctx->key, &msg_ctx->peer, NULL);
2284     }
2285 #endif
2286   }
2287   else
2288   {
2289     /* check query valid */
2290     if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID ==
2291         GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key,
2292                                &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator,
2293                                msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0))
2294     {
2295       GNUNET_break_op (0);
2296       msg_ctx->do_forward = GNUNET_NO;
2297     }
2298   }
2299
2300   if (msg_ctx->hop_count == 0)  /* Locally initiated request */
2301   {
2302 #if DEBUG_DHT_ROUTING
2303     if ((debug_routes) && (dhtlog_handle != NULL))
2304     {
2305       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_GET,
2306                                    msg_ctx->hop_count, GNUNET_NO, &my_identity,
2307                                    &msg_ctx->key);
2308     }
2309 #endif
2310   }
2311   if (msg_ctx->do_forward == GNUNET_YES)
2312     route_message (msg, msg_ctx);
2313   GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
2314   return results;
2315 }
2316
2317
2318 static void
2319 remove_recent_find_peer (void *cls,
2320                          const struct GNUNET_SCHEDULER_TaskContext *tc)
2321 {
2322   GNUNET_HashCode *key = cls;
2323
2324   GNUNET_assert (GNUNET_YES ==
2325                  GNUNET_CONTAINER_multihashmap_remove
2326                  (recent_find_peer_requests, key, NULL));
2327   GNUNET_free (key);
2328 }
2329
2330
2331 /**
2332  * Server handler for initiating local dht find peer requests
2333  *
2334  * @param find_msg the actual find peer message
2335  * @param msg_ctx struct containing pertinent information about the request
2336  *
2337  */
2338 static void
2339 handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
2340                       struct DHT_MessageContext *msg_ctx)
2341 {
2342   struct GNUNET_MessageHeader *find_peer_result;
2343   struct GNUNET_DHT_FindPeerMessage *find_peer_message;
2344   struct DHT_MessageContext *new_msg_ctx;
2345   struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
2346   size_t hello_size;
2347   size_t tsize;
2348   GNUNET_HashCode *recent_hash;
2349   struct GNUNET_MessageHeader *other_hello;
2350   size_t other_hello_size;
2351   struct GNUNET_PeerIdentity peer_id;
2352
2353   find_peer_message = (struct GNUNET_DHT_FindPeerMessage *) find_msg;
2354   GNUNET_break_op (ntohs (find_msg->size) >=
2355                    (sizeof (struct GNUNET_DHT_FindPeerMessage)));
2356   if (ntohs (find_msg->size) < sizeof (struct GNUNET_DHT_FindPeerMessage))
2357     return;
2358   other_hello = NULL;
2359   other_hello_size = 0;
2360   if (ntohs (find_msg->size) > sizeof (struct GNUNET_DHT_FindPeerMessage))
2361   {
2362     other_hello_size =
2363         ntohs (find_msg->size) - sizeof (struct GNUNET_DHT_FindPeerMessage);
2364     other_hello = GNUNET_malloc (other_hello_size);
2365     memcpy (other_hello, &find_peer_message[1], other_hello_size);
2366     if ((GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) other_hello) == 0)
2367         || (GNUNET_OK !=
2368             GNUNET_HELLO_get_id ((struct GNUNET_HELLO_Message *) other_hello,
2369                                  &peer_id)))
2370     {
2371       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2372                   "Received invalid HELLO message in find peer request!\n");
2373       GNUNET_free (other_hello);
2374       return;
2375     }
2376 #if FIND_PEER_WITH_HELLO
2377     if (GNUNET_YES == consider_peer (&peer_id))
2378     {
2379       increment_stats (STAT_HELLOS_PROVIDED);
2380       GNUNET_TRANSPORT_offer_hello (transport_handle, other_hello, NULL, NULL);
2381       GNUNET_CORE_peer_request_connect (coreAPI, &peer_id, NULL, NULL);
2382       route_message (find_msg, msg_ctx);
2383       GNUNET_free (other_hello);
2384       return;
2385     }
2386     else                        /* We don't want this peer! */
2387     {
2388       route_message (find_msg, msg_ctx);
2389       GNUNET_free (other_hello);
2390       return;
2391     }
2392 #endif
2393   }
2394
2395 #if DEBUG_DHT
2396   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2397               "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
2398               my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&msg_ctx->key),
2399               ntohs (find_msg->size), sizeof (struct GNUNET_MessageHeader));
2400 #endif
2401   if (my_hello == NULL)
2402   {
2403 #if DEBUG_DHT
2404     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405                 "`%s': Our HELLO is null, can't return.\n", "DHT");
2406 #endif
2407     GNUNET_free_non_null (other_hello);
2408     route_message (find_msg, msg_ctx);
2409     return;
2410   }
2411
2412   incoming_bloom =
2413       GNUNET_CONTAINER_bloomfilter_init (find_peer_message->bloomfilter,
2414                                          DHT_BLOOM_SIZE, DHT_BLOOM_K);
2415   if (GNUNET_YES ==
2416       GNUNET_CONTAINER_bloomfilter_test (incoming_bloom,
2417                                          &my_identity.hashPubKey))
2418   {
2419     increment_stats (STAT_BLOOM_FIND_PEER);
2420     GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
2421     GNUNET_free_non_null (other_hello);
2422     route_message (find_msg, msg_ctx);
2423     return;                     /* We match the bloomfilter, do not send a response to this peer (they likely already know us!) */
2424   }
2425   GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
2426
2427 #if RESTRICT_FIND_PEER
2428
2429   /**
2430    * Ignore any find peer requests from a peer we have seen very recently.
2431    */
2432   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent_find_peer_requests, &msg_ctx->key))  /* We have recently responded to a find peer request for this peer! */
2433   {
2434     increment_stats ("# dht find peer requests ignored (recently seen!)");
2435     GNUNET_free_non_null (other_hello);
2436     return;
2437   }
2438
2439   /**
2440    * Use this check to only allow the peer to respond to find peer requests if
2441    * it would be beneficial to have the requesting peer in this peers routing
2442    * table.  Can be used to thwart peers flooding the network with find peer
2443    * requests that we don't care about.  However, if a new peer is joining
2444    * the network and has no other peers this is a problem (assume all buckets
2445    * full, no one will respond!).
2446    */
2447   memcpy (&peer_id.hashPubKey, &msg_ctx->key, sizeof (GNUNET_HashCode));
2448   if (GNUNET_NO == consider_peer (&peer_id))
2449   {
2450     increment_stats ("# dht find peer requests ignored (do not need!)");
2451     GNUNET_free_non_null (other_hello);
2452     route_message (find_msg, msg_ctx);
2453     return;
2454   }
2455 #endif
2456
2457   recent_hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
2458   memcpy (recent_hash, &msg_ctx->key, sizeof (GNUNET_HashCode));
2459   if (GNUNET_SYSERR !=
2460       GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests,
2461                                          &msg_ctx->key, NULL,
2462                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
2463   {
2464 #if DEBUG_DHT
2465     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2466                 "Adding recent remove task for key `%s`!\n",
2467                 GNUNET_h2s (&msg_ctx->key));
2468 #endif
2469     /* Only add a task if there wasn't one for this key already! */
2470     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
2471                                   (GNUNET_TIME_UNIT_SECONDS, 30),
2472                                   &remove_recent_find_peer, recent_hash);
2473   }
2474   else
2475   {
2476     GNUNET_free (recent_hash);
2477 #if DEBUG_DHT
2478     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2479                 "Received duplicate find peer request too soon!\n");
2480 #endif
2481   }
2482
2483   /* Simplistic find_peer functionality, always return our hello */
2484   hello_size = ntohs (my_hello->size);
2485   tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
2486
2487   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2488   {
2489     GNUNET_break_op (0);
2490     GNUNET_free_non_null (other_hello);
2491     return;
2492   }
2493
2494   find_peer_result = GNUNET_malloc (tsize);
2495   find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
2496   find_peer_result->size = htons (tsize);
2497   memcpy (&find_peer_result[1], my_hello, hello_size);
2498 #if DEBUG_DHT
2499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2500               "`%s': Sending hello size %d to requesting peer.\n", "DHT",
2501               hello_size);
2502 #endif
2503
2504   new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
2505   memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
2506   new_msg_ctx->peer = my_identity;
2507   new_msg_ctx->bloom =
2508       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2509   new_msg_ctx->hop_count = 0;
2510   new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;     /* Make find peer requests a higher priority */
2511   new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2512   increment_stats (STAT_FIND_PEER_ANSWER);
2513   if (GNUNET_DHT_RO_RECORD_ROUTE ==
2514       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2515   {
2516     new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
2517     new_msg_ctx->path_history_len = msg_ctx->path_history_len;
2518     /* Assign to previous msg_ctx path history, caller should free after our return */
2519     new_msg_ctx->path_history = msg_ctx->path_history;
2520   }
2521   route_result_message (find_peer_result, new_msg_ctx);
2522   GNUNET_free (new_msg_ctx);
2523 #if DEBUG_DHT_ROUTING
2524   if ((debug_routes) && (dhtlog_handle != NULL))
2525   {
2526     dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_FIND_PEER,
2527                                  msg_ctx->hop_count, GNUNET_YES, &my_identity,
2528                                  &msg_ctx->key);
2529   }
2530 #endif
2531   GNUNET_free_non_null (other_hello);
2532   GNUNET_free (find_peer_result);
2533   route_message (find_msg, msg_ctx);
2534 }
2535
2536
2537 /**
2538  * Server handler for initiating local dht put requests
2539  *
2540  * @param msg the actual put message
2541  * @param msg_ctx struct containing pertinent information about the request
2542  */
2543 static void
2544 handle_dht_put (const struct GNUNET_MessageHeader *msg,
2545                 struct DHT_MessageContext *msg_ctx)
2546 {
2547   const struct GNUNET_DHT_PutMessage *put_msg;
2548   struct DHTPutEntry *put_entry;
2549   unsigned int put_size;
2550   char *path_offset;
2551   enum GNUNET_BLOCK_Type put_type;
2552   size_t data_size;
2553   int ret;
2554   GNUNET_HashCode key;
2555   struct DHTQueryRecord *record;
2556
2557   GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage));
2558
2559   put_msg = (const struct GNUNET_DHT_PutMessage *) msg;
2560   put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
2561 #if HAVE_MALICIOUS
2562   if (put_type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE)
2563   {
2564 #if DEBUG_DHT_ROUTING
2565     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2566     {
2567           /** Log routes that die due to high load! */
2568       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2569                                    msg_ctx->hop_count, GNUNET_SYSERR,
2570                                    &my_identity, &msg_ctx->key, &msg_ctx->peer,
2571                                    NULL);
2572     }
2573 #endif
2574     return;
2575   }
2576 #endif
2577   data_size =
2578       ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
2579   ret =
2580       GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size,
2581                             &key);
2582   if (GNUNET_NO == ret)
2583   {
2584 #if DEBUG_DHT_ROUTING
2585     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2586     {
2587       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2588                                    msg_ctx->hop_count, GNUNET_SYSERR,
2589                                    &my_identity, &msg_ctx->key, &msg_ctx->peer,
2590                                    NULL);
2591     }
2592 #endif
2593     /* invalid reply */
2594     GNUNET_break_op (0);
2595     return;
2596   }
2597   if ((GNUNET_YES == ret) &&
2598       (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode))))
2599   {
2600 #if DEBUG_DHT_ROUTING
2601     if ((debug_routes_extended) && (dhtlog_handle != NULL))
2602     {
2603       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2604                                    msg_ctx->hop_count, GNUNET_SYSERR,
2605                                    &my_identity, &msg_ctx->key, &msg_ctx->peer,
2606                                    NULL);
2607     }
2608 #endif
2609     /* invalid wrapper: key mismatch! */
2610     GNUNET_break_op (0);
2611     return;
2612   }
2613   /* ret == GNUNET_SYSERR means that there is no known relationship between
2614    * data and the key, so we cannot check it */
2615 #if DEBUG_DHT
2616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2617               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2618               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
2619               msg_ctx->unique_id);
2620 #endif
2621 #if DEBUG_DHT_ROUTING
2622   if (msg_ctx->hop_count == 0)  /* Locally initiated request */
2623   {
2624     if ((debug_routes) && (dhtlog_handle != NULL))
2625     {
2626       dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_PUT,
2627                                    msg_ctx->hop_count, GNUNET_NO, &my_identity,
2628                                    &msg_ctx->key);
2629     }
2630   }
2631 #endif
2632
2633   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
2634                                              &msg_ctx->key);
2635   if (NULL != record)
2636   {
2637     struct DHTRouteSource *pos;
2638     struct GNUNET_DHT_GetResultMessage *get_result;
2639     struct DHT_MessageContext new_msg_ctx;
2640     size_t get_size;
2641
2642     pos = record->head;
2643     while (pos != NULL)
2644     {
2645       /* TODO: do only for local started requests? or also for remote peers? */
2646       /* TODO: include this in statistics? under what? */
2647       /* TODO: reverse order of path_history? */
2648       if (NULL == pos->client)
2649       {
2650         pos = pos->next;
2651         continue;
2652       }
2653
2654       memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
2655       if (GNUNET_DHT_RO_RECORD_ROUTE ==
2656           (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2657       {
2658         new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
2659 #if DEBUG_PATH
2660         for (i = 0; i < new_msg_ctx.path_history_len; i++)
2661         {
2662           path_offset =
2663               &new_msg_ctx.path_history[i * sizeof (struct GNUNET_PeerIdentity)];
2664           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2665                       "(put for active get) Key %s Found peer %d:%s\n",
2666                       GNUNET_h2s (&msg_ctx->key), i,
2667                       GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
2668         }
2669 #endif
2670       }
2671
2672       get_size =
2673           sizeof (struct GNUNET_DHT_GetResultMessage) + data_size +
2674           (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
2675       get_result = GNUNET_malloc (get_size);
2676       get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
2677       get_result->header.size = htons (get_size);
2678       get_result->expiration = put_msg->expiration;
2679       get_result->type = put_msg->type;
2680       get_result->put_path_length = htons (msg_ctx->path_history_len);
2681
2682       /* Copy the actual data and the path_history to the end of the get result */
2683       memcpy (&get_result[1], &put_msg[1], data_size);
2684       path_offset = (char *) &get_result[1];
2685       path_offset += data_size;
2686       memcpy (path_offset, msg_ctx->path_history,
2687               msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
2688       new_msg_ctx.peer = my_identity;
2689       new_msg_ctx.bloom = NULL;
2690       new_msg_ctx.hop_count = 0;
2691       /* Make result routing a higher priority */
2692       new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;
2693       new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
2694       new_msg_ctx.unique_id = pos->uid;
2695       send_reply_to_client(pos->client, &get_result->header, &new_msg_ctx);
2696       GNUNET_free (get_result);
2697       pos = pos->next;
2698     }
2699   }
2700
2701   if (msg_ctx->closest != GNUNET_YES)
2702   {
2703     route_message (msg, msg_ctx);
2704     return;
2705   }
2706
2707 #if DEBUG_DHT
2708   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2709               "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
2710               my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
2711               msg_ctx->unique_id);
2712 #endif
2713
2714 #if DEBUG_DHT_ROUTING
2715   if ((debug_routes_extended) && (dhtlog_handle != NULL))
2716   {
2717     dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
2718                                  msg_ctx->hop_count, GNUNET_YES, &my_identity,
2719                                  &msg_ctx->key, &msg_ctx->peer, NULL);
2720   }
2721
2722   if ((debug_routes) && (dhtlog_handle != NULL))
2723   {
2724     dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_PUT,
2725                                  msg_ctx->hop_count, GNUNET_YES, &my_identity,
2726                                  &msg_ctx->key);
2727   }
2728 #endif
2729
2730   increment_stats (STAT_PUTS_INSERTED);
2731   if (datacache != NULL)
2732   {
2733     /* Put size is actual data size plus struct overhead plus path length (if any) */
2734     put_size =
2735         data_size + sizeof (struct DHTPutEntry) +
2736         (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
2737     put_entry = GNUNET_malloc (put_size);
2738     put_entry->data_size = data_size;
2739     put_entry->path_length = msg_ctx->path_history_len;
2740     /* Copy data to end of put entry */
2741     memcpy (&put_entry[1], &put_msg[1], data_size);
2742     if (msg_ctx->path_history_len > 0)
2743     {
2744       /* Copy path after data */
2745       path_offset = (char *) &put_entry[1];
2746       path_offset += data_size;
2747       memcpy (path_offset, msg_ctx->path_history,
2748               msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
2749     }
2750
2751     ret =
2752         GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size,
2753                               (const char *) put_entry, put_type,
2754                               GNUNET_TIME_absolute_ntoh (put_msg->expiration));
2755     GNUNET_free (put_entry);
2756   }
2757   else
2758     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2759                 "`%s:%s': %s request received, but have no datacache!\n",
2760                 my_short_id, "DHT", "PUT");
2761
2762   if (stop_on_closest == GNUNET_NO)
2763     route_message (msg, msg_ctx);
2764 }
2765
2766
2767 /**
2768  * To how many peers should we (on average)
2769  * forward the request to obtain the desired
2770  * target_replication count (on average).
2771  *
2772  * returns: target_replication / (est. hops) + (target_replication * hop_count)
2773  * where est. hops is typically 2 * the routing table depth
2774  *
2775  * @param hop_count number of hops the message has traversed
2776  * @param target_replication the number of total paths desired
2777  *
2778  * @return Some number of peers to forward the message to
2779  */
2780 static unsigned int
2781 get_forward_count (unsigned int hop_count, size_t target_replication)
2782 {
2783   uint32_t random_value;
2784   unsigned int forward_count;
2785   float target_value;
2786
2787   /**
2788    * If we are behaving in strict kademlia mode, send multiple initial requests,
2789    * but then only send to 1 or 0 peers based strictly on the number of hops.
2790    */
2791   if (strict_kademlia == GNUNET_YES)
2792   {
2793     if (hop_count == 0)
2794       return kademlia_replication;
2795     if (hop_count < log_of_network_size_estimate * 2.0)
2796       return 1;
2797     return 0;
2798   }
2799
2800   if (hop_count > log_of_network_size_estimate * 2.0)
2801   {
2802     if (GNUNET_YES == paper_forwarding)
2803     {
2804       /* Once we have reached our ideal number of hops, don't stop forwarding! */
2805       return 1;
2806     }
2807 #if DEBUG_DHT
2808     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2809                 "Hop count too high (est %f, lowest %d), NOT Forwarding request\n",
2810                 log_of_network_size_estimate * 2.0, lowest_bucket);
2811 #endif
2812     return 0;
2813   }
2814
2815   if (GNUNET_YES == paper_forwarding)
2816   {
2817     /* FIXME: re-run replication trials with this formula */
2818     target_value =
2819         1 + (target_replication - 1.0) / (log_of_network_size_estimate +
2820                                           ((float) (target_replication - 1.0) *
2821                                            hop_count));
2822     /* Set forward count to floor of target_value */
2823     forward_count = (unsigned int) target_value;
2824     /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
2825     target_value = target_value - forward_count;
2826     random_value =
2827         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
2828
2829     if (random_value < (target_value * UINT32_MAX))
2830       forward_count += 1;
2831   }
2832   else
2833   {
2834     random_value = 0;
2835     forward_count = 1;
2836     target_value =
2837         target_replication / (log_of_network_size_estimate +
2838                               ((float) target_replication * hop_count));
2839     if (target_value > 1)
2840     {
2841       /* Set forward count to floor of target_value */
2842       forward_count = (unsigned int) target_value;
2843       /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
2844       target_value = target_value - forward_count;
2845     }
2846     else
2847       random_value =
2848           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
2849
2850     if (random_value < (target_value * UINT32_MAX))
2851       forward_count += 1;
2852   }
2853
2854   return forward_count;
2855 }
2856
2857
2858 /**
2859  * Check whether my identity is closer than any known peers.
2860  * If a non-null bloomfilter is given, check if this is the closest
2861  * peer that hasn't already been routed to.
2862  *
2863  * @param target hash code to check closeness to
2864  * @param bloom bloomfilter, exclude these entries from the decision
2865  * @return GNUNET_YES if node location is closest,
2866  *         GNUNET_NO otherwise.
2867  */
2868 static int
2869 am_closest_peer (const GNUNET_HashCode * target,
2870                  struct GNUNET_CONTAINER_BloomFilter *bloom)
2871 {
2872   int bits;
2873   int other_bits;
2874   int bucket_num;
2875   int count;
2876   struct PeerInfo *pos;
2877   unsigned int my_distance;
2878
2879   if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode)))
2880     return GNUNET_YES;
2881
2882   bucket_num = find_current_bucket (target);
2883
2884   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target);
2885   my_distance = distance (&my_identity.hashPubKey, target);
2886   pos = k_buckets[bucket_num].head;
2887   count = 0;
2888   while ((pos != NULL) && (count < bucket_size))
2889   {
2890     if ((bloom != NULL) &&
2891         (GNUNET_YES ==
2892          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
2893     {
2894       pos = pos->next;
2895       continue;                 /* Skip already checked entries */
2896     }
2897
2898     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target);
2899     if (other_bits > bits)
2900       return GNUNET_NO;
2901     else if (other_bits == bits)        /* We match the same number of bits */
2902     {
2903       if (strict_kademlia != GNUNET_YES)        /* Return that we at as close as any other peer */
2904         return GNUNET_YES;
2905       if (distance (&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
2906         return GNUNET_NO;
2907     }
2908     pos = pos->next;
2909   }
2910
2911   /* No peers closer, we are the closest! */
2912   return GNUNET_YES;
2913 }
2914
2915
2916 /**
2917  * Select a peer from the routing table that would be a good routing
2918  * destination for sending a message for "target".  The resulting peer
2919  * must not be in the set of blocked peers.<p>
2920  *
2921  * Note that we should not ALWAYS select the closest peer to the
2922  * target, peers further away from the target should be chosen with
2923  * exponentially declining probability.
2924  *
2925  * @param target the key we are selecting a peer to route to
2926  * @param bloom a bloomfilter containing entries this request has seen already
2927  * @param hops how many hops has this message traversed thus far
2928  *
2929  * @return Peer to route to, or NULL on error
2930  */
2931 static struct PeerInfo *
2932 select_peer (const GNUNET_HashCode * target,
2933              struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
2934 {
2935   unsigned int bc;
2936   unsigned int count;
2937   unsigned int selected;
2938   struct PeerInfo *pos;
2939   unsigned int distance;
2940   unsigned int largest_distance;
2941   struct PeerInfo *chosen;
2942
2943   /** If we are doing kademlia routing (saves some cycles) */
2944   if ((strict_kademlia == GNUNET_YES) || (hops >= log_of_network_size_estimate))
2945   {
2946     /* greedy selection (closest peer that is not in bloomfilter) */
2947     largest_distance = 0;
2948     chosen = NULL;
2949     for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2950     {
2951       pos = k_buckets[bc].head;
2952       count = 0;
2953       while ((pos != NULL) && (count < bucket_size))
2954       {
2955         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
2956         if (GNUNET_NO ==
2957             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2958         {
2959           distance = inverse_distance (target, &pos->id.hashPubKey);
2960           if (distance > largest_distance)
2961           {
2962             chosen = pos;
2963             largest_distance = distance;
2964           }
2965         }
2966         count++;
2967         pos = pos->next;
2968       }
2969     }
2970     if ((largest_distance > 0) && (chosen != NULL))
2971     {
2972       GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey);
2973       return chosen;
2974     }
2975     return NULL;                /* no peer available or we are the closest */
2976   }
2977
2978
2979   /* select "random" peer */
2980   /* count number of peers that are available and not filtered */
2981   count = 0;
2982   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2983   {
2984     pos = k_buckets[bc].head;
2985     while ((pos != NULL) && (count < bucket_size))
2986     {
2987       if (GNUNET_YES ==
2988           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2989       {
2990         pos = pos->next;
2991         continue;               /* Ignore bloomfiltered peers */
2992       }
2993       count++;
2994       pos = pos->next;
2995     }
2996   }
2997   if (count == 0)               /* No peers to select from! */
2998   {
2999     increment_stats ("# failed to select peer");
3000     return NULL;
3001   }
3002   /* Now actually choose a peer */
3003   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
3004   count = 0;
3005   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
3006   {
3007     pos = k_buckets[bc].head;
3008     while ((pos != NULL) && (count < bucket_size))
3009     {
3010       if (GNUNET_YES ==
3011           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
3012       {
3013         pos = pos->next;
3014         continue;               /* Ignore bloomfiltered peers */
3015       }
3016       if (0 == selected--)
3017         return pos;
3018       pos = pos->next;
3019     }
3020   }
3021   GNUNET_break (0);
3022   return NULL;
3023 }
3024
3025
3026 /**
3027  * Task used to remove recent entries, either
3028  * after timeout, when full, or on shutdown.
3029  *
3030  * @param cls the entry to remove
3031  * @param tc context, reason, etc.
3032  */
3033 static void
3034 remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3035 {
3036   struct RecentRequest *req = cls;
3037   static GNUNET_HashCode hash;
3038
3039   GNUNET_assert (req != NULL);
3040   hash_from_uid (req->uid, &hash);
3041 #if HAVE_UID_FOR_TESTING > 1
3042   GNUNET_assert (GNUNET_YES ==
3043                  GNUNET_CONTAINER_multihashmap_remove (recent.hashmap, &hash,
3044                                                        req));
3045 #endif
3046   GNUNET_CONTAINER_heap_remove_node (req->heap_node);
3047   GNUNET_CONTAINER_bloomfilter_free (req->bloom);
3048   GNUNET_free (req);
3049
3050   /*
3051    * if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
3052    * {
3053    * GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
3054    * GNUNET_CONTAINER_heap_destroy(recent.minHeap);
3055    * }
3056    */
3057 }
3058
3059 /**
3060  * Remember this routing request so that if a reply is
3061  * received we can either forward it to the correct peer
3062  * or return the result locally.
3063  *
3064  * @param msg_ctx Context of the route request
3065  *
3066  * @return GNUNET_YES if this response was cached, GNUNET_NO if not
3067  */
3068 static int
3069 cache_response (struct DHT_MessageContext *msg_ctx)
3070 {
3071   struct DHTQueryRecord *record;
3072   struct DHTRouteSource *source_info;
3073   struct DHTRouteSource *pos;
3074   struct GNUNET_TIME_Absolute now;
3075   unsigned int current_size;
3076
3077   current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
3078
3079 #if DELETE_WHEN_FULL
3080   while (current_size >= MAX_OUTSTANDING_FORWARDS)
3081   {
3082     source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
3083     GNUNET_assert (source_info != NULL);
3084     record = source_info->record;
3085     GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
3086     if (record->head == NULL)   /* No more entries in DLL */
3087     {
3088       GNUNET_assert (GNUNET_YES ==
3089                      GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
3090                                                            &record->key,
3091                                                            record));
3092       GNUNET_free (record);
3093     }
3094     if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
3095     {
3096       GNUNET_SCHEDULER_cancel (source_info->delete_task);
3097       source_info->delete_task = GNUNET_SCHEDULER_NO_TASK;
3098     }
3099     if (source_info->find_peers_responded != NULL)
3100       GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
3101     GNUNET_free (source_info);
3102     current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
3103   }
3104 #endif
3105   /** Non-local request and have too many outstanding forwards, discard! */
3106   if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
3107     return GNUNET_NO;
3108
3109   now = GNUNET_TIME_absolute_get ();
3110   record =
3111       GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
3112   if (record != NULL)           /* Already know this request! */
3113   {
3114     pos = record->head;
3115     while (pos != NULL)
3116     {
3117       if (0 ==
3118           memcmp (&msg_ctx->peer, &pos->source,
3119                   sizeof (struct GNUNET_PeerIdentity)))
3120         break;                  /* Already have this peer in reply list! */
3121       pos = pos->next;
3122     }
3123     if ((pos != NULL) && (pos->client == msg_ctx->client))      /* Seen this already */
3124     {
3125       GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode,
3126                                          now.abs_value);
3127       return GNUNET_NO;
3128     }
3129   }
3130   else
3131   {
3132     record = GNUNET_malloc (sizeof (struct DHTQueryRecord));
3133     GNUNET_assert (GNUNET_OK ==
3134                    GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap,
3135                                                       &msg_ctx->key, record,
3136                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3137     memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
3138   }
3139
3140   source_info = GNUNET_malloc (sizeof (struct DHTRouteSource));
3141   source_info->record = record;
3142   source_info->delete_task =
3143       GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry,
3144                                     source_info);
3145   source_info->find_peers_responded =
3146       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3147   source_info->source = msg_ctx->peer;
3148   GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail,
3149                                      source_info);
3150   if (msg_ctx->client != NULL)  /* For local request, set timeout so high it effectively never gets pushed out */
3151   {
3152     source_info->client = msg_ctx->client;
3153     now = GNUNET_TIME_absolute_get_forever ();
3154   }
3155   source_info->hnode =
3156       GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info,
3157                                     now.abs_value);
3158   source_info->uid = msg_ctx->unique_id;
3159 #if DEBUG_DHT > 1
3160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3161               "`%s:%s': Created new forward source info for %s uid %llu\n",
3162               my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
3163               msg_ctx->unique_id);
3164 #endif
3165   return GNUNET_YES;
3166 }
3167
3168
3169 /**
3170  * Main function that handles whether or not to route a message to other
3171  * peers.
3172  *
3173  * @param msg the message to be routed
3174  * @param msg_ctx the context containing all pertinent information about the message
3175  */
3176 static void
3177 route_message (const struct GNUNET_MessageHeader *msg,
3178                struct DHT_MessageContext *msg_ctx)
3179 {
3180   int i;
3181   struct PeerInfo *selected;
3182
3183 #if DEBUG_DHT_ROUTING > 1
3184   struct PeerInfo *nearest;
3185 #endif
3186   unsigned int target_forward_count;
3187   unsigned int forward_count;
3188   struct RecentRequest *recent_req;
3189 #if HAVE_UID_FOR_TESTING > 1
3190   GNUNET_HashCode unique_hash;
3191 #endif
3192   char *stat_forward_count;
3193   char *temp_stat_str;
3194
3195 #if DEBUG_DHT_ROUTING
3196   int ret;
3197 #endif
3198
3199   if (malicious_dropper == GNUNET_YES)
3200   {
3201 #if DEBUG_DHT_ROUTING
3202     if ((debug_routes_extended) && (dhtlog_handle != NULL))
3203     {
3204       dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3205                                    msg_ctx->hop_count, GNUNET_SYSERR,
3206                                    &my_identity, &msg_ctx->key, &msg_ctx->peer,
3207                                    NULL);
3208     }
3209 #endif
3210     if (msg_ctx->bloom != NULL)
3211     {
3212       GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
3213       msg_ctx->bloom = NULL;
3214     }
3215     return;
3216   }
3217
3218   increment_stats (STAT_ROUTES);
3219   target_forward_count =
3220       get_forward_count (msg_ctx->hop_count, msg_ctx->replication);
3221   GNUNET_asprintf (&stat_forward_count, "# forward counts of %d",
3222                    target_forward_count);
3223   increment_stats (stat_forward_count);
3224   GNUNET_free (stat_forward_count);
3225   if (msg_ctx->bloom == NULL)
3226     msg_ctx->bloom =
3227         GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3228
3229   if ((stop_on_closest == GNUNET_YES) && (msg_ctx->closest == GNUNET_YES) &&
3230       (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
3231     target_forward_count = 0;
3232
3233   /**
3234    * NOTICE:  In Kademlia, a find peer request goes no further if the peer doesn't return
3235    * any closer peers (which is being checked for below).  Since we are doing recursive
3236    * routing we have no choice but to stop forwarding in this case.  This means that at
3237    * any given step the request may NOT be forwarded to alpha peers (because routes will
3238    * stop and the parallel route will not be aware of it).  Of course, assuming that we
3239    * have fulfilled the Kademlia requirements for routing table fullness this will never
3240    * ever ever be a problem.
3241    *
3242    * However, is this fair?
3243    *
3244    * Since we use these requests to build our routing tables (and we build them in the
3245    * testing driver) we will ignore this restriction for FIND_PEER messages so that
3246    * routing tables still get constructed.
3247    */
3248   if ((GNUNET_YES == strict_kademlia) && (msg_ctx->closest == GNUNET_YES) &&
3249       (msg_ctx->hop_count > 0) &&
3250       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER))
3251     target_forward_count = 0;
3252
3253
3254   GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom, &my_identity.hashPubKey);
3255 #if HAVE_UID_FOR_TESTING > 1
3256   /* BUG HERE: recent uses unique_id! So if all unique-IDs are 0, we get
3257      easily into trouble!!! Also, this should not even be necessary... */
3258   hash_from_uid (msg_ctx->unique_id, &unique_hash);
3259   if (GNUNET_YES ==
3260       GNUNET_CONTAINER_multihashmap_contains (recent.hashmap, &unique_hash))
3261   {
3262     recent_req =
3263         GNUNET_CONTAINER_multihashmap_get (recent.hashmap, &unique_hash);
3264     GNUNET_assert (recent_req != NULL);
3265     if (0 != memcmp (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode)))
3266       increment_stats (STAT_DUPLICATE_UID);
3267     else
3268     {
3269       increment_stats (STAT_RECENT_SEEN);
3270       GNUNET_CONTAINER_bloomfilter_or2 (msg_ctx->bloom, recent_req->bloom,
3271                                         DHT_BLOOM_SIZE);
3272     }
3273   }
3274   else
3275 #endif
3276   {
3277     recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
3278     recent_req->uid = msg_ctx->unique_id;
3279     memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
3280     recent_req->remove_task =
3281         GNUNET_SCHEDULER_add_delayed (DEFAULT_RECENT_REMOVAL, &remove_recent,
3282                                       recent_req);
3283     recent_req->heap_node =
3284         GNUNET_CONTAINER_heap_insert (recent.minHeap, recent_req,
3285                                       GNUNET_TIME_absolute_get ().abs_value);
3286     recent_req->bloom =
3287         GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3288 #if HAVE_UID_FOR_TESTING > 1
3289     GNUNET_CONTAINER_multihashmap_put (recent.hashmap, &unique_hash, recent_req,
3290                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3291 #endif
3292   }
3293
3294   if (GNUNET_CONTAINER_heap_get_size (recent.minHeap) > DHT_MAX_RECENT)
3295   {
3296     recent_req = GNUNET_CONTAINER_heap_peek (recent.minHeap);
3297     GNUNET_assert (recent_req != NULL);
3298     GNUNET_SCHEDULER_cancel (recent_req->remove_task);
3299     recent_req->remove_task =
3300         GNUNET_SCHEDULER_add_now (&remove_recent, recent_req);
3301   }
3302
3303   forward_count = 0;
3304   for (i = 0; i < target_forward_count; i++)
3305   {
3306     selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
3307
3308     if (selected != NULL)
3309     {
3310       forward_count++;
3311       if (GNUNET_CRYPTO_hash_matching_bits
3312           (&selected->id.hashPubKey,
3313            &msg_ctx->key) >=
3314           GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
3315                                             &msg_ctx->key))
3316         GNUNET_asprintf (&temp_stat_str,
3317                          "# requests routed to close(r) peer hop %u",
3318                          msg_ctx->hop_count);
3319       else
3320         GNUNET_asprintf (&temp_stat_str,
3321                          "# requests routed to less close peer hop %u",
3322                          msg_ctx->hop_count);
3323       if (temp_stat_str != NULL)
3324       {
3325         increment_stats (temp_stat_str);
3326         GNUNET_free (temp_stat_str);
3327       }
3328       GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
3329                                         &selected->id.hashPubKey);
3330 #if DEBUG_DHT_ROUTING > 1
3331       nearest = find_closest_peer (&msg_ctx->key);
3332       nearest_buf = GNUNET_strdup (GNUNET_i2s (&nearest->id));
3333       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3334                   "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n",
3335                   my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
3336                   msg_ctx->unique_id, GNUNET_i2s (&selected->id), nearest_buf,
3337                   GNUNET_CRYPTO_hash_matching_bits (&nearest->id.hashPubKey,
3338                                                     msg_ctx->key),
3339                   distance (&nearest->id.hashPubKey, msg_ctx->key));
3340       GNUNET_free (nearest_buf);
3341 #endif
3342 #if DEBUG_DHT_ROUTING
3343       if ((debug_routes_extended) && (dhtlog_handle != NULL))
3344       {
3345         dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3346                                      msg_ctx->hop_count, GNUNET_NO,
3347                                      &my_identity, &msg_ctx->key, &msg_ctx->peer,
3348                                      &selected->id);
3349       }
3350 #endif
3351       forward_message (msg, selected, msg_ctx);
3352     }
3353   }
3354
3355   if (msg_ctx->bloom != NULL)
3356   {
3357     GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom,
3358                                       DHT_BLOOM_SIZE);
3359     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
3360     msg_ctx->bloom = NULL;
3361   }
3362
3363 #if DEBUG_DHT_ROUTING
3364   if (forward_count == 0)
3365     ret = GNUNET_SYSERR;
3366   else
3367     ret = GNUNET_NO;
3368
3369   if ((debug_routes_extended) && (dhtlog_handle != NULL))
3370   {
3371     dhtlog_handle->insert_route (NULL, msg_ctx->unique_id, DHTLOG_ROUTE,
3372                                  msg_ctx->hop_count, ret, &my_identity,
3373                                  &msg_ctx->key, &msg_ctx->peer, NULL);
3374   }
3375 #endif
3376 }
3377
3378
3379 /**
3380  * Main function that handles whether or not to route a message to other
3381  * peers.
3382  *
3383  * @param msg the message to be routed
3384  * @param msg_ctx the context containing all pertinent information about the message
3385  */
3386 static void
3387 demultiplex_message (const struct GNUNET_MessageHeader *msg,
3388                      struct DHT_MessageContext *msg_ctx)
3389 {
3390   /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */
3391   msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom);
3392
3393   switch (ntohs (msg->type))
3394   {
3395   case GNUNET_MESSAGE_TYPE_DHT_GET:    /* Add to hashmap of requests seen, search for data (always) */
3396     cache_response (msg_ctx);
3397     handle_dht_get (msg, msg_ctx);
3398     break;
3399   case GNUNET_MESSAGE_TYPE_DHT_PUT:    /* Check if closest, if so insert data. */
3400     increment_stats (STAT_PUTS);
3401     handle_dht_put (msg, msg_ctx);
3402     break;
3403   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:      /* Check if closest and not started by us, check options, add to requests seen */
3404     increment_stats (STAT_FIND_PEER);
3405     if (((msg_ctx->hop_count > 0) &&
3406          (0 !=
3407           memcmp (&msg_ctx->peer, &my_identity,
3408                   sizeof (struct GNUNET_PeerIdentity)))) ||
3409         (msg_ctx->client != NULL))
3410     {
3411       cache_response (msg_ctx);
3412       if ((msg_ctx->closest == GNUNET_YES) ||
3413           (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
3414         handle_dht_find_peer (msg, msg_ctx);
3415     }
3416     else
3417       route_message (msg, msg_ctx);
3418 #if DEBUG_DHT_ROUTING
3419     if (msg_ctx->hop_count == 0)        /* Locally initiated request */
3420     {
3421       if ((debug_routes) && (dhtlog_handle != NULL))
3422       {
3423         dhtlog_handle->insert_dhtkey (NULL, &msg_ctx->key);
3424         dhtlog_handle->insert_query (NULL, msg_ctx->unique_id, DHTLOG_FIND_PEER,
3425                                      msg_ctx->hop_count, GNUNET_NO,
3426                                      &my_identity, &msg_ctx->key);
3427       }
3428     }
3429 #endif
3430     break;
3431   default:
3432     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3433                 "`%s': Message type (%d) not handled, forwarding anyway!\n",
3434                 "DHT", ntohs (msg->type));
3435     route_message (msg, msg_ctx);
3436   }
3437 }
3438
3439
3440 /**
3441  * Iterator over hash map entries.
3442  *
3443  * @param cls client to search for in source routes
3444  * @param key current key code (ignored)
3445  * @param value value in the hash map, a DHTQueryRecord
3446  * @return GNUNET_YES if we should continue to
3447  *         iterate,
3448  *         GNUNET_NO if not.
3449  */
3450 static int
3451 find_client_records (void *cls, const GNUNET_HashCode * key, void *value)
3452 {
3453   struct ClientList *client = cls;
3454   struct DHTQueryRecord *record = value;
3455   struct DHTRouteSource *pos;
3456
3457   pos = record->head;
3458   while (pos != NULL)
3459   {
3460     if (pos->client == client)
3461       break;
3462     pos = pos->next;
3463   }
3464   if (pos != NULL)
3465   {
3466     GNUNET_CONTAINER_DLL_remove (record->head, record->tail, pos);
3467     GNUNET_CONTAINER_heap_remove_node (pos->hnode);
3468     if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
3469     {
3470       GNUNET_SCHEDULER_cancel (pos->delete_task);
3471       pos->delete_task = GNUNET_SCHEDULER_NO_TASK;
3472     }
3473     if (pos->find_peers_responded != NULL)
3474       GNUNET_CONTAINER_bloomfilter_free (pos->find_peers_responded);
3475     GNUNET_free (pos);
3476   }
3477   if (record->head == NULL)     /* No more entries in DLL */
3478   {
3479     GNUNET_assert (GNUNET_YES ==
3480                    GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
3481                                                          &record->key, record));
3482     GNUNET_free (record);
3483   }
3484   return GNUNET_YES;
3485 }
3486
3487 /**
3488  * Functions with this signature are called whenever a client
3489  * is disconnected on the network level.
3490  *
3491  * @param cls closure (NULL for dht)
3492  * @param client identification of the client; NULL
3493  *        for the last call when the server is destroyed
3494  */
3495 static void
3496 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3497 {
3498   struct ClientList *pos = client_list;
3499   struct ClientList *prev;
3500   struct ClientList *found;
3501   struct PendingMessage *reply;
3502
3503   prev = NULL;
3504   found = NULL;
3505   while (pos != NULL)
3506   {
3507     if (pos->client_handle == client)
3508     {
3509       if (prev != NULL)
3510         prev->next = pos->next;
3511       else
3512         client_list = pos->next;
3513       found = pos;
3514       break;
3515     }
3516     prev = pos;
3517     pos = pos->next;
3518   }
3519
3520   if (found != NULL)
3521   {
3522     if (found->transmit_handle != NULL)
3523       GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
3524
3525     while (NULL != (reply = found->pending_head))
3526     {
3527       GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
3528                                    reply);
3529       GNUNET_free (reply);
3530     }
3531     GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
3532                                            &find_client_records, found);
3533     GNUNET_free (found);
3534   }
3535 }
3536
3537 /**
3538  * Find a client if it exists, add it otherwise.
3539  *
3540  * @param client the server handle to the client
3541  *
3542  * @return the client if found, a new client otherwise
3543  */
3544 static struct ClientList *
3545 find_active_client (struct GNUNET_SERVER_Client *client)
3546 {
3547   struct ClientList *pos = client_list;
3548   struct ClientList *ret;
3549
3550   while (pos != NULL)
3551   {
3552     if (pos->client_handle == client)
3553       return pos;
3554     pos = pos->next;
3555   }
3556
3557   ret = GNUNET_malloc (sizeof (struct ClientList));
3558   ret->client_handle = client;
3559   ret->next = client_list;
3560   client_list = ret;
3561
3562   return ret;
3563 }
3564
3565 #if HAVE_MALICIOUS
3566 /**
3567  * Task to send a malicious put message across the network.
3568  *
3569  * @param cls closure for this task
3570  * @param tc the context under which the task is running
3571  */
3572 static void
3573 malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3574 {
3575   static struct GNUNET_DHT_PutMessage put_message;
3576   static struct DHT_MessageContext msg_ctx;
3577   static GNUNET_HashCode key;
3578   uint32_t random_key;
3579
3580   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
3581     return;
3582   put_message.header.size = htons (sizeof (struct GNUNET_DHT_PutMessage));
3583   put_message.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
3584   put_message.type = htonl (GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE);
3585   put_message.expiration =
3586       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_forever ());
3587   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
3588   random_key =
3589       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
3590   GNUNET_CRYPTO_hash (&random_key, sizeof (uint32_t), &key);
3591   memcpy (&msg_ctx.key, &key, sizeof (GNUNET_HashCode));
3592   msg_ctx.unique_id =
3593       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
3594                      (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX));
3595   msg_ctx.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3596   msg_ctx.msg_options = ntohl (0);
3597   msg_ctx.network_size = log_of_network_size_estimate;
3598   msg_ctx.peer = my_identity;
3599   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE;
3600   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3601 #if DEBUG_DHT_ROUTING
3602   if (dhtlog_handle != NULL)
3603     dhtlog_handle->insert_dhtkey (NULL, &key);
3604 #endif
3605   increment_stats (STAT_PUT_START);
3606   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3607               "%s:%s Sending malicious PUT message with hash %s\n", my_short_id,
3608               "DHT", GNUNET_h2s (&key));
3609   demultiplex_message (&put_message.header, &msg_ctx);
3610   GNUNET_SCHEDULER_add_delayed (malicious_put_frequency, &malicious_put_task,
3611                                 NULL);
3612 }
3613
3614
3615 /**
3616  * Task to send a malicious put message across the network.
3617  *
3618  * @param cls closure for this task
3619  * @param tc the context under which the task is running
3620  */
3621 static void
3622 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3623 {
3624   static struct GNUNET_DHT_GetMessage get_message;
3625   struct DHT_MessageContext msg_ctx;
3626   static GNUNET_HashCode key;
3627   uint32_t random_key;
3628
3629   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
3630     return;
3631
3632   get_message.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
3633   get_message.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
3634   get_message.type = htonl (GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE);
3635   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
3636   random_key =
3637       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
3638   GNUNET_CRYPTO_hash (&random_key, sizeof (uint32_t), &key);
3639   memcpy (&msg_ctx.key, &key, sizeof (GNUNET_HashCode));
3640   msg_ctx.unique_id =
3641       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
3642                      (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX));
3643   msg_ctx.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
3644   msg_ctx.msg_options = ntohl (0);
3645   msg_ctx.network_size = log_of_network_size_estimate;
3646   msg_ctx.peer = my_identity;
3647   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE;
3648   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3649 #if DEBUG_DHT_ROUTING
3650   if (dhtlog_handle != NULL)
3651     dhtlog_handle->insert_dhtkey (NULL, &key);
3652 #endif
3653   increment_stats (STAT_GET_START);
3654   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3655               "%s:%s Sending malicious GET message with hash %s\n", my_short_id,
3656               "DHT", GNUNET_h2s (&key));
3657   demultiplex_message (&get_message.header, &msg_ctx);
3658   GNUNET_SCHEDULER_add_delayed (malicious_get_frequency, &malicious_get_task,
3659                                 NULL);
3660 }
3661 #endif
3662
3663
3664 /**
3665  * Iterator over hash map entries.
3666  *
3667  * @param cls closure
3668  * @param key current key code
3669  * @param value value in the hash map
3670  * @return GNUNET_YES if we should continue to
3671  *         iterate,
3672  *         GNUNET_NO if not.
3673  */
3674 static int
3675 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
3676 {
3677   struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
3678
3679   GNUNET_CONTAINER_bloomfilter_add (bloom, key);
3680   return GNUNET_YES;
3681 }
3682
3683 /**
3684  * Task to send a find peer message for our own peer identifier
3685  * so that we can find the closest peers in the network to ourselves
3686  * and attempt to connect to them.
3687  *
3688  * @param cls closure for this task
3689  * @param tc the context under which the task is running
3690  */
3691 static void
3692 send_find_peer_message (void *cls,
3693                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3694 {
3695   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
3696   struct DHT_MessageContext msg_ctx;
3697   struct GNUNET_TIME_Relative next_send_time;
3698   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
3699
3700   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
3701     return;
3702
3703   if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer))        /* If we are finding peers already, no need to send out our request right now! */
3704   {
3705     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3706                 "Have %d newly found peers since last find peer message sent!\n",
3707                 newly_found_peers);
3708     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
3709                                   &send_find_peer_message, NULL);
3710     newly_found_peers = 0;
3711     return;
3712   }
3713
3714   increment_stats (STAT_FIND_PEER_START);
3715 #if FIND_PEER_WITH_HELLO
3716   find_peer_msg =
3717       GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) +
3718                      GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *)
3719                                         my_hello));
3720   find_peer_msg->header.size =
3721       htons (sizeof (struct GNUNET_DHT_FindPeerMessage) +
3722              GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
3723   memcpy (&find_peer_msg[1], my_hello,
3724           GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
3725 #else
3726   find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
3727   find_peer_msg->header.size =
3728       htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
3729 #endif
3730   find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
3731   temp_bloom =
3732       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
3733   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
3734                                          temp_bloom);
3735   GNUNET_assert (GNUNET_OK ==
3736                  GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
3737                                                             find_peer_msg->
3738                                                             bloomfilter,
3739                                                             DHT_BLOOM_SIZE));
3740   GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
3741   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
3742   memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
3743   msg_ctx.unique_id =
3744       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
3745                      (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
3746   msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
3747   msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
3748   msg_ctx.network_size = log_of_network_size_estimate;
3749   msg_ctx.peer = my_identity;
3750   msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
3751   msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
3752
3753   demultiplex_message (&find_peer_msg->header, &msg_ctx);
3754   GNUNET_free (find_peer_msg);
3755   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3756               "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id,
3757               "DHT", "FIND PEER");
3758   if (newly_found_peers < bucket_size)
3759   {
3760     next_send_time.rel_value =
3761         (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
3762         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
3763                                   DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
3764   }
3765   else
3766   {
3767     next_send_time.rel_value =
3768         DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
3769         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
3770                                   DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value -
3771                                   DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
3772   }
3773
3774   GNUNET_assert (next_send_time.rel_value != 0);
3775   find_peer_context.count = 0;
3776   newly_found_peers = 0;
3777   find_peer_context.start = GNUNET_TIME_absolute_get ();
3778   if (GNUNET_YES == do_find_peer)
3779   {
3780     GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
3781                                   NULL);
3782   }
3783 }
3784
3785 /**
3786  * Handler for any generic DHT messages, calls the appropriate handler
3787  * depending on message type, sends confirmation if responses aren't otherwise
3788  * expected.
3789  *
3790  * @param cls closure for the service
3791  * @param client the client we received this message from
3792  * @param message the actual message received
3793  */
3794 static void
3795 handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
3796                                 const struct GNUNET_MessageHeader *message)
3797 {
3798   const struct GNUNET_DHT_RouteMessage *dht_msg =
3799       (const struct GNUNET_DHT_RouteMessage *) message;
3800   const struct GNUNET_MessageHeader *enc_msg;
3801   struct DHT_MessageContext msg_ctx;
3802
3803   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
3804 #if DEBUG_DHT
3805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3806               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
3807               my_short_id, "DHT", "GENERIC", ntohs (message->type),
3808               GNUNET_h2s (&dht_msg->key), GNUNET_ntohll (dht_msg->unique_id));
3809 #endif
3810 #if DEBUG_DHT_ROUTING
3811   if (dhtlog_handle != NULL)
3812     dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
3813 #endif
3814
3815   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
3816   msg_ctx.client = find_active_client (client);
3817   memcpy (&msg_ctx.key, &dht_msg->key, sizeof (GNUNET_HashCode));
3818   msg_ctx.unique_id = GNUNET_ntohll (dht_msg->unique_id);
3819   msg_ctx.replication = ntohl (dht_msg->desired_replication_level);
3820   msg_ctx.msg_options = ntohl (dht_msg->options);
3821   if (GNUNET_DHT_RO_RECORD_ROUTE ==
3822       (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
3823   {
3824     msg_ctx.path_history = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
3825     memcpy (msg_ctx.path_history, &my_identity,
3826             sizeof (struct GNUNET_PeerIdentity));
3827     msg_ctx.path_history_len = 1;
3828   }
3829   msg_ctx.network_size = log_of_network_size_estimate;
3830   msg_ctx.peer = my_identity;
3831   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 4;  /* Make local routing a higher priority */
3832   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
3833
3834   if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
3835     increment_stats (STAT_GET_START);
3836   else if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
3837     increment_stats (STAT_PUT_START);
3838   else if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER)
3839     increment_stats (STAT_FIND_PEER_START);
3840
3841   if (GNUNET_YES == malicious_dropper)
3842   {
3843     if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET)
3844     {
3845 #if DEBUG_DHT_ROUTING
3846       if ((debug_routes) && (dhtlog_handle != NULL))
3847       {
3848         dhtlog_handle->insert_query (NULL, msg_ctx.unique_id, DHTLOG_GET,
3849                                      msg_ctx.hop_count, GNUNET_NO, &my_identity,
3850                                      &msg_ctx.key);
3851       }
3852 #endif
3853     }
3854     else if (ntohs (enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)
3855     {
3856 #if DEBUG_DHT_ROUTING
3857       if ((debug_routes) && (dhtlog_handle != NULL))
3858       {
3859         dhtlog_handle->insert_query (NULL, msg_ctx.unique_id, DHTLOG_PUT,
3860                                      msg_ctx.hop_count, GNUNET_NO, &my_identity,
3861                                      &msg_ctx.key);
3862       }
3863 #endif
3864     }
3865     GNUNET_SERVER_receive_done (client, GNUNET_OK);
3866     GNUNET_free_non_null (msg_ctx.path_history);
3867     return;
3868   }
3869
3870   demultiplex_message (enc_msg, &msg_ctx);
3871   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3872
3873 }
3874
3875 /**
3876  * Handler for any locally received DHT control messages,
3877  * sets malicious flags mostly for now.
3878  *
3879  * @param cls closure for the service
3880  * @param client the client we received this message from
3881  * @param message the actual message received
3882  *
3883  */
3884 static void
3885 handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client,
3886                             const struct GNUNET_MessageHeader *message)
3887 {
3888   const struct GNUNET_DHT_ControlMessage *dht_control_msg =
3889       (const struct GNUNET_DHT_ControlMessage *) message;
3890
3891 #if DEBUG_DHT
3892   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3893               "`%s:%s': Received `%s' request from client, command %d\n",
3894               my_short_id, "DHT", "CONTROL", ntohs (dht_control_msg->command));
3895 #endif
3896
3897   switch (ntohs (dht_control_msg->command))
3898   {
3899   case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
3900     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3901                 "Sending self seeking find peer request!\n");
3902     GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL);
3903     break;
3904 #if HAVE_MALICIOUS
3905   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
3906     if (ntohs (dht_control_msg->variable) > 0)
3907       malicious_get_frequency.rel_value = ntohs (dht_control_msg->variable);
3908     if (malicious_get_frequency.rel_value == 0)
3909       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
3910     if (malicious_getter != GNUNET_YES)
3911       GNUNET_SCHEDULER_add_now (&malicious_get_task, NULL);
3912     malicious_getter = GNUNET_YES;
3913     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3914                 "%s:%s Initiating malicious GET behavior, frequency %llu\n",
3915                 my_short_id, "DHT", malicious_get_frequency.rel_value);
3916     break;
3917   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT:
3918     if (ntohs (dht_control_msg->variable) > 0)
3919       malicious_put_frequency.rel_value = ntohs (dht_control_msg->variable);
3920     if (malicious_put_frequency.rel_value == 0)
3921       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
3922     if (malicious_putter != GNUNET_YES)
3923       GNUNET_SCHEDULER_add_now (&malicious_put_task, NULL);
3924     malicious_putter = GNUNET_YES;
3925     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3926                 "%s:%s Initiating malicious PUT behavior, frequency %d\n",
3927                 my_short_id, "DHT", malicious_put_frequency);
3928     break;
3929   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP:
3930 #if DEBUG_DHT_ROUTING
3931     if ((malicious_dropper != GNUNET_YES) && (dhtlog_handle != NULL))
3932       dhtlog_handle->set_malicious (&my_identity);
3933 #endif
3934     malicious_dropper = GNUNET_YES;
3935     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3936                 "%s:%s Initiating malicious DROP behavior\n", my_short_id,
3937                 "DHT");
3938     break;
3939 #endif
3940   default:
3941     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3942                 "%s:%s Unknown control command type `%d'!\n", my_short_id,
3943                 "DHT", ntohs (dht_control_msg->command));
3944     break;
3945   }
3946
3947   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3948 }
3949
3950 /**
3951  * Handler for any generic DHT stop messages, calls the appropriate handler
3952  * depending on message type (if processed locally)
3953  *
3954  * @param cls closure for the service
3955  * @param client the client we received this message from
3956  * @param message the actual message received
3957  *
3958  */
3959 static void
3960 handle_dht_local_route_stop (void *cls, struct GNUNET_SERVER_Client *client,
3961                              const struct GNUNET_MessageHeader *message)
3962 {
3963
3964   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
3965       (const struct GNUNET_DHT_StopMessage *) message;
3966   struct DHTQueryRecord *record;
3967   struct DHTRouteSource *pos;
3968
3969 #if DEBUG_DHT
3970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3971               "`%s:%s': Received `%s' request from client, uid %llu\n",
3972               my_short_id, "DHT", "GENERIC STOP",
3973               GNUNET_ntohll (dht_stop_msg->unique_id));
3974 #endif
3975   record =
3976       GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap,
3977                                          &dht_stop_msg->key);
3978   if (record != NULL)
3979   {
3980     pos = record->head;
3981
3982     while (pos != NULL)
3983     {
3984       /* If the client is non-null (local request) and the client matches the requesting client, remove the entry. */
3985       if ((pos->client != NULL) && (pos->client->client_handle == client))
3986       {
3987         if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
3988           GNUNET_SCHEDULER_cancel (pos->delete_task);
3989         pos->delete_task =
3990             GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);
3991       }
3992       pos = pos->next;
3993     }
3994   }
3995
3996   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3997 }
3998
3999
4000 /**
4001  * Core handler for p2p route requests.
4002  *
4003  * @param cls closure
4004  * @param message message
4005  * @param peer peer identity this notification is about
4006  * @param atsi performance data
4007  * @return GNUNET_OK to keep the connection open,
4008  *         GNUNET_SYSERR to close it (signal serious error)
4009  */
4010 static int
4011 handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity *peer,
4012                               const struct GNUNET_MessageHeader *message,
4013                               const struct GNUNET_TRANSPORT_ATS_Information
4014                               *atsi)
4015 {
4016 #if DEBUG_DHT
4017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4018               "`%s:%s': Received P2P request from peer %s\n", my_short_id,
4019               "DHT", GNUNET_i2s (peer));
4020 #endif
4021   struct GNUNET_DHT_P2PRouteMessage *incoming =
4022       (struct GNUNET_DHT_P2PRouteMessage *) message;
4023   struct GNUNET_MessageHeader *enc_msg =
4024       (struct GNUNET_MessageHeader *) &incoming[1];
4025   struct DHT_MessageContext *msg_ctx;
4026   char *route_path;
4027   int path_size;
4028
4029   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4030   {
4031     GNUNET_break_op (0);
4032     return GNUNET_YES;
4033   }
4034
4035   if (malicious_dropper == GNUNET_YES)
4036   {
4037 #if DEBUG_DHT_ROUTING
4038     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4039     {
4040           /** Log routes that die due to high load! */
4041       dhtlog_handle->insert_route (NULL, 
4042 #if HAVE_UID_FOR_TESTING
4043                                    GNUNET_ntohll (incoming->unique_id),
4044 #else
4045                                    0,
4046 #endif
4047                                    DHTLOG_ROUTE, ntohl (incoming->hop_count),
4048                                    GNUNET_SYSERR, &my_identity, &incoming->key,
4049                                    peer, NULL);
4050     }
4051 #endif
4052     return GNUNET_YES;
4053   }
4054
4055   if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
4056   {
4057     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4058                 "Sending of previous replies took too long, backing off!\n");
4059     increment_stats ("# route requests dropped due to high load");
4060     decrease_max_send_delay (get_max_send_delay ());
4061 #if DEBUG_DHT_ROUTING
4062     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4063     {
4064         /** Log routes that die due to high load! */
4065       dhtlog_handle->insert_route (NULL, 
4066 #if HAVE_UID_FOR_TESTING
4067                                    GNUNET_ntohll (incoming->unique_id),
4068 #else
4069                                    0,
4070 #endif
4071                                    DHTLOG_ROUTE, ntohl (incoming->hop_count),
4072                                    GNUNET_SYSERR, &my_identity, &incoming->key,
4073                                    peer, NULL);
4074     }
4075 #endif
4076     return GNUNET_YES;
4077   }
4078   msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
4079   msg_ctx->bloom =
4080       GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
4081                                          DHT_BLOOM_K);
4082   GNUNET_assert (msg_ctx->bloom != NULL);
4083   msg_ctx->hop_count = ntohl (incoming->hop_count);
4084   memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
4085   msg_ctx->replication = ntohl (incoming->desired_replication_level);
4086 #if HAVE_UID_FOR_TESTING
4087   msg_ctx->unique_id = GNUNET_ntohll (incoming->unique_id);
4088 #endif
4089   msg_ctx->msg_options = ntohl (incoming->options);
4090   if (GNUNET_DHT_RO_RECORD_ROUTE ==
4091       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
4092   {
4093     path_size =
4094         ntohl (incoming->outgoing_path_length) *
4095         sizeof (struct GNUNET_PeerIdentity);
4096     if (ntohs (message->size) !=
4097         (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
4098          path_size))
4099     {
4100       GNUNET_break_op (0);
4101       GNUNET_free (msg_ctx);
4102       return GNUNET_YES;
4103     }
4104     route_path = (char *) &incoming[1];
4105     route_path = route_path + ntohs (enc_msg->size);
4106     msg_ctx->path_history =
4107         GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
4108     memcpy (msg_ctx->path_history, route_path, path_size);
4109     memcpy (&msg_ctx->path_history[path_size], &my_identity,
4110             sizeof (struct GNUNET_PeerIdentity));
4111     msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
4112   }
4113   msg_ctx->network_size = ntohl (incoming->network_size);
4114   msg_ctx->peer = *peer;
4115   msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
4116   msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
4117   demultiplex_message (enc_msg, msg_ctx);
4118   if (msg_ctx->bloom != NULL)
4119   {
4120     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
4121     msg_ctx->bloom = NULL;
4122   }
4123   GNUNET_free (msg_ctx);
4124   return GNUNET_YES;
4125 }
4126
4127
4128 /**
4129  * Core handler for p2p route results.
4130  *
4131  * @param cls closure
4132  * @param message message
4133  * @param peer peer identity this notification is about
4134  * @param atsi performance data
4135  *
4136  */
4137 static int
4138 handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer,
4139                              const struct GNUNET_MessageHeader *message,
4140                              const struct GNUNET_TRANSPORT_ATS_Information
4141                              *atsi)
4142 {
4143 #if DEBUG_DHT
4144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4145               "`%s:%s': Received request from peer %s\n", my_short_id, "DHT",
4146               GNUNET_i2s (peer));
4147 #endif
4148   const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
4149       (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
4150   struct GNUNET_MessageHeader *enc_msg =
4151       (struct GNUNET_MessageHeader *) &incoming[1];
4152   struct DHT_MessageContext msg_ctx;
4153
4154 #if DEBUG_PATH
4155   char *path_offset;
4156   unsigned int i;
4157 #endif
4158   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
4159   {
4160     GNUNET_break_op (0);
4161     return GNUNET_YES;
4162   }
4163
4164   if (malicious_dropper == GNUNET_YES)
4165   {
4166 #if DEBUG_DHT_ROUTING
4167     if ((debug_routes_extended) && (dhtlog_handle != NULL))
4168     {
4169           /** Log routes that die due to high load! */
4170       dhtlog_handle->insert_route (NULL, 
4171 #if HAVE_UID_FOR_TESTING
4172                                    GNUNET_ntohll (incoming->unique_id),
4173 #else
4174                                    0,
4175 #endif
4176                                    DHTLOG_ROUTE, ntohl (incoming->hop_count),
4177                                    GNUNET_SYSERR, &my_identity, &incoming->key,
4178                                    peer, NULL);
4179     }
4180 #endif
4181     return GNUNET_YES;
4182   }
4183
4184   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
4185   memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
4186 #if HAVE_UID_FOR_TESTING
4187   msg_ctx.unique_id = GNUNET_ntohll (incoming->unique_id);
4188 #endif
4189   msg_ctx.msg_options = ntohl (incoming->options);
4190   msg_ctx.hop_count = ntohl (incoming->hop_count);
4191   msg_ctx.peer = *peer;
4192   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;  /* Make result routing a higher priority */
4193   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
4194   if ((GNUNET_DHT_RO_RECORD_ROUTE ==
4195        (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
4196       (ntohl (incoming->outgoing_path_length) > 0))
4197   {
4198     if (ntohs (message->size) -
4199         sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
4200         ntohs (enc_msg->size) !=
4201         ntohl (incoming->outgoing_path_length) *
4202         sizeof (struct GNUNET_PeerIdentity))
4203     {
4204 #if DEBUG_DHT
4205       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4206                   "Return message indicated a path was included, but sizes are wrong: Total size %d, enc size %d, left %d, expected %d\n",
4207                   ntohs (message->size), ntohs (enc_msg->size),
4208                   ntohs (message->size) -
4209                   sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
4210                   ntohs (enc_msg->size),
4211                   ntohl (incoming->outgoing_path_length) *
4212                   sizeof (struct GNUNET_PeerIdentity));
4213 #endif
4214       GNUNET_break_op (0);
4215       return GNUNET_NO;
4216     }
4217     msg_ctx.path_history = (char *) &incoming[1];
4218     msg_ctx.path_history += ntohs (enc_msg->size);
4219     msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
4220 #if DEBUG_PATH
4221     for (i = 0; i < msg_ctx.path_history_len; i++)
4222     {
4223       path_offset =
4224           &msg_ctx.path_history[i * sizeof (struct GNUNET_PeerIdentity)];
4225       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4226                   "(handle_p2p_route_result) Key %s Found peer %d:%s\n",
4227                   GNUNET_h2s (&msg_ctx.key), i,
4228                   GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
4229     }
4230 #endif
4231   }
4232   route_result_message (enc_msg, &msg_ctx);
4233   return GNUNET_YES;
4234 }
4235
4236
4237 /**
4238  * Receive the HELLO from transport service,
4239  * free current and replace if necessary.
4240  *
4241  * @param cls NULL
4242  * @param message HELLO message of peer
4243  */
4244 static void
4245 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
4246 {
4247 #if DEBUG_DHT
4248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4249               "Received our `%s' from transport service\n", "HELLO");
4250 #endif
4251
4252   GNUNET_assert (message != NULL);
4253   GNUNET_free_non_null (my_hello);
4254   my_hello = GNUNET_malloc (ntohs (message->size));
4255   memcpy (my_hello, message, ntohs (message->size));
4256 }
4257
4258
4259 /**
4260  * Task run during shutdown.
4261  *
4262  * @param cls unused
4263  * @param tc unused
4264  */
4265 static void
4266 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
4267 {
4268   int bucket_count;
4269   struct PeerInfo *pos;
4270
4271   if (NULL != ghh)
4272   {
4273     GNUNET_TRANSPORT_get_hello_cancel (ghh);
4274     ghh = NULL;
4275   }
4276   if (transport_handle != NULL)
4277   {
4278     GNUNET_free_non_null (my_hello);
4279     GNUNET_TRANSPORT_disconnect (transport_handle);
4280     transport_handle = NULL;
4281   }
4282   if (NULL != nse)
4283   {
4284     GNUNET_NSE_disconnect (nse);
4285     nse = NULL;
4286   }
4287   if (coreAPI != NULL)
4288   {
4289 #if DEBUG_DHT
4290     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n",
4291                 my_short_id, "DHT");
4292 #endif
4293     GNUNET_CORE_disconnect (coreAPI);
4294     coreAPI = NULL;
4295   }
4296   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
4297   {
4298     while (k_buckets[bucket_count].head != NULL)
4299     {
4300       pos = k_buckets[bucket_count].head;
4301 #if DEBUG_DHT
4302       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4303                   "%s:%s Removing peer %s from bucket %d!\n", my_short_id,
4304                   "DHT", GNUNET_i2s (&pos->id), bucket_count);
4305 #endif
4306       delete_peer (pos, bucket_count);
4307     }
4308   }
4309   if (datacache != NULL)
4310   {
4311 #if DEBUG_DHT
4312     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n",
4313                 my_short_id, "DHT");
4314 #endif
4315     GNUNET_DATACACHE_destroy (datacache);
4316     datacache = NULL;
4317   }
4318   if (stats != NULL)
4319   {
4320     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
4321     stats = NULL;
4322   }
4323   if (dhtlog_handle != NULL)
4324   {
4325     GNUNET_DHTLOG_disconnect (dhtlog_handle);
4326     dhtlog_handle = NULL;
4327   }
4328   if (block_context != NULL)
4329   {
4330     GNUNET_BLOCK_context_destroy (block_context);
4331     block_context = NULL;
4332   }
4333   GNUNET_free_non_null (my_short_id);
4334   my_short_id = NULL;
4335 }
4336
4337
4338 /**
4339  * To be called on core init/fail.
4340  *
4341  * @param cls service closure
4342  * @param server handle to the server for this service
4343  * @param identity the public identity of this peer
4344  * @param publicKey the public key of this peer
4345  */
4346 void
4347 core_init (void *cls, struct GNUNET_CORE_Handle *server,
4348            const struct GNUNET_PeerIdentity *identity,
4349            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
4350 {
4351
4352   if (server == NULL)
4353   {
4354 #if DEBUG_DHT
4355     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n",
4356                 "dht", GNUNET_i2s (identity));
4357 #endif
4358     GNUNET_SCHEDULER_cancel (cleanup_task);
4359     GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
4360     return;
4361   }
4362 #if DEBUG_DHT
4363   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4364               "%s: Core connection initialized, I am peer: %s\n", "dht",
4365               GNUNET_i2s (identity));
4366 #endif
4367
4368   /* Copy our identity so we can use it */
4369   memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
4370   if (my_short_id != NULL)
4371     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4372                 "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n",
4373                 "DHT SERVICE");
4374   my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity));
4375   if (dhtlog_handle != NULL)
4376     dhtlog_handle->insert_node (NULL, &my_identity);
4377 }
4378
4379
4380 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
4381   {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE,
4382    0},
4383   {&handle_dht_local_route_stop, NULL,
4384    GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP, 0},
4385   {&handle_dht_control_message, NULL, GNUNET_MESSAGE_TYPE_DHT_CONTROL, 0},
4386   {NULL, NULL, 0, 0}
4387 };
4388
4389
4390 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
4391   {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
4392   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
4393   {NULL, 0, 0}
4394 };
4395
4396
4397 /**
4398  * Method called whenever a peer connects.
4399  *
4400  * @param cls closure
4401  * @param peer peer identity this notification is about
4402  * @param atsi performance data
4403  */
4404 static void
4405 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
4406                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4407 {
4408   struct PeerInfo *ret;
4409   struct DHTPutEntry *put_entry;
4410   int peer_bucket;
4411
4412   /* Check for connect to self message */
4413   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
4414     return;
4415
4416 #if DEBUG_DHT
4417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4418               "%s:%s Receives core connect message for peer %s distance %d!\n",
4419               my_short_id, "dht", GNUNET_i2s (peer), distance);
4420 #endif
4421
4422   if (GNUNET_YES ==
4423       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
4424                                               &peer->hashPubKey))
4425   {
4426 #if DEBUG_DHT
4427     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4428                 "%s:%s Received %s message for peer %s, but already have peer in RT!",
4429                 my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
4430 #endif
4431     GNUNET_break (0);
4432     return;
4433   }
4434
4435   if ((datacache != NULL) && (GNUNET_YES == put_peer_identities))
4436   {
4437     put_entry =
4438         GNUNET_malloc (sizeof (struct DHTPutEntry) +
4439                        sizeof (struct GNUNET_PeerIdentity));
4440     put_entry->path_length = 0;
4441     put_entry->data_size = sizeof (struct GNUNET_PeerIdentity);
4442     memcpy (&put_entry[1], peer, sizeof (struct GNUNET_PeerIdentity));
4443     GNUNET_DATACACHE_put (datacache, &peer->hashPubKey,
4444                           sizeof (struct DHTPutEntry) +
4445                           sizeof (struct GNUNET_PeerIdentity),
4446                           (char *) put_entry, GNUNET_BLOCK_TYPE_DHT_HELLO,
4447                           GNUNET_TIME_absolute_get_forever ());
4448     GNUNET_free (put_entry);
4449   }
4450   else if (datacache == NULL)
4451     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4452                 "DHT has no connection to datacache!\n");
4453
4454   peer_bucket = find_current_bucket (&peer->hashPubKey);
4455   GNUNET_assert (peer_bucket >= lowest_bucket);
4456   GNUNET_assert (peer_bucket < MAX_BUCKETS);
4457   ret = GNUNET_malloc (sizeof (struct PeerInfo));
4458 #if 0
4459   ret->latency = latency;
4460   ret->distance = distance;
4461 #endif
4462   ret->id = *peer;
4463   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
4464                                      k_buckets[peer_bucket].tail,
4465                                      k_buckets[peer_bucket].tail, ret);
4466   k_buckets[peer_bucket].peers_size++;
4467 #if DO_UPDATE_PREFERENCE
4468   if ((GNUNET_CRYPTO_hash_matching_bits
4469        (&my_identity.hashPubKey, &peer->hashPubKey) > 0) &&
4470       (k_buckets[peer_bucket].peers_size <= bucket_size))
4471     ret->preference_task =
4472         GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
4473 #endif
4474   if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
4475     enable_next_bucket ();
4476   newly_found_peers++;
4477   GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret,
4478                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
4479   increment_stats (STAT_PEERS_KNOWN);
4480
4481 #if DEBUG_DHT
4482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4483               "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
4484               ret == NULL ? "NOT ADDED" : "PEER ADDED");
4485 #endif
4486 }
4487
4488
4489 /**
4490  * Method called whenever a peer disconnects.
4491  *
4492  * @param cls closure
4493  * @param peer peer identity this notification is about
4494  */
4495 static void
4496 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
4497 {
4498   struct PeerInfo *to_remove;
4499   int current_bucket;
4500
4501   /* Check for disconnect from self message */
4502   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
4503     return;
4504 #if DEBUG_DHT
4505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4506               "%s:%s: Received peer disconnect message for peer `%s' from %s\n",
4507               my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
4508 #endif
4509
4510   if (GNUNET_YES !=
4511       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
4512                                               &peer->hashPubKey))
4513   {
4514     GNUNET_break (0);
4515 #if DEBUG_DHT
4516     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4517                 "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
4518                 my_short_id, "DHT", GNUNET_i2s (peer));
4519 #endif
4520     return;
4521   }
4522   increment_stats (STAT_DISCONNECTS);
4523   GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
4524                  (all_known_peers, &peer->hashPubKey));
4525   to_remove =
4526       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
4527   GNUNET_assert (to_remove != NULL);
4528   if (NULL != to_remove->info_ctx)
4529   {
4530     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
4531     to_remove->info_ctx = NULL;
4532   }
4533   GNUNET_assert (0 ==
4534                  memcmp (peer, &to_remove->id,
4535                          sizeof (struct GNUNET_PeerIdentity)));
4536   current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
4537   delete_peer (to_remove, current_bucket);
4538 }
4539
4540
4541 /**
4542  * Process dht requests.
4543  *
4544  * @param cls closure
4545  * @param server the initialized server
4546  * @param c configuration to use
4547  */
4548 static void
4549 run (void *cls, struct GNUNET_SERVER_Handle *server,
4550      const struct GNUNET_CONFIGURATION_Handle *c)
4551 {
4552   struct GNUNET_TIME_Relative next_send_time;
4553   unsigned long long temp_config_num;
4554
4555   cfg = c;
4556   datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
4557   GNUNET_SERVER_add_handlers (server, plugin_handlers);
4558   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
4559   nse = GNUNET_NSE_connect (cfg, &update_network_size_estimate, NULL);
4560   coreAPI = GNUNET_CORE_connect (cfg,   /* Main configuration */
4561                                  DEFAULT_CORE_QUEUE_SIZE,       /* queue size */
4562                                  NULL,  /* Closure passed to DHT functions */
4563                                  &core_init,    /* Call core_init once connected */
4564                                  &handle_core_connect,  /* Handle connects */
4565                                  &handle_core_disconnect,       /* remove peers on disconnects */
4566                                  NULL,  /* Do we care about "status" updates? */
4567                                  NULL,  /* Don't want notified about all incoming messages */
4568                                  GNUNET_NO,     /* For header only inbound notification */
4569                                  NULL,  /* Don't want notified about all outbound messages */
4570                                  GNUNET_NO,     /* For header only outbound notification */
4571                                  core_handlers);        /* Register these handlers */
4572
4573   if (coreAPI == NULL)
4574     return;
4575   transport_handle =
4576       GNUNET_TRANSPORT_connect (cfg, NULL, NULL, NULL, NULL, NULL);
4577   if (transport_handle != NULL)
4578     ghh = GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
4579   else
4580     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4581                 "Failed to connect to transport service!\n");
4582   block_context = GNUNET_BLOCK_context_create (cfg);
4583   lowest_bucket = MAX_BUCKETS - 1;
4584   forward_list.hashmap =
4585       GNUNET_CONTAINER_multihashmap_create (MAX_OUTSTANDING_FORWARDS / 10);
4586   forward_list.minHeap =
4587       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4588   all_known_peers = GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
4589   GNUNET_assert (all_known_peers != NULL);
4590   if (GNUNET_YES ==
4591       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht_testing",
4592                                             "mysql_logging"))
4593   {
4594     debug_routes = GNUNET_YES;
4595   }
4596
4597   if (GNUNET_YES ==
4598       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "strict_kademlia"))
4599   {
4600     strict_kademlia = GNUNET_YES;
4601   }
4602
4603   if (GNUNET_YES ==
4604       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "stop_on_closest"))
4605   {
4606     stop_on_closest = GNUNET_YES;
4607   }
4608
4609   if (GNUNET_YES ==
4610       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "stop_found"))
4611   {
4612     stop_on_found = GNUNET_YES;
4613   }
4614
4615   if (GNUNET_YES ==
4616       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "malicious_getter"))
4617   {
4618     malicious_getter = GNUNET_YES;
4619     if (GNUNET_NO ==
4620         GNUNET_CONFIGURATION_get_value_time (cfg, "DHT",
4621                                              "MALICIOUS_GET_FREQUENCY",
4622                                              &malicious_get_frequency))
4623       malicious_get_frequency = DEFAULT_MALICIOUS_GET_FREQUENCY;
4624   }
4625
4626   if (GNUNET_YES ==
4627       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "malicious_putter"))
4628   {
4629     malicious_putter = GNUNET_YES;
4630     if (GNUNET_NO ==
4631         GNUNET_CONFIGURATION_get_value_time (cfg, "DHT",
4632                                              "MALICIOUS_PUT_FREQUENCY",
4633                                              &malicious_put_frequency))
4634       malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
4635   }
4636
4637   if (GNUNET_OK ==
4638       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
4639                                              &temp_config_num))
4640   {
4641     bucket_size = (unsigned int) temp_config_num;
4642   }
4643
4644   if (GNUNET_OK !=
4645       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "kad_alpha",
4646                                              &kademlia_replication))
4647   {
4648     kademlia_replication = DEFAULT_KADEMLIA_REPLICATION;
4649   }
4650
4651   if (GNUNET_YES ==
4652       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "malicious_dropper"))
4653   {
4654     malicious_dropper = GNUNET_YES;
4655   }
4656
4657   if (GNUNET_NO ==
4658       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "do_find_peer"))
4659   {
4660     do_find_peer = GNUNET_NO;
4661   }
4662   else
4663     do_find_peer = GNUNET_YES;
4664
4665   if (GNUNET_YES ==
4666       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "use_real_distance"))
4667     use_real_distance = GNUNET_YES;
4668
4669   if (GNUNET_YES ==
4670       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht_testing",
4671                                             "mysql_logging_extended"))
4672   {
4673     debug_routes = GNUNET_YES;
4674     debug_routes_extended = GNUNET_YES;
4675   }
4676
4677 #if DEBUG_DHT_ROUTING
4678   if (GNUNET_YES == debug_routes)
4679   {
4680     dhtlog_handle = GNUNET_DHTLOG_connect (cfg);
4681     if (dhtlog_handle == NULL)
4682     {
4683       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4684                   "Could not connect to mysql logging server, logging will not happen!");
4685     }
4686   }
4687 #endif
4688
4689   if (GNUNET_YES ==
4690       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "paper_forwarding"))
4691     paper_forwarding = GNUNET_YES;
4692
4693   if (GNUNET_YES ==
4694       GNUNET_CONFIGURATION_get_value_yesno (cfg, "dht", "put_peer_identities"))
4695     put_peer_identities = GNUNET_YES;
4696
4697   stats = GNUNET_STATISTICS_create ("dht", cfg);
4698
4699   if (stats != NULL)
4700   {
4701     GNUNET_STATISTICS_set (stats, STAT_ROUTES, 0, GNUNET_NO);
4702     GNUNET_STATISTICS_set (stats, STAT_ROUTE_FORWARDS, 0, GNUNET_NO);
4703     GNUNET_STATISTICS_set (stats, STAT_ROUTE_FORWARDS_CLOSEST, 0, GNUNET_NO);
4704     GNUNET_STATISTICS_set (stats, STAT_RESULTS, 0, GNUNET_NO);
4705     GNUNET_STATISTICS_set (stats, STAT_RESULTS_TO_CLIENT, 0, GNUNET_NO);
4706     GNUNET_STATISTICS_set (stats, STAT_RESULT_FORWARDS, 0, GNUNET_NO);
4707     GNUNET_STATISTICS_set (stats, STAT_GETS, 0, GNUNET_NO);
4708     GNUNET_STATISTICS_set (stats, STAT_PUTS, 0, GNUNET_NO);
4709     GNUNET_STATISTICS_set (stats, STAT_PUTS_INSERTED, 0, GNUNET_NO);
4710     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER, 0, GNUNET_NO);
4711     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER_START, 0, GNUNET_NO);
4712     GNUNET_STATISTICS_set (stats, STAT_GET_START, 0, GNUNET_NO);
4713     GNUNET_STATISTICS_set (stats, STAT_PUT_START, 0, GNUNET_NO);
4714     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER_REPLY, 0, GNUNET_NO);
4715     GNUNET_STATISTICS_set (stats, STAT_FIND_PEER_ANSWER, 0, GNUNET_NO);
4716     GNUNET_STATISTICS_set (stats, STAT_BLOOM_FIND_PEER, 0, GNUNET_NO);
4717     GNUNET_STATISTICS_set (stats, STAT_GET_REPLY, 0, GNUNET_NO);
4718     GNUNET_STATISTICS_set (stats, STAT_GET_RESPONSE_START, 0, GNUNET_NO);
4719     GNUNET_STATISTICS_set (stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
4720     GNUNET_STATISTICS_set (stats, STAT_DISCONNECTS, 0, GNUNET_NO);
4721   }
4722   if (GNUNET_YES == do_find_peer)
4723   {
4724     next_send_time.rel_value =
4725         DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
4726         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
4727                                   (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
4728                                    2) -
4729                                   DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
4730     find_peer_context.start = GNUNET_TIME_absolute_get ();
4731     GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
4732                                   &find_peer_context);
4733   }
4734
4735   /* Scheduled the task to clean up when shutdown is called */
4736   cleanup_task =
4737       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4738                                     &shutdown_task, NULL);
4739 }
4740
4741
4742 /**
4743  * The main function for the dht service.
4744  *
4745  * @param argc number of arguments from the command line
4746  * @param argv command line arguments
4747  * @return 0 ok, 1 on error
4748  */
4749 int
4750 main (int argc, char *const *argv)
4751 {
4752   int ret;
4753
4754 #if HAVE_UID_FOR_TESTING > 1
4755   recent.hashmap = GNUNET_CONTAINER_multihashmap_create (DHT_MAX_RECENT / 2);
4756 #endif
4757   recent.minHeap =
4758       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4759   recent_find_peer_requests =
4760       GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
4761   ret =
4762       (GNUNET_OK ==
4763        GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
4764                            NULL)) ? 0 : 1;
4765 #if HAVE_UID_FOR_TESTING > 1
4766   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (recent.hashmap));
4767   GNUNET_CONTAINER_multihashmap_destroy (recent.hashmap);
4768   recent.hashmap = NULL;
4769 #endif
4770   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (recent.minHeap));
4771   GNUNET_CONTAINER_heap_destroy (recent.minHeap);
4772   recent.minHeap = NULL;
4773   GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
4774   recent_find_peer_requests = NULL;
4775   return ret;
4776 }
4777
4778 /* end of gnunet-service-dht.c */