fix bit counting mess
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_datacache.h"
43 #include "gnunet-service-dht_hello.h"
44 #include "gnunet-service-dht_neighbours.h"
45 #include "gnunet-service-dht_nse.h"
46 #include "gnunet-service-dht_routing.h"
47 #include "dht.h"
48
49 #define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
50                                                 __VA_ARGS__)
51
52 /**
53  * Enable slow sanity checks to debug issues.
54  */
55 #define SANITY_CHECKS 1
56
57 /**
58  * How many buckets will we allow total.
59  */
60 #define MAX_BUCKETS sizeof(struct GNUNET_HashCode) * 8
61
62 /**
63  * What is the maximum number of peers in a given bucket.
64  */
65 #define DEFAULT_BUCKET_SIZE 8
66
67 /**
68  * Desired replication level for FIND PEER requests
69  */
70 #define FIND_PEER_REPLICATION_LEVEL 4
71
72 /**
73  * Maximum allowed replication level for all requests.
74  */
75 #define MAXIMUM_REPLICATION_LEVEL 16
76
77 /**
78  * Maximum allowed number of pending messages per peer.
79  */
80 #define MAXIMUM_PENDING_PER_PEER 64
81
82 /**
83  * How long at least to wait before sending another find peer request.
84  */
85 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply ( \
86     GNUNET_TIME_UNIT_SECONDS, 30)
87
88 /**
89  * How long at most to wait before sending another find peer request.
90  */
91 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply ( \
92     GNUNET_TIME_UNIT_MINUTES, 10)
93
94 /**
95  * How long at most to wait for transmission of a GET request to another peer?
96  */
97 #define GET_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
98
99 /**
100  * Hello address expiration
101  */
102 extern struct GNUNET_TIME_Relative hello_expiration;
103
104
105 GNUNET_NETWORK_STRUCT_BEGIN
106
107 /**
108  * P2P PUT message
109  */
110 struct PeerPutMessage
111 {
112   /**
113    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
114    */
115   struct GNUNET_MessageHeader header;
116
117   /**
118    * Processing options
119    */
120   uint32_t options GNUNET_PACKED;
121
122   /**
123    * Content type.
124    */
125   uint32_t type GNUNET_PACKED;
126
127   /**
128    * Hop count
129    */
130   uint32_t hop_count GNUNET_PACKED;
131
132   /**
133    * Replication level for this message
134    */
135   uint32_t desired_replication_level GNUNET_PACKED;
136
137   /**
138    * Length of the PUT path that follows (if tracked).
139    */
140   uint32_t put_path_length GNUNET_PACKED;
141
142   /**
143    * When does the content expire?
144    */
145   struct GNUNET_TIME_AbsoluteNBO expiration_time;
146
147   /**
148    * Bloomfilter (for peer identities) to stop circular routes
149    */
150   char bloomfilter[DHT_BLOOM_SIZE];
151
152   /**
153    * The key we are storing under.
154    */
155   struct GNUNET_HashCode key;
156
157   /* put path (if tracked) */
158
159   /* Payload */
160 };
161
162
163 /**
164  * P2P Result message
165  */
166 struct PeerResultMessage
167 {
168   /**
169    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
170    */
171   struct GNUNET_MessageHeader header;
172
173   /**
174    * Content type.
175    */
176   uint32_t type GNUNET_PACKED;
177
178   /**
179    * Length of the PUT path that follows (if tracked).
180    */
181   uint32_t put_path_length GNUNET_PACKED;
182
183   /**
184    * Length of the GET path that follows (if tracked).
185    */
186   uint32_t get_path_length GNUNET_PACKED;
187
188   /**
189    * When does the content expire?
190    */
191   struct GNUNET_TIME_AbsoluteNBO expiration_time;
192
193   /**
194    * The key of the corresponding GET request.
195    */
196   struct GNUNET_HashCode key;
197
198   /* put path (if tracked) */
199
200   /* get path (if tracked) */
201
202   /* Payload */
203 };
204
205
206 /**
207  * P2P GET message
208  */
209 struct PeerGetMessage
210 {
211   /**
212    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
213    */
214   struct GNUNET_MessageHeader header;
215
216   /**
217    * Processing options
218    */
219   uint32_t options GNUNET_PACKED;
220
221   /**
222    * Desired content type.
223    */
224   uint32_t type GNUNET_PACKED;
225
226   /**
227    * Hop count
228    */
229   uint32_t hop_count GNUNET_PACKED;
230
231   /**
232    * Desired replication level for this request.
233    */
234   uint32_t desired_replication_level GNUNET_PACKED;
235
236   /**
237    * Size of the extended query.
238    */
239   uint32_t xquery_size;
240
241   /**
242    * Bloomfilter mutator.
243    */
244   uint32_t bf_mutator;
245
246   /**
247    * Bloomfilter (for peer identities) to stop circular routes
248    */
249   char bloomfilter[DHT_BLOOM_SIZE];
250
251   /**
252    * The key we are looking for.
253    */
254   struct GNUNET_HashCode key;
255
256   /* xquery */
257
258   /* result bloomfilter */
259 };
260 GNUNET_NETWORK_STRUCT_END
261
262
263 /**
264  * Entry for a peer in a bucket.
265  */
266 struct PeerInfo
267 {
268   /**
269    * Next peer entry (DLL)
270    */
271   struct PeerInfo *next;
272
273   /**
274    *  Prev peer entry (DLL)
275    */
276   struct PeerInfo *prev;
277
278   /**
279    * Handle for sending messages to this peer.
280    */
281   struct GNUNET_MQ_Handle *mq;
282
283   /**
284    * What is the identity of the peer?
285    */
286   const struct GNUNET_PeerIdentity *id;
287
288   /**
289    * Hash of @e id.
290    */
291   struct GNUNET_HashCode phash;
292
293   /**
294    * Which bucket is this peer in?
295    */
296   int peer_bucket;
297 };
298
299
300 /**
301  * Peers are grouped into buckets.
302  */
303 struct PeerBucket
304 {
305   /**
306    * Head of DLL
307    */
308   struct PeerInfo *head;
309
310   /**
311    * Tail of DLL
312    */
313   struct PeerInfo *tail;
314
315   /**
316    * Number of peers in the bucket.
317    */
318   unsigned int peers_size;
319 };
320
321
322 /**
323  * Information about a peer that we would like to connect to.
324  */
325 struct ConnectInfo
326 {
327   /**
328    * Handle to active HELLO offer operation, or NULL.
329    */
330   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
331
332   /**
333    * Handle to active connectivity suggestion operation, or NULL.
334    */
335   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
336
337   /**
338    * How much would we like to connect to this peer?
339    */
340   uint32_t strength;
341 };
342
343
344 /**
345  * Do we cache all results that we are routing in the local datacache?
346  */
347 static int cache_results;
348
349 /**
350  * Should routing details be logged to stderr (for debugging)?
351  */
352 static int log_route_details_stderr;
353
354 /**
355  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
356  */
357 static unsigned int closest_bucket;
358
359 /**
360  * How many peers have we added since we sent out our last
361  * find peer request?
362  */
363 static unsigned int newly_found_peers;
364
365 /**
366  * Option for testing that disables the 'connect' function of the DHT.
367  */
368 static int disable_try_connect;
369
370 /**
371  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
372  */
373 static struct PeerBucket k_buckets[MAX_BUCKETS];
374
375 /**
376  * Hash map of all CORE-connected peers, for easy removal from
377  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
378  */
379 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
380
381 /**
382  * Hash map of all peers we would like to be connected to.
383  * Values are of type `struct ConnectInfo`.
384  */
385 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
386
387 /**
388  * Maximum size for each bucket.
389  */
390 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
391
392 /**
393  * Task that sends FIND PEER requests.
394  */
395 static struct GNUNET_SCHEDULER_Task *find_peer_task;
396
397 /**
398  * Identity of this peer.
399  */
400 static struct GNUNET_PeerIdentity my_identity;
401
402 /**
403  * Hash of the identity of this peer.
404  */
405 struct GNUNET_HashCode my_identity_hash;
406
407 /**
408  * Handle to CORE.
409  */
410 static struct GNUNET_CORE_Handle *core_api;
411
412 /**
413  * Handle to ATS connectivity.
414  */
415 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
416
417
418 /**
419  * Find the optimal bucket for this key.
420  *
421  * @param hc the hashcode to compare our identity to
422  * @return the proper bucket index, or #GNUNET_SYSERR
423  *         on error (same hashcode)
424  */
425 static int
426 find_bucket (const struct GNUNET_HashCode *hc)
427 {
428   unsigned int bits;
429
430   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
431   if (bits == MAX_BUCKETS)
432   {
433     /* How can all bits match? Got my own ID? */
434     GNUNET_break (0);
435     return GNUNET_SYSERR;
436   }
437   return MAX_BUCKETS - bits - 1;
438 }
439
440
441 /**
442  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
443  * Clean up the "oh" field in the @a cls
444  *
445  * @param cls a `struct ConnectInfo`
446  */
447 static void
448 offer_hello_done (void *cls)
449 {
450   struct ConnectInfo *ci = cls;
451
452   ci->oh = NULL;
453 }
454
455
456 /**
457  * Function called for all entries in #all_desired_peers to clean up.
458  *
459  * @param cls NULL
460  * @param peer peer the entry is for
461  * @param value the value to remove
462  * @return #GNUNET_YES
463  */
464 static int
465 free_connect_info (void *cls,
466                    const struct GNUNET_PeerIdentity *peer,
467                    void *value)
468 {
469   struct ConnectInfo *ci = value;
470
471   (void) cls;
472   GNUNET_assert (GNUNET_YES ==
473                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
474                                                        peer,
475                                                        ci));
476   if (NULL != ci->sh)
477   {
478     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
479     ci->sh = NULL;
480   }
481   if (NULL != ci->oh)
482   {
483     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
484     ci->oh = NULL;
485   }
486   GNUNET_free (ci);
487   return GNUNET_YES;
488 }
489
490
491 /**
492  * Consider if we want to connect to a given peer, and if so
493  * let ATS know.  If applicable, the HELLO is offered to the
494  * TRANSPORT service.
495  *
496  * @param pid peer to consider connectivity requirements for
497  * @param h a HELLO message, or NULL
498  */
499 static void
500 try_connect (const struct GNUNET_PeerIdentity *pid,
501              const struct GNUNET_MessageHeader *h)
502 {
503   int bucket;
504   struct GNUNET_HashCode pid_hash;
505   struct ConnectInfo *ci;
506   uint32_t strength;
507
508   GNUNET_CRYPTO_hash (pid,
509                       sizeof(struct GNUNET_PeerIdentity),
510                       &pid_hash);
511   bucket = find_bucket (&pid_hash);
512   if (bucket < 0)
513     return; /* self? */
514   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
515                                           pid);
516
517   if (k_buckets[bucket].peers_size < bucket_size)
518     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
519   else
520     strength = bucket; /* minimum value of connectivity */
521   if (GNUNET_YES ==
522       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
523                                               pid))
524     strength *= 2; /* double for connected peers */
525   else if (k_buckets[bucket].peers_size > bucket_size)
526     strength = 0; /* bucket full, we really do not care about more */
527
528   if ((0 == strength) &&
529       (NULL != ci))
530   {
531     /* release request */
532     GNUNET_assert (GNUNET_YES ==
533                    free_connect_info (NULL,
534                                       pid,
535                                       ci));
536     return;
537   }
538   if (NULL == ci)
539   {
540     ci = GNUNET_new (struct ConnectInfo);
541     GNUNET_assert (GNUNET_OK ==
542                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
543                                                       pid,
544                                                       ci,
545                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
546   }
547   if ((NULL != ci->oh) &&
548       (NULL != h))
549     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
550   if (NULL != h)
551     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
552                                            h,
553                                            &offer_hello_done,
554                                            ci);
555   if ((NULL != ci->sh) &&
556       (ci->strength != strength))
557     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
558   if (ci->strength != strength)
559     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
560                                               pid,
561                                               strength);
562   ci->strength = strength;
563 }
564
565
566 /**
567  * Function called for each peer in #all_desired_peers during
568  * #update_connect_preferences() if we have reason to adjust
569  * the strength of our desire to keep connections to certain
570  * peers.  Calls #try_connect() to update the calculations for
571  * the given @a pid.
572  *
573  * @param cls NULL
574  * @param pid peer to update
575  * @param value unused
576  * @return #GNUNET_YES (continue to iterate)
577  */
578 static int
579 update_desire_strength (void *cls,
580                         const struct GNUNET_PeerIdentity *pid,
581                         void *value)
582 {
583   (void) cls;
584   (void) value;
585   try_connect (pid,
586                NULL);
587   return GNUNET_YES;
588 }
589
590
591 /**
592  * Update our preferences for connectivity as given to ATS.
593  *
594  * @param cls the `struct PeerInfo` of the peer
595  * @param tc scheduler context.
596  */
597 static void
598 update_connect_preferences ()
599 {
600   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
601                                          &update_desire_strength,
602                                          NULL);
603 }
604
605
606 /**
607  * Add each of the peers we already know to the bloom filter of
608  * the request so that we don't get duplicate HELLOs.
609  *
610  * @param cls the `struct GNUNET_BLOCK_Group`
611  * @param key peer identity to add to the bloom filter
612  * @param value value the peer information (unused)
613  * @return #GNUNET_YES (we should continue to iterate)
614  */
615 static int
616 add_known_to_bloom (void *cls,
617                     const struct GNUNET_PeerIdentity *key,
618                     void *value)
619 {
620   struct GNUNET_BLOCK_Group *bg = cls;
621   struct GNUNET_HashCode key_hash;
622
623   (void) cls;
624   (void) value;
625   GNUNET_CRYPTO_hash (key,
626                       sizeof(struct GNUNET_PeerIdentity),
627                       &key_hash);
628   GNUNET_BLOCK_group_set_seen (bg,
629                                &key_hash,
630                                1);
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "Adding known peer (%s) to bloomfilter for FIND PEER\n",
633               GNUNET_i2s (key));
634   return GNUNET_YES;
635 }
636
637
638 /**
639  * Task to send a find peer message for our own peer identifier
640  * so that we can find the closest peers in the network to ourselves
641  * and attempt to connect to them.
642  *
643  * @param cls closure for this task
644  */
645 static void
646 send_find_peer_message (void *cls)
647 {
648   struct GNUNET_TIME_Relative next_send_time;
649   struct GNUNET_BLOCK_Group *bg;
650   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
651
652   (void) cls;
653   find_peer_task = NULL;
654   if (newly_found_peers > bucket_size)
655   {
656     /* If we are finding many peers already, no need to send out our request right now! */
657     find_peer_task =
658       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
659                                     &send_find_peer_message,
660                                     NULL);
661     newly_found_peers = 0;
662     return;
663   }
664   bg = GNUNET_BLOCK_group_create (GDS_block_context,
665                                   GNUNET_BLOCK_TYPE_DHT_HELLO,
666                                   GNUNET_CRYPTO_random_u32 (
667                                     GNUNET_CRYPTO_QUALITY_WEAK,
668                                     UINT32_MAX),
669                                   NULL,
670                                   0,
671                                   "filter-size",
672                                   DHT_BLOOM_SIZE,
673                                   NULL);
674   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
675                                          &add_known_to_bloom,
676                                          bg);
677   GNUNET_STATISTICS_update (GDS_stats,
678                             gettext_noop ("# FIND PEER messages initiated"),
679                             1,
680                             GNUNET_NO);
681   peer_bf
682     = GNUNET_CONTAINER_bloomfilter_init (NULL,
683                                          DHT_BLOOM_SIZE,
684                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
685   // FIXME: pass priority!?
686   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
687                              GNUNET_DHT_RO_FIND_PEER
688                              | GNUNET_DHT_RO_RECORD_ROUTE,
689                              FIND_PEER_REPLICATION_LEVEL,
690                              0,
691                              &my_identity_hash,
692                              NULL,
693                              0,
694                              bg,
695                              peer_bf);
696   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
697   GNUNET_BLOCK_group_destroy (bg);
698   /* schedule next round */
699   next_send_time.rel_value_us =
700     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us
701     + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
702                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us
703                                 / (newly_found_peers + 1));
704   newly_found_peers = 0;
705   GNUNET_assert (NULL == find_peer_task);
706   find_peer_task =
707     GNUNET_SCHEDULER_add_delayed (next_send_time,
708                                   &send_find_peer_message,
709                                   NULL);
710 }
711
712
713 /**
714  * Method called whenever a peer connects.
715  *
716  * @param cls closure
717  * @param peer peer identity this notification is about
718  * @param mq message queue for sending messages to @a peer
719  * @return our `struct PeerInfo` for @a peer
720  */
721 static void *
722 handle_core_connect (void *cls,
723                      const struct GNUNET_PeerIdentity *peer,
724                      struct GNUNET_MQ_Handle *mq)
725 {
726   struct PeerInfo *pi;
727
728   (void) cls;
729   /* Check for connect to self message */
730   if (0 == GNUNET_memcmp (&my_identity,
731                           peer))
732     return NULL;
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "Connected to %s\n",
735               GNUNET_i2s (peer));
736   GNUNET_assert (GNUNET_NO ==
737                  GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
738                                                     peer));
739   GNUNET_STATISTICS_update (GDS_stats,
740                             gettext_noop ("# peers connected"),
741                             1,
742                             GNUNET_NO);
743   pi = GNUNET_new (struct PeerInfo);
744   pi->id = peer;
745   pi->mq = mq;
746   GNUNET_CRYPTO_hash (peer,
747                       sizeof(struct GNUNET_PeerIdentity),
748                       &pi->phash);
749   pi->peer_bucket = find_bucket (&pi->phash);
750   GNUNET_assert ((pi->peer_bucket >= 0) &&
751                  ((unsigned int) pi->peer_bucket < MAX_BUCKETS));
752   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
753                                     k_buckets[pi->peer_bucket].tail,
754                                     pi);
755   k_buckets[pi->peer_bucket].peers_size++;
756   closest_bucket = GNUNET_MAX (closest_bucket,
757                                (unsigned int) pi->peer_bucket);
758   GNUNET_assert (GNUNET_OK ==
759                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
760                                                     pi->id,
761                                                     pi,
762                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
763   if ((pi->peer_bucket > 0) &&
764       (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
765   {
766     update_connect_preferences ();
767     newly_found_peers++;
768   }
769   if ((1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
770       (GNUNET_YES != disable_try_connect))
771   {
772     /* got a first connection, good time to start with FIND PEER requests... */
773     GNUNET_assert (NULL == find_peer_task);
774     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
775                                                NULL);
776   }
777   return pi;
778 }
779
780
781 /**
782  * Method called whenever a peer disconnects.
783  *
784  * @param cls closure
785  * @param peer peer identity this notification is about
786  * @param internal_cls our `struct PeerInfo` for @a peer
787  */
788 static void
789 handle_core_disconnect (void *cls,
790                         const struct GNUNET_PeerIdentity *peer,
791                         void *internal_cls)
792 {
793   struct PeerInfo *to_remove = internal_cls;
794
795   (void) cls;
796   /* Check for disconnect from self message */
797   if (NULL == to_remove)
798     return;
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Disconnected %s\n",
801               GNUNET_i2s (peer));
802   GNUNET_STATISTICS_update (GDS_stats,
803                             gettext_noop ("# peers connected"),
804                             -1,
805                             GNUNET_NO);
806   GNUNET_assert (GNUNET_YES ==
807                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
808                                                        peer,
809                                                        to_remove));
810   if ((0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
811       (GNUNET_YES != disable_try_connect))
812   {
813     GNUNET_SCHEDULER_cancel (find_peer_task);
814     find_peer_task = NULL;
815   }
816   GNUNET_assert (to_remove->peer_bucket >= 0);
817   GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
818                                k_buckets[to_remove->peer_bucket].tail,
819                                to_remove);
820   GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
821   k_buckets[to_remove->peer_bucket].peers_size--;
822   while ((closest_bucket > 0) &&
823          (0 == k_buckets[to_remove->peer_bucket].peers_size))
824     closest_bucket--;
825   if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
826     update_connect_preferences ();
827   GNUNET_free (to_remove);
828 }
829
830
831 /**
832  * To how many peers should we (on average) forward the request to
833  * obtain the desired target_replication count (on average).
834  *
835  * @param hop_count number of hops the message has traversed
836  * @param target_replication the number of total paths desired
837  * @return Some number of peers to forward the message to
838  */
839 static unsigned int
840 get_forward_count (uint32_t hop_count,
841                    uint32_t target_replication)
842 {
843   uint32_t random_value;
844   uint32_t forward_count;
845   float target_value;
846
847   if (hop_count > GDS_NSE_get () * 4.0)
848   {
849     /* forcefully terminate */
850     GNUNET_STATISTICS_update (GDS_stats,
851                               gettext_noop ("# requests TTL-dropped"),
852                               1, GNUNET_NO);
853     return 0;
854   }
855   if (hop_count > GDS_NSE_get () * 2.0)
856   {
857     /* Once we have reached our ideal number of hops, only forward to 1 peer */
858     return 1;
859   }
860   /* bound by system-wide maximum */
861   target_replication =
862     GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
863   target_value =
864     1 + (target_replication - 1.0) / (GDS_NSE_get ()
865                                       + ((float) (target_replication - 1.0)
866                                          * hop_count));
867   /* Set forward count to floor of target_value */
868   forward_count = (uint32_t) target_value;
869   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
870   target_value = target_value - forward_count;
871   random_value =
872     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
873   if (random_value < (target_value * UINT32_MAX))
874     forward_count++;
875   return forward_count;
876 }
877
878
879 /**
880  * Compute the distance between have and target as a 32-bit value.
881  * Differences in the lower bits must count stronger than differences
882  * in the higher bits.
883  *
884  * @param target
885  * @param have
886  * @return 0 if have==target, otherwise a number
887  *           that is larger as the distance between
888  *           the two hash codes increases
889  */
890 static unsigned int
891 get_distance (const struct GNUNET_HashCode *target,
892               const struct GNUNET_HashCode *have)
893 {
894   unsigned int bucket;
895   unsigned int msb;
896   unsigned int lsb;
897   unsigned int i;
898
899   /* We have to represent the distance between two 2^9 (=512)-bit
900    * numbers as a 2^5 (=32)-bit number with "0" being used for the
901    * two numbers being identical; furthermore, we need to
902    * guarantee that a difference in the number of matching
903    * bits is always represented in the result.
904    *
905    * We use 2^32/2^9 numerical values to distinguish between
906    * hash codes that have the same LSB bit distance and
907    * use the highest 2^9 bits of the result to signify the
908    * number of (mis)matching LSB bits; if we have 0 matching
909    * and hence 512 mismatching LSB bits we return -1 (since
910    * 512 itself cannot be represented with 9 bits) *//* first, calculate the most significant 9 bits of our
911    * result, aka the number of LSBs */bucket = GNUNET_CRYPTO_hash_matching_bits (target,
912                                              have);
913   /* bucket is now a value between 0 and 512 */
914   if (bucket == 512)
915     return 0;                   /* perfect match */
916   if (bucket == 0)
917     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
918                                  * below, we'd end up with max+1 (overflow)) */
919
920   /* calculate the most significant bits of the final result */
921   msb = (512 - bucket) << (32 - 9);
922   /* calculate the 32-9 least significant bits of the final result by
923    * looking at the differences in the 32-9 bits following the
924    * mismatching bit at 'bucket' */
925   lsb = 0;
926   for (i = bucket + 1;
927        (i < sizeof(struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9);
928        i++)
929   {
930     if (GNUNET_CRYPTO_hash_get_bit_rtl (target, i) !=
931         GNUNET_CRYPTO_hash_get_bit_rtl (have, i))
932       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
933                                                  * last bit set will be 31 -- if
934                                                  * i does not reach 512 first... */
935   }
936   return msb | lsb;
937 }
938
939
940 /**
941  * Check whether my identity is closer than any known peers.  If a
942  * non-null bloomfilter is given, check if this is the closest peer
943  * that hasn't already been routed to.
944  *
945  * @param key hash code to check closeness to
946  * @param bloom bloomfilter, exclude these entries from the decision
947  * @return #GNUNET_YES if node location is closest,
948  *         #GNUNET_NO otherwise.
949  */
950 int
951 GDS_am_closest_peer (const struct GNUNET_HashCode *key,
952                      const struct GNUNET_CONTAINER_BloomFilter *bloom)
953 {
954   int bits;
955   int other_bits;
956   int bucket_num;
957   struct PeerInfo *pos;
958
959   if (0 == GNUNET_memcmp (&my_identity_hash,
960                           key))
961     return GNUNET_YES;
962   bucket_num = find_bucket (key);
963   GNUNET_assert (bucket_num >= 0);
964   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
965                                            key);
966   pos = k_buckets[bucket_num].head;
967   while (NULL != pos)
968   {
969     if ((NULL != bloom) &&
970         (GNUNET_YES ==
971          GNUNET_CONTAINER_bloomfilter_test (bloom,
972                                             &pos->phash)))
973     {
974       pos = pos->next;
975       continue;                 /* Skip already checked entries */
976     }
977     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
978                                                    key);
979     if (other_bits > bits)
980       return GNUNET_NO;
981     if (other_bits == bits)     /* We match the same number of bits */
982       return GNUNET_YES;
983     pos = pos->next;
984   }
985   /* No peers closer, we are the closest! */
986   return GNUNET_YES;
987 }
988
989
990 /**
991  * Select a peer from the routing table that would be a good routing
992  * destination for sending a message for "key".  The resulting peer
993  * must not be in the set of blocked peers.<p>
994  *
995  * Note that we should not ALWAYS select the closest peer to the
996  * target, peers further away from the target should be chosen with
997  * exponentially declining probability.
998  *
999  * FIXME: double-check that this is fine
1000  *
1001  *
1002  * @param key the key we are selecting a peer to route to
1003  * @param bloom a bloomfilter containing entries this request has seen already
1004  * @param hops how many hops has this message traversed thus far
1005  * @return Peer to route to, or NULL on error
1006  */
1007 static struct PeerInfo *
1008 select_peer (const struct GNUNET_HashCode *key,
1009              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1010              uint32_t hops)
1011 {
1012   unsigned int bc;
1013   unsigned int count;
1014   unsigned int selected;
1015   struct PeerInfo *pos;
1016   unsigned int dist;
1017   unsigned int smallest_distance;
1018   struct PeerInfo *chosen;
1019
1020   if (hops >= GDS_NSE_get ())
1021   {
1022     /* greedy selection (closest peer that is not in bloomfilter) */
1023     smallest_distance = UINT_MAX;
1024     chosen = NULL;
1025     for (bc = 0; bc <= closest_bucket; bc++)
1026     {
1027       pos = k_buckets[bc].head;
1028       count = 0;
1029       while ((pos != NULL) && (count < bucket_size))
1030       {
1031         if ((NULL == bloom) ||
1032             (GNUNET_NO ==
1033              GNUNET_CONTAINER_bloomfilter_test (bloom,
1034                                                 &pos->phash)))
1035         {
1036           dist = get_distance (key,
1037                                &pos->phash);
1038           if (dist < smallest_distance)
1039           {
1040             chosen = pos;
1041             smallest_distance = dist;
1042           }
1043         }
1044         else
1045         {
1046           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1048                       GNUNET_i2s (pos->id),
1049                       GNUNET_h2s (key));
1050           GNUNET_STATISTICS_update (GDS_stats,
1051                                     gettext_noop (
1052                                       "# Peers excluded from routing due to Bloomfilter"),
1053                                     1,
1054                                     GNUNET_NO);
1055           dist = get_distance (key,
1056                                &pos->phash);
1057           if (dist < smallest_distance)
1058           {
1059             chosen = NULL;
1060             smallest_distance = dist;
1061           }
1062         }
1063         count++;
1064         pos = pos->next;
1065       }
1066     }
1067     if (NULL == chosen)
1068       GNUNET_STATISTICS_update (GDS_stats,
1069                                 gettext_noop ("# Peer selection failed"),
1070                                 1,
1071                                 GNUNET_NO);
1072     else
1073       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074                   "Selected peer `%s' in greedy routing for %s\n",
1075                   GNUNET_i2s (chosen->id),
1076                   GNUNET_h2s (key));
1077     return chosen;
1078   }
1079
1080   /* select "random" peer */
1081   /* count number of peers that are available and not filtered */
1082   count = 0;
1083   for (bc = 0; bc <= closest_bucket; bc++)
1084   {
1085     pos = k_buckets[bc].head;
1086     while ((NULL != pos) && (count < bucket_size))
1087     {
1088       if ((NULL != bloom) &&
1089           (GNUNET_YES ==
1090            GNUNET_CONTAINER_bloomfilter_test (bloom,
1091                                               &pos->phash)))
1092       {
1093         GNUNET_STATISTICS_update (GDS_stats,
1094                                   gettext_noop
1095                                   (
1096                                     "# Peers excluded from routing due to Bloomfilter"),
1097                                   1, GNUNET_NO);
1098         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1100                     GNUNET_i2s (pos->id),
1101                     GNUNET_h2s (key));
1102         pos = pos->next;
1103         continue;               /* Ignore bloomfiltered peers */
1104       }
1105       count++;
1106       pos = pos->next;
1107     }
1108   }
1109   if (0 == count)               /* No peers to select from! */
1110   {
1111     GNUNET_STATISTICS_update (GDS_stats,
1112                               gettext_noop ("# Peer selection failed"), 1,
1113                               GNUNET_NO);
1114     return NULL;
1115   }
1116   /* Now actually choose a peer */
1117   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1118                                        count);
1119   count = 0;
1120   for (bc = 0; bc <= closest_bucket; bc++)
1121   {
1122     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size));
1123          pos = pos->next)
1124     {
1125       if ((bloom != NULL) &&
1126           (GNUNET_YES ==
1127            GNUNET_CONTAINER_bloomfilter_test (bloom,
1128                                               &pos->phash)))
1129       {
1130         continue;               /* Ignore bloomfiltered peers */
1131       }
1132       if (0 == selected--)
1133       {
1134         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135                     "Selected peer `%s' in random routing for %s\n",
1136                     GNUNET_i2s (pos->id),
1137                     GNUNET_h2s (key));
1138         return pos;
1139       }
1140     }
1141   }
1142   GNUNET_break (0);
1143   return NULL;
1144 }
1145
1146
1147 /**
1148  * Compute the set of peers that the given request should be
1149  * forwarded to.
1150  *
1151  * @param key routing key
1152  * @param bloom bloom filter excluding peers as targets, all selected
1153  *        peers will be added to the bloom filter
1154  * @param hop_count number of hops the request has traversed so far
1155  * @param target_replication desired number of replicas
1156  * @param targets where to store an array of target peers (to be
1157  *         free'd by the caller)
1158  * @return number of peers returned in 'targets'.
1159  */
1160 static unsigned int
1161 get_target_peers (const struct GNUNET_HashCode *key,
1162                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1163                   uint32_t hop_count,
1164                   uint32_t target_replication,
1165                   struct PeerInfo ***targets)
1166 {
1167   unsigned int ret;
1168   unsigned int off;
1169   struct PeerInfo **rtargets;
1170   struct PeerInfo *nxt;
1171
1172   GNUNET_assert (NULL != bloom);
1173   ret = get_forward_count (hop_count,
1174                            target_replication);
1175   if (0 == ret)
1176   {
1177     *targets = NULL;
1178     return 0;
1179   }
1180   rtargets = GNUNET_new_array (ret,
1181                                struct PeerInfo *);
1182   for (off = 0; off < ret; off++)
1183   {
1184     nxt = select_peer (key,
1185                        bloom,
1186                        hop_count);
1187     if (NULL == nxt)
1188       break;
1189     rtargets[off] = nxt;
1190     GNUNET_break (GNUNET_NO ==
1191                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1192                                                      &nxt->phash));
1193     GNUNET_CONTAINER_bloomfilter_add (bloom,
1194                                       &nxt->phash);
1195   }
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1198               off,
1199               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1200               (unsigned int) hop_count,
1201               GNUNET_h2s (key),
1202               ret);
1203   if (0 == off)
1204   {
1205     GNUNET_free (rtargets);
1206     *targets = NULL;
1207     return 0;
1208   }
1209   *targets = rtargets;
1210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1212               GNUNET_h2s (key),
1213               off,
1214               ret);
1215   return off;
1216 }
1217
1218
1219 /**
1220  * Perform a PUT operation.   Forwards the given request to other
1221  * peers.   Does not store the data locally.  Does not give the
1222  * data to local clients.  May do nothing if this is the only
1223  * peer in the network (or if we are the closest peer in the
1224  * network).
1225  *
1226  * @param type type of the block
1227  * @param options routing options
1228  * @param desired_replication_level desired replication count
1229  * @param expiration_time when does the content expire
1230  * @param hop_count how many hops has this message traversed so far
1231  * @param bf Bloom filter of peers this PUT has already traversed
1232  * @param key key for the content
1233  * @param put_path_length number of entries in @a put_path
1234  * @param put_path peers this request has traversed so far (if tracked)
1235  * @param data payload to store
1236  * @param data_size number of bytes in @a data
1237  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1238  */
1239 int
1240 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1241                            enum GNUNET_DHT_RouteOption options,
1242                            uint32_t desired_replication_level,
1243                            struct GNUNET_TIME_Absolute expiration_time,
1244                            uint32_t hop_count,
1245                            struct GNUNET_CONTAINER_BloomFilter *bf,
1246                            const struct GNUNET_HashCode *key,
1247                            unsigned int put_path_length,
1248                            struct GNUNET_PeerIdentity *put_path,
1249                            const void *data,
1250                            size_t data_size)
1251 {
1252   unsigned int target_count;
1253   unsigned int i;
1254   struct PeerInfo **targets;
1255   struct PeerInfo *target;
1256   size_t msize;
1257   struct GNUNET_MQ_Envelope *env;
1258   struct PeerPutMessage *ppm;
1259   struct GNUNET_PeerIdentity *pp;
1260   unsigned int skip_count;
1261
1262   GNUNET_assert (NULL != bf);
1263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264               "Adding myself (%s) to PUT bloomfilter for %s\n",
1265               GNUNET_i2s (&my_identity),
1266               GNUNET_h2s (key));
1267   GNUNET_CONTAINER_bloomfilter_add (bf,
1268                                     &my_identity_hash);
1269   GNUNET_STATISTICS_update (GDS_stats,
1270                             gettext_noop ("# PUT requests routed"),
1271                             1,
1272                             GNUNET_NO);
1273   target_count
1274     = get_target_peers (key,
1275                         bf,
1276                         hop_count,
1277                         desired_replication_level,
1278                         &targets);
1279   if (0 == target_count)
1280   {
1281     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282                 "Routing PUT for %s terminates after %u hops at %s\n",
1283                 GNUNET_h2s (key),
1284                 (unsigned int) hop_count,
1285                 GNUNET_i2s (&my_identity));
1286     return GNUNET_NO;
1287   }
1288   msize = put_path_length * sizeof(struct GNUNET_PeerIdentity) + data_size;
1289   if (msize + sizeof(struct PeerPutMessage)
1290       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1291   {
1292     put_path_length = 0;
1293     msize = data_size;
1294   }
1295   if (msize + sizeof(struct PeerPutMessage)
1296       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1297   {
1298     GNUNET_break (0);
1299     GNUNET_free (targets);
1300     return GNUNET_NO;
1301   }
1302   GNUNET_STATISTICS_update (GDS_stats,
1303                             gettext_noop (
1304                               "# PUT messages queued for transmission"),
1305                             target_count,
1306                             GNUNET_NO);
1307   skip_count = 0;
1308   for (i = 0; i < target_count; i++)
1309   {
1310     target = targets[i];
1311     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1312     {
1313       /* skip */
1314       GNUNET_STATISTICS_update (GDS_stats,
1315                                 gettext_noop (
1316                                   "# P2P messages dropped due to full queue"),
1317                                 1,
1318                                 GNUNET_NO);
1319       skip_count++;
1320       continue;
1321     }
1322     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323                 "Routing PUT for %s after %u hops to %s\n",
1324                 GNUNET_h2s (key),
1325                 (unsigned int) hop_count,
1326                 GNUNET_i2s (target->id));
1327     env = GNUNET_MQ_msg_extra (ppm,
1328                                msize,
1329                                GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1330     ppm->options = htonl (options);
1331     ppm->type = htonl (type);
1332     ppm->hop_count = htonl (hop_count + 1);
1333     ppm->desired_replication_level = htonl (desired_replication_level);
1334     ppm->put_path_length = htonl (put_path_length);
1335     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1336     GNUNET_break (GNUNET_YES ==
1337                   GNUNET_CONTAINER_bloomfilter_test (bf,
1338                                                      &target->phash));
1339     GNUNET_assert (GNUNET_OK ==
1340                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1341                                                               ppm->bloomfilter,
1342                                                               DHT_BLOOM_SIZE));
1343     ppm->key = *key;
1344     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1345     GNUNET_memcpy (pp,
1346                    put_path,
1347                    sizeof(struct GNUNET_PeerIdentity) * put_path_length);
1348     GNUNET_memcpy (&pp[put_path_length],
1349                    data,
1350                    data_size);
1351     GNUNET_MQ_send (target->mq,
1352                     env);
1353   }
1354   GNUNET_free (targets);
1355   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1356 }
1357
1358
1359 /**
1360  * Perform a GET operation.  Forwards the given request to other
1361  * peers.  Does not lookup the key locally.  May do nothing if this is
1362  * the only peer in the network (or if we are the closest peer in the
1363  * network).
1364  *
1365  * @param type type of the block
1366  * @param options routing options
1367  * @param desired_replication_level desired replication count
1368  * @param hop_count how many hops did this request traverse so far?
1369  * @param key key for the content
1370  * @param xquery extended query
1371  * @param xquery_size number of bytes in @a xquery
1372  * @param bg group to use for filtering replies
1373  * @param peer_bf filter for peers not to select (again)
1374  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1375  */
1376 int
1377 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1378                            enum GNUNET_DHT_RouteOption options,
1379                            uint32_t desired_replication_level,
1380                            uint32_t hop_count,
1381                            const struct GNUNET_HashCode *key,
1382                            const void *xquery,
1383                            size_t xquery_size,
1384                            struct GNUNET_BLOCK_Group *bg,
1385                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1386 {
1387   unsigned int target_count;
1388   struct PeerInfo **targets;
1389   struct PeerInfo *target;
1390   struct GNUNET_MQ_Envelope *env;
1391   size_t msize;
1392   struct PeerGetMessage *pgm;
1393   char *xq;
1394   size_t reply_bf_size;
1395   void *reply_bf;
1396   unsigned int skip_count;
1397   uint32_t bf_nonce;
1398
1399   GNUNET_assert (NULL != peer_bf);
1400   GNUNET_STATISTICS_update (GDS_stats,
1401                             gettext_noop ("# GET requests routed"),
1402                             1,
1403                             GNUNET_NO);
1404   target_count = get_target_peers (key,
1405                                    peer_bf,
1406                                    hop_count,
1407                                    desired_replication_level,
1408                                    &targets);
1409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410               "Adding myself (%s) to GET bloomfilter for %s\n",
1411               GNUNET_i2s (&my_identity),
1412               GNUNET_h2s (key));
1413   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1414                                     &my_identity_hash);
1415   if (0 == target_count)
1416   {
1417     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418                 "Routing GET for %s terminates after %u hops at %s\n",
1419                 GNUNET_h2s (key),
1420                 (unsigned int) hop_count,
1421                 GNUNET_i2s (&my_identity));
1422     return GNUNET_NO;
1423   }
1424   if (GNUNET_OK !=
1425       GNUNET_BLOCK_group_serialize (bg,
1426                                     &bf_nonce,
1427                                     &reply_bf,
1428                                     &reply_bf_size))
1429   {
1430     reply_bf = NULL;
1431     reply_bf_size = 0;
1432     bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1433                                          UINT32_MAX);
1434   }
1435   msize = xquery_size + reply_bf_size;
1436   if (msize + sizeof(struct PeerGetMessage) >= GNUNET_MAX_MESSAGE_SIZE)
1437   {
1438     GNUNET_break (0);
1439     GNUNET_free_non_null (reply_bf);
1440     GNUNET_free (targets);
1441     return GNUNET_NO;
1442   }
1443   GNUNET_STATISTICS_update (GDS_stats,
1444                             gettext_noop (
1445                               "# GET messages queued for transmission"),
1446                             target_count,
1447                             GNUNET_NO);
1448   /* forward request */
1449   skip_count = 0;
1450   for (unsigned int i = 0; i < target_count; i++)
1451   {
1452     target = targets[i];
1453     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1454     {
1455       /* skip */
1456       GNUNET_STATISTICS_update (GDS_stats,
1457                                 gettext_noop (
1458                                   "# P2P messages dropped due to full queue"),
1459                                 1, GNUNET_NO);
1460       skip_count++;
1461       continue;
1462     }
1463     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1464                 "Routing GET for %s after %u hops to %s\n",
1465                 GNUNET_h2s (key),
1466                 (unsigned int) hop_count,
1467                 GNUNET_i2s (target->id));
1468     env = GNUNET_MQ_msg_extra (pgm,
1469                                msize,
1470                                GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1471     pgm->options = htonl (options);
1472     pgm->type = htonl (type);
1473     pgm->hop_count = htonl (hop_count + 1);
1474     pgm->desired_replication_level = htonl (desired_replication_level);
1475     pgm->xquery_size = htonl (xquery_size);
1476     pgm->bf_mutator = bf_nonce;
1477     GNUNET_break (GNUNET_YES ==
1478                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1479                                                      &target->phash));
1480     GNUNET_assert (GNUNET_OK ==
1481                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1482                                                               pgm->bloomfilter,
1483                                                               DHT_BLOOM_SIZE));
1484     pgm->key = *key;
1485     xq = (char *) &pgm[1];
1486     GNUNET_memcpy (xq,
1487                    xquery,
1488                    xquery_size);
1489     GNUNET_memcpy (&xq[xquery_size],
1490                    reply_bf,
1491                    reply_bf_size);
1492     GNUNET_MQ_send (target->mq,
1493                     env);
1494   }
1495   GNUNET_free (targets);
1496   GNUNET_free_non_null (reply_bf);
1497   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1498 }
1499
1500
1501 /**
1502  * Handle a reply (route to origin).  Only forwards the reply back to
1503  * the given peer.  Does not do local caching or forwarding to local
1504  * clients.
1505  *
1506  * @param target neighbour that should receive the block (if still connected)
1507  * @param type type of the block
1508  * @param expiration_time when does the content expire
1509  * @param key key for the content
1510  * @param put_path_length number of entries in @a put_path
1511  * @param put_path peers the original PUT traversed (if tracked)
1512  * @param get_path_length number of entries in @a get_path
1513  * @param get_path peers this reply has traversed so far (if tracked)
1514  * @param data payload of the reply
1515  * @param data_size number of bytes in @a data
1516  */
1517 void
1518 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1519                              enum GNUNET_BLOCK_Type type,
1520                              struct GNUNET_TIME_Absolute expiration_time,
1521                              const struct GNUNET_HashCode *key,
1522                              unsigned int put_path_length,
1523                              const struct GNUNET_PeerIdentity *put_path,
1524                              unsigned int get_path_length,
1525                              const struct GNUNET_PeerIdentity *get_path,
1526                              const void *data,
1527                              size_t data_size)
1528 {
1529   struct PeerInfo *pi;
1530   struct GNUNET_MQ_Envelope *env;
1531   size_t msize;
1532   struct PeerResultMessage *prm;
1533   struct GNUNET_PeerIdentity *paths;
1534
1535   msize = data_size + (get_path_length + put_path_length)
1536           * sizeof(struct GNUNET_PeerIdentity);
1537   if ((msize + sizeof(struct PeerResultMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
1538       (get_path_length >
1539        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
1540       (put_path_length >
1541        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
1542       (data_size > GNUNET_MAX_MESSAGE_SIZE))
1543   {
1544     GNUNET_break (0);
1545     return;
1546   }
1547   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1548                                           target);
1549   if (NULL == pi)
1550   {
1551     /* peer disconnected in the meantime, drop reply */
1552     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553                 "No matching peer for reply for key %s\n",
1554                 GNUNET_h2s (key));
1555     return;
1556   }
1557   if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1558   {
1559     /* skip */
1560     GNUNET_STATISTICS_update (GDS_stats,
1561                               gettext_noop (
1562                                 "# P2P messages dropped due to full queue"),
1563                               1,
1564                               GNUNET_NO);
1565     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566                 "Peer queue full, ignoring reply for key %s\n",
1567                 GNUNET_h2s (key));
1568     return;
1569   }
1570
1571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572               "Forwarding reply for key %s to peer %s\n",
1573               GNUNET_h2s (key),
1574               GNUNET_i2s (target));
1575   GNUNET_STATISTICS_update (GDS_stats,
1576                             gettext_noop
1577                               ("# RESULT messages queued for transmission"), 1,
1578                             GNUNET_NO);
1579   env = GNUNET_MQ_msg_extra (prm,
1580                              msize,
1581                              GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1582   prm->type = htonl (type);
1583   prm->put_path_length = htonl (put_path_length);
1584   prm->get_path_length = htonl (get_path_length);
1585   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1586   prm->key = *key;
1587   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1588   GNUNET_memcpy (paths,
1589                  put_path,
1590                  put_path_length * sizeof(struct GNUNET_PeerIdentity));
1591   GNUNET_memcpy (&paths[put_path_length],
1592                  get_path,
1593                  get_path_length * sizeof(struct GNUNET_PeerIdentity));
1594   GNUNET_memcpy (&paths[put_path_length + get_path_length],
1595                  data,
1596                  data_size);
1597   GNUNET_MQ_send (pi->mq,
1598                   env);
1599 }
1600
1601
1602 /**
1603  * To be called on core init/fail.
1604  *
1605  * @param cls service closure
1606  * @param identity the public identity of this peer
1607  */
1608 static void
1609 core_init (void *cls,
1610            const struct GNUNET_PeerIdentity *identity)
1611 {
1612   (void) cls;
1613   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1614               "CORE called, I am %s\n",
1615               GNUNET_i2s (identity));
1616   my_identity = *identity;
1617   GNUNET_CRYPTO_hash (identity,
1618                       sizeof(struct GNUNET_PeerIdentity),
1619                       &my_identity_hash);
1620   GNUNET_SERVICE_resume (GDS_service);
1621 }
1622
1623
1624 /**
1625  * Check validity of a p2p put request.
1626  *
1627  * @param cls closure with the `struct PeerInfo` of the sender
1628  * @param message message
1629  * @return #GNUNET_OK if the message is valid
1630  */
1631 static int
1632 check_dht_p2p_put (void *cls,
1633                    const struct PeerPutMessage *put)
1634 {
1635   uint32_t putlen;
1636   uint16_t msize;
1637
1638   (void) cls;
1639   msize = ntohs (put->header.size);
1640   putlen = ntohl (put->put_path_length);
1641   if ((msize <
1642        sizeof(struct PeerPutMessage)
1643        + putlen * sizeof(struct GNUNET_PeerIdentity)) ||
1644       (putlen >
1645        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
1646   {
1647     GNUNET_break_op (0);
1648     return GNUNET_SYSERR;
1649   }
1650   return GNUNET_OK;
1651 }
1652
1653
1654 /**
1655  * Core handler for p2p put requests.
1656  *
1657  * @param cls closure with the `struct PeerInfo` of the sender
1658  * @param message message
1659  */
1660 static void
1661 handle_dht_p2p_put (void *cls,
1662                     const struct PeerPutMessage *put)
1663 {
1664   struct PeerInfo *peer = cls;
1665   const struct GNUNET_PeerIdentity *put_path;
1666   const void *payload;
1667   uint32_t putlen;
1668   uint16_t msize;
1669   size_t payload_size;
1670   enum GNUNET_DHT_RouteOption options;
1671   struct GNUNET_CONTAINER_BloomFilter *bf;
1672   struct GNUNET_HashCode test_key;
1673   int forwarded;
1674   struct GNUNET_TIME_Absolute exp_time;
1675
1676   exp_time = GNUNET_TIME_absolute_ntoh (put->expiration_time);
1677   if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us)
1678   {
1679     GNUNET_STATISTICS_update (GDS_stats,
1680                               gettext_noop ("# Expired PUTs discarded"),
1681                               1,
1682                               GNUNET_NO);
1683     return;
1684   }
1685   msize = ntohs (put->header.size);
1686   putlen = ntohl (put->put_path_length);
1687   GNUNET_STATISTICS_update (GDS_stats,
1688                             gettext_noop ("# P2P PUT requests received"),
1689                             1,
1690                             GNUNET_NO);
1691   GNUNET_STATISTICS_update (GDS_stats,
1692                             gettext_noop ("# P2P PUT bytes received"),
1693                             msize,
1694                             GNUNET_NO);
1695   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1696   payload = &put_path[putlen];
1697   options = ntohl (put->options);
1698   payload_size = msize - (sizeof(struct PeerPutMessage)
1699                           + putlen * sizeof(struct GNUNET_PeerIdentity));
1700
1701   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1702               "PUT for `%s' from %s\n",
1703               GNUNET_h2s (&put->key),
1704               GNUNET_i2s (peer->id));
1705   if (GNUNET_YES == log_route_details_stderr)
1706   {
1707     char *tmp;
1708     char *pp;
1709
1710     pp = GNUNET_STRINGS_pp2s (put_path,
1711                               putlen);
1712     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1713     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1714                  "R5N PUT %s: %s->%s (%u, %u=>%u, PP: %s)\n",
1715                  GNUNET_h2s (&put->key),
1716                  GNUNET_i2s (peer->id),
1717                  tmp,
1718                  ntohl (put->hop_count),
1719                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1720                                                    &put->key),
1721                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1722                                                    &put->key),
1723                  pp);
1724     GNUNET_free (pp);
1725     GNUNET_free (tmp);
1726   }
1727   switch (GNUNET_BLOCK_get_key
1728             (GDS_block_context,
1729             ntohl (put->type),
1730             payload,
1731             payload_size,
1732             &test_key))
1733   {
1734   case GNUNET_YES:
1735     if (0 != memcmp (&test_key,
1736                      &put->key,
1737                      sizeof(struct GNUNET_HashCode)))
1738     {
1739       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1740
1741       GNUNET_break_op (0);
1742       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1743                   "PUT with key `%s' for block with key %s\n",
1744                   put_s,
1745                   GNUNET_h2s_full (&test_key));
1746       GNUNET_free (put_s);
1747       return;
1748     }
1749     break;
1750
1751   case GNUNET_NO:
1752     GNUNET_break_op (0);
1753     return;
1754
1755   case GNUNET_SYSERR:
1756     /* cannot verify, good luck */
1757     break;
1758   }
1759   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX)  /* FIXME: do for all tpyes */
1760   {
1761     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1762                                    ntohl (put->type),
1763                                    NULL,  /* query group */
1764                                    GNUNET_BLOCK_EO_NONE,
1765                                    NULL,    /* query */
1766                                    NULL, 0,  /* xquery */
1767                                    payload,
1768                                    payload_size))
1769     {
1770     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1771     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1772       break;
1773
1774     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1775     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1776     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1777     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1778     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1779     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1780     default:
1781       GNUNET_break_op (0);
1782       return;
1783     }
1784   }
1785
1786   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1787                                           DHT_BLOOM_SIZE,
1788                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1789   GNUNET_break_op (GNUNET_YES ==
1790                    GNUNET_CONTAINER_bloomfilter_test (bf,
1791                                                       &peer->phash));
1792   {
1793     struct GNUNET_PeerIdentity pp[putlen + 1];
1794
1795     /* extend 'put path' by sender */
1796     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1797     {
1798 #if SANITY_CHECKS
1799       for (unsigned int i = 0; i <= putlen; i++)
1800       {
1801         for (unsigned int j = 0; j < i; j++)
1802         {
1803           GNUNET_break (0 != memcmp (&pp[i],
1804                                      &pp[j],
1805                                      sizeof(struct GNUNET_PeerIdentity)));
1806         }
1807         GNUNET_break (0 != memcmp (&pp[i],
1808                                    peer->id,
1809                                    sizeof(struct GNUNET_PeerIdentity)));
1810       }
1811 #endif
1812       GNUNET_memcpy (pp,
1813                      put_path,
1814                      putlen * sizeof(struct GNUNET_PeerIdentity));
1815       pp[putlen] = *peer->id;
1816       putlen++;
1817     }
1818     else
1819       putlen = 0;
1820
1821     /* give to local clients */
1822     GDS_CLIENTS_handle_reply (exp_time,
1823                               &put->key,
1824                               0,
1825                               NULL,
1826                               putlen,
1827                               pp,
1828                               ntohl (put->type),
1829                               payload_size,
1830                               payload);
1831     /* store locally */
1832     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1833         (GDS_am_closest_peer (&put->key, bf)))
1834       GDS_DATACACHE_handle_put (exp_time,
1835                                 &put->key,
1836                                 putlen,
1837                                 pp,
1838                                 ntohl (put->type),
1839                                 payload_size,
1840                                 payload);
1841     /* route to other peers */
1842     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1843                                            options,
1844                                            ntohl (
1845                                              put->desired_replication_level),
1846                                            exp_time,
1847                                            ntohl (put->hop_count),
1848                                            bf,
1849                                            &put->key,
1850                                            putlen,
1851                                            pp,
1852                                            payload,
1853                                            payload_size);
1854     /* notify monitoring clients */
1855     GDS_CLIENTS_process_put (options
1856                              | ((GNUNET_OK == forwarded)
1857                                 ? GNUNET_DHT_RO_LAST_HOP
1858                                 : 0),
1859                              ntohl (put->type),
1860                              ntohl (put->hop_count),
1861                              ntohl (put->desired_replication_level),
1862                              putlen, pp,
1863                              exp_time,
1864                              &put->key,
1865                              payload,
1866                              payload_size);
1867   }
1868   GNUNET_CONTAINER_bloomfilter_free (bf);
1869 }
1870
1871
1872 /**
1873  * We have received a FIND PEER request.  Send matching
1874  * HELLOs back.
1875  *
1876  * @param sender sender of the FIND PEER request
1877  * @param key peers close to this key are desired
1878  * @param bg group for filtering peers
1879  */
1880 static void
1881 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1882                   const struct GNUNET_HashCode *key,
1883                   struct GNUNET_BLOCK_Group *bg)
1884 {
1885   int bucket_idx;
1886   struct PeerBucket *bucket;
1887   struct PeerInfo *peer;
1888   unsigned int choice;
1889   const struct GNUNET_HELLO_Message *hello;
1890   size_t hello_size;
1891
1892   /* first, check about our own HELLO */
1893   if (NULL != GDS_my_hello)
1894   {
1895     hello_size = GNUNET_HELLO_size ((const struct
1896                                      GNUNET_HELLO_Message *) GDS_my_hello);
1897     GNUNET_break (hello_size >= sizeof(struct GNUNET_MessageHeader));
1898     if (GNUNET_BLOCK_EVALUATION_OK_MORE ==
1899         GNUNET_BLOCK_evaluate (GDS_block_context,
1900                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1901                                bg,
1902                                GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1903                                &my_identity_hash,
1904                                NULL, 0,
1905                                GDS_my_hello,
1906                                hello_size))
1907     {
1908       GDS_NEIGHBOURS_handle_reply (sender,
1909                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1910                                    GNUNET_TIME_relative_to_absolute (
1911                                      hello_expiration),
1912                                    key,
1913                                    0,
1914                                    NULL,
1915                                    0,
1916                                    NULL,
1917                                    GDS_my_hello,
1918                                    hello_size);
1919     }
1920     else
1921     {
1922       GNUNET_STATISTICS_update (GDS_stats,
1923                                 gettext_noop (
1924                                   "# FIND PEER requests ignored due to Bloomfilter"),
1925                                 1,
1926                                 GNUNET_NO);
1927     }
1928   }
1929   else
1930   {
1931     GNUNET_STATISTICS_update (GDS_stats,
1932                               gettext_noop (
1933                                 "# FIND PEER requests ignored due to lack of HELLO"),
1934                               1,
1935                               GNUNET_NO);
1936   }
1937
1938   /* then, also consider sending a random HELLO from the closest bucket */
1939   if (0 == memcmp (&my_identity_hash,
1940                    key,
1941                    sizeof(struct GNUNET_HashCode)))
1942     bucket_idx = closest_bucket;
1943   else
1944     bucket_idx = GNUNET_MIN ((int) closest_bucket,
1945                              find_bucket (key));
1946   if (bucket_idx < 0)
1947     return;
1948   bucket = &k_buckets[bucket_idx];
1949   if (bucket->peers_size == 0)
1950     return;
1951   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1952                                      bucket->peers_size);
1953   peer = bucket->head;
1954   while (choice > 0)
1955   {
1956     GNUNET_assert (NULL != peer);
1957     peer = peer->next;
1958     choice--;
1959   }
1960   choice = bucket->peers_size;
1961   do
1962   {
1963     peer = peer->next;
1964     if (0 == choice--)
1965       return;                   /* no non-masked peer available */
1966     if (NULL == peer)
1967       peer = bucket->head;
1968     hello = GDS_HELLO_get (peer->id);
1969   }
1970   while ((NULL == hello) ||
1971          (GNUNET_BLOCK_EVALUATION_OK_MORE !=
1972           GNUNET_BLOCK_evaluate (GDS_block_context,
1973                                  GNUNET_BLOCK_TYPE_DHT_HELLO,
1974                                  bg,
1975                                  GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1976                                  &peer->phash,
1977                                  NULL, 0,
1978                                  hello,
1979                                  (hello_size = GNUNET_HELLO_size (hello)))));
1980   GDS_NEIGHBOURS_handle_reply (sender,
1981                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1982                                GNUNET_TIME_relative_to_absolute
1983                                  (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1984                                key,
1985                                0,
1986                                NULL,
1987                                0,
1988                                NULL,
1989                                hello,
1990                                hello_size);
1991 }
1992
1993
1994 /**
1995  * Handle a result from local datacache for a GET operation.
1996  *
1997  * @param cls the `struct PeerInfo` for which this is a reply
1998  * @param type type of the block
1999  * @param expiration_time when does the content expire
2000  * @param key key for the content
2001  * @param put_path_length number of entries in @a put_path
2002  * @param put_path peers the original PUT traversed (if tracked)
2003  * @param get_path_length number of entries in @a get_path
2004  * @param get_path peers this reply has traversed so far (if tracked)
2005  * @param data payload of the reply
2006  * @param data_size number of bytes in @a data
2007  */
2008 static void
2009 handle_local_result (void *cls,
2010                      enum GNUNET_BLOCK_Type type,
2011                      struct GNUNET_TIME_Absolute expiration_time,
2012                      const struct GNUNET_HashCode *key,
2013                      unsigned int put_path_length,
2014                      const struct GNUNET_PeerIdentity *put_path,
2015                      unsigned int get_path_length,
2016                      const struct GNUNET_PeerIdentity *get_path,
2017                      const void *data,
2018                      size_t data_size)
2019 {
2020   struct PeerInfo *peer = cls;
2021   char *pp;
2022
2023   pp = GNUNET_STRINGS_pp2s (put_path,
2024                             put_path_length);
2025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2026               "Found local result for %s (PP: %s)\n",
2027               GNUNET_h2s (key),
2028               pp);
2029   GNUNET_free (pp);
2030   GDS_NEIGHBOURS_handle_reply (peer->id,
2031                                type,
2032                                expiration_time,
2033                                key,
2034                                put_path_length, put_path,
2035                                get_path_length, get_path,
2036                                data, data_size);
2037 }
2038
2039
2040 /**
2041  * Check validity of p2p get request.
2042  *
2043  * @param cls closure with the `struct PeerInfo` of the sender
2044  * @param get the message
2045  * @return #GNUNET_OK if the message is well-formed
2046  */
2047 static int
2048 check_dht_p2p_get (void *cls,
2049                    const struct PeerGetMessage *get)
2050 {
2051   uint32_t xquery_size;
2052   uint16_t msize;
2053
2054   (void) cls;
2055   msize = ntohs (get->header.size);
2056   xquery_size = ntohl (get->xquery_size);
2057   if (msize < sizeof(struct PeerGetMessage) + xquery_size)
2058   {
2059     GNUNET_break_op (0);
2060     return GNUNET_SYSERR;
2061   }
2062   return GNUNET_OK;
2063 }
2064
2065
2066 /**
2067  * Core handler for p2p get requests.
2068  *
2069  * @param cls closure with the `struct PeerInfo` of the sender
2070  * @param get the message
2071  */
2072 static void
2073 handle_dht_p2p_get (void *cls,
2074                     const struct PeerGetMessage *get)
2075 {
2076   struct PeerInfo *peer = cls;
2077   uint32_t xquery_size;
2078   size_t reply_bf_size;
2079   uint16_t msize;
2080   enum GNUNET_BLOCK_Type type;
2081   enum GNUNET_DHT_RouteOption options;
2082   enum GNUNET_BLOCK_EvaluationResult eval;
2083   struct GNUNET_BLOCK_Group *bg;
2084   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2085   const char *xquery;
2086   int forwarded;
2087
2088   /* parse and validate message */
2089   msize = ntohs (get->header.size);
2090   xquery_size = ntohl (get->xquery_size);
2091   reply_bf_size = msize - (sizeof(struct PeerGetMessage) + xquery_size);
2092   type = ntohl (get->type);
2093   options = ntohl (get->options);
2094   xquery = (const char *) &get[1];
2095   GNUNET_STATISTICS_update (GDS_stats,
2096                             gettext_noop ("# P2P GET requests received"),
2097                             1,
2098                             GNUNET_NO);
2099   GNUNET_STATISTICS_update (GDS_stats,
2100                             gettext_noop ("# P2P GET bytes received"),
2101                             msize,
2102                             GNUNET_NO);
2103   if (GNUNET_YES == log_route_details_stderr)
2104   {
2105     char *tmp;
2106
2107     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2108     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2109                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2110                  GNUNET_h2s (&get->key),
2111                  GNUNET_i2s (peer->id),
2112                  tmp,
2113                  ntohl (get->hop_count),
2114                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2115                                                    &get->key),
2116                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2117                                                    &get->key),
2118                  ntohl (get->xquery_size),
2119                  xquery);
2120     GNUNET_free (tmp);
2121   }
2122   eval
2123     = GNUNET_BLOCK_evaluate (GDS_block_context,
2124                              type,
2125                              NULL,
2126                              GNUNET_BLOCK_EO_NONE,
2127                              &get->key,
2128                              xquery,
2129                              xquery_size,
2130                              NULL,
2131                              0);
2132   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2133   {
2134     /* request invalid or block type not supported */
2135     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2136     return;
2137   }
2138   peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2139                                                DHT_BLOOM_SIZE,
2140                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
2141   GNUNET_break_op (GNUNET_YES ==
2142                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2143                                                       &peer->phash));
2144   bg = GNUNET_BLOCK_group_create (GDS_block_context,
2145                                   type,
2146                                   get->bf_mutator,
2147                                   &xquery[xquery_size],
2148                                   reply_bf_size,
2149                                   "filter-size",
2150                                   reply_bf_size,
2151                                   NULL);
2152   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2153               "GET for %s at %s after %u hops\n",
2154               GNUNET_h2s (&get->key),
2155               GNUNET_i2s (&my_identity),
2156               (unsigned int) ntohl (get->hop_count));
2157   /* local lookup (this may update the reply_bf) */
2158   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2159       (GDS_am_closest_peer (&get->key,
2160                             peer_bf)))
2161   {
2162     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2163     {
2164       GNUNET_STATISTICS_update (GDS_stats,
2165                                 gettext_noop (
2166                                   "# P2P FIND PEER requests processed"),
2167                                 1,
2168                                 GNUNET_NO);
2169       handle_find_peer (peer->id,
2170                         &get->key,
2171                         bg);
2172     }
2173     else
2174     {
2175       eval = GDS_DATACACHE_handle_get (&get->key,
2176                                        type,
2177                                        xquery,
2178                                        xquery_size,
2179                                        bg,
2180                                        &handle_local_result,
2181                                        peer);
2182     }
2183   }
2184   else
2185   {
2186     GNUNET_STATISTICS_update (GDS_stats,
2187                               gettext_noop ("# P2P GET requests ONLY routed"),
2188                               1,
2189                               GNUNET_NO);
2190   }
2191
2192   /* remember request for routing replies */
2193   GDS_ROUTING_add (peer->id,
2194                    type,
2195                    bg,      /* bg now owned by routing, but valid at least until end of this function! */
2196                    options,
2197                    &get->key,
2198                    xquery,
2199                    xquery_size);
2200
2201   /* P2P forwarding */
2202   forwarded = GNUNET_NO;
2203   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2204     forwarded = GDS_NEIGHBOURS_handle_get (type,
2205                                            options,
2206                                            ntohl (
2207                                              get->desired_replication_level),
2208                                            ntohl (get->hop_count),
2209                                            &get->key,
2210                                            xquery,
2211                                            xquery_size,
2212                                            bg,
2213                                            peer_bf);
2214   GDS_CLIENTS_process_get (options
2215                            | (GNUNET_OK == forwarded)
2216                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2217                            type,
2218                            ntohl (get->hop_count),
2219                            ntohl (get->desired_replication_level),
2220                            0,
2221                            NULL,
2222                            &get->key);
2223
2224   /* clean up; note that 'bg' is owned by routing now! */
2225   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2226 }
2227
2228
2229 /**
2230  * Check validity of p2p result message.
2231  *
2232  * @param cls closure
2233  * @param message message
2234  * @return #GNUNET_YES if the message is well-formed
2235  */
2236 static int
2237 check_dht_p2p_result (void *cls,
2238                       const struct PeerResultMessage *prm)
2239 {
2240   uint32_t get_path_length;
2241   uint32_t put_path_length;
2242   uint16_t msize;
2243
2244   (void) cls;
2245   msize = ntohs (prm->header.size);
2246   put_path_length = ntohl (prm->put_path_length);
2247   get_path_length = ntohl (prm->get_path_length);
2248   if ((msize <
2249        sizeof(struct PeerResultMessage) + (get_path_length
2250                                            + put_path_length)
2251        * sizeof(struct GNUNET_PeerIdentity)) ||
2252       (get_path_length >
2253        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
2254       (put_path_length >
2255        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
2256   {
2257     GNUNET_break_op (0);
2258     return GNUNET_SYSERR;
2259   }
2260   return GNUNET_OK;
2261 }
2262
2263
2264 /**
2265  * Process a reply, after the @a get_path has been updated.
2266  *
2267  * @param expiration_time when does the reply expire
2268  * @param key key matching the query
2269  * @param get_path_length number of entries in @a get_path
2270  * @param get_path path the reply has taken
2271  * @param put_path_length number of entries in @a put_path
2272  * @param put_path path the PUT has taken
2273  * @param type type of the block
2274  * @param data_size number of bytes in @a data
2275  * @param data payload of the reply
2276  */
2277 static void
2278 process_reply_with_path (struct GNUNET_TIME_Absolute expiration_time,
2279                          const struct GNUNET_HashCode *key,
2280                          unsigned int get_path_length,
2281                          const struct GNUNET_PeerIdentity *get_path,
2282                          unsigned int put_path_length,
2283                          const struct GNUNET_PeerIdentity *put_path,
2284                          enum GNUNET_BLOCK_Type type,
2285                          size_t data_size,
2286                          const void *data)
2287 {
2288   /* forward to local clients */
2289   GDS_CLIENTS_handle_reply (expiration_time,
2290                             key,
2291                             get_path_length,
2292                             get_path,
2293                             put_path_length,
2294                             put_path,
2295                             type,
2296                             data_size,
2297                             data);
2298   GDS_CLIENTS_process_get_resp (type,
2299                                 get_path,
2300                                 get_path_length,
2301                                 put_path,
2302                                 put_path_length,
2303                                 expiration_time,
2304                                 key,
2305                                 data,
2306                                 data_size);
2307   if (GNUNET_YES == cache_results)
2308   {
2309     struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2310
2311     GNUNET_memcpy (xput_path,
2312                    put_path,
2313                    put_path_length * sizeof(struct GNUNET_PeerIdentity));
2314     GNUNET_memcpy (&xput_path[put_path_length],
2315                    get_path,
2316                    get_path_length * sizeof(struct GNUNET_PeerIdentity));
2317
2318     GDS_DATACACHE_handle_put (expiration_time,
2319                               key,
2320                               get_path_length + put_path_length,
2321                               xput_path,
2322                               type,
2323                               data_size,
2324                               data);
2325   }
2326   /* forward to other peers */
2327   GDS_ROUTING_process (type,
2328                        expiration_time,
2329                        key,
2330                        put_path_length,
2331                        put_path,
2332                        get_path_length,
2333                        get_path,
2334                        data,
2335                        data_size);
2336 }
2337
2338
2339 /**
2340  * Core handler for p2p result messages.
2341  *
2342  * @param cls closure
2343  * @param message message
2344  */
2345 static void
2346 handle_dht_p2p_result (void *cls,
2347                        const struct PeerResultMessage *prm)
2348 {
2349   struct PeerInfo *peer = cls;
2350   const struct GNUNET_PeerIdentity *put_path;
2351   const struct GNUNET_PeerIdentity *get_path;
2352   const void *data;
2353   uint32_t get_path_length;
2354   uint32_t put_path_length;
2355   uint16_t msize;
2356   size_t data_size;
2357   enum GNUNET_BLOCK_Type type;
2358   struct GNUNET_TIME_Absolute exp_time;
2359
2360   /* parse and validate message */
2361   exp_time = GNUNET_TIME_absolute_ntoh (prm->expiration_time);
2362   if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us)
2363   {
2364     GNUNET_STATISTICS_update (GDS_stats,
2365                               gettext_noop ("# Expired results discarded"),
2366                               1,
2367                               GNUNET_NO);
2368     return;
2369   }
2370   msize = ntohs (prm->header.size);
2371   put_path_length = ntohl (prm->put_path_length);
2372   get_path_length = ntohl (prm->get_path_length);
2373   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2374   get_path = &put_path[put_path_length];
2375   type = ntohl (prm->type);
2376   data = (const void *) &get_path[get_path_length];
2377   data_size = msize - (sizeof(struct PeerResultMessage)
2378                        + (get_path_length
2379                           + put_path_length) * sizeof(struct
2380                                                       GNUNET_PeerIdentity));
2381   GNUNET_STATISTICS_update (GDS_stats,
2382                             gettext_noop ("# P2P RESULTS received"),
2383                             1,
2384                             GNUNET_NO);
2385   GNUNET_STATISTICS_update (GDS_stats,
2386                             gettext_noop ("# P2P RESULT bytes received"),
2387                             msize,
2388                             GNUNET_NO);
2389   if (GNUNET_YES == log_route_details_stderr)
2390   {
2391     char *tmp;
2392     char *pp;
2393     char *gp;
2394
2395     gp = GNUNET_STRINGS_pp2s (get_path,
2396                               get_path_length);
2397     pp = GNUNET_STRINGS_pp2s (put_path,
2398                               put_path_length);
2399     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2400     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2401                  "R5N RESULT %s: %s->%s (GP: %s, PP: %s)\n",
2402                  GNUNET_h2s (&prm->key),
2403                  GNUNET_i2s (peer->id),
2404                  tmp,
2405                  gp,
2406                  pp);
2407     GNUNET_free (gp);
2408     GNUNET_free (pp);
2409     GNUNET_free (tmp);
2410   }
2411   /* if we got a HELLO, consider it for our own routing table */
2412   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2413   {
2414     const struct GNUNET_MessageHeader *h;
2415     struct GNUNET_PeerIdentity pid;
2416
2417     /* Should be a HELLO, validate and consider using it! */
2418     if (data_size < sizeof(struct GNUNET_HELLO_Message))
2419     {
2420       GNUNET_break_op (0);
2421       return;
2422     }
2423     h = data;
2424     if (data_size != ntohs (h->size))
2425     {
2426       GNUNET_break_op (0);
2427       return;
2428     }
2429     if (GNUNET_OK !=
2430         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2431                              &pid))
2432     {
2433       GNUNET_break_op (0);
2434       return;
2435     }
2436     if ((GNUNET_YES != disable_try_connect) &&
2437         (0 != memcmp (&my_identity,
2438                       &pid,
2439                       sizeof(struct GNUNET_PeerIdentity))))
2440       try_connect (&pid,
2441                    h);
2442   }
2443
2444   /* First, check if 'peer' is already on the path, and if
2445      so, truncate it instead of expanding. */
2446   for (unsigned int i = 0; i <= get_path_length; i++)
2447     if (0 == memcmp (&get_path[i],
2448                      peer->id,
2449                      sizeof(struct GNUNET_PeerIdentity)))
2450     {
2451       process_reply_with_path (exp_time,
2452                                &prm->key,
2453                                i,
2454                                get_path,
2455                                put_path_length,
2456                                put_path,
2457                                type,
2458                                data_size,
2459                                data);
2460       return;
2461     }
2462
2463   /* Need to append 'peer' to 'get_path' (normal case) */
2464   {
2465     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2466
2467     GNUNET_memcpy (xget_path,
2468                    get_path,
2469                    get_path_length * sizeof(struct GNUNET_PeerIdentity));
2470     xget_path[get_path_length] = *peer->id;
2471
2472     process_reply_with_path (exp_time,
2473                              &prm->key,
2474                              get_path_length + 1,
2475                              xget_path,
2476                              put_path_length,
2477                              put_path,
2478                              type,
2479                              data_size,
2480                              data);
2481   }
2482 }
2483
2484
2485 /**
2486  * Initialize neighbours subsystem.
2487  *
2488  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2489  */
2490 int
2491 GDS_NEIGHBOURS_init ()
2492 {
2493   struct GNUNET_MQ_MessageHandler core_handlers[] = {
2494     GNUNET_MQ_hd_var_size (dht_p2p_get,
2495                            GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2496                            struct PeerGetMessage,
2497                            NULL),
2498     GNUNET_MQ_hd_var_size (dht_p2p_put,
2499                            GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2500                            struct PeerPutMessage,
2501                            NULL),
2502     GNUNET_MQ_hd_var_size (dht_p2p_result,
2503                            GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2504                            struct PeerResultMessage,
2505                            NULL),
2506     GNUNET_MQ_handler_end ()
2507   };
2508   unsigned long long temp_config_num;
2509
2510   disable_try_connect
2511     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2512                                             "DHT",
2513                                             "DISABLE_TRY_CONNECT");
2514   if (GNUNET_OK ==
2515       GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2516                                              "DHT",
2517                                              "bucket_size",
2518                                              &temp_config_num))
2519     bucket_size = (unsigned int) temp_config_num;
2520   cache_results
2521     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2522                                             "DHT",
2523                                             "CACHE_RESULTS");
2524
2525   log_route_details_stderr =
2526     (NULL != getenv ("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2527   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2528   core_api = GNUNET_CORE_connect (GDS_cfg,
2529                                   NULL,
2530                                   &core_init,
2531                                   &handle_core_connect,
2532                                   &handle_core_disconnect,
2533                                   core_handlers);
2534   if (NULL == core_api)
2535     return GNUNET_SYSERR;
2536   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2537                                                               GNUNET_YES);
2538   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2539                                                             GNUNET_NO);
2540   return GNUNET_OK;
2541 }
2542
2543
2544 /**
2545  * Shutdown neighbours subsystem.
2546  */
2547 void
2548 GDS_NEIGHBOURS_done ()
2549 {
2550   if (NULL == core_api)
2551     return;
2552   GNUNET_CORE_disconnect (core_api);
2553   core_api = NULL;
2554   GNUNET_assert (0 ==
2555                  GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2556   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2557   all_connected_peers = NULL;
2558   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2559                                          &free_connect_info,
2560                                          NULL);
2561   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2562   all_desired_peers = NULL;
2563   GNUNET_ATS_connectivity_done (ats_ch);
2564   ats_ch = NULL;
2565   GNUNET_assert (NULL == find_peer_task);
2566 }
2567
2568
2569 /**
2570  * Get the ID of the local node.
2571  *
2572  * @return identity of the local node
2573  */
2574 struct GNUNET_PeerIdentity *
2575 GDS_NEIGHBOURS_get_id ()
2576 {
2577   return &my_identity;
2578 }
2579
2580
2581 /* end of gnunet-service-dht_neighbours.c */