support BF size adjustments in other plugins
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_datacache.h"
43 #include "gnunet-service-dht_hello.h"
44 #include "gnunet-service-dht_neighbours.h"
45 #include "gnunet-service-dht_nse.h"
46 #include "gnunet-service-dht_routing.h"
47 #include "dht.h"
48
49 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
50
51 /**
52  * Enable slow sanity checks to debug issues.
53  */
54 #define SANITY_CHECKS 1
55
56 /**
57  * How many buckets will we allow total.
58  */
59 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
60
61 /**
62  * What is the maximum number of peers in a given bucket.
63  */
64 #define DEFAULT_BUCKET_SIZE 8
65
66 /**
67  * Desired replication level for FIND PEER requests
68  */
69 #define FIND_PEER_REPLICATION_LEVEL 4
70
71 /**
72  * Maximum allowed replication level for all requests.
73  */
74 #define MAXIMUM_REPLICATION_LEVEL 16
75
76 /**
77  * Maximum allowed number of pending messages per peer.
78  */
79 #define MAXIMUM_PENDING_PER_PEER 64
80
81 /**
82  * How long at least to wait before sending another find peer request.
83  */
84 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
85
86 /**
87  * How long at most to wait before sending another find peer request.
88  */
89 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
90
91 /**
92  * How long at most to wait for transmission of a GET request to another peer?
93  */
94 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
95
96 /**
97  * Hello address expiration
98  */
99 extern struct GNUNET_TIME_Relative hello_expiration;
100
101
102 GNUNET_NETWORK_STRUCT_BEGIN
103
104 /**
105  * P2P PUT message
106  */
107 struct PeerPutMessage
108 {
109   /**
110    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
111    */
112   struct GNUNET_MessageHeader header;
113
114   /**
115    * Processing options
116    */
117   uint32_t options GNUNET_PACKED;
118
119   /**
120    * Content type.
121    */
122   uint32_t type GNUNET_PACKED;
123
124   /**
125    * Hop count
126    */
127   uint32_t hop_count GNUNET_PACKED;
128
129   /**
130    * Replication level for this message
131    */
132   uint32_t desired_replication_level GNUNET_PACKED;
133
134   /**
135    * Length of the PUT path that follows (if tracked).
136    */
137   uint32_t put_path_length GNUNET_PACKED;
138
139   /**
140    * When does the content expire?
141    */
142   struct GNUNET_TIME_AbsoluteNBO expiration_time;
143
144   /**
145    * Bloomfilter (for peer identities) to stop circular routes
146    */
147   char bloomfilter[DHT_BLOOM_SIZE];
148
149   /**
150    * The key we are storing under.
151    */
152   struct GNUNET_HashCode key;
153
154   /* put path (if tracked) */
155
156   /* Payload */
157
158 };
159
160
161 /**
162  * P2P Result message
163  */
164 struct PeerResultMessage
165 {
166   /**
167    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
168    */
169   struct GNUNET_MessageHeader header;
170
171   /**
172    * Content type.
173    */
174   uint32_t type GNUNET_PACKED;
175
176   /**
177    * Length of the PUT path that follows (if tracked).
178    */
179   uint32_t put_path_length GNUNET_PACKED;
180
181   /**
182    * Length of the GET path that follows (if tracked).
183    */
184   uint32_t get_path_length GNUNET_PACKED;
185
186   /**
187    * When does the content expire?
188    */
189   struct GNUNET_TIME_AbsoluteNBO expiration_time;
190
191   /**
192    * The key of the corresponding GET request.
193    */
194   struct GNUNET_HashCode key;
195
196   /* put path (if tracked) */
197
198   /* get path (if tracked) */
199
200   /* Payload */
201
202 };
203
204
205 /**
206  * P2P GET message
207  */
208 struct PeerGetMessage
209 {
210   /**
211    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
212    */
213   struct GNUNET_MessageHeader header;
214
215   /**
216    * Processing options
217    */
218   uint32_t options GNUNET_PACKED;
219
220   /**
221    * Desired content type.
222    */
223   uint32_t type GNUNET_PACKED;
224
225   /**
226    * Hop count
227    */
228   uint32_t hop_count GNUNET_PACKED;
229
230   /**
231    * Desired replication level for this request.
232    */
233   uint32_t desired_replication_level GNUNET_PACKED;
234
235   /**
236    * Size of the extended query.
237    */
238   uint32_t xquery_size;
239
240   /**
241    * Bloomfilter mutator.
242    */
243   uint32_t bf_mutator;
244
245   /**
246    * Bloomfilter (for peer identities) to stop circular routes
247    */
248   char bloomfilter[DHT_BLOOM_SIZE];
249
250   /**
251    * The key we are looking for.
252    */
253   struct GNUNET_HashCode key;
254
255   /* xquery */
256
257   /* result bloomfilter */
258
259 };
260 GNUNET_NETWORK_STRUCT_END
261
262
263 /**
264  * Entry for a peer in a bucket.
265  */
266 struct PeerInfo
267 {
268   /**
269    * Next peer entry (DLL)
270    */
271   struct PeerInfo *next;
272
273   /**
274    *  Prev peer entry (DLL)
275    */
276   struct PeerInfo *prev;
277
278   /**
279    * Handle for sending messages to this peer.
280    */
281   struct GNUNET_MQ_Handle *mq;
282
283   /**
284    * What is the identity of the peer?
285    */
286   const struct GNUNET_PeerIdentity *id;
287
288   /**
289    * Hash of @e id.
290    */
291   struct GNUNET_HashCode phash;
292
293   /**
294    * Which bucket is this peer in?
295    */
296   int peer_bucket;
297
298 };
299
300
301 /**
302  * Peers are grouped into buckets.
303  */
304 struct PeerBucket
305 {
306   /**
307    * Head of DLL
308    */
309   struct PeerInfo *head;
310
311   /**
312    * Tail of DLL
313    */
314   struct PeerInfo *tail;
315
316   /**
317    * Number of peers in the bucket.
318    */
319   unsigned int peers_size;
320 };
321
322
323 /**
324  * Information about a peer that we would like to connect to.
325  */
326 struct ConnectInfo
327 {
328
329   /**
330    * Handle to active HELLO offer operation, or NULL.
331    */
332   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
333
334   /**
335    * Handle to active connectivity suggestion operation, or NULL.
336    */
337   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
338
339   /**
340    * How much would we like to connect to this peer?
341    */
342   uint32_t strength;
343 };
344
345
346 /**
347  * Do we cache all results that we are routing in the local datacache?
348  */
349 static int cache_results;
350
351 /**
352  * Should routing details be logged to stderr (for debugging)?
353  */
354 static int log_route_details_stderr;
355
356 /**
357  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
358  */
359 static unsigned int closest_bucket;
360
361 /**
362  * How many peers have we added since we sent out our last
363  * find peer request?
364  */
365 static unsigned int newly_found_peers;
366
367 /**
368  * Option for testing that disables the 'connect' function of the DHT.
369  */
370 static int disable_try_connect;
371
372 /**
373  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
374  */
375 static struct PeerBucket k_buckets[MAX_BUCKETS];
376
377 /**
378  * Hash map of all CORE-connected peers, for easy removal from
379  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
380  */
381 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
382
383 /**
384  * Hash map of all peers we would like to be connected to.
385  * Values are of type `struct ConnectInfo`.
386  */
387 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
388
389 /**
390  * Maximum size for each bucket.
391  */
392 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
393
394 /**
395  * Task that sends FIND PEER requests.
396  */
397 static struct GNUNET_SCHEDULER_Task *find_peer_task;
398
399 /**
400  * Identity of this peer.
401  */
402 static struct GNUNET_PeerIdentity my_identity;
403
404 /**
405  * Hash of the identity of this peer.
406  */
407 static struct GNUNET_HashCode my_identity_hash;
408
409 /**
410  * Handle to CORE.
411  */
412 static struct GNUNET_CORE_Handle *core_api;
413
414 /**
415  * Handle to ATS connectivity.
416  */
417 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
418
419
420 /**
421  * Find the optimal bucket for this key.
422  *
423  * @param hc the hashcode to compare our identity to
424  * @return the proper bucket index, or GNUNET_SYSERR
425  *         on error (same hashcode)
426  */
427 static int
428 find_bucket (const struct GNUNET_HashCode *hc)
429 {
430   unsigned int bits;
431
432   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
433   if (bits == MAX_BUCKETS)
434   {
435     /* How can all bits match? Got my own ID? */
436     GNUNET_break (0);
437     return GNUNET_SYSERR;
438   }
439   return MAX_BUCKETS - bits - 1;
440 }
441
442
443 /**
444  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
445  * Clean up the "oh" field in the @a cls
446  *
447  * @param cls a `struct ConnectInfo`
448  */
449 static void
450 offer_hello_done (void *cls)
451 {
452   struct ConnectInfo *ci = cls;
453
454   ci->oh = NULL;
455 }
456
457
458 /**
459  * Function called for all entries in #all_desired_peers to clean up.
460  *
461  * @param cls NULL
462  * @param peer peer the entry is for
463  * @param value the value to remove
464  * @return #GNUNET_YES
465  */
466 static int
467 free_connect_info (void *cls,
468                    const struct GNUNET_PeerIdentity *peer,
469                    void *value)
470 {
471   struct ConnectInfo *ci = value;
472
473   GNUNET_assert (GNUNET_YES ==
474                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
475                                                        peer,
476                                                        ci));
477   if (NULL != ci->sh)
478   {
479     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
480     ci->sh = NULL;
481   }
482   if (NULL != ci->oh)
483   {
484     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
485     ci->oh = NULL;
486   }
487   GNUNET_free (ci);
488   return GNUNET_YES;
489 }
490
491
492 /**
493  * Consider if we want to connect to a given peer, and if so
494  * let ATS know.  If applicable, the HELLO is offered to the
495  * TRANSPORT service.
496  *
497  * @param pid peer to consider connectivity requirements for
498  * @param h a HELLO message, or NULL
499  */
500 static void
501 try_connect (const struct GNUNET_PeerIdentity *pid,
502              const struct GNUNET_MessageHeader *h)
503 {
504   int bucket;
505   struct GNUNET_HashCode pid_hash;
506   struct ConnectInfo *ci;
507   uint32_t strength;
508
509   GNUNET_CRYPTO_hash (pid,
510                       sizeof (struct GNUNET_PeerIdentity),
511                       &pid_hash);
512   bucket = find_bucket (&pid_hash);
513   if (bucket < 0)
514     return; /* self? */
515   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
516                                           pid);
517
518   if (k_buckets[bucket].peers_size < bucket_size)
519     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
520   else
521     strength = bucket; /* minimum value of connectivity */
522   if (GNUNET_YES ==
523       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
524                                               pid))
525     strength *= 2; /* double for connected peers */
526   else if (k_buckets[bucket].peers_size > bucket_size)
527     strength = 0; /* bucket full, we really do not care about more */
528
529   if ( (0 == strength) &&
530        (NULL != ci) )
531   {
532     /* release request */
533     GNUNET_assert (GNUNET_YES ==
534                    free_connect_info (NULL,
535                                       pid,
536                                       ci));
537     return;
538   }
539   if (NULL == ci)
540   {
541     ci = GNUNET_new (struct ConnectInfo);
542     GNUNET_assert (GNUNET_OK ==
543                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
544                                                       pid,
545                                                       ci,
546                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
547   }
548   if ( (NULL != ci->oh) &&
549        (NULL != h) )
550     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
551   if (NULL != h)
552     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
553                                            h,
554                                            &offer_hello_done,
555                                            ci);
556   if ( (NULL != ci->sh) &&
557        (ci->strength != strength) )
558     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
559   if (ci->strength != strength)
560     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
561                                               pid,
562                                               strength);
563   ci->strength = strength;
564 }
565
566
567 /**
568  * Function called for each peer in #all_desired_peers during
569  * #update_connect_preferences() if we have reason to adjust
570  * the strength of our desire to keep connections to certain
571  * peers.  Calls #try_connect() to update the calculations for
572  * the given @a pid.
573  *
574  * @param cls NULL
575  * @param pid peer to update
576  * @param value unused
577  * @return #GNUNET_YES (continue to iterate)
578  */
579 static int
580 update_desire_strength (void *cls,
581                         const struct GNUNET_PeerIdentity *pid,
582                         void *value)
583 {
584   try_connect (pid, NULL);
585   return GNUNET_YES;
586 }
587
588
589 /**
590  * Update our preferences for connectivity as given to ATS.
591  *
592  * @param cls the `struct PeerInfo` of the peer
593  * @param tc scheduler context.
594  */
595 static void
596 update_connect_preferences ()
597 {
598   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
599                                          &update_desire_strength,
600                                          NULL);
601 }
602
603
604 /**
605  * Add each of the peers we already know to the bloom filter of
606  * the request so that we don't get duplicate HELLOs.
607  *
608  * @param cls the `struct GNUNET_BLOCK_Group`
609  * @param key peer identity to add to the bloom filter
610  * @param value value the peer information (unused)
611  * @return #GNUNET_YES (we should continue to iterate)
612  */
613 static int
614 add_known_to_bloom (void *cls,
615                     const struct GNUNET_PeerIdentity *key,
616                     void *value)
617 {
618   struct GNUNET_BLOCK_Group *bg = cls;
619   struct GNUNET_HashCode key_hash;
620
621   GNUNET_CRYPTO_hash (key,
622                       sizeof (struct GNUNET_PeerIdentity),
623                       &key_hash);
624   GNUNET_BLOCK_group_set_seen (bg,
625                                &key_hash,
626                                1);
627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628               "Adding known peer (%s) to bloomfilter for FIND PEER\n",
629               GNUNET_i2s (key));
630   return GNUNET_YES;
631 }
632
633
634 /**
635  * Task to send a find peer message for our own peer identifier
636  * so that we can find the closest peers in the network to ourselves
637  * and attempt to connect to them.
638  *
639  * @param cls closure for this task
640  */
641 static void
642 send_find_peer_message (void *cls)
643 {
644   struct GNUNET_TIME_Relative next_send_time;
645   struct GNUNET_BLOCK_Group *bg;
646   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
647
648   find_peer_task = NULL;
649   if (newly_found_peers > bucket_size)
650   {
651     /* If we are finding many peers already, no need to send out our request right now! */
652     find_peer_task =
653         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
654                                       &send_find_peer_message,
655                                       NULL);
656     newly_found_peers = 0;
657     return;
658   }
659   bg = GNUNET_BLOCK_group_create (GDS_block_context,
660                                   GNUNET_BLOCK_TYPE_DHT_HELLO,
661                                   GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
662                                                             UINT32_MAX),
663                                   NULL,
664                                   0,
665                                   "filter-size",
666                                   DHT_BLOOM_SIZE,
667                                   NULL);
668   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
669                                          &add_known_to_bloom,
670                                          bg);
671   GNUNET_STATISTICS_update (GDS_stats,
672                             gettext_noop ("# FIND PEER messages initiated"),
673                             1,
674                             GNUNET_NO);
675   peer_bf
676     = GNUNET_CONTAINER_bloomfilter_init (NULL,
677                                          DHT_BLOOM_SIZE,
678                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
679   // FIXME: pass priority!?
680   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
681                              GNUNET_DHT_RO_FIND_PEER,
682                              FIND_PEER_REPLICATION_LEVEL,
683                              0,
684                              &my_identity_hash,
685                              NULL,
686                              0,
687                              bg,
688                              peer_bf);
689   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
690   GNUNET_BLOCK_group_destroy (bg);
691   /* schedule next round */
692   next_send_time.rel_value_us =
693       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
694       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
695                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
696                                 (newly_found_peers + 1));
697   newly_found_peers = 0;
698   GNUNET_assert (NULL == find_peer_task);
699   find_peer_task =
700       GNUNET_SCHEDULER_add_delayed (next_send_time,
701                                     &send_find_peer_message,
702                                     NULL);
703 }
704
705
706 /**
707  * Method called whenever a peer connects.
708  *
709  * @param cls closure
710  * @param peer peer identity this notification is about
711  * @param mq message queue for sending messages to @a peer
712  * @return our `struct PeerInfo` for @a peer
713  */
714 static void *
715 handle_core_connect (void *cls,
716                      const struct GNUNET_PeerIdentity *peer,
717                      struct GNUNET_MQ_Handle *mq)
718 {
719   struct PeerInfo *pi;
720
721   /* Check for connect to self message */
722   if (0 == memcmp (&my_identity,
723                    peer,
724                    sizeof (struct GNUNET_PeerIdentity)))
725     return NULL;
726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727               "Connected to %s\n",
728               GNUNET_i2s (peer));
729   GNUNET_assert (GNUNET_NO ==
730                  GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
731                                                     peer));
732   GNUNET_STATISTICS_update (GDS_stats,
733                             gettext_noop ("# peers connected"),
734                             1,
735                             GNUNET_NO);
736   pi = GNUNET_new (struct PeerInfo);
737   pi->id = peer;
738   pi->mq = mq;
739   GNUNET_CRYPTO_hash (peer,
740                       sizeof (struct GNUNET_PeerIdentity),
741                       &pi->phash);
742   pi->peer_bucket = find_bucket (&pi->phash);
743   GNUNET_assert ( (pi->peer_bucket >= 0) &&
744                   (pi->peer_bucket < MAX_BUCKETS) );
745   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
746                                     k_buckets[pi->peer_bucket].tail,
747                                     pi);
748   k_buckets[pi->peer_bucket].peers_size++;
749   closest_bucket = GNUNET_MAX (closest_bucket,
750                                pi->peer_bucket);
751   GNUNET_assert (GNUNET_OK ==
752                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
753                                                     pi->id,
754                                                     pi,
755                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
756   if ( (pi->peer_bucket > 0) &&
757        (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
758   {
759     update_connect_preferences ();
760     newly_found_peers++;
761   }
762   if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
763        (GNUNET_YES != disable_try_connect))
764   {
765     /* got a first connection, good time to start with FIND PEER requests... */
766     GNUNET_assert (NULL == find_peer_task);
767     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
768                                                NULL);
769   }
770   return pi;
771 }
772
773
774 /**
775  * Method called whenever a peer disconnects.
776  *
777  * @param cls closure
778  * @param peer peer identity this notification is about
779  * @param internal_cls our `struct PeerInfo` for @a peer
780  */
781 static void
782 handle_core_disconnect (void *cls,
783                         const struct GNUNET_PeerIdentity *peer,
784                         void *internal_cls)
785 {
786   struct PeerInfo *to_remove = internal_cls;
787
788   /* Check for disconnect from self message */
789   if (NULL == to_remove)
790     return;
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "Disconnected %s\n",
793               GNUNET_i2s (peer));
794   GNUNET_STATISTICS_update (GDS_stats,
795                             gettext_noop ("# peers connected"),
796                             -1,
797                             GNUNET_NO);
798   GNUNET_assert (GNUNET_YES ==
799                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
800                                                        peer,
801                                                        to_remove));
802   if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
803        (GNUNET_YES != disable_try_connect) )
804   {
805     GNUNET_SCHEDULER_cancel (find_peer_task);
806     find_peer_task = NULL;
807   }
808   GNUNET_assert (to_remove->peer_bucket >= 0);
809   GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
810                                k_buckets[to_remove->peer_bucket].tail,
811                                to_remove);
812   GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
813   k_buckets[to_remove->peer_bucket].peers_size--;
814   while ( (closest_bucket > 0) &&
815           (0 == k_buckets[to_remove->peer_bucket].peers_size) )
816     closest_bucket--;
817   if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
818     update_connect_preferences ();
819   GNUNET_free (to_remove);
820 }
821
822
823 /**
824  * To how many peers should we (on average) forward the request to
825  * obtain the desired target_replication count (on average).
826  *
827  * @param hop_count number of hops the message has traversed
828  * @param target_replication the number of total paths desired
829  * @return Some number of peers to forward the message to
830  */
831 static unsigned int
832 get_forward_count (uint32_t hop_count,
833                    uint32_t target_replication)
834 {
835   uint32_t random_value;
836   uint32_t forward_count;
837   float target_value;
838
839   if (hop_count > GDS_NSE_get () * 4.0)
840   {
841     /* forcefully terminate */
842     GNUNET_STATISTICS_update (GDS_stats,
843                               gettext_noop ("# requests TTL-dropped"),
844                               1, GNUNET_NO);
845     return 0;
846   }
847   if (hop_count > GDS_NSE_get () * 2.0)
848   {
849     /* Once we have reached our ideal number of hops, only forward to 1 peer */
850     return 1;
851   }
852   /* bound by system-wide maximum */
853   target_replication =
854       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
855   target_value =
856       1 + (target_replication - 1.0) / (GDS_NSE_get () +
857                                         ((float) (target_replication - 1.0) *
858                                          hop_count));
859   /* Set forward count to floor of target_value */
860   forward_count = (uint32_t) target_value;
861   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
862   target_value = target_value - forward_count;
863   random_value =
864       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
865   if (random_value < (target_value * UINT32_MAX))
866     forward_count++;
867   return forward_count;
868 }
869
870
871 /**
872  * Compute the distance between have and target as a 32-bit value.
873  * Differences in the lower bits must count stronger than differences
874  * in the higher bits.
875  *
876  * @param target
877  * @param have
878  * @return 0 if have==target, otherwise a number
879  *           that is larger as the distance between
880  *           the two hash codes increases
881  */
882 static unsigned int
883 get_distance (const struct GNUNET_HashCode *target,
884               const struct GNUNET_HashCode *have)
885 {
886   unsigned int bucket;
887   unsigned int msb;
888   unsigned int lsb;
889   unsigned int i;
890
891   /* We have to represent the distance between two 2^9 (=512)-bit
892    * numbers as a 2^5 (=32)-bit number with "0" being used for the
893    * two numbers being identical; furthermore, we need to
894    * guarantee that a difference in the number of matching
895    * bits is always represented in the result.
896    *
897    * We use 2^32/2^9 numerical values to distinguish between
898    * hash codes that have the same LSB bit distance and
899    * use the highest 2^9 bits of the result to signify the
900    * number of (mis)matching LSB bits; if we have 0 matching
901    * and hence 512 mismatching LSB bits we return -1 (since
902    * 512 itself cannot be represented with 9 bits) */
903
904   /* first, calculate the most significant 9 bits of our
905    * result, aka the number of LSBs */
906   bucket = GNUNET_CRYPTO_hash_matching_bits (target,
907                                              have);
908   /* bucket is now a value between 0 and 512 */
909   if (bucket == 512)
910     return 0;                   /* perfect match */
911   if (bucket == 0)
912     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
913                                  * below, we'd end up with max+1 (overflow)) */
914
915   /* calculate the most significant bits of the final result */
916   msb = (512 - bucket) << (32 - 9);
917   /* calculate the 32-9 least significant bits of the final result by
918    * looking at the differences in the 32-9 bits following the
919    * mismatching bit at 'bucket' */
920   lsb = 0;
921   for (i = bucket + 1;
922        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
923   {
924     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
925         GNUNET_CRYPTO_hash_get_bit (have, i))
926       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
927                                                  * last bit set will be 31 -- if
928                                                  * i does not reach 512 first... */
929   }
930   return msb | lsb;
931 }
932
933
934 /**
935  * Check whether my identity is closer than any known peers.  If a
936  * non-null bloomfilter is given, check if this is the closest peer
937  * that hasn't already been routed to.
938  *
939  * @param key hash code to check closeness to
940  * @param bloom bloomfilter, exclude these entries from the decision
941  * @return #GNUNET_YES if node location is closest,
942  *         #GNUNET_NO otherwise.
943  */
944 static int
945 am_closest_peer (const struct GNUNET_HashCode *key,
946                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
947 {
948   int bits;
949   int other_bits;
950   int bucket_num;
951   int count;
952   struct PeerInfo *pos;
953
954   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
955     return GNUNET_YES;
956   bucket_num = find_bucket (key);
957   GNUNET_assert (bucket_num >= 0);
958   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
959                                            key);
960   pos = k_buckets[bucket_num].head;
961   count = 0;
962   while ((NULL != pos) && (count < bucket_size))
963   {
964     if ((NULL != bloom) &&
965         (GNUNET_YES ==
966          GNUNET_CONTAINER_bloomfilter_test (bloom,
967                                             &pos->phash)))
968     {
969       pos = pos->next;
970       continue;                 /* Skip already checked entries */
971     }
972     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
973                                                    key);
974     if (other_bits > bits)
975       return GNUNET_NO;
976     if (other_bits == bits)     /* We match the same number of bits */
977       return GNUNET_YES;
978     pos = pos->next;
979   }
980   /* No peers closer, we are the closest! */
981   return GNUNET_YES;
982 }
983
984
985 /**
986  * Select a peer from the routing table that would be a good routing
987  * destination for sending a message for "key".  The resulting peer
988  * must not be in the set of blocked peers.<p>
989  *
990  * Note that we should not ALWAYS select the closest peer to the
991  * target, peers further away from the target should be chosen with
992  * exponentially declining probability.
993  *
994  * FIXME: double-check that this is fine
995  *
996  *
997  * @param key the key we are selecting a peer to route to
998  * @param bloom a bloomfilter containing entries this request has seen already
999  * @param hops how many hops has this message traversed thus far
1000  * @return Peer to route to, or NULL on error
1001  */
1002 static struct PeerInfo *
1003 select_peer (const struct GNUNET_HashCode *key,
1004              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1005              uint32_t hops)
1006 {
1007   unsigned int bc;
1008   unsigned int count;
1009   unsigned int selected;
1010   struct PeerInfo *pos;
1011   unsigned int dist;
1012   unsigned int smallest_distance;
1013   struct PeerInfo *chosen;
1014
1015   if (hops >= GDS_NSE_get ())
1016   {
1017     /* greedy selection (closest peer that is not in bloomfilter) */
1018     smallest_distance = UINT_MAX;
1019     chosen = NULL;
1020     for (bc = 0; bc <= closest_bucket; bc++)
1021     {
1022       pos = k_buckets[bc].head;
1023       count = 0;
1024       while ((pos != NULL) && (count < bucket_size))
1025       {
1026         if ((bloom == NULL) ||
1027             (GNUNET_NO ==
1028              GNUNET_CONTAINER_bloomfilter_test (bloom,
1029                                                 &pos->phash)))
1030         {
1031           dist = get_distance (key,
1032                                &pos->phash);
1033           if (dist < smallest_distance)
1034           {
1035             chosen = pos;
1036             smallest_distance = dist;
1037           }
1038         }
1039         else
1040         {
1041           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1043                       GNUNET_i2s (pos->id),
1044                       GNUNET_h2s (key));
1045           GNUNET_STATISTICS_update (GDS_stats,
1046                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"),
1047                                     1,
1048                                     GNUNET_NO);
1049           dist = get_distance (key,
1050                                &pos->phash);
1051           if (dist < smallest_distance)
1052           {
1053             chosen = NULL;
1054             smallest_distance = dist;
1055           }
1056         }
1057         count++;
1058         pos = pos->next;
1059       }
1060     }
1061     if (NULL == chosen)
1062       GNUNET_STATISTICS_update (GDS_stats,
1063                                 gettext_noop ("# Peer selection failed"), 1,
1064                                 GNUNET_NO);
1065     return chosen;
1066   }
1067
1068   /* select "random" peer */
1069   /* count number of peers that are available and not filtered */
1070   count = 0;
1071   for (bc = 0; bc <= closest_bucket; bc++)
1072   {
1073     pos = k_buckets[bc].head;
1074     while ((pos != NULL) && (count < bucket_size))
1075     {
1076       if ((bloom != NULL) &&
1077           (GNUNET_YES ==
1078            GNUNET_CONTAINER_bloomfilter_test (bloom,
1079                                               &pos->phash)))
1080       {
1081         GNUNET_STATISTICS_update (GDS_stats,
1082                                   gettext_noop
1083                                   ("# Peers excluded from routing due to Bloomfilter"),
1084                                   1, GNUNET_NO);
1085         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1087                     GNUNET_i2s (pos->id),
1088                     GNUNET_h2s (key));
1089         pos = pos->next;
1090         continue;               /* Ignore bloomfiltered peers */
1091       }
1092       count++;
1093       pos = pos->next;
1094     }
1095   }
1096   if (0 == count)               /* No peers to select from! */
1097   {
1098     GNUNET_STATISTICS_update (GDS_stats,
1099                               gettext_noop ("# Peer selection failed"), 1,
1100                               GNUNET_NO);
1101     return NULL;
1102   }
1103   /* Now actually choose a peer */
1104   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1105                                        count);
1106   count = 0;
1107   for (bc = 0; bc <= closest_bucket; bc++)
1108   {
1109     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1110     {
1111       if ((bloom != NULL) &&
1112           (GNUNET_YES ==
1113            GNUNET_CONTAINER_bloomfilter_test (bloom,
1114                                               &pos->phash)))
1115       {
1116         continue;               /* Ignore bloomfiltered peers */
1117       }
1118       if (0 == selected--)
1119         return pos;
1120     }
1121   }
1122   GNUNET_break (0);
1123   return NULL;
1124 }
1125
1126
1127 /**
1128  * Compute the set of peers that the given request should be
1129  * forwarded to.
1130  *
1131  * @param key routing key
1132  * @param bloom bloom filter excluding peers as targets, all selected
1133  *        peers will be added to the bloom filter
1134  * @param hop_count number of hops the request has traversed so far
1135  * @param target_replication desired number of replicas
1136  * @param targets where to store an array of target peers (to be
1137  *         free'd by the caller)
1138  * @return number of peers returned in 'targets'.
1139  */
1140 static unsigned int
1141 get_target_peers (const struct GNUNET_HashCode *key,
1142                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1143                   uint32_t hop_count,
1144                   uint32_t target_replication,
1145                   struct PeerInfo ***targets)
1146 {
1147   unsigned int ret;
1148   unsigned int off;
1149   struct PeerInfo **rtargets;
1150   struct PeerInfo *nxt;
1151
1152   GNUNET_assert (NULL != bloom);
1153   ret = get_forward_count (hop_count,
1154                            target_replication);
1155   if (0 == ret)
1156   {
1157     *targets = NULL;
1158     return 0;
1159   }
1160   rtargets = GNUNET_new_array (ret,
1161                                struct PeerInfo *);
1162   for (off = 0; off < ret; off++)
1163   {
1164     nxt = select_peer (key, bloom, hop_count);
1165     if (NULL == nxt)
1166       break;
1167     rtargets[off] = nxt;
1168     GNUNET_break (GNUNET_NO ==
1169                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1170                                                      &nxt->phash));
1171     GNUNET_CONTAINER_bloomfilter_add (bloom,
1172                                       &nxt->phash);
1173   }
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1176               off,
1177               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1178               (unsigned int) hop_count,
1179               GNUNET_h2s (key),
1180               ret);
1181   if (0 == off)
1182   {
1183     GNUNET_free (rtargets);
1184     *targets = NULL;
1185     return 0;
1186   }
1187   *targets = rtargets;
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1190               GNUNET_h2s (key),
1191               off,
1192               ret);
1193   return off;
1194 }
1195
1196
1197 /**
1198  * Perform a PUT operation.   Forwards the given request to other
1199  * peers.   Does not store the data locally.  Does not give the
1200  * data to local clients.  May do nothing if this is the only
1201  * peer in the network (or if we are the closest peer in the
1202  * network).
1203  *
1204  * @param type type of the block
1205  * @param options routing options
1206  * @param desired_replication_level desired replication count
1207  * @param expiration_time when does the content expire
1208  * @param hop_count how many hops has this message traversed so far
1209  * @param bf Bloom filter of peers this PUT has already traversed
1210  * @param key key for the content
1211  * @param put_path_length number of entries in @a put_path
1212  * @param put_path peers this request has traversed so far (if tracked)
1213  * @param data payload to store
1214  * @param data_size number of bytes in @a data
1215  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1216  */
1217 int
1218 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1219                            enum GNUNET_DHT_RouteOption options,
1220                            uint32_t desired_replication_level,
1221                            struct GNUNET_TIME_Absolute expiration_time,
1222                            uint32_t hop_count,
1223                            struct GNUNET_CONTAINER_BloomFilter *bf,
1224                            const struct GNUNET_HashCode *key,
1225                            unsigned int put_path_length,
1226                            struct GNUNET_PeerIdentity *put_path,
1227                            const void *data,
1228                            size_t data_size)
1229 {
1230   unsigned int target_count;
1231   unsigned int i;
1232   struct PeerInfo **targets;
1233   struct PeerInfo *target;
1234   size_t msize;
1235   struct GNUNET_MQ_Envelope *env;
1236   struct PeerPutMessage *ppm;
1237   struct GNUNET_PeerIdentity *pp;
1238   unsigned int skip_count;
1239
1240   GNUNET_assert (NULL != bf);
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242               "Adding myself (%s) to PUT bloomfilter for %s\n",
1243               GNUNET_i2s (&my_identity),
1244               GNUNET_h2s (key));
1245   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1246   GNUNET_STATISTICS_update (GDS_stats,
1247                             gettext_noop ("# PUT requests routed"),
1248                             1,
1249                             GNUNET_NO);
1250   target_count
1251     = get_target_peers (key,
1252                         bf,
1253                         hop_count,
1254                         desired_replication_level,
1255                         &targets);
1256   if (0 == target_count)
1257   {
1258     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259                 "Routing PUT for %s terminates after %u hops at %s\n",
1260                 GNUNET_h2s (key),
1261                 (unsigned int) hop_count,
1262                 GNUNET_i2s (&my_identity));
1263     return GNUNET_NO;
1264   }
1265   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size;
1266   if (msize + sizeof (struct PeerPutMessage)
1267       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1268   {
1269     put_path_length = 0;
1270     msize = data_size;
1271   }
1272   if (msize + sizeof (struct PeerPutMessage)
1273       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1274   {
1275     GNUNET_break (0);
1276     GNUNET_free (targets);
1277     return GNUNET_NO;
1278   }
1279   GNUNET_STATISTICS_update (GDS_stats,
1280                             gettext_noop ("# PUT messages queued for transmission"),
1281                             target_count,
1282                             GNUNET_NO);
1283   skip_count = 0;
1284   for (i = 0; i < target_count; i++)
1285   {
1286     target = targets[i];
1287     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1288     {
1289       /* skip */
1290       GNUNET_STATISTICS_update (GDS_stats,
1291                                 gettext_noop ("# P2P messages dropped due to full queue"),
1292                                 1,
1293                                 GNUNET_NO);
1294       skip_count++;
1295       continue;
1296     }
1297     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298                 "Routing PUT for %s after %u hops to %s\n",
1299                 GNUNET_h2s (key),
1300                 (unsigned int) hop_count,
1301                 GNUNET_i2s (target->id));
1302     env = GNUNET_MQ_msg_extra (ppm,
1303                                msize,
1304                                GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1305     ppm->options = htonl (options);
1306     ppm->type = htonl (type);
1307     ppm->hop_count = htonl (hop_count + 1);
1308     ppm->desired_replication_level = htonl (desired_replication_level);
1309     ppm->put_path_length = htonl (put_path_length);
1310     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1311     GNUNET_break (GNUNET_YES ==
1312                   GNUNET_CONTAINER_bloomfilter_test (bf,
1313                                                      &target->phash));
1314     GNUNET_assert (GNUNET_OK ==
1315                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1316                                                               ppm->bloomfilter,
1317                                                               DHT_BLOOM_SIZE));
1318     ppm->key = *key;
1319     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1320     GNUNET_memcpy (pp,
1321                    put_path,
1322                    sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1323     GNUNET_memcpy (&pp[put_path_length],
1324                    data,
1325                    data_size);
1326     GNUNET_MQ_send (target->mq,
1327                     env);
1328   }
1329   GNUNET_free (targets);
1330   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1331 }
1332
1333
1334 /**
1335  * Perform a GET operation.  Forwards the given request to other
1336  * peers.  Does not lookup the key locally.  May do nothing if this is
1337  * the only peer in the network (or if we are the closest peer in the
1338  * network).
1339  *
1340  * @param type type of the block
1341  * @param options routing options
1342  * @param desired_replication_level desired replication count
1343  * @param hop_count how many hops did this request traverse so far?
1344  * @param key key for the content
1345  * @param xquery extended query
1346  * @param xquery_size number of bytes in @a xquery
1347  * @param bg group to use for filtering replies
1348  * @param peer_bf filter for peers not to select (again)
1349  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1350  */
1351 int
1352 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1353                            enum GNUNET_DHT_RouteOption options,
1354                            uint32_t desired_replication_level,
1355                            uint32_t hop_count,
1356                            const struct GNUNET_HashCode *key,
1357                            const void *xquery,
1358                            size_t xquery_size,
1359                            struct GNUNET_BLOCK_Group *bg,
1360                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1361 {
1362   unsigned int target_count;
1363   struct PeerInfo **targets;
1364   struct PeerInfo *target;
1365   struct GNUNET_MQ_Envelope *env;
1366   size_t msize;
1367   struct PeerGetMessage *pgm;
1368   char *xq;
1369   size_t reply_bf_size;
1370   void *reply_bf;
1371   unsigned int skip_count;
1372   uint32_t bf_nonce;
1373
1374   GNUNET_assert (NULL != peer_bf);
1375   GNUNET_STATISTICS_update (GDS_stats,
1376                             gettext_noop ("# GET requests routed"),
1377                             1,
1378                             GNUNET_NO);
1379   target_count = get_target_peers (key,
1380                                    peer_bf,
1381                                    hop_count,
1382                                    desired_replication_level,
1383                                    &targets);
1384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1385               "Adding myself (%s) to GET bloomfilter for %s\n",
1386               GNUNET_i2s (&my_identity),
1387               GNUNET_h2s (key));
1388   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1389                                     &my_identity_hash);
1390   if (0 == target_count)
1391   {
1392     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393                 "Routing GET for %s terminates after %u hops at %s\n",
1394                 GNUNET_h2s (key),
1395                 (unsigned int) hop_count,
1396                 GNUNET_i2s (&my_identity));
1397     return GNUNET_NO;
1398   }
1399   if (GNUNET_OK !=
1400       GNUNET_BLOCK_group_serialize (bg,
1401                                     &bf_nonce,
1402                                     &reply_bf,
1403                                     &reply_bf_size))
1404   {
1405     reply_bf = NULL;
1406     reply_bf_size = 0;
1407     bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1408                                          UINT32_MAX);
1409   }
1410   msize = xquery_size + reply_bf_size;
1411   if (msize + sizeof (struct PeerGetMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1412   {
1413     GNUNET_break (0);
1414     GNUNET_free_non_null (reply_bf);
1415     GNUNET_free (targets);
1416     return GNUNET_NO;
1417   }
1418   GNUNET_STATISTICS_update (GDS_stats,
1419                             gettext_noop ("# GET messages queued for transmission"),
1420                             target_count,
1421                             GNUNET_NO);
1422   /* forward request */
1423   skip_count = 0;
1424   for (unsigned int i = 0; i < target_count; i++)
1425   {
1426     target = targets[i];
1427     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1428     {
1429       /* skip */
1430       GNUNET_STATISTICS_update (GDS_stats,
1431                                 gettext_noop ("# P2P messages dropped due to full queue"),
1432                                 1, GNUNET_NO);
1433       skip_count++;
1434       continue;
1435     }
1436     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437                 "Routing GET for %s after %u hops to %s\n",
1438                 GNUNET_h2s (key),
1439                 (unsigned int) hop_count,
1440                 GNUNET_i2s (target->id));
1441     env = GNUNET_MQ_msg_extra (pgm,
1442                                msize,
1443                                GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1444     pgm->options = htonl (options);
1445     pgm->type = htonl (type);
1446     pgm->hop_count = htonl (hop_count + 1);
1447     pgm->desired_replication_level = htonl (desired_replication_level);
1448     pgm->xquery_size = htonl (xquery_size);
1449     pgm->bf_mutator = bf_nonce;
1450     GNUNET_break (GNUNET_YES ==
1451                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1452                                                      &target->phash));
1453     GNUNET_assert (GNUNET_OK ==
1454                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1455                                                               pgm->bloomfilter,
1456                                                               DHT_BLOOM_SIZE));
1457     pgm->key = *key;
1458     xq = (char *) &pgm[1];
1459     GNUNET_memcpy (xq,
1460                    xquery,
1461                    xquery_size);
1462     GNUNET_memcpy (&xq[xquery_size],
1463                    reply_bf,
1464                    reply_bf_size);
1465     GNUNET_MQ_send (target->mq,
1466                     env);
1467   }
1468   GNUNET_free (targets);
1469   GNUNET_free_non_null (reply_bf);
1470   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1471 }
1472
1473
1474 /**
1475  * Handle a reply (route to origin).  Only forwards the reply back to
1476  * the given peer.  Does not do local caching or forwarding to local
1477  * clients.
1478  *
1479  * @param target neighbour that should receive the block (if still connected)
1480  * @param type type of the block
1481  * @param expiration_time when does the content expire
1482  * @param key key for the content
1483  * @param put_path_length number of entries in @a put_path
1484  * @param put_path peers the original PUT traversed (if tracked)
1485  * @param get_path_length number of entries in @a get_path
1486  * @param get_path peers this reply has traversed so far (if tracked)
1487  * @param data payload of the reply
1488  * @param data_size number of bytes in @a data
1489  */
1490 void
1491 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1492                              enum GNUNET_BLOCK_Type type,
1493                              struct GNUNET_TIME_Absolute expiration_time,
1494                              const struct GNUNET_HashCode *key,
1495                              unsigned int put_path_length,
1496                              const struct GNUNET_PeerIdentity *put_path,
1497                              unsigned int get_path_length,
1498                              const struct GNUNET_PeerIdentity *get_path,
1499                              const void *data,
1500                              size_t data_size)
1501 {
1502   struct PeerInfo *pi;
1503   struct GNUNET_MQ_Envelope *env;
1504   size_t msize;
1505   struct PeerResultMessage *prm;
1506   struct GNUNET_PeerIdentity *paths;
1507
1508   msize = data_size + (get_path_length + put_path_length) *
1509       sizeof (struct GNUNET_PeerIdentity);
1510   if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1511       (get_path_length >
1512        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1513       (put_path_length >
1514        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1515       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1516   {
1517     GNUNET_break (0);
1518     return;
1519   }
1520   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1521                                           target);
1522   if (NULL == pi)
1523   {
1524     /* peer disconnected in the meantime, drop reply */
1525     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526                 "No matching peer for reply for key %s\n",
1527                 GNUNET_h2s (key));
1528     return;
1529   }
1530   if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1531   {
1532     /* skip */
1533     GNUNET_STATISTICS_update (GDS_stats,
1534                               gettext_noop ("# P2P messages dropped due to full queue"),
1535                               1,
1536                               GNUNET_NO);
1537     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538                 "Peer queue full, ignoring reply for key %s\n",
1539                 GNUNET_h2s (key));
1540     return;
1541   }
1542
1543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1544               "Forwarding reply for key %s to peer %s\n",
1545               GNUNET_h2s (key),
1546               GNUNET_i2s (target));
1547   GNUNET_STATISTICS_update (GDS_stats,
1548                             gettext_noop
1549                             ("# RESULT messages queued for transmission"), 1,
1550                             GNUNET_NO);
1551   env = GNUNET_MQ_msg_extra (prm,
1552                              msize,
1553                              GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1554   prm->type = htonl (type);
1555   prm->put_path_length = htonl (put_path_length);
1556   prm->get_path_length = htonl (get_path_length);
1557   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1558   prm->key = *key;
1559   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1560   GNUNET_memcpy (paths,
1561                  put_path,
1562                  put_path_length * sizeof (struct GNUNET_PeerIdentity));
1563   GNUNET_memcpy (&paths[put_path_length],
1564                  get_path,
1565                  get_path_length * sizeof (struct GNUNET_PeerIdentity));
1566   GNUNET_memcpy (&paths[put_path_length + get_path_length],
1567                  data,
1568                  data_size);
1569   GNUNET_MQ_send (pi->mq,
1570                   env);
1571 }
1572
1573
1574 /**
1575  * To be called on core init/fail.
1576  *
1577  * @param cls service closure
1578  * @param identity the public identity of this peer
1579  */
1580 static void
1581 core_init (void *cls,
1582            const struct GNUNET_PeerIdentity *identity)
1583 {
1584   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1585               "CORE called, I am %s\n",
1586               GNUNET_i2s (identity));
1587   my_identity = *identity;
1588   GNUNET_CRYPTO_hash (identity,
1589                       sizeof (struct GNUNET_PeerIdentity),
1590                       &my_identity_hash);
1591   GNUNET_SERVICE_resume (GDS_service);
1592 }
1593
1594
1595 /**
1596  * Check validity of a p2p put request.
1597  *
1598  * @param cls closure with the `struct PeerInfo` of the sender
1599  * @param message message
1600  * @return #GNUNET_OK if the message is valid
1601  */
1602 static int
1603 check_dht_p2p_put (void *cls,
1604                    const struct PeerPutMessage *put)
1605 {
1606   uint32_t putlen;
1607   uint16_t msize;
1608
1609   msize = ntohs (put->header.size);
1610   putlen = ntohl (put->put_path_length);
1611   if ((msize <
1612        sizeof (struct PeerPutMessage) +
1613        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1614       (putlen >
1615        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1616   {
1617     GNUNET_break_op (0);
1618     return GNUNET_SYSERR;
1619   }
1620   return GNUNET_OK;
1621 }
1622
1623
1624 /**
1625  * Core handler for p2p put requests.
1626  *
1627  * @param cls closure with the `struct PeerInfo` of the sender
1628  * @param message message
1629  */
1630 static void
1631 handle_dht_p2p_put (void *cls,
1632                     const struct PeerPutMessage *put)
1633 {
1634   struct PeerInfo *peer = cls;
1635   const struct GNUNET_PeerIdentity *put_path;
1636   const void *payload;
1637   uint32_t putlen;
1638   uint16_t msize;
1639   size_t payload_size;
1640   enum GNUNET_DHT_RouteOption options;
1641   struct GNUNET_CONTAINER_BloomFilter *bf;
1642   struct GNUNET_HashCode test_key;
1643   int forwarded;
1644
1645   msize = ntohs (put->header.size);
1646   putlen = ntohl (put->put_path_length);
1647   GNUNET_STATISTICS_update (GDS_stats,
1648                             gettext_noop ("# P2P PUT requests received"),
1649                             1,
1650                             GNUNET_NO);
1651   GNUNET_STATISTICS_update (GDS_stats,
1652                             gettext_noop ("# P2P PUT bytes received"),
1653                             msize,
1654                             GNUNET_NO);
1655   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1656   payload = &put_path[putlen];
1657   options = ntohl (put->options);
1658   payload_size = msize - (sizeof (struct PeerPutMessage) +
1659                           putlen * sizeof (struct GNUNET_PeerIdentity));
1660
1661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662               "PUT for `%s' from %s\n",
1663               GNUNET_h2s (&put->key),
1664               GNUNET_i2s (peer->id));
1665   if (GNUNET_YES == log_route_details_stderr)
1666   {
1667     char *tmp;
1668
1669     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1670     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1671                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1672                  GNUNET_h2s (&put->key),
1673                  GNUNET_i2s (peer->id),
1674                  tmp,
1675                  ntohl(put->hop_count),
1676                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1677                                                    &put->key),
1678                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1679                                                    &put->key)
1680                 );
1681     GNUNET_free (tmp);
1682   }
1683   switch (GNUNET_BLOCK_get_key
1684           (GDS_block_context,
1685            ntohl (put->type),
1686            payload,
1687            payload_size,
1688            &test_key))
1689   {
1690   case GNUNET_YES:
1691     if (0 != memcmp (&test_key,
1692                      &put->key,
1693                      sizeof (struct GNUNET_HashCode)))
1694     {
1695       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1696
1697       GNUNET_break_op (0);
1698       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1699                   "PUT with key `%s' for block with key %s\n",
1700                   put_s,
1701                   GNUNET_h2s_full (&test_key));
1702       GNUNET_free (put_s);
1703       return;
1704     }
1705     break;
1706   case GNUNET_NO:
1707     GNUNET_break_op (0);
1708     return;
1709   case GNUNET_SYSERR:
1710     /* cannot verify, good luck */
1711     break;
1712   }
1713   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1714   {
1715     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1716                                    ntohl (put->type),
1717                                    NULL, /* query group */
1718                                    GNUNET_BLOCK_EO_NONE,
1719                                    NULL,    /* query */
1720                                    NULL, 0, /* xquery */
1721                                    payload,
1722                                    payload_size))
1723     {
1724     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1725     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1726       break;
1727
1728     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1729     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1730     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1731     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1732     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1733     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1734     default:
1735       GNUNET_break_op (0);
1736       return;
1737     }
1738   }
1739
1740   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1741                                           DHT_BLOOM_SIZE,
1742                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1743   GNUNET_break_op (GNUNET_YES ==
1744                    GNUNET_CONTAINER_bloomfilter_test (bf,
1745                                                       &peer->phash));
1746   {
1747     struct GNUNET_PeerIdentity pp[putlen + 1];
1748
1749     /* extend 'put path' by sender */
1750     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1751     {
1752 #if SANITY_CHECKS
1753       for (unsigned int i=0;i<=putlen;i++)
1754       {
1755         for (unsigned int j=0;j<i;j++)
1756         {
1757           GNUNET_break (0 != memcmp (&pp[i],
1758                                      &pp[j],
1759                                      sizeof (struct GNUNET_PeerIdentity)));
1760         }
1761         GNUNET_break (0 != memcmp (&pp[i],
1762                                    peer->id,
1763                                    sizeof (struct GNUNET_PeerIdentity)));
1764       }
1765 #endif
1766       GNUNET_memcpy (pp,
1767                      put_path,
1768                      putlen * sizeof (struct GNUNET_PeerIdentity));
1769       pp[putlen] = *peer->id;
1770       putlen++;
1771     }
1772     else
1773       putlen = 0;
1774
1775     /* give to local clients */
1776     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1777                               &put->key,
1778                               0,
1779                               NULL,
1780                               putlen,
1781                               pp,
1782                               ntohl (put->type),
1783                               payload_size,
1784                               payload);
1785     /* store locally */
1786     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1787         (am_closest_peer (&put->key, bf)))
1788       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1789                                 &put->key,
1790                                 putlen,
1791                                 pp,
1792                                 ntohl (put->type),
1793                                 payload_size,
1794                                 payload);
1795     /* route to other peers */
1796     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1797                                            options,
1798                                            ntohl (put->desired_replication_level),
1799                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1800                                            ntohl (put->hop_count),
1801                                            bf,
1802                                            &put->key,
1803                                            putlen,
1804                                            pp,
1805                                            payload,
1806                                            payload_size);
1807     /* notify monitoring clients */
1808     GDS_CLIENTS_process_put (options
1809                              | ( (GNUNET_OK == forwarded)
1810                                  ? GNUNET_DHT_RO_LAST_HOP
1811                                  : 0 ),
1812                              ntohl (put->type),
1813                              ntohl (put->hop_count),
1814                              ntohl (put->desired_replication_level),
1815                              putlen, pp,
1816                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1817                              &put->key,
1818                              payload,
1819                              payload_size);
1820   }
1821   GNUNET_CONTAINER_bloomfilter_free (bf);
1822 }
1823
1824
1825 /**
1826  * We have received a FIND PEER request.  Send matching
1827  * HELLOs back.
1828  *
1829  * @param sender sender of the FIND PEER request
1830  * @param key peers close to this key are desired
1831  * @param bg group for filtering peers
1832  */
1833 static void
1834 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1835                   const struct GNUNET_HashCode *key,
1836                   struct GNUNET_BLOCK_Group *bg)
1837 {
1838   int bucket_idx;
1839   struct PeerBucket *bucket;
1840   struct PeerInfo *peer;
1841   unsigned int choice;
1842   const struct GNUNET_HELLO_Message *hello;
1843   size_t hello_size;
1844
1845   /* first, check about our own HELLO */
1846   if (NULL != GDS_my_hello)
1847   {
1848     hello_size = GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message *) GDS_my_hello);
1849     GNUNET_break (hello_size >= sizeof (struct GNUNET_MessageHeader));
1850     if (GNUNET_BLOCK_EVALUATION_OK_MORE ==
1851         GNUNET_BLOCK_evaluate (GDS_block_context,
1852                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1853                                bg,
1854                                GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1855                                &my_identity_hash,
1856                                NULL, 0,
1857                                GDS_my_hello,
1858                                hello_size))
1859     {
1860       GDS_NEIGHBOURS_handle_reply (sender,
1861                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1862                                    GNUNET_TIME_relative_to_absolute (hello_expiration),
1863                                    key,
1864                                    0,
1865                                    NULL,
1866                                    0,
1867                                    NULL,
1868                                    GDS_my_hello,
1869                                    hello_size);
1870     }
1871     else
1872     {
1873       GNUNET_STATISTICS_update (GDS_stats,
1874                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"),
1875                                 1,
1876                                 GNUNET_NO);
1877     }
1878   }
1879   else
1880   {
1881     GNUNET_STATISTICS_update (GDS_stats,
1882                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"),
1883                               1,
1884                               GNUNET_NO);
1885   }
1886
1887   /* then, also consider sending a random HELLO from the closest bucket */
1888   if (0 == memcmp (&my_identity_hash,
1889                    key,
1890                    sizeof (struct GNUNET_HashCode)))
1891     bucket_idx = closest_bucket;
1892   else
1893     bucket_idx = GNUNET_MIN (closest_bucket,
1894                              find_bucket (key));
1895   if (bucket_idx == GNUNET_SYSERR)
1896     return;
1897   bucket = &k_buckets[bucket_idx];
1898   if (bucket->peers_size == 0)
1899     return;
1900   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1901                                      bucket->peers_size);
1902   peer = bucket->head;
1903   while (choice > 0)
1904   {
1905     GNUNET_assert (NULL != peer);
1906     peer = peer->next;
1907     choice--;
1908   }
1909   choice = bucket->peers_size;
1910   do
1911   {
1912     peer = peer->next;
1913     if (0 == choice--)
1914       return;                   /* no non-masked peer available */
1915     if (NULL == peer)
1916       peer = bucket->head;
1917     hello = GDS_HELLO_get (peer->id);
1918     hello_size = GNUNET_HELLO_size (hello);
1919   } while ( (NULL == hello) ||
1920             (GNUNET_BLOCK_EVALUATION_OK_MORE !=
1921              GNUNET_BLOCK_evaluate (GDS_block_context,
1922                                     GNUNET_BLOCK_TYPE_DHT_HELLO,
1923                                     bg,
1924                                     GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1925                                     &peer->phash,
1926                                     NULL, 0,
1927                                     hello,
1928                                     hello_size)) );
1929   GDS_NEIGHBOURS_handle_reply (sender,
1930                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1931                                GNUNET_TIME_relative_to_absolute
1932                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1933                                key,
1934                                0,
1935                                NULL,
1936                                0,
1937                                NULL,
1938                                hello,
1939                                hello_size);
1940 }
1941
1942
1943 /**
1944  * Handle a result from local datacache for a GET operation.
1945  *
1946  * @param cls the `struct ClientHandle` of the client doing the query
1947  * @param type type of the block
1948  * @param expiration_time when does the content expire
1949  * @param key key for the content
1950  * @param put_path_length number of entries in @a put_path
1951  * @param put_path peers the original PUT traversed (if tracked)
1952  * @param get_path_length number of entries in @a get_path
1953  * @param get_path peers this reply has traversed so far (if tracked)
1954  * @param data payload of the reply
1955  * @param data_size number of bytes in @a data
1956  */
1957 static void
1958 handle_local_result (void *cls,
1959                      enum GNUNET_BLOCK_Type type,
1960                      struct GNUNET_TIME_Absolute expiration_time,
1961                      const struct GNUNET_HashCode *key,
1962                      unsigned int put_path_length,
1963                      const struct GNUNET_PeerIdentity *put_path,
1964                      unsigned int get_path_length,
1965                      const struct GNUNET_PeerIdentity *get_path,
1966                      const void *data,
1967                      size_t data_size)
1968 {
1969   // FIXME: we can probably do better here by
1970   // passing the peer that did the query in the closure...
1971   GDS_ROUTING_process (NULL,
1972                        type,
1973                        expiration_time,
1974                        key,
1975                        put_path_length, put_path,
1976                        0, NULL,
1977                        data, data_size);
1978 }
1979
1980
1981 /**
1982  * Check validity of p2p get request.
1983  *
1984  * @param cls closure with the `struct PeerInfo` of the sender
1985  * @param get the message
1986  * @return #GNUNET_OK if the message is well-formed
1987  */
1988 static int
1989 check_dht_p2p_get (void *cls,
1990                    const struct PeerGetMessage *get)
1991 {
1992   uint32_t xquery_size;
1993   uint16_t msize;
1994
1995   msize = ntohs (get->header.size);
1996   xquery_size = ntohl (get->xquery_size);
1997   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1998   {
1999     GNUNET_break_op (0);
2000     return GNUNET_SYSERR;
2001   }
2002   return GNUNET_OK;
2003 }
2004
2005
2006 /**
2007  * Core handler for p2p get requests.
2008  *
2009  * @param cls closure with the `struct PeerInfo` of the sender
2010  * @param get the message
2011  */
2012 static void
2013 handle_dht_p2p_get (void *cls,
2014                     const struct PeerGetMessage *get)
2015 {
2016   struct PeerInfo *peer = cls;
2017   uint32_t xquery_size;
2018   size_t reply_bf_size;
2019   uint16_t msize;
2020   enum GNUNET_BLOCK_Type type;
2021   enum GNUNET_DHT_RouteOption options;
2022   enum GNUNET_BLOCK_EvaluationResult eval;
2023   struct GNUNET_BLOCK_Group *bg;
2024   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2025   const char *xquery;
2026   int forwarded;
2027
2028   if (NULL == peer)
2029   {
2030     GNUNET_break (0);
2031     return;
2032   }
2033   /* parse and validate message */
2034   msize = ntohs (get->header.size);
2035   xquery_size = ntohl (get->xquery_size);
2036   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2037   type = ntohl (get->type);
2038   options = ntohl (get->options);
2039   xquery = (const char *) &get[1];
2040   GNUNET_STATISTICS_update (GDS_stats,
2041                             gettext_noop ("# P2P GET requests received"),
2042                             1,
2043                             GNUNET_NO);
2044   GNUNET_STATISTICS_update (GDS_stats,
2045                             gettext_noop ("# P2P GET bytes received"),
2046                             msize,
2047                             GNUNET_NO);
2048   if (GNUNET_YES == log_route_details_stderr)
2049   {
2050     char *tmp;
2051
2052     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2053     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2054                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2055                  GNUNET_h2s (&get->key),
2056                  GNUNET_i2s (peer->id),
2057                  tmp,
2058                  ntohl(get->hop_count),
2059                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2060                                                    &get->key),
2061                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2062                                                    &get->key),
2063                  ntohl(get->xquery_size),
2064                  xquery);
2065     GNUNET_free (tmp);
2066   }
2067   eval
2068     = GNUNET_BLOCK_evaluate (GDS_block_context,
2069                              type,
2070                              NULL,
2071                              GNUNET_BLOCK_EO_NONE,
2072                              &get->key,
2073                              xquery,
2074                              xquery_size,
2075                              NULL,
2076                              0);
2077   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2078   {
2079     /* request invalid or block type not supported */
2080     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2081     return;
2082   }
2083   peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2084                                                DHT_BLOOM_SIZE,
2085                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
2086   GNUNET_break_op (GNUNET_YES ==
2087                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2088                                                       &peer->phash));
2089   bg = GNUNET_BLOCK_group_create (GDS_block_context,
2090                                   type,
2091                                   get->bf_mutator,
2092                                   &xquery[xquery_size],
2093                                   reply_bf_size,
2094                                   "filter-size",
2095                                   reply_bf_size,
2096                                   NULL);
2097   /* remember request for routing replies */
2098   GDS_ROUTING_add (peer->id,
2099                    type,
2100                    bg, /* bg now owned by routing, but valid at least until end of this function! */
2101                    options,
2102                    &get->key,
2103                    xquery,
2104                    xquery_size);
2105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2106               "GET for %s at %s after %u hops\n",
2107               GNUNET_h2s (&get->key),
2108               GNUNET_i2s (&my_identity),
2109               (unsigned int) ntohl (get->hop_count));
2110   /* local lookup (this may update the reply_bf) */
2111   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2112       (am_closest_peer (&get->key,
2113                         peer_bf)))
2114   {
2115     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2116     {
2117       GNUNET_STATISTICS_update (GDS_stats,
2118                                 gettext_noop ("# P2P FIND PEER requests processed"),
2119                                 1,
2120                                 GNUNET_NO);
2121       handle_find_peer (peer->id,
2122                         &get->key,
2123                         bg);
2124     }
2125     else
2126     {
2127       eval = GDS_DATACACHE_handle_get (&get->key,
2128                                        type,
2129                                        xquery,
2130                                        xquery_size,
2131                                        bg,
2132                                        &handle_local_result,
2133                                        NULL);
2134     }
2135   }
2136   else
2137   {
2138     GNUNET_STATISTICS_update (GDS_stats,
2139                               gettext_noop ("# P2P GET requests ONLY routed"),
2140                               1,
2141                               GNUNET_NO);
2142   }
2143
2144   /* P2P forwarding */
2145   forwarded = GNUNET_NO;
2146   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2147     forwarded = GDS_NEIGHBOURS_handle_get (type,
2148                                            options,
2149                                            ntohl (get->desired_replication_level),
2150                                            ntohl (get->hop_count),
2151                                            &get->key,
2152                                            xquery,
2153                                            xquery_size,
2154                                            bg,
2155                                            peer_bf);
2156   GDS_CLIENTS_process_get (options
2157                            | (GNUNET_OK == forwarded)
2158                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2159                            type,
2160                            ntohl (get->hop_count),
2161                            ntohl (get->desired_replication_level),
2162                            0,
2163                            NULL,
2164                            &get->key);
2165
2166   /* clean up; note that 'bg' is owned by routing now! */
2167   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2168 }
2169
2170
2171 /**
2172  * Check validity of p2p result message.
2173  *
2174  * @param cls closure
2175  * @param message message
2176  * @return #GNUNET_YES if the message is well-formed
2177  */
2178 static int
2179 check_dht_p2p_result (void *cls,
2180                       const struct PeerResultMessage *prm)
2181 {
2182   uint32_t get_path_length;
2183   uint32_t put_path_length;
2184   uint16_t msize;
2185
2186   msize = ntohs (prm->header.size);
2187   put_path_length = ntohl (prm->put_path_length);
2188   get_path_length = ntohl (prm->get_path_length);
2189   if ((msize <
2190        sizeof (struct PeerResultMessage) + (get_path_length +
2191                                             put_path_length) *
2192        sizeof (struct GNUNET_PeerIdentity)) ||
2193       (get_path_length >
2194        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2195       (put_path_length >
2196        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2197   {
2198     GNUNET_break_op (0);
2199     return GNUNET_SYSERR;
2200   }
2201   return GNUNET_OK;
2202 }
2203
2204
2205 /**
2206  * Core handler for p2p result messages.
2207  *
2208  * @param cls closure
2209  * @param message message
2210  */
2211 static void
2212 handle_dht_p2p_result (void *cls,
2213                        const struct PeerResultMessage *prm)
2214 {
2215   struct PeerInfo *peer = cls;
2216   const struct GNUNET_PeerIdentity *put_path;
2217   const struct GNUNET_PeerIdentity *get_path;
2218   const void *data;
2219   uint32_t get_path_length;
2220   uint32_t put_path_length;
2221   uint16_t msize;
2222   size_t data_size;
2223   enum GNUNET_BLOCK_Type type;
2224
2225   /* parse and validate message */
2226   msize = ntohs (prm->header.size);
2227   put_path_length = ntohl (prm->put_path_length);
2228   get_path_length = ntohl (prm->get_path_length);
2229   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2230   get_path = &put_path[put_path_length];
2231   type = ntohl (prm->type);
2232   data = (const void *) &get_path[get_path_length];
2233   data_size = msize - (sizeof (struct PeerResultMessage) +
2234                        (get_path_length +
2235                         put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2236   GNUNET_STATISTICS_update (GDS_stats,
2237                             gettext_noop ("# P2P RESULTS received"),
2238                             1,
2239                             GNUNET_NO);
2240   GNUNET_STATISTICS_update (GDS_stats,
2241                             gettext_noop ("# P2P RESULT bytes received"),
2242                             msize,
2243                             GNUNET_NO);
2244   if (GNUNET_YES == log_route_details_stderr)
2245   {
2246     char *tmp;
2247
2248     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2249     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2250                  "R5N RESULT %s: %s->%s (%u)\n",
2251                  GNUNET_h2s (&prm->key),
2252                  GNUNET_i2s (peer->id),
2253                  tmp,
2254                  get_path_length + 1);
2255     GNUNET_free (tmp);
2256   }
2257   /* if we got a HELLO, consider it for our own routing table */
2258   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2259   {
2260     const struct GNUNET_MessageHeader *h;
2261     struct GNUNET_PeerIdentity pid;
2262
2263     /* Should be a HELLO, validate and consider using it! */
2264     if (data_size < sizeof (struct GNUNET_HELLO_Message))
2265     {
2266       GNUNET_break_op (0);
2267       return;
2268     }
2269     h = data;
2270     if (data_size != ntohs (h->size))
2271     {
2272       GNUNET_break_op (0);
2273       return;
2274     }
2275     if (GNUNET_OK !=
2276         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2277                              &pid))
2278     {
2279       GNUNET_break_op (0);
2280       return;
2281     }
2282     if ( (GNUNET_YES != disable_try_connect) &&
2283          (0 != memcmp (&my_identity,
2284                        &pid,
2285                        sizeof (struct GNUNET_PeerIdentity))) )
2286       try_connect (&pid,
2287                    h);
2288   }
2289
2290   /* append 'peer' to 'get_path' */
2291   {
2292     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2293
2294 #if SANITY_CHECKS
2295     for (unsigned int i=0;i<=get_path_length;i++)
2296     {
2297       for (unsigned int j=0;j<i;j++)
2298       {
2299         GNUNET_break (0 != memcmp (&get_path[i],
2300                                    &get_path[j],
2301                                    sizeof (struct GNUNET_PeerIdentity)));
2302       }
2303       GNUNET_break (0 != memcmp (&get_path[i],
2304                                  peer->id,
2305                                  sizeof (struct GNUNET_PeerIdentity)));
2306     }
2307 #endif
2308     GNUNET_memcpy (xget_path,
2309                    get_path,
2310                    get_path_length * sizeof (struct GNUNET_PeerIdentity));
2311     xget_path[get_path_length] = *peer->id;
2312     get_path_length++;
2313
2314     /* forward to local clients */
2315     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2316                               &prm->key,
2317                               get_path_length,
2318                               xget_path,
2319                               put_path_length,
2320                               put_path,
2321                               type,
2322                               data_size,
2323                               data);
2324     GDS_CLIENTS_process_get_resp (type,
2325                                   xget_path,
2326                                   get_path_length,
2327                                   put_path, put_path_length,
2328                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2329                                   &prm->key,
2330                                   data,
2331                                   data_size);
2332     if (GNUNET_YES == cache_results)
2333     {
2334       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2335
2336       GNUNET_memcpy (xput_path,
2337                      put_path,
2338                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
2339       GNUNET_memcpy (&xput_path[put_path_length],
2340                      xget_path,
2341                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
2342
2343       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2344                                 &prm->key,
2345                                 get_path_length + put_path_length,
2346                                 xput_path,
2347                                 type,
2348                                 data_size,
2349                                 data);
2350     }
2351     /* forward to other peers */
2352     GDS_ROUTING_process (NULL,
2353                          type,
2354                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2355                          &prm->key,
2356                          put_path_length,
2357                          put_path,
2358                          get_path_length,
2359                          xget_path,
2360                          data,
2361                          data_size);
2362   }
2363 }
2364
2365
2366 /**
2367  * Initialize neighbours subsystem.
2368  *
2369  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2370  */
2371 int
2372 GDS_NEIGHBOURS_init ()
2373 {
2374   struct GNUNET_MQ_MessageHandler core_handlers[] = {
2375     GNUNET_MQ_hd_var_size (dht_p2p_get,
2376                            GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2377                            struct PeerGetMessage,
2378                            NULL),
2379     GNUNET_MQ_hd_var_size (dht_p2p_put,
2380                            GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2381                            struct PeerPutMessage,
2382                            NULL),
2383     GNUNET_MQ_hd_var_size (dht_p2p_result,
2384                            GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2385                            struct PeerResultMessage,
2386                            NULL),
2387     GNUNET_MQ_handler_end ()
2388   };
2389   unsigned long long temp_config_num;
2390
2391   disable_try_connect
2392     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2393                                             "DHT",
2394                                             "DISABLE_TRY_CONNECT");
2395   if (GNUNET_OK ==
2396       GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2397                                              "DHT",
2398                                              "bucket_size",
2399                                              &temp_config_num))
2400     bucket_size = (unsigned int) temp_config_num;
2401   cache_results
2402     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2403                                             "DHT",
2404                                             "CACHE_RESULTS");
2405
2406   log_route_details_stderr =
2407     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2408   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2409   core_api = GNUNET_CORE_connect (GDS_cfg,
2410                                   NULL,
2411                                   &core_init,
2412                                   &handle_core_connect,
2413                                   &handle_core_disconnect,
2414                                   core_handlers);
2415   if (NULL == core_api)
2416     return GNUNET_SYSERR;
2417   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2418                                                               GNUNET_YES);
2419   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2420                                                             GNUNET_NO);
2421   return GNUNET_OK;
2422 }
2423
2424
2425 /**
2426  * Shutdown neighbours subsystem.
2427  */
2428 void
2429 GDS_NEIGHBOURS_done ()
2430 {
2431   if (NULL == core_api)
2432     return;
2433   GNUNET_CORE_disconnect (core_api);
2434   core_api = NULL;
2435   GNUNET_assert (0 ==
2436                  GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2437   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2438   all_connected_peers = NULL;
2439   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2440                                          &free_connect_info,
2441                                          NULL);
2442   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2443   all_desired_peers = NULL;
2444   GNUNET_ATS_connectivity_done (ats_ch);
2445   ats_ch = NULL;
2446   GNUNET_assert (NULL == find_peer_task);
2447 }
2448
2449
2450 /**
2451  * Get the ID of the local node.
2452  *
2453  * @return identity of the local node
2454  */
2455 struct GNUNET_PeerIdentity *
2456 GDS_NEIGHBOURS_get_id ()
2457 {
2458   return &my_identity;
2459 }
2460
2461
2462 /* end of gnunet-service-dht_neighbours.c */