paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file dht/gnunet-service-dht_neighbours.c
21  * @brief GNUnet DHT service's bucket and neighbour management code
22  * @author Christian Grothoff
23  * @author Nathan Evans
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_block_lib.h"
28 #include "gnunet_hello_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_nse_service.h"
32 #include "gnunet_ats_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet-service-dht.h"
40 #include "gnunet-service-dht_datacache.h"
41 #include "gnunet-service-dht_hello.h"
42 #include "gnunet-service-dht_neighbours.h"
43 #include "gnunet-service-dht_nse.h"
44 #include "gnunet-service-dht_routing.h"
45 #include "dht.h"
46
47 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
48
49 /**
50  * Enable slow sanity checks to debug issues.
51  */
52 #define SANITY_CHECKS 1
53
54 /**
55  * How many buckets will we allow total.
56  */
57 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
58
59 /**
60  * What is the maximum number of peers in a given bucket.
61  */
62 #define DEFAULT_BUCKET_SIZE 8
63
64 /**
65  * Desired replication level for FIND PEER requests
66  */
67 #define FIND_PEER_REPLICATION_LEVEL 4
68
69 /**
70  * Maximum allowed replication level for all requests.
71  */
72 #define MAXIMUM_REPLICATION_LEVEL 16
73
74 /**
75  * Maximum allowed number of pending messages per peer.
76  */
77 #define MAXIMUM_PENDING_PER_PEER 64
78
79 /**
80  * How long at least to wait before sending another find peer request.
81  */
82 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
83
84 /**
85  * How long at most to wait before sending another find peer request.
86  */
87 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
88
89 /**
90  * How long at most to wait for transmission of a GET request to another peer?
91  */
92 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
93
94 /**
95  * Hello address expiration
96  */
97 extern struct GNUNET_TIME_Relative hello_expiration;
98
99
100 GNUNET_NETWORK_STRUCT_BEGIN
101
102 /**
103  * P2P PUT message
104  */
105 struct PeerPutMessage
106 {
107   /**
108    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
109    */
110   struct GNUNET_MessageHeader header;
111
112   /**
113    * Processing options
114    */
115   uint32_t options GNUNET_PACKED;
116
117   /**
118    * Content type.
119    */
120   uint32_t type GNUNET_PACKED;
121
122   /**
123    * Hop count
124    */
125   uint32_t hop_count GNUNET_PACKED;
126
127   /**
128    * Replication level for this message
129    */
130   uint32_t desired_replication_level GNUNET_PACKED;
131
132   /**
133    * Length of the PUT path that follows (if tracked).
134    */
135   uint32_t put_path_length GNUNET_PACKED;
136
137   /**
138    * When does the content expire?
139    */
140   struct GNUNET_TIME_AbsoluteNBO expiration_time;
141
142   /**
143    * Bloomfilter (for peer identities) to stop circular routes
144    */
145   char bloomfilter[DHT_BLOOM_SIZE];
146
147   /**
148    * The key we are storing under.
149    */
150   struct GNUNET_HashCode key;
151
152   /* put path (if tracked) */
153
154   /* Payload */
155
156 };
157
158
159 /**
160  * P2P Result message
161  */
162 struct PeerResultMessage
163 {
164   /**
165    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
166    */
167   struct GNUNET_MessageHeader header;
168
169   /**
170    * Content type.
171    */
172   uint32_t type GNUNET_PACKED;
173
174   /**
175    * Length of the PUT path that follows (if tracked).
176    */
177   uint32_t put_path_length GNUNET_PACKED;
178
179   /**
180    * Length of the GET path that follows (if tracked).
181    */
182   uint32_t get_path_length GNUNET_PACKED;
183
184   /**
185    * When does the content expire?
186    */
187   struct GNUNET_TIME_AbsoluteNBO expiration_time;
188
189   /**
190    * The key of the corresponding GET request.
191    */
192   struct GNUNET_HashCode key;
193
194   /* put path (if tracked) */
195
196   /* get path (if tracked) */
197
198   /* Payload */
199
200 };
201
202
203 /**
204  * P2P GET message
205  */
206 struct PeerGetMessage
207 {
208   /**
209    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
210    */
211   struct GNUNET_MessageHeader header;
212
213   /**
214    * Processing options
215    */
216   uint32_t options GNUNET_PACKED;
217
218   /**
219    * Desired content type.
220    */
221   uint32_t type GNUNET_PACKED;
222
223   /**
224    * Hop count
225    */
226   uint32_t hop_count GNUNET_PACKED;
227
228   /**
229    * Desired replication level for this request.
230    */
231   uint32_t desired_replication_level GNUNET_PACKED;
232
233   /**
234    * Size of the extended query.
235    */
236   uint32_t xquery_size;
237
238   /**
239    * Bloomfilter mutator.
240    */
241   uint32_t bf_mutator;
242
243   /**
244    * Bloomfilter (for peer identities) to stop circular routes
245    */
246   char bloomfilter[DHT_BLOOM_SIZE];
247
248   /**
249    * The key we are looking for.
250    */
251   struct GNUNET_HashCode key;
252
253   /* xquery */
254
255   /* result bloomfilter */
256
257 };
258 GNUNET_NETWORK_STRUCT_END
259
260
261 /**
262  * Entry for a peer in a bucket.
263  */
264 struct PeerInfo
265 {
266   /**
267    * Next peer entry (DLL)
268    */
269   struct PeerInfo *next;
270
271   /**
272    *  Prev peer entry (DLL)
273    */
274   struct PeerInfo *prev;
275
276   /**
277    * Handle for sending messages to this peer.
278    */
279   struct GNUNET_MQ_Handle *mq;
280
281   /**
282    * What is the identity of the peer?
283    */
284   const struct GNUNET_PeerIdentity *id;
285
286   /**
287    * Hash of @e id.
288    */
289   struct GNUNET_HashCode phash;
290
291   /**
292    * Which bucket is this peer in?
293    */
294   int peer_bucket;
295
296 };
297
298
299 /**
300  * Peers are grouped into buckets.
301  */
302 struct PeerBucket
303 {
304   /**
305    * Head of DLL
306    */
307   struct PeerInfo *head;
308
309   /**
310    * Tail of DLL
311    */
312   struct PeerInfo *tail;
313
314   /**
315    * Number of peers in the bucket.
316    */
317   unsigned int peers_size;
318 };
319
320
321 /**
322  * Information about a peer that we would like to connect to.
323  */
324 struct ConnectInfo
325 {
326
327   /**
328    * Handle to active HELLO offer operation, or NULL.
329    */
330   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
331
332   /**
333    * Handle to active connectivity suggestion operation, or NULL.
334    */
335   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
336
337   /**
338    * How much would we like to connect to this peer?
339    */
340   uint32_t strength;
341 };
342
343
344 /**
345  * Do we cache all results that we are routing in the local datacache?
346  */
347 static int cache_results;
348
349 /**
350  * Should routing details be logged to stderr (for debugging)?
351  */
352 static int log_route_details_stderr;
353
354 /**
355  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
356  */
357 static unsigned int closest_bucket;
358
359 /**
360  * How many peers have we added since we sent out our last
361  * find peer request?
362  */
363 static unsigned int newly_found_peers;
364
365 /**
366  * Option for testing that disables the 'connect' function of the DHT.
367  */
368 static int disable_try_connect;
369
370 /**
371  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
372  */
373 static struct PeerBucket k_buckets[MAX_BUCKETS];
374
375 /**
376  * Hash map of all CORE-connected peers, for easy removal from
377  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
378  */
379 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
380
381 /**
382  * Hash map of all peers we would like to be connected to.
383  * Values are of type `struct ConnectInfo`.
384  */
385 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
386
387 /**
388  * Maximum size for each bucket.
389  */
390 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
391
392 /**
393  * Task that sends FIND PEER requests.
394  */
395 static struct GNUNET_SCHEDULER_Task *find_peer_task;
396
397 /**
398  * Identity of this peer.
399  */
400 static struct GNUNET_PeerIdentity my_identity;
401
402 /**
403  * Hash of the identity of this peer.
404  */
405 struct GNUNET_HashCode my_identity_hash;
406
407 /**
408  * Handle to CORE.
409  */
410 static struct GNUNET_CORE_Handle *core_api;
411
412 /**
413  * Handle to ATS connectivity.
414  */
415 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
416
417
418 /**
419  * Find the optimal bucket for this key.
420  *
421  * @param hc the hashcode to compare our identity to
422  * @return the proper bucket index, or #GNUNET_SYSERR
423  *         on error (same hashcode)
424  */
425 static int
426 find_bucket (const struct GNUNET_HashCode *hc)
427 {
428   unsigned int bits;
429
430   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
431   if (bits == MAX_BUCKETS)
432   {
433     /* How can all bits match? Got my own ID? */
434     GNUNET_break (0);
435     return GNUNET_SYSERR;
436   }
437   return MAX_BUCKETS - bits - 1;
438 }
439
440
441 /**
442  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
443  * Clean up the "oh" field in the @a cls
444  *
445  * @param cls a `struct ConnectInfo`
446  */
447 static void
448 offer_hello_done (void *cls)
449 {
450   struct ConnectInfo *ci = cls;
451
452   ci->oh = NULL;
453 }
454
455
456 /**
457  * Function called for all entries in #all_desired_peers to clean up.
458  *
459  * @param cls NULL
460  * @param peer peer the entry is for
461  * @param value the value to remove
462  * @return #GNUNET_YES
463  */
464 static int
465 free_connect_info (void *cls,
466                    const struct GNUNET_PeerIdentity *peer,
467                    void *value)
468 {
469   struct ConnectInfo *ci = value;
470
471   GNUNET_assert (GNUNET_YES ==
472                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
473                                                        peer,
474                                                        ci));
475   if (NULL != ci->sh)
476   {
477     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
478     ci->sh = NULL;
479   }
480   if (NULL != ci->oh)
481   {
482     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
483     ci->oh = NULL;
484   }
485   GNUNET_free (ci);
486   return GNUNET_YES;
487 }
488
489
490 /**
491  * Consider if we want to connect to a given peer, and if so
492  * let ATS know.  If applicable, the HELLO is offered to the
493  * TRANSPORT service.
494  *
495  * @param pid peer to consider connectivity requirements for
496  * @param h a HELLO message, or NULL
497  */
498 static void
499 try_connect (const struct GNUNET_PeerIdentity *pid,
500              const struct GNUNET_MessageHeader *h)
501 {
502   int bucket;
503   struct GNUNET_HashCode pid_hash;
504   struct ConnectInfo *ci;
505   uint32_t strength;
506
507   GNUNET_CRYPTO_hash (pid,
508                       sizeof (struct GNUNET_PeerIdentity),
509                       &pid_hash);
510   bucket = find_bucket (&pid_hash);
511   if (bucket < 0)
512     return; /* self? */
513   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
514                                           pid);
515
516   if (k_buckets[bucket].peers_size < bucket_size)
517     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
518   else
519     strength = bucket; /* minimum value of connectivity */
520   if (GNUNET_YES ==
521       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
522                                               pid))
523     strength *= 2; /* double for connected peers */
524   else if (k_buckets[bucket].peers_size > bucket_size)
525     strength = 0; /* bucket full, we really do not care about more */
526
527   if ( (0 == strength) &&
528        (NULL != ci) )
529   {
530     /* release request */
531     GNUNET_assert (GNUNET_YES ==
532                    free_connect_info (NULL,
533                                       pid,
534                                       ci));
535     return;
536   }
537   if (NULL == ci)
538   {
539     ci = GNUNET_new (struct ConnectInfo);
540     GNUNET_assert (GNUNET_OK ==
541                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
542                                                       pid,
543                                                       ci,
544                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
545   }
546   if ( (NULL != ci->oh) &&
547        (NULL != h) )
548     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
549   if (NULL != h)
550     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
551                                            h,
552                                            &offer_hello_done,
553                                            ci);
554   if ( (NULL != ci->sh) &&
555        (ci->strength != strength) )
556     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
557   if (ci->strength != strength)
558     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
559                                               pid,
560                                               strength);
561   ci->strength = strength;
562 }
563
564
565 /**
566  * Function called for each peer in #all_desired_peers during
567  * #update_connect_preferences() if we have reason to adjust
568  * the strength of our desire to keep connections to certain
569  * peers.  Calls #try_connect() to update the calculations for
570  * the given @a pid.
571  *
572  * @param cls NULL
573  * @param pid peer to update
574  * @param value unused
575  * @return #GNUNET_YES (continue to iterate)
576  */
577 static int
578 update_desire_strength (void *cls,
579                         const struct GNUNET_PeerIdentity *pid,
580                         void *value)
581 {
582   try_connect (pid, NULL);
583   return GNUNET_YES;
584 }
585
586
587 /**
588  * Update our preferences for connectivity as given to ATS.
589  *
590  * @param cls the `struct PeerInfo` of the peer
591  * @param tc scheduler context.
592  */
593 static void
594 update_connect_preferences ()
595 {
596   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
597                                          &update_desire_strength,
598                                          NULL);
599 }
600
601
602 /**
603  * Add each of the peers we already know to the bloom filter of
604  * the request so that we don't get duplicate HELLOs.
605  *
606  * @param cls the `struct GNUNET_BLOCK_Group`
607  * @param key peer identity to add to the bloom filter
608  * @param value value the peer information (unused)
609  * @return #GNUNET_YES (we should continue to iterate)
610  */
611 static int
612 add_known_to_bloom (void *cls,
613                     const struct GNUNET_PeerIdentity *key,
614                     void *value)
615 {
616   struct GNUNET_BLOCK_Group *bg = cls;
617   struct GNUNET_HashCode key_hash;
618
619   GNUNET_CRYPTO_hash (key,
620                       sizeof (struct GNUNET_PeerIdentity),
621                       &key_hash);
622   GNUNET_BLOCK_group_set_seen (bg,
623                                &key_hash,
624                                1);
625   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
626               "Adding known peer (%s) to bloomfilter for FIND PEER\n",
627               GNUNET_i2s (key));
628   return GNUNET_YES;
629 }
630
631
632 /**
633  * Task to send a find peer message for our own peer identifier
634  * so that we can find the closest peers in the network to ourselves
635  * and attempt to connect to them.
636  *
637  * @param cls closure for this task
638  */
639 static void
640 send_find_peer_message (void *cls)
641 {
642   struct GNUNET_TIME_Relative next_send_time;
643   struct GNUNET_BLOCK_Group *bg;
644   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
645
646   find_peer_task = NULL;
647   if (newly_found_peers > bucket_size)
648   {
649     /* If we are finding many peers already, no need to send out our request right now! */
650     find_peer_task =
651         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
652                                       &send_find_peer_message,
653                                       NULL);
654     newly_found_peers = 0;
655     return;
656   }
657   bg = GNUNET_BLOCK_group_create (GDS_block_context,
658                                   GNUNET_BLOCK_TYPE_DHT_HELLO,
659                                   GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
660                                                             UINT32_MAX),
661                                   NULL,
662                                   0,
663                                   "filter-size",
664                                   DHT_BLOOM_SIZE,
665                                   NULL);
666   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
667                                          &add_known_to_bloom,
668                                          bg);
669   GNUNET_STATISTICS_update (GDS_stats,
670                             gettext_noop ("# FIND PEER messages initiated"),
671                             1,
672                             GNUNET_NO);
673   peer_bf
674     = GNUNET_CONTAINER_bloomfilter_init (NULL,
675                                          DHT_BLOOM_SIZE,
676                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
677   // FIXME: pass priority!?
678   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
679                              GNUNET_DHT_RO_FIND_PEER | GNUNET_DHT_RO_RECORD_ROUTE,
680                              FIND_PEER_REPLICATION_LEVEL,
681                              0,
682                              &my_identity_hash,
683                              NULL,
684                              0,
685                              bg,
686                              peer_bf);
687   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
688   GNUNET_BLOCK_group_destroy (bg);
689   /* schedule next round */
690   next_send_time.rel_value_us =
691       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
692       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
693                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
694                                 (newly_found_peers + 1));
695   newly_found_peers = 0;
696   GNUNET_assert (NULL == find_peer_task);
697   find_peer_task =
698       GNUNET_SCHEDULER_add_delayed (next_send_time,
699                                     &send_find_peer_message,
700                                     NULL);
701 }
702
703
704 /**
705  * Method called whenever a peer connects.
706  *
707  * @param cls closure
708  * @param peer peer identity this notification is about
709  * @param mq message queue for sending messages to @a peer
710  * @return our `struct PeerInfo` for @a peer
711  */
712 static void *
713 handle_core_connect (void *cls,
714                      const struct GNUNET_PeerIdentity *peer,
715                      struct GNUNET_MQ_Handle *mq)
716 {
717   struct PeerInfo *pi;
718
719   /* Check for connect to self message */
720   if (0 == memcmp (&my_identity,
721                    peer,
722                    sizeof (struct GNUNET_PeerIdentity)))
723     return NULL;
724   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725               "Connected to %s\n",
726               GNUNET_i2s (peer));
727   GNUNET_assert (GNUNET_NO ==
728                  GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
729                                                     peer));
730   GNUNET_STATISTICS_update (GDS_stats,
731                             gettext_noop ("# peers connected"),
732                             1,
733                             GNUNET_NO);
734   pi = GNUNET_new (struct PeerInfo);
735   pi->id = peer;
736   pi->mq = mq;
737   GNUNET_CRYPTO_hash (peer,
738                       sizeof (struct GNUNET_PeerIdentity),
739                       &pi->phash);
740   pi->peer_bucket = find_bucket (&pi->phash);
741   GNUNET_assert ( (pi->peer_bucket >= 0) &&
742                   (pi->peer_bucket < MAX_BUCKETS) );
743   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
744                                     k_buckets[pi->peer_bucket].tail,
745                                     pi);
746   k_buckets[pi->peer_bucket].peers_size++;
747   closest_bucket = GNUNET_MAX (closest_bucket,
748                                pi->peer_bucket);
749   GNUNET_assert (GNUNET_OK ==
750                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
751                                                     pi->id,
752                                                     pi,
753                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
754   if ( (pi->peer_bucket > 0) &&
755        (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
756   {
757     update_connect_preferences ();
758     newly_found_peers++;
759   }
760   if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
761        (GNUNET_YES != disable_try_connect))
762   {
763     /* got a first connection, good time to start with FIND PEER requests... */
764     GNUNET_assert (NULL == find_peer_task);
765     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
766                                                NULL);
767   }
768   return pi;
769 }
770
771
772 /**
773  * Method called whenever a peer disconnects.
774  *
775  * @param cls closure
776  * @param peer peer identity this notification is about
777  * @param internal_cls our `struct PeerInfo` for @a peer
778  */
779 static void
780 handle_core_disconnect (void *cls,
781                         const struct GNUNET_PeerIdentity *peer,
782                         void *internal_cls)
783 {
784   struct PeerInfo *to_remove = internal_cls;
785
786   /* Check for disconnect from self message */
787   if (NULL == to_remove)
788     return;
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "Disconnected %s\n",
791               GNUNET_i2s (peer));
792   GNUNET_STATISTICS_update (GDS_stats,
793                             gettext_noop ("# peers connected"),
794                             -1,
795                             GNUNET_NO);
796   GNUNET_assert (GNUNET_YES ==
797                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
798                                                        peer,
799                                                        to_remove));
800   if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
801        (GNUNET_YES != disable_try_connect) )
802   {
803     GNUNET_SCHEDULER_cancel (find_peer_task);
804     find_peer_task = NULL;
805   }
806   GNUNET_assert (to_remove->peer_bucket >= 0);
807   GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
808                                k_buckets[to_remove->peer_bucket].tail,
809                                to_remove);
810   GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
811   k_buckets[to_remove->peer_bucket].peers_size--;
812   while ( (closest_bucket > 0) &&
813           (0 == k_buckets[to_remove->peer_bucket].peers_size) )
814     closest_bucket--;
815   if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
816     update_connect_preferences ();
817   GNUNET_free (to_remove);
818 }
819
820
821 /**
822  * To how many peers should we (on average) forward the request to
823  * obtain the desired target_replication count (on average).
824  *
825  * @param hop_count number of hops the message has traversed
826  * @param target_replication the number of total paths desired
827  * @return Some number of peers to forward the message to
828  */
829 static unsigned int
830 get_forward_count (uint32_t hop_count,
831                    uint32_t target_replication)
832 {
833   uint32_t random_value;
834   uint32_t forward_count;
835   float target_value;
836
837   if (hop_count > GDS_NSE_get () * 4.0)
838   {
839     /* forcefully terminate */
840     GNUNET_STATISTICS_update (GDS_stats,
841                               gettext_noop ("# requests TTL-dropped"),
842                               1, GNUNET_NO);
843     return 0;
844   }
845   if (hop_count > GDS_NSE_get () * 2.0)
846   {
847     /* Once we have reached our ideal number of hops, only forward to 1 peer */
848     return 1;
849   }
850   /* bound by system-wide maximum */
851   target_replication =
852       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
853   target_value =
854       1 + (target_replication - 1.0) / (GDS_NSE_get () +
855                                         ((float) (target_replication - 1.0) *
856                                          hop_count));
857   /* Set forward count to floor of target_value */
858   forward_count = (uint32_t) target_value;
859   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
860   target_value = target_value - forward_count;
861   random_value =
862       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
863   if (random_value < (target_value * UINT32_MAX))
864     forward_count++;
865   return forward_count;
866 }
867
868
869 /**
870  * Compute the distance between have and target as a 32-bit value.
871  * Differences in the lower bits must count stronger than differences
872  * in the higher bits.
873  *
874  * @param target
875  * @param have
876  * @return 0 if have==target, otherwise a number
877  *           that is larger as the distance between
878  *           the two hash codes increases
879  */
880 static unsigned int
881 get_distance (const struct GNUNET_HashCode *target,
882               const struct GNUNET_HashCode *have)
883 {
884   unsigned int bucket;
885   unsigned int msb;
886   unsigned int lsb;
887   unsigned int i;
888
889   /* We have to represent the distance between two 2^9 (=512)-bit
890    * numbers as a 2^5 (=32)-bit number with "0" being used for the
891    * two numbers being identical; furthermore, we need to
892    * guarantee that a difference in the number of matching
893    * bits is always represented in the result.
894    *
895    * We use 2^32/2^9 numerical values to distinguish between
896    * hash codes that have the same LSB bit distance and
897    * use the highest 2^9 bits of the result to signify the
898    * number of (mis)matching LSB bits; if we have 0 matching
899    * and hence 512 mismatching LSB bits we return -1 (since
900    * 512 itself cannot be represented with 9 bits) */
901
902   /* first, calculate the most significant 9 bits of our
903    * result, aka the number of LSBs */
904   bucket = GNUNET_CRYPTO_hash_matching_bits (target,
905                                              have);
906   /* bucket is now a value between 0 and 512 */
907   if (bucket == 512)
908     return 0;                   /* perfect match */
909   if (bucket == 0)
910     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
911                                  * below, we'd end up with max+1 (overflow)) */
912
913   /* calculate the most significant bits of the final result */
914   msb = (512 - bucket) << (32 - 9);
915   /* calculate the 32-9 least significant bits of the final result by
916    * looking at the differences in the 32-9 bits following the
917    * mismatching bit at 'bucket' */
918   lsb = 0;
919   for (i = bucket + 1;
920        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
921   {
922     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
923         GNUNET_CRYPTO_hash_get_bit (have, i))
924       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
925                                                  * last bit set will be 31 -- if
926                                                  * i does not reach 512 first... */
927   }
928   return msb | lsb;
929 }
930
931
932 /**
933  * Check whether my identity is closer than any known peers.  If a
934  * non-null bloomfilter is given, check if this is the closest peer
935  * that hasn't already been routed to.
936  *
937  * @param key hash code to check closeness to
938  * @param bloom bloomfilter, exclude these entries from the decision
939  * @return #GNUNET_YES if node location is closest,
940  *         #GNUNET_NO otherwise.
941  */
942 int
943 GDS_am_closest_peer (const struct GNUNET_HashCode *key,
944                      const struct GNUNET_CONTAINER_BloomFilter *bloom)
945 {
946   int bits;
947   int other_bits;
948   int bucket_num;
949   int count;
950   struct PeerInfo *pos;
951
952   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
953     return GNUNET_YES;
954   bucket_num = find_bucket (key);
955   GNUNET_assert (bucket_num >= 0);
956   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
957                                            key);
958   pos = k_buckets[bucket_num].head;
959   count = 0;
960   while ((NULL != pos) && (count < bucket_size))
961   {
962     if ((NULL != bloom) &&
963         (GNUNET_YES ==
964          GNUNET_CONTAINER_bloomfilter_test (bloom,
965                                             &pos->phash)))
966     {
967       pos = pos->next;
968       continue;                 /* Skip already checked entries */
969     }
970     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
971                                                    key);
972     if (other_bits > bits)
973       return GNUNET_NO;
974     if (other_bits == bits)     /* We match the same number of bits */
975       return GNUNET_YES;
976     pos = pos->next;
977   }
978   /* No peers closer, we are the closest! */
979   return GNUNET_YES;
980 }
981
982
983 /**
984  * Select a peer from the routing table that would be a good routing
985  * destination for sending a message for "key".  The resulting peer
986  * must not be in the set of blocked peers.<p>
987  *
988  * Note that we should not ALWAYS select the closest peer to the
989  * target, peers further away from the target should be chosen with
990  * exponentially declining probability.
991  *
992  * FIXME: double-check that this is fine
993  *
994  *
995  * @param key the key we are selecting a peer to route to
996  * @param bloom a bloomfilter containing entries this request has seen already
997  * @param hops how many hops has this message traversed thus far
998  * @return Peer to route to, or NULL on error
999  */
1000 static struct PeerInfo *
1001 select_peer (const struct GNUNET_HashCode *key,
1002              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1003              uint32_t hops)
1004 {
1005   unsigned int bc;
1006   unsigned int count;
1007   unsigned int selected;
1008   struct PeerInfo *pos;
1009   unsigned int dist;
1010   unsigned int smallest_distance;
1011   struct PeerInfo *chosen;
1012
1013   if (hops >= GDS_NSE_get ())
1014   {
1015     /* greedy selection (closest peer that is not in bloomfilter) */
1016     smallest_distance = UINT_MAX;
1017     chosen = NULL;
1018     for (bc = 0; bc <= closest_bucket; bc++)
1019     {
1020       pos = k_buckets[bc].head;
1021       count = 0;
1022       while ((pos != NULL) && (count < bucket_size))
1023       {
1024         if ( (NULL == bloom) ||
1025              (GNUNET_NO ==
1026               GNUNET_CONTAINER_bloomfilter_test (bloom,
1027                                                  &pos->phash)))
1028         {
1029           dist = get_distance (key,
1030                                &pos->phash);
1031           if (dist < smallest_distance)
1032           {
1033             chosen = pos;
1034             smallest_distance = dist;
1035           }
1036         }
1037         else
1038         {
1039           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1041                       GNUNET_i2s (pos->id),
1042                       GNUNET_h2s (key));
1043           GNUNET_STATISTICS_update (GDS_stats,
1044                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"),
1045                                     1,
1046                                     GNUNET_NO);
1047           dist = get_distance (key,
1048                                &pos->phash);
1049           if (dist < smallest_distance)
1050           {
1051             chosen = NULL;
1052             smallest_distance = dist;
1053           }
1054         }
1055         count++;
1056         pos = pos->next;
1057       }
1058     }
1059     if (NULL == chosen)
1060       GNUNET_STATISTICS_update (GDS_stats,
1061                                 gettext_noop ("# Peer selection failed"),
1062                                 1,
1063                                 GNUNET_NO);
1064     else
1065       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066                   "Selected peer `%s' in greedy routing for %s\n",
1067                   GNUNET_i2s (chosen->id),
1068                   GNUNET_h2s (key));
1069     return chosen;
1070   }
1071
1072   /* select "random" peer */
1073   /* count number of peers that are available and not filtered */
1074   count = 0;
1075   for (bc = 0; bc <= closest_bucket; bc++)
1076   {
1077     pos = k_buckets[bc].head;
1078     while ( (NULL != pos) && (count < bucket_size) )
1079     {
1080       if ( (NULL != bloom) &&
1081            (GNUNET_YES ==
1082             GNUNET_CONTAINER_bloomfilter_test (bloom,
1083                                                &pos->phash)) )
1084       {
1085         GNUNET_STATISTICS_update (GDS_stats,
1086                                   gettext_noop
1087                                   ("# Peers excluded from routing due to Bloomfilter"),
1088                                   1, GNUNET_NO);
1089         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1091                     GNUNET_i2s (pos->id),
1092                     GNUNET_h2s (key));
1093         pos = pos->next;
1094         continue;               /* Ignore bloomfiltered peers */
1095       }
1096       count++;
1097       pos = pos->next;
1098     }
1099   }
1100   if (0 == count)               /* No peers to select from! */
1101   {
1102     GNUNET_STATISTICS_update (GDS_stats,
1103                               gettext_noop ("# Peer selection failed"), 1,
1104                               GNUNET_NO);
1105     return NULL;
1106   }
1107   /* Now actually choose a peer */
1108   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1109                                        count);
1110   count = 0;
1111   for (bc = 0; bc <= closest_bucket; bc++)
1112   {
1113     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1114     {
1115       if ((bloom != NULL) &&
1116           (GNUNET_YES ==
1117            GNUNET_CONTAINER_bloomfilter_test (bloom,
1118                                               &pos->phash)))
1119       {
1120         continue;               /* Ignore bloomfiltered peers */
1121       }
1122       if (0 == selected--)
1123       {
1124         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125                     "Selected peer `%s' in random routing for %s\n",
1126                     GNUNET_i2s (pos->id),
1127                     GNUNET_h2s (key));
1128         return pos;
1129       }
1130     }
1131   }
1132   GNUNET_break (0);
1133   return NULL;
1134 }
1135
1136
1137 /**
1138  * Compute the set of peers that the given request should be
1139  * forwarded to.
1140  *
1141  * @param key routing key
1142  * @param bloom bloom filter excluding peers as targets, all selected
1143  *        peers will be added to the bloom filter
1144  * @param hop_count number of hops the request has traversed so far
1145  * @param target_replication desired number of replicas
1146  * @param targets where to store an array of target peers (to be
1147  *         free'd by the caller)
1148  * @return number of peers returned in 'targets'.
1149  */
1150 static unsigned int
1151 get_target_peers (const struct GNUNET_HashCode *key,
1152                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1153                   uint32_t hop_count,
1154                   uint32_t target_replication,
1155                   struct PeerInfo ***targets)
1156 {
1157   unsigned int ret;
1158   unsigned int off;
1159   struct PeerInfo **rtargets;
1160   struct PeerInfo *nxt;
1161
1162   GNUNET_assert (NULL != bloom);
1163   ret = get_forward_count (hop_count,
1164                            target_replication);
1165   if (0 == ret)
1166   {
1167     *targets = NULL;
1168     return 0;
1169   }
1170   rtargets = GNUNET_new_array (ret,
1171                                struct PeerInfo *);
1172   for (off = 0; off < ret; off++)
1173   {
1174     nxt = select_peer (key,
1175                        bloom,
1176                        hop_count);
1177     if (NULL == nxt)
1178       break;
1179     rtargets[off] = nxt;
1180     GNUNET_break (GNUNET_NO ==
1181                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1182                                                      &nxt->phash));
1183     GNUNET_CONTAINER_bloomfilter_add (bloom,
1184                                       &nxt->phash);
1185   }
1186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1188               off,
1189               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1190               (unsigned int) hop_count,
1191               GNUNET_h2s (key),
1192               ret);
1193   if (0 == off)
1194   {
1195     GNUNET_free (rtargets);
1196     *targets = NULL;
1197     return 0;
1198   }
1199   *targets = rtargets;
1200   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1202               GNUNET_h2s (key),
1203               off,
1204               ret);
1205   return off;
1206 }
1207
1208
1209 /**
1210  * Perform a PUT operation.   Forwards the given request to other
1211  * peers.   Does not store the data locally.  Does not give the
1212  * data to local clients.  May do nothing if this is the only
1213  * peer in the network (or if we are the closest peer in the
1214  * network).
1215  *
1216  * @param type type of the block
1217  * @param options routing options
1218  * @param desired_replication_level desired replication count
1219  * @param expiration_time when does the content expire
1220  * @param hop_count how many hops has this message traversed so far
1221  * @param bf Bloom filter of peers this PUT has already traversed
1222  * @param key key for the content
1223  * @param put_path_length number of entries in @a put_path
1224  * @param put_path peers this request has traversed so far (if tracked)
1225  * @param data payload to store
1226  * @param data_size number of bytes in @a data
1227  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1228  */
1229 int
1230 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1231                            enum GNUNET_DHT_RouteOption options,
1232                            uint32_t desired_replication_level,
1233                            struct GNUNET_TIME_Absolute expiration_time,
1234                            uint32_t hop_count,
1235                            struct GNUNET_CONTAINER_BloomFilter *bf,
1236                            const struct GNUNET_HashCode *key,
1237                            unsigned int put_path_length,
1238                            struct GNUNET_PeerIdentity *put_path,
1239                            const void *data,
1240                            size_t data_size)
1241 {
1242   unsigned int target_count;
1243   unsigned int i;
1244   struct PeerInfo **targets;
1245   struct PeerInfo *target;
1246   size_t msize;
1247   struct GNUNET_MQ_Envelope *env;
1248   struct PeerPutMessage *ppm;
1249   struct GNUNET_PeerIdentity *pp;
1250   unsigned int skip_count;
1251
1252   GNUNET_assert (NULL != bf);
1253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1254               "Adding myself (%s) to PUT bloomfilter for %s\n",
1255               GNUNET_i2s (&my_identity),
1256               GNUNET_h2s (key));
1257   GNUNET_CONTAINER_bloomfilter_add (bf,
1258                                     &my_identity_hash);
1259   GNUNET_STATISTICS_update (GDS_stats,
1260                             gettext_noop ("# PUT requests routed"),
1261                             1,
1262                             GNUNET_NO);
1263   target_count
1264     = get_target_peers (key,
1265                         bf,
1266                         hop_count,
1267                         desired_replication_level,
1268                         &targets);
1269   if (0 == target_count)
1270   {
1271     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272                 "Routing PUT for %s terminates after %u hops at %s\n",
1273                 GNUNET_h2s (key),
1274                 (unsigned int) hop_count,
1275                 GNUNET_i2s (&my_identity));
1276     return GNUNET_NO;
1277   }
1278   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size;
1279   if (msize + sizeof (struct PeerPutMessage)
1280       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1281   {
1282     put_path_length = 0;
1283     msize = data_size;
1284   }
1285   if (msize + sizeof (struct PeerPutMessage)
1286       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1287   {
1288     GNUNET_break (0);
1289     GNUNET_free (targets);
1290     return GNUNET_NO;
1291   }
1292   GNUNET_STATISTICS_update (GDS_stats,
1293                             gettext_noop ("# PUT messages queued for transmission"),
1294                             target_count,
1295                             GNUNET_NO);
1296   skip_count = 0;
1297   for (i = 0; i < target_count; i++)
1298   {
1299     target = targets[i];
1300     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1301     {
1302       /* skip */
1303       GNUNET_STATISTICS_update (GDS_stats,
1304                                 gettext_noop ("# P2P messages dropped due to full queue"),
1305                                 1,
1306                                 GNUNET_NO);
1307       skip_count++;
1308       continue;
1309     }
1310     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1311                 "Routing PUT for %s after %u hops to %s\n",
1312                 GNUNET_h2s (key),
1313                 (unsigned int) hop_count,
1314                 GNUNET_i2s (target->id));
1315     env = GNUNET_MQ_msg_extra (ppm,
1316                                msize,
1317                                GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1318     ppm->options = htonl (options);
1319     ppm->type = htonl (type);
1320     ppm->hop_count = htonl (hop_count + 1);
1321     ppm->desired_replication_level = htonl (desired_replication_level);
1322     ppm->put_path_length = htonl (put_path_length);
1323     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1324     GNUNET_break (GNUNET_YES ==
1325                   GNUNET_CONTAINER_bloomfilter_test (bf,
1326                                                      &target->phash));
1327     GNUNET_assert (GNUNET_OK ==
1328                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1329                                                               ppm->bloomfilter,
1330                                                               DHT_BLOOM_SIZE));
1331     ppm->key = *key;
1332     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1333     GNUNET_memcpy (pp,
1334                    put_path,
1335                    sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1336     GNUNET_memcpy (&pp[put_path_length],
1337                    data,
1338                    data_size);
1339     GNUNET_MQ_send (target->mq,
1340                     env);
1341   }
1342   GNUNET_free (targets);
1343   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1344 }
1345
1346
1347 /**
1348  * Perform a GET operation.  Forwards the given request to other
1349  * peers.  Does not lookup the key locally.  May do nothing if this is
1350  * the only peer in the network (or if we are the closest peer in the
1351  * network).
1352  *
1353  * @param type type of the block
1354  * @param options routing options
1355  * @param desired_replication_level desired replication count
1356  * @param hop_count how many hops did this request traverse so far?
1357  * @param key key for the content
1358  * @param xquery extended query
1359  * @param xquery_size number of bytes in @a xquery
1360  * @param bg group to use for filtering replies
1361  * @param peer_bf filter for peers not to select (again)
1362  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1363  */
1364 int
1365 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1366                            enum GNUNET_DHT_RouteOption options,
1367                            uint32_t desired_replication_level,
1368                            uint32_t hop_count,
1369                            const struct GNUNET_HashCode *key,
1370                            const void *xquery,
1371                            size_t xquery_size,
1372                            struct GNUNET_BLOCK_Group *bg,
1373                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1374 {
1375   unsigned int target_count;
1376   struct PeerInfo **targets;
1377   struct PeerInfo *target;
1378   struct GNUNET_MQ_Envelope *env;
1379   size_t msize;
1380   struct PeerGetMessage *pgm;
1381   char *xq;
1382   size_t reply_bf_size;
1383   void *reply_bf;
1384   unsigned int skip_count;
1385   uint32_t bf_nonce;
1386
1387   GNUNET_assert (NULL != peer_bf);
1388   GNUNET_STATISTICS_update (GDS_stats,
1389                             gettext_noop ("# GET requests routed"),
1390                             1,
1391                             GNUNET_NO);
1392   target_count = get_target_peers (key,
1393                                    peer_bf,
1394                                    hop_count,
1395                                    desired_replication_level,
1396                                    &targets);
1397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398               "Adding myself (%s) to GET bloomfilter for %s\n",
1399               GNUNET_i2s (&my_identity),
1400               GNUNET_h2s (key));
1401   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1402                                     &my_identity_hash);
1403   if (0 == target_count)
1404   {
1405     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406                 "Routing GET for %s terminates after %u hops at %s\n",
1407                 GNUNET_h2s (key),
1408                 (unsigned int) hop_count,
1409                 GNUNET_i2s (&my_identity));
1410     return GNUNET_NO;
1411   }
1412   if (GNUNET_OK !=
1413       GNUNET_BLOCK_group_serialize (bg,
1414                                     &bf_nonce,
1415                                     &reply_bf,
1416                                     &reply_bf_size))
1417   {
1418     reply_bf = NULL;
1419     reply_bf_size = 0;
1420     bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1421                                          UINT32_MAX);
1422   }
1423   msize = xquery_size + reply_bf_size;
1424   if (msize + sizeof (struct PeerGetMessage) >= GNUNET_MAX_MESSAGE_SIZE)
1425   {
1426     GNUNET_break (0);
1427     GNUNET_free_non_null (reply_bf);
1428     GNUNET_free (targets);
1429     return GNUNET_NO;
1430   }
1431   GNUNET_STATISTICS_update (GDS_stats,
1432                             gettext_noop ("# GET messages queued for transmission"),
1433                             target_count,
1434                             GNUNET_NO);
1435   /* forward request */
1436   skip_count = 0;
1437   for (unsigned int i = 0; i < target_count; i++)
1438   {
1439     target = targets[i];
1440     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1441     {
1442       /* skip */
1443       GNUNET_STATISTICS_update (GDS_stats,
1444                                 gettext_noop ("# P2P messages dropped due to full queue"),
1445                                 1, GNUNET_NO);
1446       skip_count++;
1447       continue;
1448     }
1449     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1450                 "Routing GET for %s after %u hops to %s\n",
1451                 GNUNET_h2s (key),
1452                 (unsigned int) hop_count,
1453                 GNUNET_i2s (target->id));
1454     env = GNUNET_MQ_msg_extra (pgm,
1455                                msize,
1456                                GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1457     pgm->options = htonl (options);
1458     pgm->type = htonl (type);
1459     pgm->hop_count = htonl (hop_count + 1);
1460     pgm->desired_replication_level = htonl (desired_replication_level);
1461     pgm->xquery_size = htonl (xquery_size);
1462     pgm->bf_mutator = bf_nonce;
1463     GNUNET_break (GNUNET_YES ==
1464                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1465                                                      &target->phash));
1466     GNUNET_assert (GNUNET_OK ==
1467                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1468                                                               pgm->bloomfilter,
1469                                                               DHT_BLOOM_SIZE));
1470     pgm->key = *key;
1471     xq = (char *) &pgm[1];
1472     GNUNET_memcpy (xq,
1473                    xquery,
1474                    xquery_size);
1475     GNUNET_memcpy (&xq[xquery_size],
1476                    reply_bf,
1477                    reply_bf_size);
1478     GNUNET_MQ_send (target->mq,
1479                     env);
1480   }
1481   GNUNET_free (targets);
1482   GNUNET_free_non_null (reply_bf);
1483   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1484 }
1485
1486
1487 /**
1488  * Handle a reply (route to origin).  Only forwards the reply back to
1489  * the given peer.  Does not do local caching or forwarding to local
1490  * clients.
1491  *
1492  * @param target neighbour that should receive the block (if still connected)
1493  * @param type type of the block
1494  * @param expiration_time when does the content expire
1495  * @param key key for the content
1496  * @param put_path_length number of entries in @a put_path
1497  * @param put_path peers the original PUT traversed (if tracked)
1498  * @param get_path_length number of entries in @a get_path
1499  * @param get_path peers this reply has traversed so far (if tracked)
1500  * @param data payload of the reply
1501  * @param data_size number of bytes in @a data
1502  */
1503 void
1504 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1505                              enum GNUNET_BLOCK_Type type,
1506                              struct GNUNET_TIME_Absolute expiration_time,
1507                              const struct GNUNET_HashCode *key,
1508                              unsigned int put_path_length,
1509                              const struct GNUNET_PeerIdentity *put_path,
1510                              unsigned int get_path_length,
1511                              const struct GNUNET_PeerIdentity *get_path,
1512                              const void *data,
1513                              size_t data_size)
1514 {
1515   struct PeerInfo *pi;
1516   struct GNUNET_MQ_Envelope *env;
1517   size_t msize;
1518   struct PeerResultMessage *prm;
1519   struct GNUNET_PeerIdentity *paths;
1520
1521   msize = data_size + (get_path_length + put_path_length) *
1522       sizeof (struct GNUNET_PeerIdentity);
1523   if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
1524       (get_path_length >
1525        GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1526       (put_path_length >
1527        GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1528       (data_size > GNUNET_MAX_MESSAGE_SIZE))
1529   {
1530     GNUNET_break (0);
1531     return;
1532   }
1533   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1534                                           target);
1535   if (NULL == pi)
1536   {
1537     /* peer disconnected in the meantime, drop reply */
1538     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1539                 "No matching peer for reply for key %s\n",
1540                 GNUNET_h2s (key));
1541     return;
1542   }
1543   if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1544   {
1545     /* skip */
1546     GNUNET_STATISTICS_update (GDS_stats,
1547                               gettext_noop ("# P2P messages dropped due to full queue"),
1548                               1,
1549                               GNUNET_NO);
1550     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551                 "Peer queue full, ignoring reply for key %s\n",
1552                 GNUNET_h2s (key));
1553     return;
1554   }
1555
1556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557               "Forwarding reply for key %s to peer %s\n",
1558               GNUNET_h2s (key),
1559               GNUNET_i2s (target));
1560   GNUNET_STATISTICS_update (GDS_stats,
1561                             gettext_noop
1562                             ("# RESULT messages queued for transmission"), 1,
1563                             GNUNET_NO);
1564   env = GNUNET_MQ_msg_extra (prm,
1565                              msize,
1566                              GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1567   prm->type = htonl (type);
1568   prm->put_path_length = htonl (put_path_length);
1569   prm->get_path_length = htonl (get_path_length);
1570   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1571   prm->key = *key;
1572   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1573   GNUNET_memcpy (paths,
1574                  put_path,
1575                  put_path_length * sizeof (struct GNUNET_PeerIdentity));
1576   GNUNET_memcpy (&paths[put_path_length],
1577                  get_path,
1578                  get_path_length * sizeof (struct GNUNET_PeerIdentity));
1579   GNUNET_memcpy (&paths[put_path_length + get_path_length],
1580                  data,
1581                  data_size);
1582   GNUNET_MQ_send (pi->mq,
1583                   env);
1584 }
1585
1586
1587 /**
1588  * To be called on core init/fail.
1589  *
1590  * @param cls service closure
1591  * @param identity the public identity of this peer
1592  */
1593 static void
1594 core_init (void *cls,
1595            const struct GNUNET_PeerIdentity *identity)
1596 {
1597   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1598               "CORE called, I am %s\n",
1599               GNUNET_i2s (identity));
1600   my_identity = *identity;
1601   GNUNET_CRYPTO_hash (identity,
1602                       sizeof (struct GNUNET_PeerIdentity),
1603                       &my_identity_hash);
1604   GNUNET_SERVICE_resume (GDS_service);
1605 }
1606
1607
1608 /**
1609  * Check validity of a p2p put request.
1610  *
1611  * @param cls closure with the `struct PeerInfo` of the sender
1612  * @param message message
1613  * @return #GNUNET_OK if the message is valid
1614  */
1615 static int
1616 check_dht_p2p_put (void *cls,
1617                    const struct PeerPutMessage *put)
1618 {
1619   uint32_t putlen;
1620   uint16_t msize;
1621
1622   msize = ntohs (put->header.size);
1623   putlen = ntohl (put->put_path_length);
1624   if ((msize <
1625        sizeof (struct PeerPutMessage) +
1626        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1627       (putlen >
1628        GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1629   {
1630     GNUNET_break_op (0);
1631     return GNUNET_SYSERR;
1632   }
1633   return GNUNET_OK;
1634 }
1635
1636
1637 /**
1638  * Core handler for p2p put requests.
1639  *
1640  * @param cls closure with the `struct PeerInfo` of the sender
1641  * @param message message
1642  */
1643 static void
1644 handle_dht_p2p_put (void *cls,
1645                     const struct PeerPutMessage *put)
1646 {
1647   struct PeerInfo *peer = cls;
1648   const struct GNUNET_PeerIdentity *put_path;
1649   const void *payload;
1650   uint32_t putlen;
1651   uint16_t msize;
1652   size_t payload_size;
1653   enum GNUNET_DHT_RouteOption options;
1654   struct GNUNET_CONTAINER_BloomFilter *bf;
1655   struct GNUNET_HashCode test_key;
1656   int forwarded;
1657
1658   msize = ntohs (put->header.size);
1659   putlen = ntohl (put->put_path_length);
1660   GNUNET_STATISTICS_update (GDS_stats,
1661                             gettext_noop ("# P2P PUT requests received"),
1662                             1,
1663                             GNUNET_NO);
1664   GNUNET_STATISTICS_update (GDS_stats,
1665                             gettext_noop ("# P2P PUT bytes received"),
1666                             msize,
1667                             GNUNET_NO);
1668   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1669   payload = &put_path[putlen];
1670   options = ntohl (put->options);
1671   payload_size = msize - (sizeof (struct PeerPutMessage) +
1672                           putlen * sizeof (struct GNUNET_PeerIdentity));
1673
1674   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1675               "PUT for `%s' from %s\n",
1676               GNUNET_h2s (&put->key),
1677               GNUNET_i2s (peer->id));
1678   if (GNUNET_YES == log_route_details_stderr)
1679   {
1680     char *tmp;
1681     char *pp;
1682
1683     pp = GNUNET_STRINGS_pp2s (put_path,
1684                               putlen);
1685     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1686     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1687                  "R5N PUT %s: %s->%s (%u, %u=>%u, PP: %s)\n",
1688                  GNUNET_h2s (&put->key),
1689                  GNUNET_i2s (peer->id),
1690                  tmp,
1691                  ntohl(put->hop_count),
1692                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1693                                                    &put->key),
1694                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1695                                                    &put->key),
1696                  pp);
1697     GNUNET_free (pp);
1698     GNUNET_free (tmp);
1699   }
1700   switch (GNUNET_BLOCK_get_key
1701           (GDS_block_context,
1702            ntohl (put->type),
1703            payload,
1704            payload_size,
1705            &test_key))
1706   {
1707   case GNUNET_YES:
1708     if (0 != memcmp (&test_key,
1709                      &put->key,
1710                      sizeof (struct GNUNET_HashCode)))
1711     {
1712       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1713
1714       GNUNET_break_op (0);
1715       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1716                   "PUT with key `%s' for block with key %s\n",
1717                   put_s,
1718                   GNUNET_h2s_full (&test_key));
1719       GNUNET_free (put_s);
1720       return;
1721     }
1722     break;
1723   case GNUNET_NO:
1724     GNUNET_break_op (0);
1725     return;
1726   case GNUNET_SYSERR:
1727     /* cannot verify, good luck */
1728     break;
1729   }
1730   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1731   {
1732     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1733                                    ntohl (put->type),
1734                                    NULL, /* query group */
1735                                    GNUNET_BLOCK_EO_NONE,
1736                                    NULL,    /* query */
1737                                    NULL, 0, /* xquery */
1738                                    payload,
1739                                    payload_size))
1740     {
1741     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1742     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1743       break;
1744
1745     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1746     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1747     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1748     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1749     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1750     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1751     default:
1752       GNUNET_break_op (0);
1753       return;
1754     }
1755   }
1756
1757   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1758                                           DHT_BLOOM_SIZE,
1759                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1760   GNUNET_break_op (GNUNET_YES ==
1761                    GNUNET_CONTAINER_bloomfilter_test (bf,
1762                                                       &peer->phash));
1763   {
1764     struct GNUNET_PeerIdentity pp[putlen + 1];
1765
1766     /* extend 'put path' by sender */
1767     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1768     {
1769 #if SANITY_CHECKS
1770       for (unsigned int i=0;i<=putlen;i++)
1771       {
1772         for (unsigned int j=0;j<i;j++)
1773         {
1774           GNUNET_break (0 != memcmp (&pp[i],
1775                                      &pp[j],
1776                                      sizeof (struct GNUNET_PeerIdentity)));
1777         }
1778         GNUNET_break (0 != memcmp (&pp[i],
1779                                    peer->id,
1780                                    sizeof (struct GNUNET_PeerIdentity)));
1781       }
1782 #endif
1783       GNUNET_memcpy (pp,
1784                      put_path,
1785                      putlen * sizeof (struct GNUNET_PeerIdentity));
1786       pp[putlen] = *peer->id;
1787       putlen++;
1788     }
1789     else
1790       putlen = 0;
1791
1792     /* give to local clients */
1793     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1794                               &put->key,
1795                               0,
1796                               NULL,
1797                               putlen,
1798                               pp,
1799                               ntohl (put->type),
1800                               payload_size,
1801                               payload);
1802     /* store locally */
1803     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1804         (GDS_am_closest_peer (&put->key, bf)))
1805       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1806                                 &put->key,
1807                                 putlen,
1808                                 pp,
1809                                 ntohl (put->type),
1810                                 payload_size,
1811                                 payload);
1812     /* route to other peers */
1813     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1814                                            options,
1815                                            ntohl (put->desired_replication_level),
1816                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1817                                            ntohl (put->hop_count),
1818                                            bf,
1819                                            &put->key,
1820                                            putlen,
1821                                            pp,
1822                                            payload,
1823                                            payload_size);
1824     /* notify monitoring clients */
1825     GDS_CLIENTS_process_put (options
1826                              | ( (GNUNET_OK == forwarded)
1827                                  ? GNUNET_DHT_RO_LAST_HOP
1828                                  : 0 ),
1829                              ntohl (put->type),
1830                              ntohl (put->hop_count),
1831                              ntohl (put->desired_replication_level),
1832                              putlen, pp,
1833                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1834                              &put->key,
1835                              payload,
1836                              payload_size);
1837   }
1838   GNUNET_CONTAINER_bloomfilter_free (bf);
1839 }
1840
1841
1842 /**
1843  * We have received a FIND PEER request.  Send matching
1844  * HELLOs back.
1845  *
1846  * @param sender sender of the FIND PEER request
1847  * @param key peers close to this key are desired
1848  * @param bg group for filtering peers
1849  */
1850 static void
1851 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1852                   const struct GNUNET_HashCode *key,
1853                   struct GNUNET_BLOCK_Group *bg)
1854 {
1855   int bucket_idx;
1856   struct PeerBucket *bucket;
1857   struct PeerInfo *peer;
1858   unsigned int choice;
1859   const struct GNUNET_HELLO_Message *hello;
1860   size_t hello_size;
1861
1862   /* first, check about our own HELLO */
1863   if (NULL != GDS_my_hello)
1864   {
1865     hello_size = GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message *) GDS_my_hello);
1866     GNUNET_break (hello_size >= sizeof (struct GNUNET_MessageHeader));
1867     if (GNUNET_BLOCK_EVALUATION_OK_MORE ==
1868         GNUNET_BLOCK_evaluate (GDS_block_context,
1869                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1870                                bg,
1871                                GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1872                                &my_identity_hash,
1873                                NULL, 0,
1874                                GDS_my_hello,
1875                                hello_size))
1876     {
1877       GDS_NEIGHBOURS_handle_reply (sender,
1878                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1879                                    GNUNET_TIME_relative_to_absolute (hello_expiration),
1880                                    key,
1881                                    0,
1882                                    NULL,
1883                                    0,
1884                                    NULL,
1885                                    GDS_my_hello,
1886                                    hello_size);
1887     }
1888     else
1889     {
1890       GNUNET_STATISTICS_update (GDS_stats,
1891                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"),
1892                                 1,
1893                                 GNUNET_NO);
1894     }
1895   }
1896   else
1897   {
1898     GNUNET_STATISTICS_update (GDS_stats,
1899                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"),
1900                               1,
1901                               GNUNET_NO);
1902   }
1903
1904   /* then, also consider sending a random HELLO from the closest bucket */
1905   if (0 == memcmp (&my_identity_hash,
1906                    key,
1907                    sizeof (struct GNUNET_HashCode)))
1908     bucket_idx = closest_bucket;
1909   else
1910     bucket_idx = GNUNET_MIN (closest_bucket,
1911                              find_bucket (key));
1912   if (bucket_idx == GNUNET_SYSERR)
1913     return;
1914   bucket = &k_buckets[bucket_idx];
1915   if (bucket->peers_size == 0)
1916     return;
1917   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1918                                      bucket->peers_size);
1919   peer = bucket->head;
1920   while (choice > 0)
1921   {
1922     GNUNET_assert (NULL != peer);
1923     peer = peer->next;
1924     choice--;
1925   }
1926   choice = bucket->peers_size;
1927   do
1928   {
1929     peer = peer->next;
1930     if (0 == choice--)
1931       return;                   /* no non-masked peer available */
1932     if (NULL == peer)
1933       peer = bucket->head;
1934     hello = GDS_HELLO_get (peer->id);
1935   } while ( (NULL == hello) ||
1936             (GNUNET_BLOCK_EVALUATION_OK_MORE !=
1937              GNUNET_BLOCK_evaluate (GDS_block_context,
1938                                     GNUNET_BLOCK_TYPE_DHT_HELLO,
1939                                     bg,
1940                                     GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1941                                     &peer->phash,
1942                                     NULL, 0,
1943                                     hello,
1944                                     (hello_size = GNUNET_HELLO_size (hello)))) );
1945   GDS_NEIGHBOURS_handle_reply (sender,
1946                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1947                                GNUNET_TIME_relative_to_absolute
1948                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1949                                key,
1950                                0,
1951                                NULL,
1952                                0,
1953                                NULL,
1954                                hello,
1955                                hello_size);
1956 }
1957
1958
1959 /**
1960  * Handle a result from local datacache for a GET operation.
1961  *
1962  * @param cls the `struct PeerInfo` for which this is a reply
1963  * @param type type of the block
1964  * @param expiration_time when does the content expire
1965  * @param key key for the content
1966  * @param put_path_length number of entries in @a put_path
1967  * @param put_path peers the original PUT traversed (if tracked)
1968  * @param get_path_length number of entries in @a get_path
1969  * @param get_path peers this reply has traversed so far (if tracked)
1970  * @param data payload of the reply
1971  * @param data_size number of bytes in @a data
1972  */
1973 static void
1974 handle_local_result (void *cls,
1975                      enum GNUNET_BLOCK_Type type,
1976                      struct GNUNET_TIME_Absolute expiration_time,
1977                      const struct GNUNET_HashCode *key,
1978                      unsigned int put_path_length,
1979                      const struct GNUNET_PeerIdentity *put_path,
1980                      unsigned int get_path_length,
1981                      const struct GNUNET_PeerIdentity *get_path,
1982                      const void *data,
1983                      size_t data_size)
1984 {
1985   struct PeerInfo *peer = cls;
1986   char *pp;
1987
1988   pp = GNUNET_STRINGS_pp2s (put_path,
1989                             put_path_length);
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991               "Found local result for %s (PP: %s)\n",
1992               GNUNET_h2s (key),
1993               pp);
1994   GNUNET_free (pp);
1995   GDS_NEIGHBOURS_handle_reply (peer->id,
1996                                type,
1997                                expiration_time,
1998                                key,
1999                                put_path_length, put_path,
2000                                get_path_length, get_path,
2001                                data, data_size);
2002 }
2003
2004
2005 /**
2006  * Check validity of p2p get request.
2007  *
2008  * @param cls closure with the `struct PeerInfo` of the sender
2009  * @param get the message
2010  * @return #GNUNET_OK if the message is well-formed
2011  */
2012 static int
2013 check_dht_p2p_get (void *cls,
2014                    const struct PeerGetMessage *get)
2015 {
2016   uint32_t xquery_size;
2017   uint16_t msize;
2018
2019   msize = ntohs (get->header.size);
2020   xquery_size = ntohl (get->xquery_size);
2021   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
2022   {
2023     GNUNET_break_op (0);
2024     return GNUNET_SYSERR;
2025   }
2026   return GNUNET_OK;
2027 }
2028
2029
2030 /**
2031  * Core handler for p2p get requests.
2032  *
2033  * @param cls closure with the `struct PeerInfo` of the sender
2034  * @param get the message
2035  */
2036 static void
2037 handle_dht_p2p_get (void *cls,
2038                     const struct PeerGetMessage *get)
2039 {
2040   struct PeerInfo *peer = cls;
2041   uint32_t xquery_size;
2042   size_t reply_bf_size;
2043   uint16_t msize;
2044   enum GNUNET_BLOCK_Type type;
2045   enum GNUNET_DHT_RouteOption options;
2046   enum GNUNET_BLOCK_EvaluationResult eval;
2047   struct GNUNET_BLOCK_Group *bg;
2048   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2049   const char *xquery;
2050   int forwarded;
2051
2052   /* parse and validate message */
2053   msize = ntohs (get->header.size);
2054   xquery_size = ntohl (get->xquery_size);
2055   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2056   type = ntohl (get->type);
2057   options = ntohl (get->options);
2058   xquery = (const char *) &get[1];
2059   GNUNET_STATISTICS_update (GDS_stats,
2060                             gettext_noop ("# P2P GET requests received"),
2061                             1,
2062                             GNUNET_NO);
2063   GNUNET_STATISTICS_update (GDS_stats,
2064                             gettext_noop ("# P2P GET bytes received"),
2065                             msize,
2066                             GNUNET_NO);
2067   if (GNUNET_YES == log_route_details_stderr)
2068   {
2069     char *tmp;
2070
2071     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2072     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2073                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2074                  GNUNET_h2s (&get->key),
2075                  GNUNET_i2s (peer->id),
2076                  tmp,
2077                  ntohl(get->hop_count),
2078                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2079                                                    &get->key),
2080                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2081                                                    &get->key),
2082                  ntohl(get->xquery_size),
2083                  xquery);
2084     GNUNET_free (tmp);
2085   }
2086   eval
2087     = GNUNET_BLOCK_evaluate (GDS_block_context,
2088                              type,
2089                              NULL,
2090                              GNUNET_BLOCK_EO_NONE,
2091                              &get->key,
2092                              xquery,
2093                              xquery_size,
2094                              NULL,
2095                              0);
2096   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2097   {
2098     /* request invalid or block type not supported */
2099     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2100     return;
2101   }
2102   peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2103                                                DHT_BLOOM_SIZE,
2104                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
2105   GNUNET_break_op (GNUNET_YES ==
2106                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2107                                                       &peer->phash));
2108   bg = GNUNET_BLOCK_group_create (GDS_block_context,
2109                                   type,
2110                                   get->bf_mutator,
2111                                   &xquery[xquery_size],
2112                                   reply_bf_size,
2113                                   "filter-size",
2114                                   reply_bf_size,
2115                                   NULL);
2116   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2117               "GET for %s at %s after %u hops\n",
2118               GNUNET_h2s (&get->key),
2119               GNUNET_i2s (&my_identity),
2120               (unsigned int) ntohl (get->hop_count));
2121   /* local lookup (this may update the reply_bf) */
2122   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2123       (GDS_am_closest_peer (&get->key,
2124                         peer_bf)))
2125   {
2126     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2127     {
2128       GNUNET_STATISTICS_update (GDS_stats,
2129                                 gettext_noop ("# P2P FIND PEER requests processed"),
2130                                 1,
2131                                 GNUNET_NO);
2132       handle_find_peer (peer->id,
2133                         &get->key,
2134                         bg);
2135     }
2136     else
2137     {
2138       eval = GDS_DATACACHE_handle_get (&get->key,
2139                                        type,
2140                                        xquery,
2141                                        xquery_size,
2142                                        bg,
2143                                        &handle_local_result,
2144                                        peer);
2145     }
2146   }
2147   else
2148   {
2149     GNUNET_STATISTICS_update (GDS_stats,
2150                               gettext_noop ("# P2P GET requests ONLY routed"),
2151                               1,
2152                               GNUNET_NO);
2153   }
2154
2155   /* remember request for routing replies */
2156   GDS_ROUTING_add (peer->id,
2157                         type,
2158                         bg, /* bg now owned by routing, but valid at least until end of this function! */
2159                         options,
2160                         &get->key,
2161                         xquery,
2162                         xquery_size);
2163
2164   /* P2P forwarding */
2165   forwarded = GNUNET_NO;
2166   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2167     forwarded = GDS_NEIGHBOURS_handle_get (type,
2168                                            options,
2169                                            ntohl (get->desired_replication_level),
2170                                            ntohl (get->hop_count),
2171                                            &get->key,
2172                                            xquery,
2173                                            xquery_size,
2174                                            bg,
2175                                            peer_bf);
2176   GDS_CLIENTS_process_get (options
2177                            | (GNUNET_OK == forwarded)
2178                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2179                            type,
2180                            ntohl (get->hop_count),
2181                            ntohl (get->desired_replication_level),
2182                            0,
2183                            NULL,
2184                            &get->key);
2185
2186   /* clean up; note that 'bg' is owned by routing now! */
2187   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2188 }
2189
2190
2191 /**
2192  * Check validity of p2p result message.
2193  *
2194  * @param cls closure
2195  * @param message message
2196  * @return #GNUNET_YES if the message is well-formed
2197  */
2198 static int
2199 check_dht_p2p_result (void *cls,
2200                       const struct PeerResultMessage *prm)
2201 {
2202   uint32_t get_path_length;
2203   uint32_t put_path_length;
2204   uint16_t msize;
2205
2206   msize = ntohs (prm->header.size);
2207   put_path_length = ntohl (prm->put_path_length);
2208   get_path_length = ntohl (prm->get_path_length);
2209   if ((msize <
2210        sizeof (struct PeerResultMessage) + (get_path_length +
2211                                             put_path_length) *
2212        sizeof (struct GNUNET_PeerIdentity)) ||
2213       (get_path_length >
2214        GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2215       (put_path_length >
2216        GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2217   {
2218     GNUNET_break_op (0);
2219     return GNUNET_SYSERR;
2220   }
2221   return GNUNET_OK;
2222 }
2223
2224
2225 /**
2226  * Process a reply, after the @a get_path has been updated.
2227  *
2228  * @param expiration_time when does the reply expire
2229  * @param key key matching the query
2230  * @param get_path_length number of entries in @a get_path
2231  * @param get_path path the reply has taken
2232  * @param put_path_length number of entries in @a put_path
2233  * @param put_path path the PUT has taken
2234  * @param type type of the block
2235  * @param data_size number of bytes in @a data
2236  * @param data payload of the reply
2237  */
2238 static void
2239 process_reply_with_path (struct GNUNET_TIME_Absolute expiration_time,
2240                          const struct GNUNET_HashCode *key,
2241                          unsigned int get_path_length,
2242                          const struct GNUNET_PeerIdentity *get_path,
2243                          unsigned int put_path_length,
2244                          const struct GNUNET_PeerIdentity *put_path,
2245                          enum GNUNET_BLOCK_Type type,
2246                          size_t data_size,
2247                          const void *data)
2248 {
2249   /* forward to local clients */
2250   GDS_CLIENTS_handle_reply (expiration_time,
2251                             key,
2252                             get_path_length,
2253                             get_path,
2254                             put_path_length,
2255                             put_path,
2256                             type,
2257                             data_size,
2258                             data);
2259   GDS_CLIENTS_process_get_resp (type,
2260                                 get_path,
2261                                 get_path_length,
2262                                 put_path,
2263                                 put_path_length,
2264                                 expiration_time,
2265                                 key,
2266                                 data,
2267                                 data_size);
2268   if (GNUNET_YES == cache_results)
2269   {
2270     struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2271
2272     GNUNET_memcpy (xput_path,
2273                    put_path,
2274                    put_path_length * sizeof (struct GNUNET_PeerIdentity));
2275     GNUNET_memcpy (&xput_path[put_path_length],
2276                    get_path,
2277                    get_path_length * sizeof (struct GNUNET_PeerIdentity));
2278
2279     GDS_DATACACHE_handle_put (expiration_time,
2280                               key,
2281                               get_path_length + put_path_length,
2282                               xput_path,
2283                               type,
2284                               data_size,
2285                               data);
2286   }
2287   /* forward to other peers */
2288   GDS_ROUTING_process (type,
2289                        expiration_time,
2290                        key,
2291                        put_path_length,
2292                        put_path,
2293                        get_path_length,
2294                        get_path,
2295                        data,
2296                        data_size);
2297 }
2298
2299
2300 /**
2301  * Core handler for p2p result messages.
2302  *
2303  * @param cls closure
2304  * @param message message
2305  */
2306 static void
2307 handle_dht_p2p_result (void *cls,
2308                        const struct PeerResultMessage *prm)
2309 {
2310   struct PeerInfo *peer = cls;
2311   const struct GNUNET_PeerIdentity *put_path;
2312   const struct GNUNET_PeerIdentity *get_path;
2313   const void *data;
2314   uint32_t get_path_length;
2315   uint32_t put_path_length;
2316   uint16_t msize;
2317   size_t data_size;
2318   enum GNUNET_BLOCK_Type type;
2319
2320   /* parse and validate message */
2321   msize = ntohs (prm->header.size);
2322   put_path_length = ntohl (prm->put_path_length);
2323   get_path_length = ntohl (prm->get_path_length);
2324   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2325   get_path = &put_path[put_path_length];
2326   type = ntohl (prm->type);
2327   data = (const void *) &get_path[get_path_length];
2328   data_size = msize - (sizeof (struct PeerResultMessage) +
2329                        (get_path_length +
2330                         put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2331   GNUNET_STATISTICS_update (GDS_stats,
2332                             gettext_noop ("# P2P RESULTS received"),
2333                             1,
2334                             GNUNET_NO);
2335   GNUNET_STATISTICS_update (GDS_stats,
2336                             gettext_noop ("# P2P RESULT bytes received"),
2337                             msize,
2338                             GNUNET_NO);
2339   if (GNUNET_YES == log_route_details_stderr)
2340   {
2341     char *tmp;
2342     char *pp;
2343     char *gp;
2344
2345     gp = GNUNET_STRINGS_pp2s (get_path,
2346                               get_path_length);
2347     pp = GNUNET_STRINGS_pp2s (put_path,
2348                               put_path_length);
2349     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2350     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2351                  "R5N RESULT %s: %s->%s (GP: %s, PP: %s)\n",
2352                  GNUNET_h2s (&prm->key),
2353                  GNUNET_i2s (peer->id),
2354                  tmp,
2355                  gp,
2356                  pp);
2357     GNUNET_free (gp);
2358     GNUNET_free (pp);
2359     GNUNET_free (tmp);
2360   }
2361   /* if we got a HELLO, consider it for our own routing table */
2362   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2363   {
2364     const struct GNUNET_MessageHeader *h;
2365     struct GNUNET_PeerIdentity pid;
2366
2367     /* Should be a HELLO, validate and consider using it! */
2368     if (data_size < sizeof (struct GNUNET_HELLO_Message))
2369     {
2370       GNUNET_break_op (0);
2371       return;
2372     }
2373     h = data;
2374     if (data_size != ntohs (h->size))
2375     {
2376       GNUNET_break_op (0);
2377       return;
2378     }
2379     if (GNUNET_OK !=
2380         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2381                              &pid))
2382     {
2383       GNUNET_break_op (0);
2384       return;
2385     }
2386     if ( (GNUNET_YES != disable_try_connect) &&
2387          (0 != memcmp (&my_identity,
2388                        &pid,
2389                        sizeof (struct GNUNET_PeerIdentity))) )
2390       try_connect (&pid,
2391                    h);
2392   }
2393
2394
2395   /* First, check if 'peer' is already on the path, and if
2396      so, truncate it instead of expanding. */
2397   for (unsigned int i=0;i<=get_path_length;i++)
2398     if (0 == memcmp (&get_path[i],
2399                      peer->id,
2400                      sizeof (struct GNUNET_PeerIdentity)))
2401     {
2402       process_reply_with_path (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2403                                &prm->key,
2404                                i,
2405                                get_path,
2406                                put_path_length,
2407                                put_path,
2408                                type,
2409                                data_size,
2410                                data);
2411       return;
2412     }
2413
2414   /* Need to append 'peer' to 'get_path' (normal case) */
2415   {
2416     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2417
2418     GNUNET_memcpy (xget_path,
2419                    get_path,
2420                    get_path_length * sizeof (struct GNUNET_PeerIdentity));
2421     xget_path[get_path_length] = *peer->id;
2422
2423     process_reply_with_path (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2424                              &prm->key,
2425                              get_path_length + 1,
2426                              xget_path,
2427                              put_path_length,
2428                              put_path,
2429                              type,
2430                              data_size,
2431                              data);
2432   }
2433 }
2434
2435
2436 /**
2437  * Initialize neighbours subsystem.
2438  *
2439  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2440  */
2441 int
2442 GDS_NEIGHBOURS_init ()
2443 {
2444   struct GNUNET_MQ_MessageHandler core_handlers[] = {
2445     GNUNET_MQ_hd_var_size (dht_p2p_get,
2446                            GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2447                            struct PeerGetMessage,
2448                            NULL),
2449     GNUNET_MQ_hd_var_size (dht_p2p_put,
2450                            GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2451                            struct PeerPutMessage,
2452                            NULL),
2453     GNUNET_MQ_hd_var_size (dht_p2p_result,
2454                            GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2455                            struct PeerResultMessage,
2456                            NULL),
2457     GNUNET_MQ_handler_end ()
2458   };
2459   unsigned long long temp_config_num;
2460
2461   disable_try_connect
2462     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2463                                             "DHT",
2464                                             "DISABLE_TRY_CONNECT");
2465   if (GNUNET_OK ==
2466       GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2467                                              "DHT",
2468                                              "bucket_size",
2469                                              &temp_config_num))
2470     bucket_size = (unsigned int) temp_config_num;
2471   cache_results
2472     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2473                                             "DHT",
2474                                             "CACHE_RESULTS");
2475
2476   log_route_details_stderr =
2477     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2478   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2479   core_api = GNUNET_CORE_connect (GDS_cfg,
2480                                   NULL,
2481                                   &core_init,
2482                                   &handle_core_connect,
2483                                   &handle_core_disconnect,
2484                                   core_handlers);
2485   if (NULL == core_api)
2486     return GNUNET_SYSERR;
2487   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2488                                                               GNUNET_YES);
2489   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2490                                                             GNUNET_NO);
2491   return GNUNET_OK;
2492 }
2493
2494
2495 /**
2496  * Shutdown neighbours subsystem.
2497  */
2498 void
2499 GDS_NEIGHBOURS_done ()
2500 {
2501   if (NULL == core_api)
2502     return;
2503   GNUNET_CORE_disconnect (core_api);
2504   core_api = NULL;
2505   GNUNET_assert (0 ==
2506                  GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2507   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2508   all_connected_peers = NULL;
2509   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2510                                          &free_connect_info,
2511                                          NULL);
2512   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2513   all_desired_peers = NULL;
2514   GNUNET_ATS_connectivity_done (ats_ch);
2515   ats_ch = NULL;
2516   GNUNET_assert (NULL == find_peer_task);
2517 }
2518
2519
2520 /**
2521  * Get the ID of the local node.
2522  *
2523  * @return identity of the local node
2524  */
2525 struct GNUNET_PeerIdentity *
2526 GDS_NEIGHBOURS_get_id ()
2527 {
2528   return &my_identity;
2529 }
2530
2531
2532 /* end of gnunet-service-dht_neighbours.c */