More API function tests...
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_block_group_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_ats_service.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_hello.h"
45 #include "gnunet-service-dht_neighbours.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include "dht.h"
49
50 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
51
52 /**
53  * Enable slow sanity checks to debug issues.
54  */
55 #define SANITY_CHECKS 1
56
57 /**
58  * How many buckets will we allow total.
59  */
60 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
61
62 /**
63  * What is the maximum number of peers in a given bucket.
64  */
65 #define DEFAULT_BUCKET_SIZE 8
66
67 /**
68  * Desired replication level for FIND PEER requests
69  */
70 #define FIND_PEER_REPLICATION_LEVEL 4
71
72 /**
73  * Maximum allowed replication level for all requests.
74  */
75 #define MAXIMUM_REPLICATION_LEVEL 16
76
77 /**
78  * Maximum allowed number of pending messages per peer.
79  */
80 #define MAXIMUM_PENDING_PER_PEER 64
81
82 /**
83  * How long at least to wait before sending another find peer request.
84  */
85 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
86
87 /**
88  * How long at most to wait before sending another find peer request.
89  */
90 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
91
92 /**
93  * How long at most to wait for transmission of a GET request to another peer?
94  */
95 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
96
97 /**
98  * Hello address expiration
99  */
100 extern struct GNUNET_TIME_Relative hello_expiration;
101
102
103 GNUNET_NETWORK_STRUCT_BEGIN
104
105 /**
106  * P2P PUT message
107  */
108 struct PeerPutMessage
109 {
110   /**
111    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
112    */
113   struct GNUNET_MessageHeader header;
114
115   /**
116    * Processing options
117    */
118   uint32_t options GNUNET_PACKED;
119
120   /**
121    * Content type.
122    */
123   uint32_t type GNUNET_PACKED;
124
125   /**
126    * Hop count
127    */
128   uint32_t hop_count GNUNET_PACKED;
129
130   /**
131    * Replication level for this message
132    */
133   uint32_t desired_replication_level GNUNET_PACKED;
134
135   /**
136    * Length of the PUT path that follows (if tracked).
137    */
138   uint32_t put_path_length GNUNET_PACKED;
139
140   /**
141    * When does the content expire?
142    */
143   struct GNUNET_TIME_AbsoluteNBO expiration_time;
144
145   /**
146    * Bloomfilter (for peer identities) to stop circular routes
147    */
148   char bloomfilter[DHT_BLOOM_SIZE];
149
150   /**
151    * The key we are storing under.
152    */
153   struct GNUNET_HashCode key;
154
155   /* put path (if tracked) */
156
157   /* Payload */
158
159 };
160
161
162 /**
163  * P2P Result message
164  */
165 struct PeerResultMessage
166 {
167   /**
168    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
169    */
170   struct GNUNET_MessageHeader header;
171
172   /**
173    * Content type.
174    */
175   uint32_t type GNUNET_PACKED;
176
177   /**
178    * Length of the PUT path that follows (if tracked).
179    */
180   uint32_t put_path_length GNUNET_PACKED;
181
182   /**
183    * Length of the GET path that follows (if tracked).
184    */
185   uint32_t get_path_length GNUNET_PACKED;
186
187   /**
188    * When does the content expire?
189    */
190   struct GNUNET_TIME_AbsoluteNBO expiration_time;
191
192   /**
193    * The key of the corresponding GET request.
194    */
195   struct GNUNET_HashCode key;
196
197   /* put path (if tracked) */
198
199   /* get path (if tracked) */
200
201   /* Payload */
202
203 };
204
205
206 /**
207  * P2P GET message
208  */
209 struct PeerGetMessage
210 {
211   /**
212    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
213    */
214   struct GNUNET_MessageHeader header;
215
216   /**
217    * Processing options
218    */
219   uint32_t options GNUNET_PACKED;
220
221   /**
222    * Desired content type.
223    */
224   uint32_t type GNUNET_PACKED;
225
226   /**
227    * Hop count
228    */
229   uint32_t hop_count GNUNET_PACKED;
230
231   /**
232    * Desired replication level for this request.
233    */
234   uint32_t desired_replication_level GNUNET_PACKED;
235
236   /**
237    * Size of the extended query.
238    */
239   uint32_t xquery_size;
240
241   /**
242    * Bloomfilter mutator.
243    */
244   uint32_t bf_mutator;
245
246   /**
247    * Bloomfilter (for peer identities) to stop circular routes
248    */
249   char bloomfilter[DHT_BLOOM_SIZE];
250
251   /**
252    * The key we are looking for.
253    */
254   struct GNUNET_HashCode key;
255
256   /* xquery */
257
258   /* result bloomfilter */
259
260 };
261 GNUNET_NETWORK_STRUCT_END
262
263
264 /**
265  * Entry for a peer in a bucket.
266  */
267 struct PeerInfo
268 {
269   /**
270    * Next peer entry (DLL)
271    */
272   struct PeerInfo *next;
273
274   /**
275    *  Prev peer entry (DLL)
276    */
277   struct PeerInfo *prev;
278
279   /**
280    * Handle for sending messages to this peer.
281    */
282   struct GNUNET_MQ_Handle *mq;
283
284   /**
285    * What is the identity of the peer?
286    */
287   const struct GNUNET_PeerIdentity *id;
288
289   /**
290    * Hash of @e id.
291    */
292   struct GNUNET_HashCode phash;
293
294   /**
295    * Which bucket is this peer in?
296    */
297   int peer_bucket;
298
299 };
300
301
302 /**
303  * Peers are grouped into buckets.
304  */
305 struct PeerBucket
306 {
307   /**
308    * Head of DLL
309    */
310   struct PeerInfo *head;
311
312   /**
313    * Tail of DLL
314    */
315   struct PeerInfo *tail;
316
317   /**
318    * Number of peers in the bucket.
319    */
320   unsigned int peers_size;
321 };
322
323
324 /**
325  * Information about a peer that we would like to connect to.
326  */
327 struct ConnectInfo
328 {
329
330   /**
331    * Handle to active HELLO offer operation, or NULL.
332    */
333   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
334
335   /**
336    * Handle to active connectivity suggestion operation, or NULL.
337    */
338   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
339
340   /**
341    * How much would we like to connect to this peer?
342    */
343   uint32_t strength;
344 };
345
346
347 /**
348  * Do we cache all results that we are routing in the local datacache?
349  */
350 static int cache_results;
351
352 /**
353  * Should routing details be logged to stderr (for debugging)?
354  */
355 static int log_route_details_stderr;
356
357 /**
358  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
359  */
360 static unsigned int closest_bucket;
361
362 /**
363  * How many peers have we added since we sent out our last
364  * find peer request?
365  */
366 static unsigned int newly_found_peers;
367
368 /**
369  * Option for testing that disables the 'connect' function of the DHT.
370  */
371 static int disable_try_connect;
372
373 /**
374  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
375  */
376 static struct PeerBucket k_buckets[MAX_BUCKETS];
377
378 /**
379  * Hash map of all CORE-connected peers, for easy removal from
380  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
381  */
382 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
383
384 /**
385  * Hash map of all peers we would like to be connected to.
386  * Values are of type `struct ConnectInfo`.
387  */
388 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
389
390 /**
391  * Maximum size for each bucket.
392  */
393 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
394
395 /**
396  * Task that sends FIND PEER requests.
397  */
398 static struct GNUNET_SCHEDULER_Task *find_peer_task;
399
400 /**
401  * Identity of this peer.
402  */
403 static struct GNUNET_PeerIdentity my_identity;
404
405 /**
406  * Hash of the identity of this peer.
407  */
408 static struct GNUNET_HashCode my_identity_hash;
409
410 /**
411  * Handle to CORE.
412  */
413 static struct GNUNET_CORE_Handle *core_api;
414
415 /**
416  * Handle to ATS connectivity.
417  */
418 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
419
420
421 /**
422  * Find the optimal bucket for this key.
423  *
424  * @param hc the hashcode to compare our identity to
425  * @return the proper bucket index, or GNUNET_SYSERR
426  *         on error (same hashcode)
427  */
428 static int
429 find_bucket (const struct GNUNET_HashCode *hc)
430 {
431   unsigned int bits;
432
433   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
434   if (bits == MAX_BUCKETS)
435   {
436     /* How can all bits match? Got my own ID? */
437     GNUNET_break (0);
438     return GNUNET_SYSERR;
439   }
440   return MAX_BUCKETS - bits - 1;
441 }
442
443
444 /**
445  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
446  * Clean up the "oh" field in the @a cls
447  *
448  * @param cls a `struct ConnectInfo`
449  */
450 static void
451 offer_hello_done (void *cls)
452 {
453   struct ConnectInfo *ci = cls;
454
455   ci->oh = NULL;
456 }
457
458
459 /**
460  * Function called for all entries in #all_desired_peers to clean up.
461  *
462  * @param cls NULL
463  * @param peer peer the entry is for
464  * @param value the value to remove
465  * @return #GNUNET_YES
466  */
467 static int
468 free_connect_info (void *cls,
469                    const struct GNUNET_PeerIdentity *peer,
470                    void *value)
471 {
472   struct ConnectInfo *ci = value;
473
474   GNUNET_assert (GNUNET_YES ==
475                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
476                                                        peer,
477                                                        ci));
478   if (NULL != ci->sh)
479   {
480     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
481     ci->sh = NULL;
482   }
483   if (NULL != ci->oh)
484   {
485     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
486     ci->oh = NULL;
487   }
488   GNUNET_free (ci);
489   return GNUNET_YES;
490 }
491
492
493 /**
494  * Consider if we want to connect to a given peer, and if so
495  * let ATS know.  If applicable, the HELLO is offered to the
496  * TRANSPORT service.
497  *
498  * @param pid peer to consider connectivity requirements for
499  * @param h a HELLO message, or NULL
500  */
501 static void
502 try_connect (const struct GNUNET_PeerIdentity *pid,
503              const struct GNUNET_MessageHeader *h)
504 {
505   int bucket;
506   struct GNUNET_HashCode pid_hash;
507   struct ConnectInfo *ci;
508   uint32_t strength;
509
510   GNUNET_CRYPTO_hash (pid,
511                       sizeof (struct GNUNET_PeerIdentity),
512                       &pid_hash);
513   bucket = find_bucket (&pid_hash);
514   if (bucket < 0)
515     return; /* self? */
516   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
517                                           pid);
518
519   if (k_buckets[bucket].peers_size < bucket_size)
520     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
521   else
522     strength = bucket; /* minimum value of connectivity */
523   if (GNUNET_YES ==
524       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
525                                               pid))
526     strength *= 2; /* double for connected peers */
527   else if (k_buckets[bucket].peers_size > bucket_size)
528     strength = 0; /* bucket full, we really do not care about more */
529
530   if ( (0 == strength) &&
531        (NULL != ci) )
532   {
533     /* release request */
534     GNUNET_assert (GNUNET_YES ==
535                    free_connect_info (NULL,
536                                       pid,
537                                       ci));
538     return;
539   }
540   if (NULL == ci)
541   {
542     ci = GNUNET_new (struct ConnectInfo);
543     GNUNET_assert (GNUNET_OK ==
544                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
545                                                       pid,
546                                                       ci,
547                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
548   }
549   if ( (NULL != ci->oh) &&
550        (NULL != h) )
551     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
552   if (NULL != h)
553     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
554                                            h,
555                                            &offer_hello_done,
556                                            ci);
557   if ( (NULL != ci->sh) &&
558        (ci->strength != strength) )
559     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
560   if (ci->strength != strength)
561     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
562                                               pid,
563                                               strength);
564   ci->strength = strength;
565 }
566
567
568 /**
569  * Function called for each peer in #all_desired_peers during
570  * #update_connect_preferences() if we have reason to adjust
571  * the strength of our desire to keep connections to certain
572  * peers.  Calls #try_connect() to update the calculations for
573  * the given @a pid.
574  *
575  * @param cls NULL
576  * @param pid peer to update
577  * @param value unused
578  * @return #GNUNET_YES (continue to iterate)
579  */
580 static int
581 update_desire_strength (void *cls,
582                         const struct GNUNET_PeerIdentity *pid,
583                         void *value)
584 {
585   try_connect (pid, NULL);
586   return GNUNET_YES;
587 }
588
589
590 /**
591  * Update our preferences for connectivity as given to ATS.
592  *
593  * @param cls the `struct PeerInfo` of the peer
594  * @param tc scheduler context.
595  */
596 static void
597 update_connect_preferences ()
598 {
599   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
600                                          &update_desire_strength,
601                                          NULL);
602 }
603
604
605 /**
606  * Add each of the peers we already know to the bloom filter of
607  * the request so that we don't get duplicate HELLOs.
608  *
609  * @param cls the `struct GNUNET_BLOCK_Group`
610  * @param key peer identity to add to the bloom filter
611  * @param value value the peer information (unused)
612  * @return #GNUNET_YES (we should continue to iterate)
613  */
614 static int
615 add_known_to_bloom (void *cls,
616                     const struct GNUNET_PeerIdentity *key,
617                     void *value)
618 {
619   struct GNUNET_BLOCK_Group *bg = cls;
620   struct GNUNET_HashCode key_hash;
621
622   GNUNET_CRYPTO_hash (key,
623                       sizeof (struct GNUNET_PeerIdentity),
624                       &key_hash);
625   GNUNET_BLOCK_GROUP_bf_test_and_set (bg,
626                                       &key_hash);
627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628               "Adding known peer (%s) to bloomfilter for FIND PEER\n",
629               GNUNET_i2s (key));
630   return GNUNET_YES;
631 }
632
633
634 /**
635  * Task to send a find peer message for our own peer identifier
636  * so that we can find the closest peers in the network to ourselves
637  * and attempt to connect to them.
638  *
639  * @param cls closure for this task
640  */
641 static void
642 send_find_peer_message (void *cls)
643 {
644   struct GNUNET_TIME_Relative next_send_time;
645   struct GNUNET_BLOCK_Group *bg;
646   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
647
648   find_peer_task = NULL;
649   if (newly_found_peers > bucket_size)
650   {
651     /* If we are finding many peers already, no need to send out our request right now! */
652     find_peer_task =
653         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
654                                       &send_find_peer_message,
655                                       NULL);
656     newly_found_peers = 0;
657     return;
658   }
659   bg = GNUNET_BLOCK_GROUP_bf_create (NULL,
660                                      DHT_BLOOM_SIZE,
661                                      GNUNET_CONSTANTS_BLOOMFILTER_K,
662                                      GNUNET_BLOCK_TYPE_DHT_HELLO,
663                                      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
664                                                                UINT32_MAX),
665                                      NULL,
666                                      0);
667   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
668                                          &add_known_to_bloom,
669                                          bg);
670   GNUNET_STATISTICS_update (GDS_stats,
671                             gettext_noop ("# FIND PEER messages initiated"),
672                             1,
673                             GNUNET_NO);
674   peer_bf
675     = GNUNET_CONTAINER_bloomfilter_init (NULL,
676                                          DHT_BLOOM_SIZE,
677                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
678   // FIXME: pass priority!?
679   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
680                              GNUNET_DHT_RO_FIND_PEER,
681                              FIND_PEER_REPLICATION_LEVEL,
682                              0,
683                              &my_identity_hash,
684                              NULL,
685                              0,
686                              bg,
687                              peer_bf);
688   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
689   GNUNET_BLOCK_group_destroy (bg);
690   /* schedule next round */
691   next_send_time.rel_value_us =
692       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
693       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
694                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
695                                 (newly_found_peers + 1));
696   newly_found_peers = 0;
697   GNUNET_assert (NULL == find_peer_task);
698   find_peer_task =
699       GNUNET_SCHEDULER_add_delayed (next_send_time,
700                                     &send_find_peer_message,
701                                     NULL);
702 }
703
704
705 /**
706  * Method called whenever a peer connects.
707  *
708  * @param cls closure
709  * @param peer peer identity this notification is about
710  * @param mq message queue for sending messages to @a peer
711  * @return our `struct PeerInfo` for @a peer
712  */
713 static void *
714 handle_core_connect (void *cls,
715                      const struct GNUNET_PeerIdentity *peer,
716                      struct GNUNET_MQ_Handle *mq)
717 {
718   struct PeerInfo *pi;
719
720   /* Check for connect to self message */
721   if (0 == memcmp (&my_identity,
722                    peer,
723                    sizeof (struct GNUNET_PeerIdentity)))
724     return NULL;
725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726               "Connected to %s\n",
727               GNUNET_i2s (peer));
728   GNUNET_assert (GNUNET_NO ==
729                  GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
730                                                     peer));
731   GNUNET_STATISTICS_update (GDS_stats,
732                             gettext_noop ("# peers connected"),
733                             1,
734                             GNUNET_NO);
735   pi = GNUNET_new (struct PeerInfo);
736   pi->id = peer;
737   pi->mq = mq;
738   GNUNET_CRYPTO_hash (peer,
739                       sizeof (struct GNUNET_PeerIdentity),
740                       &pi->phash);
741   pi->peer_bucket = find_bucket (&pi->phash);
742   GNUNET_assert ( (pi->peer_bucket >= 0) &&
743                   (pi->peer_bucket < MAX_BUCKETS) );
744   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
745                                     k_buckets[pi->peer_bucket].tail,
746                                     pi);
747   k_buckets[pi->peer_bucket].peers_size++;
748   closest_bucket = GNUNET_MAX (closest_bucket,
749                                pi->peer_bucket);
750   GNUNET_assert (GNUNET_OK ==
751                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
752                                                     pi->id,
753                                                     pi,
754                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
755   if ( (pi->peer_bucket > 0) &&
756        (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
757   {
758     update_connect_preferences ();
759     newly_found_peers++;
760   }
761   if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
762        (GNUNET_YES != disable_try_connect))
763   {
764     /* got a first connection, good time to start with FIND PEER requests... */
765     GNUNET_assert (NULL == find_peer_task);
766     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
767                                                NULL);
768   }
769   return pi;
770 }
771
772
773 /**
774  * Method called whenever a peer disconnects.
775  *
776  * @param cls closure
777  * @param peer peer identity this notification is about
778  * @param internal_cls our `struct PeerInfo` for @a peer
779  */
780 static void
781 handle_core_disconnect (void *cls,
782                         const struct GNUNET_PeerIdentity *peer,
783                         void *internal_cls)
784 {
785   struct PeerInfo *to_remove = internal_cls;
786
787   /* Check for disconnect from self message */
788   if (NULL == to_remove)
789     return;
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "Disconnected %s\n",
792               GNUNET_i2s (peer));
793   GNUNET_STATISTICS_update (GDS_stats,
794                             gettext_noop ("# peers connected"),
795                             -1,
796                             GNUNET_NO);
797   GNUNET_assert (GNUNET_YES ==
798                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
799                                                        peer,
800                                                        to_remove));
801   if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
802        (GNUNET_YES != disable_try_connect) )
803   {
804     GNUNET_SCHEDULER_cancel (find_peer_task);
805     find_peer_task = NULL;
806   }
807   GNUNET_assert (to_remove->peer_bucket >= 0);
808   GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
809                                k_buckets[to_remove->peer_bucket].tail,
810                                to_remove);
811   GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
812   k_buckets[to_remove->peer_bucket].peers_size--;
813   while ( (closest_bucket > 0) &&
814           (0 == k_buckets[to_remove->peer_bucket].peers_size) )
815     closest_bucket--;
816   if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
817     update_connect_preferences ();
818   GNUNET_free (to_remove);
819 }
820
821
822 /**
823  * To how many peers should we (on average) forward the request to
824  * obtain the desired target_replication count (on average).
825  *
826  * @param hop_count number of hops the message has traversed
827  * @param target_replication the number of total paths desired
828  * @return Some number of peers to forward the message to
829  */
830 static unsigned int
831 get_forward_count (uint32_t hop_count,
832                    uint32_t target_replication)
833 {
834   uint32_t random_value;
835   uint32_t forward_count;
836   float target_value;
837
838   if (hop_count > GDS_NSE_get () * 4.0)
839   {
840     /* forcefully terminate */
841     GNUNET_STATISTICS_update (GDS_stats,
842                               gettext_noop ("# requests TTL-dropped"),
843                               1, GNUNET_NO);
844     return 0;
845   }
846   if (hop_count > GDS_NSE_get () * 2.0)
847   {
848     /* Once we have reached our ideal number of hops, only forward to 1 peer */
849     return 1;
850   }
851   /* bound by system-wide maximum */
852   target_replication =
853       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
854   target_value =
855       1 + (target_replication - 1.0) / (GDS_NSE_get () +
856                                         ((float) (target_replication - 1.0) *
857                                          hop_count));
858   /* Set forward count to floor of target_value */
859   forward_count = (uint32_t) target_value;
860   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
861   target_value = target_value - forward_count;
862   random_value =
863       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
864   if (random_value < (target_value * UINT32_MAX))
865     forward_count++;
866   return forward_count;
867 }
868
869
870 /**
871  * Compute the distance between have and target as a 32-bit value.
872  * Differences in the lower bits must count stronger than differences
873  * in the higher bits.
874  *
875  * @param target
876  * @param have
877  * @return 0 if have==target, otherwise a number
878  *           that is larger as the distance between
879  *           the two hash codes increases
880  */
881 static unsigned int
882 get_distance (const struct GNUNET_HashCode *target,
883               const struct GNUNET_HashCode *have)
884 {
885   unsigned int bucket;
886   unsigned int msb;
887   unsigned int lsb;
888   unsigned int i;
889
890   /* We have to represent the distance between two 2^9 (=512)-bit
891    * numbers as a 2^5 (=32)-bit number with "0" being used for the
892    * two numbers being identical; furthermore, we need to
893    * guarantee that a difference in the number of matching
894    * bits is always represented in the result.
895    *
896    * We use 2^32/2^9 numerical values to distinguish between
897    * hash codes that have the same LSB bit distance and
898    * use the highest 2^9 bits of the result to signify the
899    * number of (mis)matching LSB bits; if we have 0 matching
900    * and hence 512 mismatching LSB bits we return -1 (since
901    * 512 itself cannot be represented with 9 bits) */
902
903   /* first, calculate the most significant 9 bits of our
904    * result, aka the number of LSBs */
905   bucket = GNUNET_CRYPTO_hash_matching_bits (target,
906                                              have);
907   /* bucket is now a value between 0 and 512 */
908   if (bucket == 512)
909     return 0;                   /* perfect match */
910   if (bucket == 0)
911     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
912                                  * below, we'd end up with max+1 (overflow)) */
913
914   /* calculate the most significant bits of the final result */
915   msb = (512 - bucket) << (32 - 9);
916   /* calculate the 32-9 least significant bits of the final result by
917    * looking at the differences in the 32-9 bits following the
918    * mismatching bit at 'bucket' */
919   lsb = 0;
920   for (i = bucket + 1;
921        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
922   {
923     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
924         GNUNET_CRYPTO_hash_get_bit (have, i))
925       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
926                                                  * last bit set will be 31 -- if
927                                                  * i does not reach 512 first... */
928   }
929   return msb | lsb;
930 }
931
932
933 /**
934  * Check whether my identity is closer than any known peers.  If a
935  * non-null bloomfilter is given, check if this is the closest peer
936  * that hasn't already been routed to.
937  *
938  * @param key hash code to check closeness to
939  * @param bloom bloomfilter, exclude these entries from the decision
940  * @return #GNUNET_YES if node location is closest,
941  *         #GNUNET_NO otherwise.
942  */
943 static int
944 am_closest_peer (const struct GNUNET_HashCode *key,
945                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
946 {
947   int bits;
948   int other_bits;
949   int bucket_num;
950   int count;
951   struct PeerInfo *pos;
952
953   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
954     return GNUNET_YES;
955   bucket_num = find_bucket (key);
956   GNUNET_assert (bucket_num >= 0);
957   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
958                                            key);
959   pos = k_buckets[bucket_num].head;
960   count = 0;
961   while ((NULL != pos) && (count < bucket_size))
962   {
963     if ((NULL != bloom) &&
964         (GNUNET_YES ==
965          GNUNET_CONTAINER_bloomfilter_test (bloom,
966                                             &pos->phash)))
967     {
968       pos = pos->next;
969       continue;                 /* Skip already checked entries */
970     }
971     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
972                                                    key);
973     if (other_bits > bits)
974       return GNUNET_NO;
975     if (other_bits == bits)     /* We match the same number of bits */
976       return GNUNET_YES;
977     pos = pos->next;
978   }
979   /* No peers closer, we are the closest! */
980   return GNUNET_YES;
981 }
982
983
984 /**
985  * Select a peer from the routing table that would be a good routing
986  * destination for sending a message for "key".  The resulting peer
987  * must not be in the set of blocked peers.<p>
988  *
989  * Note that we should not ALWAYS select the closest peer to the
990  * target, peers further away from the target should be chosen with
991  * exponentially declining probability.
992  *
993  * FIXME: double-check that this is fine
994  *
995  *
996  * @param key the key we are selecting a peer to route to
997  * @param bloom a bloomfilter containing entries this request has seen already
998  * @param hops how many hops has this message traversed thus far
999  * @return Peer to route to, or NULL on error
1000  */
1001 static struct PeerInfo *
1002 select_peer (const struct GNUNET_HashCode *key,
1003              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1004              uint32_t hops)
1005 {
1006   unsigned int bc;
1007   unsigned int count;
1008   unsigned int selected;
1009   struct PeerInfo *pos;
1010   unsigned int dist;
1011   unsigned int smallest_distance;
1012   struct PeerInfo *chosen;
1013
1014   if (hops >= GDS_NSE_get ())
1015   {
1016     /* greedy selection (closest peer that is not in bloomfilter) */
1017     smallest_distance = UINT_MAX;
1018     chosen = NULL;
1019     for (bc = 0; bc <= closest_bucket; bc++)
1020     {
1021       pos = k_buckets[bc].head;
1022       count = 0;
1023       while ((pos != NULL) && (count < bucket_size))
1024       {
1025         if ((bloom == NULL) ||
1026             (GNUNET_NO ==
1027              GNUNET_CONTAINER_bloomfilter_test (bloom,
1028                                                 &pos->phash)))
1029         {
1030           dist = get_distance (key,
1031                                &pos->phash);
1032           if (dist < smallest_distance)
1033           {
1034             chosen = pos;
1035             smallest_distance = dist;
1036           }
1037         }
1038         else
1039         {
1040           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1042                       GNUNET_i2s (pos->id),
1043                       GNUNET_h2s (key));
1044           GNUNET_STATISTICS_update (GDS_stats,
1045                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"),
1046                                     1,
1047                                     GNUNET_NO);
1048           dist = get_distance (key,
1049                                &pos->phash);
1050           if (dist < smallest_distance)
1051           {
1052             chosen = NULL;
1053             smallest_distance = dist;
1054           }
1055         }
1056         count++;
1057         pos = pos->next;
1058       }
1059     }
1060     if (NULL == chosen)
1061       GNUNET_STATISTICS_update (GDS_stats,
1062                                 gettext_noop ("# Peer selection failed"), 1,
1063                                 GNUNET_NO);
1064     return chosen;
1065   }
1066
1067   /* select "random" peer */
1068   /* count number of peers that are available and not filtered */
1069   count = 0;
1070   for (bc = 0; bc <= closest_bucket; bc++)
1071   {
1072     pos = k_buckets[bc].head;
1073     while ((pos != NULL) && (count < bucket_size))
1074     {
1075       if ((bloom != NULL) &&
1076           (GNUNET_YES ==
1077            GNUNET_CONTAINER_bloomfilter_test (bloom,
1078                                               &pos->phash)))
1079       {
1080         GNUNET_STATISTICS_update (GDS_stats,
1081                                   gettext_noop
1082                                   ("# Peers excluded from routing due to Bloomfilter"),
1083                                   1, GNUNET_NO);
1084         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1086                     GNUNET_i2s (pos->id),
1087                     GNUNET_h2s (key));
1088         pos = pos->next;
1089         continue;               /* Ignore bloomfiltered peers */
1090       }
1091       count++;
1092       pos = pos->next;
1093     }
1094   }
1095   if (0 == count)               /* No peers to select from! */
1096   {
1097     GNUNET_STATISTICS_update (GDS_stats,
1098                               gettext_noop ("# Peer selection failed"), 1,
1099                               GNUNET_NO);
1100     return NULL;
1101   }
1102   /* Now actually choose a peer */
1103   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1104                                        count);
1105   count = 0;
1106   for (bc = 0; bc <= closest_bucket; bc++)
1107   {
1108     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1109     {
1110       if ((bloom != NULL) &&
1111           (GNUNET_YES ==
1112            GNUNET_CONTAINER_bloomfilter_test (bloom,
1113                                               &pos->phash)))
1114       {
1115         continue;               /* Ignore bloomfiltered peers */
1116       }
1117       if (0 == selected--)
1118         return pos;
1119     }
1120   }
1121   GNUNET_break (0);
1122   return NULL;
1123 }
1124
1125
1126 /**
1127  * Compute the set of peers that the given request should be
1128  * forwarded to.
1129  *
1130  * @param key routing key
1131  * @param bloom bloom filter excluding peers as targets, all selected
1132  *        peers will be added to the bloom filter
1133  * @param hop_count number of hops the request has traversed so far
1134  * @param target_replication desired number of replicas
1135  * @param targets where to store an array of target peers (to be
1136  *         free'd by the caller)
1137  * @return number of peers returned in 'targets'.
1138  */
1139 static unsigned int
1140 get_target_peers (const struct GNUNET_HashCode *key,
1141                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1142                   uint32_t hop_count,
1143                   uint32_t target_replication,
1144                   struct PeerInfo ***targets)
1145 {
1146   unsigned int ret;
1147   unsigned int off;
1148   struct PeerInfo **rtargets;
1149   struct PeerInfo *nxt;
1150
1151   GNUNET_assert (NULL != bloom);
1152   ret = get_forward_count (hop_count,
1153                            target_replication);
1154   if (0 == ret)
1155   {
1156     *targets = NULL;
1157     return 0;
1158   }
1159   rtargets = GNUNET_new_array (ret,
1160                                struct PeerInfo *);
1161   for (off = 0; off < ret; off++)
1162   {
1163     nxt = select_peer (key, bloom, hop_count);
1164     if (NULL == nxt)
1165       break;
1166     rtargets[off] = nxt;
1167     GNUNET_break (GNUNET_NO ==
1168                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1169                                                      &nxt->phash));
1170     GNUNET_CONTAINER_bloomfilter_add (bloom,
1171                                       &nxt->phash);
1172   }
1173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1175               off,
1176               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1177               (unsigned int) hop_count,
1178               GNUNET_h2s (key),
1179               ret);
1180   if (0 == off)
1181   {
1182     GNUNET_free (rtargets);
1183     *targets = NULL;
1184     return 0;
1185   }
1186   *targets = rtargets;
1187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1189               GNUNET_h2s (key),
1190               off,
1191               ret);
1192   return off;
1193 }
1194
1195
1196 /**
1197  * Perform a PUT operation.   Forwards the given request to other
1198  * peers.   Does not store the data locally.  Does not give the
1199  * data to local clients.  May do nothing if this is the only
1200  * peer in the network (or if we are the closest peer in the
1201  * network).
1202  *
1203  * @param type type of the block
1204  * @param options routing options
1205  * @param desired_replication_level desired replication count
1206  * @param expiration_time when does the content expire
1207  * @param hop_count how many hops has this message traversed so far
1208  * @param bf Bloom filter of peers this PUT has already traversed
1209  * @param key key for the content
1210  * @param put_path_length number of entries in @a put_path
1211  * @param put_path peers this request has traversed so far (if tracked)
1212  * @param data payload to store
1213  * @param data_size number of bytes in @a data
1214  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1215  */
1216 int
1217 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1218                            enum GNUNET_DHT_RouteOption options,
1219                            uint32_t desired_replication_level,
1220                            struct GNUNET_TIME_Absolute expiration_time,
1221                            uint32_t hop_count,
1222                            struct GNUNET_CONTAINER_BloomFilter *bf,
1223                            const struct GNUNET_HashCode *key,
1224                            unsigned int put_path_length,
1225                            struct GNUNET_PeerIdentity *put_path,
1226                            const void *data,
1227                            size_t data_size)
1228 {
1229   unsigned int target_count;
1230   unsigned int i;
1231   struct PeerInfo **targets;
1232   struct PeerInfo *target;
1233   size_t msize;
1234   struct GNUNET_MQ_Envelope *env;
1235   struct PeerPutMessage *ppm;
1236   struct GNUNET_PeerIdentity *pp;
1237   unsigned int skip_count;
1238
1239   GNUNET_assert (NULL != bf);
1240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241               "Adding myself (%s) to PUT bloomfilter for %s\n",
1242               GNUNET_i2s (&my_identity),
1243               GNUNET_h2s (key));
1244   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1245   GNUNET_STATISTICS_update (GDS_stats,
1246                             gettext_noop ("# PUT requests routed"),
1247                             1,
1248                             GNUNET_NO);
1249   target_count
1250     = get_target_peers (key,
1251                         bf,
1252                         hop_count,
1253                         desired_replication_level,
1254                         &targets);
1255   if (0 == target_count)
1256   {
1257     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                 "Routing PUT for %s terminates after %u hops at %s\n",
1259                 GNUNET_h2s (key),
1260                 (unsigned int) hop_count,
1261                 GNUNET_i2s (&my_identity));
1262     return GNUNET_NO;
1263   }
1264   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size;
1265   if (msize + sizeof (struct PeerPutMessage)
1266       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1267   {
1268     put_path_length = 0;
1269     msize = data_size;
1270   }
1271   if (msize + sizeof (struct PeerPutMessage)
1272       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1273   {
1274     GNUNET_break (0);
1275     GNUNET_free (targets);
1276     return GNUNET_NO;
1277   }
1278   GNUNET_STATISTICS_update (GDS_stats,
1279                             gettext_noop ("# PUT messages queued for transmission"),
1280                             target_count,
1281                             GNUNET_NO);
1282   skip_count = 0;
1283   for (i = 0; i < target_count; i++)
1284   {
1285     target = targets[i];
1286     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1287     {
1288       /* skip */
1289       GNUNET_STATISTICS_update (GDS_stats,
1290                                 gettext_noop ("# P2P messages dropped due to full queue"),
1291                                 1,
1292                                 GNUNET_NO);
1293       skip_count++;
1294       continue;
1295     }
1296     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1297                 "Routing PUT for %s after %u hops to %s\n",
1298                 GNUNET_h2s (key),
1299                 (unsigned int) hop_count,
1300                 GNUNET_i2s (target->id));
1301     env = GNUNET_MQ_msg_extra (ppm,
1302                                msize,
1303                                GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1304     ppm->options = htonl (options);
1305     ppm->type = htonl (type);
1306     ppm->hop_count = htonl (hop_count + 1);
1307     ppm->desired_replication_level = htonl (desired_replication_level);
1308     ppm->put_path_length = htonl (put_path_length);
1309     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1310     GNUNET_break (GNUNET_YES ==
1311                   GNUNET_CONTAINER_bloomfilter_test (bf,
1312                                                      &target->phash));
1313     GNUNET_assert (GNUNET_OK ==
1314                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1315                                                               ppm->bloomfilter,
1316                                                               DHT_BLOOM_SIZE));
1317     ppm->key = *key;
1318     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1319     GNUNET_memcpy (pp,
1320                    put_path,
1321                    sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1322     GNUNET_memcpy (&pp[put_path_length],
1323                    data,
1324                    data_size);
1325     GNUNET_MQ_send (target->mq,
1326                     env);
1327   }
1328   GNUNET_free (targets);
1329   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1330 }
1331
1332
1333 /**
1334  * Perform a GET operation.  Forwards the given request to other
1335  * peers.  Does not lookup the key locally.  May do nothing if this is
1336  * the only peer in the network (or if we are the closest peer in the
1337  * network).
1338  *
1339  * @param type type of the block
1340  * @param options routing options
1341  * @param desired_replication_level desired replication count
1342  * @param hop_count how many hops did this request traverse so far?
1343  * @param key key for the content
1344  * @param xquery extended query
1345  * @param xquery_size number of bytes in @a xquery
1346  * @param bg group to use for filtering replies
1347  * @param peer_bf filter for peers not to select (again)
1348  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1349  */
1350 int
1351 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1352                            enum GNUNET_DHT_RouteOption options,
1353                            uint32_t desired_replication_level,
1354                            uint32_t hop_count,
1355                            const struct GNUNET_HashCode *key,
1356                            const void *xquery,
1357                            size_t xquery_size,
1358                            struct GNUNET_BLOCK_Group *bg,
1359                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1360 {
1361   unsigned int target_count;
1362   struct PeerInfo **targets;
1363   struct PeerInfo *target;
1364   struct GNUNET_MQ_Envelope *env;
1365   size_t msize;
1366   struct PeerGetMessage *pgm;
1367   char *xq;
1368   size_t reply_bf_size;
1369   void *reply_bf;
1370   unsigned int skip_count;
1371   uint32_t bf_nonce;
1372
1373   GNUNET_assert (NULL != peer_bf);
1374   GNUNET_STATISTICS_update (GDS_stats,
1375                             gettext_noop ("# GET requests routed"),
1376                             1,
1377                             GNUNET_NO);
1378   target_count = get_target_peers (key,
1379                                    peer_bf,
1380                                    hop_count,
1381                                    desired_replication_level,
1382                                    &targets);
1383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384               "Adding myself (%s) to GET bloomfilter for %s\n",
1385               GNUNET_i2s (&my_identity),
1386               GNUNET_h2s (key));
1387   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1388                                     &my_identity_hash);
1389   if (0 == target_count)
1390   {
1391     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392                 "Routing GET for %s terminates after %u hops at %s\n",
1393                 GNUNET_h2s (key),
1394                 (unsigned int) hop_count,
1395                 GNUNET_i2s (&my_identity));
1396     return GNUNET_NO;
1397   }
1398   if (GNUNET_OK !=
1399       GNUNET_BLOCK_group_serialize (bg,
1400                                     &bf_nonce,
1401                                     &reply_bf,
1402                                     &reply_bf_size))
1403   {
1404     reply_bf = NULL;
1405     reply_bf_size = 0;
1406     bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1407                                          UINT32_MAX);
1408   }
1409   msize = xquery_size + reply_bf_size;
1410   if (msize + sizeof (struct PeerGetMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1411   {
1412     GNUNET_break (0);
1413     GNUNET_free_non_null (reply_bf);
1414     GNUNET_free (targets);
1415     return GNUNET_NO;
1416   }
1417   GNUNET_STATISTICS_update (GDS_stats,
1418                             gettext_noop ("# GET messages queued for transmission"),
1419                             target_count,
1420                             GNUNET_NO);
1421   /* forward request */
1422   skip_count = 0;
1423   for (unsigned int i = 0; i < target_count; i++)
1424   {
1425     target = targets[i];
1426     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1427     {
1428       /* skip */
1429       GNUNET_STATISTICS_update (GDS_stats,
1430                                 gettext_noop ("# P2P messages dropped due to full queue"),
1431                                 1, GNUNET_NO);
1432       skip_count++;
1433       continue;
1434     }
1435     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436                 "Routing GET for %s after %u hops to %s\n",
1437                 GNUNET_h2s (key),
1438                 (unsigned int) hop_count,
1439                 GNUNET_i2s (target->id));
1440     env = GNUNET_MQ_msg_extra (pgm,
1441                                msize,
1442                                GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1443     pgm->options = htonl (options);
1444     pgm->type = htonl (type);
1445     pgm->hop_count = htonl (hop_count + 1);
1446     pgm->desired_replication_level = htonl (desired_replication_level);
1447     pgm->xquery_size = htonl (xquery_size);
1448     pgm->bf_mutator = bf_nonce;
1449     GNUNET_break (GNUNET_YES ==
1450                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1451                                                      &target->phash));
1452     GNUNET_assert (GNUNET_OK ==
1453                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1454                                                               pgm->bloomfilter,
1455                                                               DHT_BLOOM_SIZE));
1456     pgm->key = *key;
1457     xq = (char *) &pgm[1];
1458     GNUNET_memcpy (xq,
1459                    xquery,
1460                    xquery_size);
1461     GNUNET_memcpy (&xq[xquery_size],
1462                    reply_bf,
1463                    reply_bf_size);
1464     GNUNET_MQ_send (target->mq,
1465                     env);
1466   }
1467   GNUNET_free (targets);
1468   GNUNET_free_non_null (reply_bf);
1469   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1470 }
1471
1472
1473 /**
1474  * Handle a reply (route to origin).  Only forwards the reply back to
1475  * the given peer.  Does not do local caching or forwarding to local
1476  * clients.
1477  *
1478  * @param target neighbour that should receive the block (if still connected)
1479  * @param type type of the block
1480  * @param expiration_time when does the content expire
1481  * @param key key for the content
1482  * @param put_path_length number of entries in @a put_path
1483  * @param put_path peers the original PUT traversed (if tracked)
1484  * @param get_path_length number of entries in @a get_path
1485  * @param get_path peers this reply has traversed so far (if tracked)
1486  * @param data payload of the reply
1487  * @param data_size number of bytes in @a data
1488  */
1489 void
1490 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1491                              enum GNUNET_BLOCK_Type type,
1492                              struct GNUNET_TIME_Absolute expiration_time,
1493                              const struct GNUNET_HashCode *key,
1494                              unsigned int put_path_length,
1495                              const struct GNUNET_PeerIdentity *put_path,
1496                              unsigned int get_path_length,
1497                              const struct GNUNET_PeerIdentity *get_path,
1498                              const void *data,
1499                              size_t data_size)
1500 {
1501   struct PeerInfo *pi;
1502   struct GNUNET_MQ_Envelope *env;
1503   size_t msize;
1504   struct PeerResultMessage *prm;
1505   struct GNUNET_PeerIdentity *paths;
1506
1507   msize = data_size + (get_path_length + put_path_length) *
1508       sizeof (struct GNUNET_PeerIdentity);
1509   if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1510       (get_path_length >
1511        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1512       (put_path_length >
1513        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1514       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1515   {
1516     GNUNET_break (0);
1517     return;
1518   }
1519   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1520                                           target);
1521   if (NULL == pi)
1522   {
1523     /* peer disconnected in the meantime, drop reply */
1524     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525                 "No matching peer for reply for key %s\n",
1526                 GNUNET_h2s (key));
1527     return;
1528   }
1529   if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1530   {
1531     /* skip */
1532     GNUNET_STATISTICS_update (GDS_stats,
1533                               gettext_noop ("# P2P messages dropped due to full queue"),
1534                               1,
1535                               GNUNET_NO);
1536     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537                 "Peer queue full, ignoring reply for key %s\n",
1538                 GNUNET_h2s (key));
1539     return;
1540   }
1541
1542   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543               "Forwarding reply for key %s to peer %s\n",
1544               GNUNET_h2s (key),
1545               GNUNET_i2s (target));
1546   GNUNET_STATISTICS_update (GDS_stats,
1547                             gettext_noop
1548                             ("# RESULT messages queued for transmission"), 1,
1549                             GNUNET_NO);
1550   env = GNUNET_MQ_msg_extra (prm,
1551                              msize,
1552                              GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1553   prm->type = htonl (type);
1554   prm->put_path_length = htonl (put_path_length);
1555   prm->get_path_length = htonl (get_path_length);
1556   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1557   prm->key = *key;
1558   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1559   GNUNET_memcpy (paths,
1560                  put_path,
1561                  put_path_length * sizeof (struct GNUNET_PeerIdentity));
1562   GNUNET_memcpy (&paths[put_path_length],
1563                  get_path,
1564                  get_path_length * sizeof (struct GNUNET_PeerIdentity));
1565   GNUNET_memcpy (&paths[put_path_length + get_path_length],
1566                  data,
1567                  data_size);
1568   GNUNET_MQ_send (pi->mq,
1569                   env);
1570 }
1571
1572
1573 /**
1574  * To be called on core init/fail.
1575  *
1576  * @param cls service closure
1577  * @param identity the public identity of this peer
1578  */
1579 static void
1580 core_init (void *cls,
1581            const struct GNUNET_PeerIdentity *identity)
1582 {
1583   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1584               "CORE called, I am %s\n",
1585               GNUNET_i2s (identity));
1586   my_identity = *identity;
1587   GNUNET_CRYPTO_hash (identity,
1588                       sizeof (struct GNUNET_PeerIdentity),
1589                       &my_identity_hash);
1590   GNUNET_SERVICE_resume (GDS_service);
1591 }
1592
1593
1594 /**
1595  * Check validity of a p2p put request.
1596  *
1597  * @param cls closure with the `struct PeerInfo` of the sender
1598  * @param message message
1599  * @return #GNUNET_OK if the message is valid
1600  */
1601 static int
1602 check_dht_p2p_put (void *cls,
1603                    const struct PeerPutMessage *put)
1604 {
1605   uint32_t putlen;
1606   uint16_t msize;
1607
1608   msize = ntohs (put->header.size);
1609   putlen = ntohl (put->put_path_length);
1610   if ((msize <
1611        sizeof (struct PeerPutMessage) +
1612        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1613       (putlen >
1614        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1615   {
1616     GNUNET_break_op (0);
1617     return GNUNET_SYSERR;
1618   }
1619   return GNUNET_OK;
1620 }
1621
1622
1623 /**
1624  * Core handler for p2p put requests.
1625  *
1626  * @param cls closure with the `struct PeerInfo` of the sender
1627  * @param message message
1628  */
1629 static void
1630 handle_dht_p2p_put (void *cls,
1631                     const struct PeerPutMessage *put)
1632 {
1633   struct PeerInfo *peer = cls;
1634   const struct GNUNET_PeerIdentity *put_path;
1635   const void *payload;
1636   uint32_t putlen;
1637   uint16_t msize;
1638   size_t payload_size;
1639   enum GNUNET_DHT_RouteOption options;
1640   struct GNUNET_CONTAINER_BloomFilter *bf;
1641   struct GNUNET_HashCode test_key;
1642   int forwarded;
1643
1644   msize = ntohs (put->header.size);
1645   putlen = ntohl (put->put_path_length);
1646   GNUNET_STATISTICS_update (GDS_stats,
1647                             gettext_noop ("# P2P PUT requests received"),
1648                             1,
1649                             GNUNET_NO);
1650   GNUNET_STATISTICS_update (GDS_stats,
1651                             gettext_noop ("# P2P PUT bytes received"),
1652                             msize,
1653                             GNUNET_NO);
1654   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1655   payload = &put_path[putlen];
1656   options = ntohl (put->options);
1657   payload_size = msize - (sizeof (struct PeerPutMessage) +
1658                           putlen * sizeof (struct GNUNET_PeerIdentity));
1659
1660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661               "PUT for `%s' from %s\n",
1662               GNUNET_h2s (&put->key),
1663               GNUNET_i2s (peer->id));
1664   if (GNUNET_YES == log_route_details_stderr)
1665   {
1666     char *tmp;
1667
1668     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1669     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1670                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1671                  GNUNET_h2s (&put->key),
1672                  GNUNET_i2s (peer->id),
1673                  tmp,
1674                  ntohl(put->hop_count),
1675                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1676                                                    &put->key),
1677                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1678                                                    &put->key)
1679                 );
1680     GNUNET_free (tmp);
1681   }
1682   switch (GNUNET_BLOCK_get_key
1683           (GDS_block_context,
1684            ntohl (put->type),
1685            payload,
1686            payload_size,
1687            &test_key))
1688   {
1689   case GNUNET_YES:
1690     if (0 != memcmp (&test_key,
1691                      &put->key,
1692                      sizeof (struct GNUNET_HashCode)))
1693     {
1694       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1695
1696       GNUNET_break_op (0);
1697       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1698                   "PUT with key `%s' for block with key %s\n",
1699                   put_s,
1700                   GNUNET_h2s_full (&test_key));
1701       GNUNET_free (put_s);
1702       return;
1703     }
1704     break;
1705   case GNUNET_NO:
1706     GNUNET_break_op (0);
1707     return;
1708   case GNUNET_SYSERR:
1709     /* cannot verify, good luck */
1710     break;
1711   }
1712   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1713   {
1714     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1715                                    ntohl (put->type),
1716                                    NULL, /* query group */
1717                                    GNUNET_BLOCK_EO_NONE,
1718                                    NULL,    /* query */
1719                                    NULL, 0, /* xquery */
1720                                    payload,
1721                                    payload_size))
1722     {
1723     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1724     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1725       break;
1726
1727     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1728     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1729     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1730     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1731     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1732     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1733     default:
1734       GNUNET_break_op (0);
1735       return;
1736     }
1737   }
1738
1739   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1740                                           DHT_BLOOM_SIZE,
1741                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1742   GNUNET_break_op (GNUNET_YES ==
1743                    GNUNET_CONTAINER_bloomfilter_test (bf,
1744                                                       &peer->phash));
1745   {
1746     struct GNUNET_PeerIdentity pp[putlen + 1];
1747
1748     /* extend 'put path' by sender */
1749     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1750     {
1751 #if SANITY_CHECKS
1752       for (unsigned int i=0;i<=putlen;i++)
1753       {
1754         for (unsigned int j=0;j<i;j++)
1755         {
1756           GNUNET_break (0 != memcmp (&pp[i],
1757                                      &pp[j],
1758                                      sizeof (struct GNUNET_PeerIdentity)));
1759         }
1760         GNUNET_break (0 != memcmp (&pp[i],
1761                                    peer->id,
1762                                    sizeof (struct GNUNET_PeerIdentity)));
1763       }
1764 #endif
1765       GNUNET_memcpy (pp,
1766                      put_path,
1767                      putlen * sizeof (struct GNUNET_PeerIdentity));
1768       pp[putlen] = *peer->id;
1769       putlen++;
1770     }
1771     else
1772       putlen = 0;
1773
1774     /* give to local clients */
1775     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1776                               &put->key,
1777                               0,
1778                               NULL,
1779                               putlen,
1780                               pp,
1781                               ntohl (put->type),
1782                               payload_size,
1783                               payload);
1784     /* store locally */
1785     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1786         (am_closest_peer (&put->key, bf)))
1787       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1788                                 &put->key,
1789                                 putlen,
1790                                 pp,
1791                                 ntohl (put->type),
1792                                 payload_size,
1793                                 payload);
1794     /* route to other peers */
1795     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1796                                            options,
1797                                            ntohl (put->desired_replication_level),
1798                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1799                                            ntohl (put->hop_count),
1800                                            bf,
1801                                            &put->key,
1802                                            putlen,
1803                                            pp,
1804                                            payload,
1805                                            payload_size);
1806     /* notify monitoring clients */
1807     GDS_CLIENTS_process_put (options
1808                              | ( (GNUNET_OK == forwarded)
1809                                  ? GNUNET_DHT_RO_LAST_HOP
1810                                  : 0 ),
1811                              ntohl (put->type),
1812                              ntohl (put->hop_count),
1813                              ntohl (put->desired_replication_level),
1814                              putlen, pp,
1815                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1816                              &put->key,
1817                              payload,
1818                              payload_size);
1819   }
1820   GNUNET_CONTAINER_bloomfilter_free (bf);
1821 }
1822
1823
1824 /**
1825  * We have received a FIND PEER request.  Send matching
1826  * HELLOs back.
1827  *
1828  * @param sender sender of the FIND PEER request
1829  * @param key peers close to this key are desired
1830  * @param bg group for filtering peers
1831  */
1832 static void
1833 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1834                   const struct GNUNET_HashCode *key,
1835                   struct GNUNET_BLOCK_Group *bg)
1836 {
1837   int bucket_idx;
1838   struct PeerBucket *bucket;
1839   struct PeerInfo *peer;
1840   unsigned int choice;
1841   const struct GNUNET_HELLO_Message *hello;
1842
1843   /* first, check about our own HELLO */
1844   if (NULL != GDS_my_hello)
1845   {
1846     if (GNUNET_YES !=
1847         GNUNET_BLOCK_GROUP_bf_test_and_set (bg,
1848                                             &my_identity_hash))
1849     {
1850       size_t hello_size;
1851
1852       hello_size = GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message *) GDS_my_hello);
1853       GNUNET_break (hello_size >= sizeof (struct GNUNET_MessageHeader));
1854       GDS_NEIGHBOURS_handle_reply (sender,
1855                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1856                                    GNUNET_TIME_relative_to_absolute
1857                                    (hello_expiration),
1858                                    key,
1859                                    0,
1860                                    NULL,
1861                                    0,
1862                                    NULL,
1863                                    GDS_my_hello,
1864                                    hello_size);
1865     }
1866     else
1867     {
1868       GNUNET_STATISTICS_update (GDS_stats,
1869                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"),
1870                                 1,
1871                                 GNUNET_NO);
1872     }
1873   }
1874   else
1875   {
1876     GNUNET_STATISTICS_update (GDS_stats,
1877                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"),
1878                               1,
1879                               GNUNET_NO);
1880   }
1881
1882   /* then, also consider sending a random HELLO from the closest bucket */
1883   if (0 == memcmp (&my_identity_hash,
1884                    key,
1885                    sizeof (struct GNUNET_HashCode)))
1886     bucket_idx = closest_bucket;
1887   else
1888     bucket_idx = GNUNET_MIN (closest_bucket,
1889                              find_bucket (key));
1890   if (bucket_idx == GNUNET_SYSERR)
1891     return;
1892   bucket = &k_buckets[bucket_idx];
1893   if (bucket->peers_size == 0)
1894     return;
1895   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1896                                      bucket->peers_size);
1897   peer = bucket->head;
1898   while (choice > 0)
1899   {
1900     GNUNET_assert (NULL != peer);
1901     peer = peer->next;
1902     choice--;
1903   }
1904   choice = bucket->peers_size;
1905   do
1906   {
1907     peer = peer->next;
1908     if (0 == choice--)
1909       return;                   /* no non-masked peer available */
1910     if (NULL == peer)
1911       peer = bucket->head;
1912     hello = GDS_HELLO_get (peer->id);
1913   } while ( (NULL == hello) ||
1914             (GNUNET_YES ==
1915              GNUNET_BLOCK_GROUP_bf_test_and_set (bg,
1916                                                  &peer->phash)) );
1917   GDS_NEIGHBOURS_handle_reply (sender,
1918                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1919                                GNUNET_TIME_relative_to_absolute
1920                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1921                                key,
1922                                0,
1923                                NULL,
1924                                0,
1925                                NULL,
1926                                hello,
1927                                GNUNET_HELLO_size (hello));
1928 }
1929
1930
1931 /**
1932  * Handle a result from local datacache for a GET operation.
1933  *
1934  * @param cls the `struct ClientHandle` of the client doing the query
1935  * @param type type of the block
1936  * @param expiration_time when does the content expire
1937  * @param key key for the content
1938  * @param put_path_length number of entries in @a put_path
1939  * @param put_path peers the original PUT traversed (if tracked)
1940  * @param get_path_length number of entries in @a get_path
1941  * @param get_path peers this reply has traversed so far (if tracked)
1942  * @param data payload of the reply
1943  * @param data_size number of bytes in @a data
1944  */
1945 static void
1946 handle_local_result (void *cls,
1947                      enum GNUNET_BLOCK_Type type,
1948                      struct GNUNET_TIME_Absolute expiration_time,
1949                      const struct GNUNET_HashCode *key,
1950                      unsigned int put_path_length,
1951                      const struct GNUNET_PeerIdentity *put_path,
1952                      unsigned int get_path_length,
1953                      const struct GNUNET_PeerIdentity *get_path,
1954                      const void *data,
1955                      size_t data_size)
1956 {
1957   // FIXME: we can probably do better here by
1958   // passing the peer that did the query in the closure...
1959   GDS_ROUTING_process (NULL,
1960                        type,
1961                        expiration_time,
1962                        key,
1963                        put_path_length, put_path,
1964                        0, NULL,
1965                        data, data_size);
1966 }
1967
1968
1969 /**
1970  * Check validity of p2p get request.
1971  *
1972  * @param cls closure with the `struct PeerInfo` of the sender
1973  * @param get the message
1974  * @return #GNUNET_OK if the message is well-formed
1975  */
1976 static int
1977 check_dht_p2p_get (void *cls,
1978                    const struct PeerGetMessage *get)
1979 {
1980   uint32_t xquery_size;
1981   uint16_t msize;
1982
1983   msize = ntohs (get->header.size);
1984   xquery_size = ntohl (get->xquery_size);
1985   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1986   {
1987     GNUNET_break_op (0);
1988     return GNUNET_SYSERR;
1989   }
1990   return GNUNET_OK;
1991 }
1992
1993
1994 /**
1995  * Core handler for p2p get requests.
1996  *
1997  * @param cls closure with the `struct PeerInfo` of the sender
1998  * @param get the message
1999  */
2000 static void
2001 handle_dht_p2p_get (void *cls,
2002                     const struct PeerGetMessage *get)
2003 {
2004   struct PeerInfo *peer = cls;
2005   uint32_t xquery_size;
2006   size_t reply_bf_size;
2007   uint16_t msize;
2008   enum GNUNET_BLOCK_Type type;
2009   enum GNUNET_DHT_RouteOption options;
2010   enum GNUNET_BLOCK_EvaluationResult eval;
2011   struct GNUNET_BLOCK_Group *bg;
2012   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2013   const char *xquery;
2014   int forwarded;
2015
2016   if (NULL == peer)
2017   {
2018     GNUNET_break (0);
2019     return;
2020   }
2021   /* parse and validate message */
2022   msize = ntohs (get->header.size);
2023   xquery_size = ntohl (get->xquery_size);
2024   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2025   type = ntohl (get->type);
2026   options = ntohl (get->options);
2027   xquery = (const char *) &get[1];
2028   GNUNET_STATISTICS_update (GDS_stats,
2029                             gettext_noop ("# P2P GET requests received"),
2030                             1,
2031                             GNUNET_NO);
2032   GNUNET_STATISTICS_update (GDS_stats,
2033                             gettext_noop ("# P2P GET bytes received"),
2034                             msize,
2035                             GNUNET_NO);
2036   if (GNUNET_YES == log_route_details_stderr)
2037   {
2038     char *tmp;
2039
2040     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2041     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2042                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2043                  GNUNET_h2s (&get->key),
2044                  GNUNET_i2s (peer->id),
2045                  tmp,
2046                  ntohl(get->hop_count),
2047                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2048                                                    &get->key),
2049                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2050                                                    &get->key),
2051                  ntohl(get->xquery_size),
2052                  xquery);
2053     GNUNET_free (tmp);
2054   }
2055   bg = GNUNET_BLOCK_group_create (GDS_block_context,
2056                                   type,
2057                                   get->bf_mutator,
2058                                   &xquery[xquery_size],
2059                                   reply_bf_size);
2060   eval
2061     = GNUNET_BLOCK_evaluate (GDS_block_context,
2062                              type,
2063                              bg,
2064                              GNUNET_BLOCK_EO_NONE,
2065                              &get->key,
2066                              xquery,
2067                              xquery_size,
2068                              NULL,
2069                              0);
2070   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2071   {
2072     /* request invalid or block type not supported */
2073     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2074     GNUNET_BLOCK_group_destroy (bg);
2075     return;
2076   }
2077   peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2078                                                DHT_BLOOM_SIZE,
2079                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
2080   GNUNET_break_op (GNUNET_YES ==
2081                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2082                                                       &peer->phash));
2083   /* remember request for routing replies */
2084   GDS_ROUTING_add (peer->id,
2085                    type,
2086                    bg, /* bg now owned by routing, but valid at least until end of this function! */
2087                    options,
2088                    &get->key,
2089                    xquery,
2090                    xquery_size);
2091   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2092               "GET for %s at %s after %u hops\n",
2093               GNUNET_h2s (&get->key),
2094               GNUNET_i2s (&my_identity),
2095               (unsigned int) ntohl (get->hop_count));
2096   /* local lookup (this may update the reply_bf) */
2097   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2098       (am_closest_peer (&get->key,
2099                         peer_bf)))
2100   {
2101     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2102     {
2103       GNUNET_STATISTICS_update (GDS_stats,
2104                                 gettext_noop ("# P2P FIND PEER requests processed"),
2105                                 1,
2106                                 GNUNET_NO);
2107       handle_find_peer (peer->id,
2108                         &get->key,
2109                         bg);
2110     }
2111     else
2112     {
2113       eval = GDS_DATACACHE_handle_get (&get->key,
2114                                        type,
2115                                        xquery,
2116                                        xquery_size,
2117                                        bg,
2118                                        &handle_local_result,
2119                                        NULL);
2120     }
2121   }
2122   else
2123   {
2124     GNUNET_STATISTICS_update (GDS_stats,
2125                               gettext_noop ("# P2P GET requests ONLY routed"),
2126                               1,
2127                               GNUNET_NO);
2128   }
2129
2130   /* P2P forwarding */
2131   forwarded = GNUNET_NO;
2132   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2133     forwarded = GDS_NEIGHBOURS_handle_get (type,
2134                                            options,
2135                                            ntohl (get->desired_replication_level),
2136                                            ntohl (get->hop_count),
2137                                            &get->key,
2138                                            xquery,
2139                                            xquery_size,
2140                                            bg,
2141                                            peer_bf);
2142   GDS_CLIENTS_process_get (options
2143                            | (GNUNET_OK == forwarded)
2144                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2145                            type,
2146                            ntohl (get->hop_count),
2147                            ntohl (get->desired_replication_level),
2148                            0,
2149                            NULL,
2150                            &get->key);
2151
2152   /* clean up; note that 'bg' is owned by routing now! */
2153   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2154 }
2155
2156
2157 /**
2158  * Check validity of p2p result message.
2159  *
2160  * @param cls closure
2161  * @param message message
2162  * @return #GNUNET_YES if the message is well-formed
2163  */
2164 static int
2165 check_dht_p2p_result (void *cls,
2166                       const struct PeerResultMessage *prm)
2167 {
2168   uint32_t get_path_length;
2169   uint32_t put_path_length;
2170   uint16_t msize;
2171
2172   msize = ntohs (prm->header.size);
2173   put_path_length = ntohl (prm->put_path_length);
2174   get_path_length = ntohl (prm->get_path_length);
2175   if ((msize <
2176        sizeof (struct PeerResultMessage) + (get_path_length +
2177                                             put_path_length) *
2178        sizeof (struct GNUNET_PeerIdentity)) ||
2179       (get_path_length >
2180        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2181       (put_path_length >
2182        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2183   {
2184     GNUNET_break_op (0);
2185     return GNUNET_SYSERR;
2186   }
2187   return GNUNET_OK;
2188 }
2189
2190
2191 /**
2192  * Core handler for p2p result messages.
2193  *
2194  * @param cls closure
2195  * @param message message
2196  */
2197 static void
2198 handle_dht_p2p_result (void *cls,
2199                        const struct PeerResultMessage *prm)
2200 {
2201   struct PeerInfo *peer = cls;
2202   const struct GNUNET_PeerIdentity *put_path;
2203   const struct GNUNET_PeerIdentity *get_path;
2204   const void *data;
2205   uint32_t get_path_length;
2206   uint32_t put_path_length;
2207   uint16_t msize;
2208   size_t data_size;
2209   enum GNUNET_BLOCK_Type type;
2210
2211   /* parse and validate message */
2212   msize = ntohs (prm->header.size);
2213   put_path_length = ntohl (prm->put_path_length);
2214   get_path_length = ntohl (prm->get_path_length);
2215   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2216   get_path = &put_path[put_path_length];
2217   type = ntohl (prm->type);
2218   data = (const void *) &get_path[get_path_length];
2219   data_size = msize - (sizeof (struct PeerResultMessage) +
2220                        (get_path_length +
2221                         put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2222   GNUNET_STATISTICS_update (GDS_stats,
2223                             gettext_noop ("# P2P RESULTS received"),
2224                             1,
2225                             GNUNET_NO);
2226   GNUNET_STATISTICS_update (GDS_stats,
2227                             gettext_noop ("# P2P RESULT bytes received"),
2228                             msize,
2229                             GNUNET_NO);
2230   if (GNUNET_YES == log_route_details_stderr)
2231   {
2232     char *tmp;
2233
2234     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2235     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2236                  "R5N RESULT %s: %s->%s (%u)\n",
2237                  GNUNET_h2s (&prm->key),
2238                  GNUNET_i2s (peer->id),
2239                  tmp,
2240                  get_path_length + 1);
2241     GNUNET_free (tmp);
2242   }
2243   /* if we got a HELLO, consider it for our own routing table */
2244   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2245   {
2246     const struct GNUNET_MessageHeader *h;
2247     struct GNUNET_PeerIdentity pid;
2248
2249     /* Should be a HELLO, validate and consider using it! */
2250     if (data_size < sizeof (struct GNUNET_HELLO_Message))
2251     {
2252       GNUNET_break_op (0);
2253       return;
2254     }
2255     h = data;
2256     if (data_size != ntohs (h->size))
2257     {
2258       GNUNET_break_op (0);
2259       return;
2260     }
2261     if (GNUNET_OK !=
2262         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2263                              &pid))
2264     {
2265       GNUNET_break_op (0);
2266       return;
2267     }
2268     if ( (GNUNET_YES != disable_try_connect) &&
2269          (0 != memcmp (&my_identity,
2270                        &pid,
2271                        sizeof (struct GNUNET_PeerIdentity))) )
2272       try_connect (&pid,
2273                    h);
2274   }
2275
2276   /* append 'peer' to 'get_path' */
2277   {
2278     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2279
2280 #if SANITY_CHECKS
2281     for (unsigned int i=0;i<=get_path_length;i++)
2282     {
2283       for (unsigned int j=0;j<i;j++)
2284       {
2285         GNUNET_break (0 != memcmp (&get_path[i],
2286                                    &get_path[j],
2287                                    sizeof (struct GNUNET_PeerIdentity)));
2288       }
2289       GNUNET_break (0 != memcmp (&get_path[i],
2290                                  peer->id,
2291                                  sizeof (struct GNUNET_PeerIdentity)));
2292     }
2293 #endif
2294     GNUNET_memcpy (xget_path,
2295                    get_path,
2296                    get_path_length * sizeof (struct GNUNET_PeerIdentity));
2297     xget_path[get_path_length] = *peer->id;
2298     get_path_length++;
2299
2300     /* forward to local clients */
2301     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2302                               &prm->key,
2303                               get_path_length,
2304                               xget_path,
2305                               put_path_length,
2306                               put_path,
2307                               type,
2308                               data_size,
2309                               data);
2310     GDS_CLIENTS_process_get_resp (type,
2311                                   xget_path,
2312                                   get_path_length,
2313                                   put_path, put_path_length,
2314                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2315                                   &prm->key,
2316                                   data,
2317                                   data_size);
2318     if (GNUNET_YES == cache_results)
2319     {
2320       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2321
2322       GNUNET_memcpy (xput_path,
2323                      put_path,
2324                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
2325       GNUNET_memcpy (&xput_path[put_path_length],
2326                      xget_path,
2327                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
2328
2329       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2330                                 &prm->key,
2331                                 get_path_length + put_path_length,
2332                                 xput_path,
2333                                 type,
2334                                 data_size,
2335                                 data);
2336     }
2337     /* forward to other peers */
2338     GDS_ROUTING_process (NULL,
2339                          type,
2340                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2341                          &prm->key,
2342                          put_path_length,
2343                          put_path,
2344                          get_path_length,
2345                          xget_path,
2346                          data,
2347                          data_size);
2348   }
2349 }
2350
2351
2352 /**
2353  * Initialize neighbours subsystem.
2354  *
2355  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2356  */
2357 int
2358 GDS_NEIGHBOURS_init ()
2359 {
2360   struct GNUNET_MQ_MessageHandler core_handlers[] = {
2361     GNUNET_MQ_hd_var_size (dht_p2p_get,
2362                            GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2363                            struct PeerGetMessage,
2364                            NULL),
2365     GNUNET_MQ_hd_var_size (dht_p2p_put,
2366                            GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2367                            struct PeerPutMessage,
2368                            NULL),
2369     GNUNET_MQ_hd_var_size (dht_p2p_result,
2370                            GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2371                            struct PeerResultMessage,
2372                            NULL),
2373     GNUNET_MQ_handler_end ()
2374   };
2375   unsigned long long temp_config_num;
2376
2377   disable_try_connect
2378     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2379                                             "DHT",
2380                                             "DISABLE_TRY_CONNECT");
2381   if (GNUNET_OK ==
2382       GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2383                                              "DHT",
2384                                              "bucket_size",
2385                                              &temp_config_num))
2386     bucket_size = (unsigned int) temp_config_num;
2387   cache_results
2388     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2389                                             "DHT",
2390                                             "CACHE_RESULTS");
2391
2392   log_route_details_stderr =
2393     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2394   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2395   core_api = GNUNET_CORE_connect (GDS_cfg,
2396                                   NULL,
2397                                   &core_init,
2398                                   &handle_core_connect,
2399                                   &handle_core_disconnect,
2400                                   core_handlers);
2401   if (NULL == core_api)
2402     return GNUNET_SYSERR;
2403   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2404                                                               GNUNET_YES);
2405   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2406                                                             GNUNET_NO);
2407   return GNUNET_OK;
2408 }
2409
2410
2411 /**
2412  * Shutdown neighbours subsystem.
2413  */
2414 void
2415 GDS_NEIGHBOURS_done ()
2416 {
2417   if (NULL == core_api)
2418     return;
2419   GNUNET_CORE_disconnect (core_api);
2420   core_api = NULL;
2421   GNUNET_assert (0 ==
2422                  GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2423   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2424   all_connected_peers = NULL;
2425   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2426                                          &free_connect_info,
2427                                          NULL);
2428   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2429   all_desired_peers = NULL;
2430   GNUNET_ATS_connectivity_done (ats_ch);
2431   ats_ch = NULL;
2432   GNUNET_assert (NULL == find_peer_task);
2433 }
2434
2435
2436 /**
2437  * Get the ID of the local node.
2438  *
2439  * @return identity of the local node
2440  */
2441 struct GNUNET_PeerIdentity *
2442 GDS_NEIGHBOURS_get_id ()
2443 {
2444   return &my_identity;
2445 }
2446
2447
2448 /* end of gnunet-service-dht_neighbours.c */