indentation fixes
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_datacache.h"
43 #include "gnunet-service-dht_hello.h"
44 #include "gnunet-service-dht_neighbours.h"
45 #include "gnunet-service-dht_nse.h"
46 #include "gnunet-service-dht_routing.h"
47 #include "dht.h"
48
49 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
50
51 /**
52  * How many buckets will we allow total.
53  */
54 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
55
56 /**
57  * What is the maximum number of peers in a given bucket.
58  */
59 #define DEFAULT_BUCKET_SIZE 8
60
61 /**
62  * Desired replication level for FIND PEER requests
63  */
64 #define FIND_PEER_REPLICATION_LEVEL 4
65
66 /**
67  * Maximum allowed replication level for all requests.
68  */
69 #define MAXIMUM_REPLICATION_LEVEL 16
70
71 /**
72  * Maximum allowed number of pending messages per peer.
73  */
74 #define MAXIMUM_PENDING_PER_PEER 64
75
76 /**
77  * How long at least to wait before sending another find peer request.
78  */
79 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
80
81 /**
82  * How long at most to wait before sending another find peer request.
83  */
84 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
85
86 /**
87  * How long at most to wait for transmission of a GET request to another peer?
88  */
89 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
90
91 /**
92  * Hello address expiration
93  */
94 extern struct GNUNET_TIME_Relative hello_expiration;
95
96
97 GNUNET_NETWORK_STRUCT_BEGIN
98
99 /**
100  * P2P PUT message
101  */
102 struct PeerPutMessage
103 {
104   /**
105    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
106    */
107   struct GNUNET_MessageHeader header;
108
109   /**
110    * Processing options
111    */
112   uint32_t options GNUNET_PACKED;
113
114   /**
115    * Content type.
116    */
117   uint32_t type GNUNET_PACKED;
118
119   /**
120    * Hop count
121    */
122   uint32_t hop_count GNUNET_PACKED;
123
124   /**
125    * Replication level for this message
126    */
127   uint32_t desired_replication_level GNUNET_PACKED;
128
129   /**
130    * Length of the PUT path that follows (if tracked).
131    */
132   uint32_t put_path_length GNUNET_PACKED;
133
134   /**
135    * When does the content expire?
136    */
137   struct GNUNET_TIME_AbsoluteNBO expiration_time;
138
139   /**
140    * Bloomfilter (for peer identities) to stop circular routes
141    */
142   char bloomfilter[DHT_BLOOM_SIZE];
143
144   /**
145    * The key we are storing under.
146    */
147   struct GNUNET_HashCode key;
148
149   /* put path (if tracked) */
150
151   /* Payload */
152
153 };
154
155
156 /**
157  * P2P Result message
158  */
159 struct PeerResultMessage
160 {
161   /**
162    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
163    */
164   struct GNUNET_MessageHeader header;
165
166   /**
167    * Content type.
168    */
169   uint32_t type GNUNET_PACKED;
170
171   /**
172    * Length of the PUT path that follows (if tracked).
173    */
174   uint32_t put_path_length GNUNET_PACKED;
175
176   /**
177    * Length of the GET path that follows (if tracked).
178    */
179   uint32_t get_path_length GNUNET_PACKED;
180
181   /**
182    * When does the content expire?
183    */
184   struct GNUNET_TIME_AbsoluteNBO expiration_time;
185
186   /**
187    * The key of the corresponding GET request.
188    */
189   struct GNUNET_HashCode key;
190
191   /* put path (if tracked) */
192
193   /* get path (if tracked) */
194
195   /* Payload */
196
197 };
198
199
200 /**
201  * P2P GET message
202  */
203 struct PeerGetMessage
204 {
205   /**
206    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
207    */
208   struct GNUNET_MessageHeader header;
209
210   /**
211    * Processing options
212    */
213   uint32_t options GNUNET_PACKED;
214
215   /**
216    * Desired content type.
217    */
218   uint32_t type GNUNET_PACKED;
219
220   /**
221    * Hop count
222    */
223   uint32_t hop_count GNUNET_PACKED;
224
225   /**
226    * Desired replication level for this request.
227    */
228   uint32_t desired_replication_level GNUNET_PACKED;
229
230   /**
231    * Size of the extended query.
232    */
233   uint32_t xquery_size;
234
235   /**
236    * Bloomfilter mutator.
237    */
238   uint32_t bf_mutator;
239
240   /**
241    * Bloomfilter (for peer identities) to stop circular routes
242    */
243   char bloomfilter[DHT_BLOOM_SIZE];
244
245   /**
246    * The key we are looking for.
247    */
248   struct GNUNET_HashCode key;
249
250   /* xquery */
251
252   /* result bloomfilter */
253
254 };
255 GNUNET_NETWORK_STRUCT_END
256
257
258 /**
259  * Entry for a peer in a bucket.
260  */
261 struct PeerInfo
262 {
263   /**
264    * Next peer entry (DLL)
265    */
266   struct PeerInfo *next;
267
268   /**
269    *  Prev peer entry (DLL)
270    */
271   struct PeerInfo *prev;
272
273   /**
274    * Handle for sending messages to this peer.
275    */
276   struct GNUNET_MQ_Handle *mq;
277
278   /**
279    * What is the identity of the peer?
280    */
281   const struct GNUNET_PeerIdentity *id;
282
283   /**
284    * Hash of @e id.
285    */
286   struct GNUNET_HashCode phash;
287
288   /**
289    * Which bucket is this peer in?
290    */
291   int peer_bucket;
292
293 };
294
295
296 /**
297  * Peers are grouped into buckets.
298  */
299 struct PeerBucket
300 {
301   /**
302    * Head of DLL
303    */
304   struct PeerInfo *head;
305
306   /**
307    * Tail of DLL
308    */
309   struct PeerInfo *tail;
310
311   /**
312    * Number of peers in the bucket.
313    */
314   unsigned int peers_size;
315 };
316
317
318 /**
319  * Information about a peer that we would like to connect to.
320  */
321 struct ConnectInfo
322 {
323
324   /**
325    * Handle to active HELLO offer operation, or NULL.
326    */
327   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
328
329   /**
330    * Handle to active connectivity suggestion operation, or NULL.
331    */
332   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
333
334   /**
335    * How much would we like to connect to this peer?
336    */
337   uint32_t strength;
338 };
339
340
341 /**
342  * Do we cache all results that we are routing in the local datacache?
343  */
344 static int cache_results;
345
346 /**
347  * Should routing details be logged to stderr (for debugging)?
348  */
349 static int log_route_details_stderr;
350
351 /**
352  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
353  */
354 static unsigned int closest_bucket;
355
356 /**
357  * How many peers have we added since we sent out our last
358  * find peer request?
359  */
360 static unsigned int newly_found_peers;
361
362 /**
363  * Option for testing that disables the 'connect' function of the DHT.
364  */
365 static int disable_try_connect;
366
367 /**
368  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
369  */
370 static struct PeerBucket k_buckets[MAX_BUCKETS];
371
372 /**
373  * Hash map of all CORE-connected peers, for easy removal from
374  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
375  */
376 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
377
378 /**
379  * Hash map of all peers we would like to be connected to.
380  * Values are of type `struct ConnectInfo`.
381  */
382 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
383
384 /**
385  * Maximum size for each bucket.
386  */
387 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
388
389 /**
390  * Task that sends FIND PEER requests.
391  */
392 static struct GNUNET_SCHEDULER_Task *find_peer_task;
393
394 /**
395  * Identity of this peer.
396  */
397 static struct GNUNET_PeerIdentity my_identity;
398
399 /**
400  * Hash of the identity of this peer.
401  */
402 static struct GNUNET_HashCode my_identity_hash;
403
404 /**
405  * Handle to CORE.
406  */
407 static struct GNUNET_CORE_Handle *core_api;
408
409 /**
410  * Handle to ATS connectivity.
411  */
412 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
413
414
415 /**
416  * Find the optimal bucket for this key.
417  *
418  * @param hc the hashcode to compare our identity to
419  * @return the proper bucket index, or GNUNET_SYSERR
420  *         on error (same hashcode)
421  */
422 static int
423 find_bucket (const struct GNUNET_HashCode *hc)
424 {
425   unsigned int bits;
426
427   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
428   if (bits == MAX_BUCKETS)
429   {
430     /* How can all bits match? Got my own ID? */
431     GNUNET_break (0);
432     return GNUNET_SYSERR;
433   }
434   return MAX_BUCKETS - bits - 1;
435 }
436
437
438 /**
439  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
440  * Clean up the "oh" field in the @a cls
441  *
442  * @param cls a `struct ConnectInfo`
443  */
444 static void
445 offer_hello_done (void *cls)
446 {
447   struct ConnectInfo *ci = cls;
448
449   ci->oh = NULL;
450 }
451
452
453 /**
454  * Function called for all entries in #all_desired_peers to clean up.
455  *
456  * @param cls NULL
457  * @param peer peer the entry is for
458  * @param value the value to remove
459  * @return #GNUNET_YES
460  */
461 static int
462 free_connect_info (void *cls,
463                    const struct GNUNET_PeerIdentity *peer,
464                    void *value)
465 {
466   struct ConnectInfo *ci = value;
467
468   GNUNET_assert (GNUNET_YES ==
469                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
470                                                        peer,
471                                                        ci));
472   if (NULL != ci->sh)
473   {
474     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
475     ci->sh = NULL;
476   }
477   if (NULL != ci->oh)
478   {
479     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
480     ci->oh = NULL;
481   }
482   GNUNET_free (ci);
483   return GNUNET_YES;
484 }
485
486
487 /**
488  * Consider if we want to connect to a given peer, and if so
489  * let ATS know.  If applicable, the HELLO is offered to the
490  * TRANSPORT service.
491  *
492  * @param pid peer to consider connectivity requirements for
493  * @param h a HELLO message, or NULL
494  */
495 static void
496 try_connect (const struct GNUNET_PeerIdentity *pid,
497              const struct GNUNET_MessageHeader *h)
498 {
499   int bucket;
500   struct GNUNET_HashCode pid_hash;
501   struct ConnectInfo *ci;
502   uint32_t strength;
503
504   GNUNET_CRYPTO_hash (pid,
505                       sizeof (struct GNUNET_PeerIdentity),
506                       &pid_hash);
507   bucket = find_bucket (&pid_hash);
508   if (bucket < 0)
509     return; /* self? */
510   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
511                                           pid);
512
513   if (k_buckets[bucket].peers_size < bucket_size)
514     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
515   else
516     strength = bucket; /* minimum value of connectivity */
517   if (GNUNET_YES ==
518       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
519                                               pid))
520     strength *= 2; /* double for connected peers */
521   else if (k_buckets[bucket].peers_size > bucket_size)
522     strength = 0; /* bucket full, we really do not care about more */
523
524   if ( (0 == strength) &&
525        (NULL != ci) )
526   {
527     /* release request */
528     GNUNET_assert (GNUNET_YES ==
529                    free_connect_info (NULL,
530                                       pid,
531                                       ci));
532     return;
533   }
534   if (NULL == ci)
535   {
536     ci = GNUNET_new (struct ConnectInfo);
537     GNUNET_assert (GNUNET_OK ==
538                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
539                                                       pid,
540                                                       ci,
541                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
542   }
543   if ( (NULL != ci->oh) &&
544        (NULL != h) )
545     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
546   if (NULL != h)
547     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
548                                            h,
549                                            &offer_hello_done,
550                                            ci);
551   if ( (NULL != ci->sh) &&
552        (ci->strength != strength) )
553     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
554   if (ci->strength != strength)
555     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
556                                               pid,
557                                               strength);
558   ci->strength = strength;
559 }
560
561
562 /**
563  * Function called for each peer in #all_desired_peers during
564  * #update_connect_preferences() if we have reason to adjust
565  * the strength of our desire to keep connections to certain
566  * peers.  Calls #try_connect() to update the calculations for
567  * the given @a pid.
568  *
569  * @param cls NULL
570  * @param pid peer to update
571  * @param value unused
572  * @return #GNUNET_YES (continue to iterate)
573  */
574 static int
575 update_desire_strength (void *cls,
576                         const struct GNUNET_PeerIdentity *pid,
577                         void *value)
578 {
579   try_connect (pid, NULL);
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Update our preferences for connectivity as given to ATS.
586  *
587  * @param cls the `struct PeerInfo` of the peer
588  * @param tc scheduler context.
589  */
590 static void
591 update_connect_preferences ()
592 {
593   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
594                                          &update_desire_strength,
595                                          NULL);
596 }
597
598
599 /**
600  * Closure for #add_known_to_bloom().
601  */
602 struct BloomConstructorContext
603 {
604   /**
605    * Bloom filter under construction.
606    */
607   struct GNUNET_CONTAINER_BloomFilter *bloom;
608
609   /**
610    * Mutator to use.
611    */
612   uint32_t bf_mutator;
613 };
614
615
616 /**
617  * Add each of the peers we already know to the bloom filter of
618  * the request so that we don't get duplicate HELLOs.
619  *
620  * @param cls the 'struct BloomConstructorContext'.
621  * @param key peer identity to add to the bloom filter
622  * @param value value the peer information (unused)
623  * @return #GNUNET_YES (we should continue to iterate)
624  */
625 static int
626 add_known_to_bloom (void *cls,
627                     const struct GNUNET_PeerIdentity *key,
628                     void *value)
629 {
630   struct BloomConstructorContext *ctx = cls;
631   struct GNUNET_HashCode key_hash;
632   struct GNUNET_HashCode mh;
633
634   GNUNET_CRYPTO_hash (key,
635                       sizeof (struct GNUNET_PeerIdentity),
636                       &key_hash);
637   GNUNET_BLOCK_mingle_hash (&key_hash,
638                             ctx->bf_mutator,
639                             &mh);
640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
641               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
642               GNUNET_i2s (key),
643               ctx->bf_mutator);
644   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom,
645                                     &mh);
646   return GNUNET_YES;
647 }
648
649
650 /**
651  * Task to send a find peer message for our own peer identifier
652  * so that we can find the closest peers in the network to ourselves
653  * and attempt to connect to them.
654  *
655  * @param cls closure for this task
656  */
657 static void
658 send_find_peer_message (void *cls)
659 {
660   struct GNUNET_TIME_Relative next_send_time;
661   struct BloomConstructorContext bcc;
662   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
663
664   find_peer_task = NULL;
665   if (newly_found_peers > bucket_size)
666   {
667     /* If we are finding many peers already, no need to send out our request right now! */
668     find_peer_task =
669         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
670                                       &send_find_peer_message,
671                                       NULL);
672     newly_found_peers = 0;
673     return;
674   }
675   bcc.bf_mutator =
676       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
677                                 UINT32_MAX);
678   bcc.bloom =
679       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
680                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
681   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
682                                          &add_known_to_bloom,
683                                          &bcc);
684   GNUNET_STATISTICS_update (GDS_stats,
685                             gettext_noop ("# FIND PEER messages initiated"),
686                             1,
687                             GNUNET_NO);
688   peer_bf =
689       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
690                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
691   // FIXME: pass priority!?
692   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
693                              GNUNET_DHT_RO_FIND_PEER,
694                              FIND_PEER_REPLICATION_LEVEL, 0,
695                              &my_identity_hash, NULL, 0, bcc.bloom,
696                              bcc.bf_mutator, peer_bf);
697   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
698   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
699   /* schedule next round */
700   next_send_time.rel_value_us =
701       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
702       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
703                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
704                                 (newly_found_peers + 1));
705   newly_found_peers = 0;
706   GNUNET_assert (NULL == find_peer_task);
707   find_peer_task =
708       GNUNET_SCHEDULER_add_delayed (next_send_time,
709                                     &send_find_peer_message,
710                                     NULL);
711 }
712
713
714 /**
715  * Method called whenever a peer connects.
716  *
717  * @param cls closure
718  * @param peer peer identity this notification is about
719  * @param mq message queue for sending messages to @a peer
720  * @return our `struct PeerInfo` for @a peer
721  */
722 static void *
723 handle_core_connect (void *cls,
724                      const struct GNUNET_PeerIdentity *peer,
725                      struct GNUNET_MQ_Handle *mq)
726 {
727   struct PeerInfo *pi;
728
729   /* Check for connect to self message */
730   if (0 == memcmp (&my_identity,
731                    peer,
732                    sizeof (struct GNUNET_PeerIdentity)))
733     return NULL;
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "Connected to %s\n",
736               GNUNET_i2s (peer));
737   GNUNET_assert (GNUNET_NO ==
738                  GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
739                                                     peer));
740   GNUNET_STATISTICS_update (GDS_stats,
741                             gettext_noop ("# peers connected"),
742                             1,
743                             GNUNET_NO);
744   pi = GNUNET_new (struct PeerInfo);
745   pi->id = peer;
746   pi->mq = mq;
747   GNUNET_CRYPTO_hash (peer,
748                       sizeof (struct GNUNET_PeerIdentity),
749                       &pi->phash);
750   pi->peer_bucket = find_bucket (&pi->phash);
751   GNUNET_assert ( (pi->peer_bucket >= 0) &&
752                   (pi->peer_bucket < MAX_BUCKETS) );
753   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
754                                     k_buckets[pi->peer_bucket].tail,
755                                     pi);
756   k_buckets[pi->peer_bucket].peers_size++;
757   closest_bucket = GNUNET_MAX (closest_bucket,
758                                pi->peer_bucket);
759   GNUNET_assert (GNUNET_OK ==
760                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
761                                                     pi->id,
762                                                     pi,
763                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
764   if ( (pi->peer_bucket > 0) &&
765        (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
766   {
767     update_connect_preferences ();
768     newly_found_peers++;
769   }
770   if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
771        (GNUNET_YES != disable_try_connect))
772   {
773     /* got a first connection, good time to start with FIND PEER requests... */
774     GNUNET_assert (NULL == find_peer_task);
775     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
776                                                NULL);
777   }
778   return pi;
779 }
780
781
782 /**
783  * Method called whenever a peer disconnects.
784  *
785  * @param cls closure
786  * @param peer peer identity this notification is about
787  * @param internal_cls our `struct PeerInfo` for @a peer
788  */
789 static void
790 handle_core_disconnect (void *cls,
791                         const struct GNUNET_PeerIdentity *peer,
792                         void *internal_cls)
793 {
794   struct PeerInfo *to_remove = internal_cls;
795
796   /* Check for disconnect from self message */
797   if (NULL == to_remove)
798     return;
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Disconnected %s\n",
801               GNUNET_i2s (peer));
802   GNUNET_STATISTICS_update (GDS_stats,
803                             gettext_noop ("# peers connected"),
804                             -1,
805                             GNUNET_NO);
806   GNUNET_assert (GNUNET_YES ==
807                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
808                                                        peer,
809                                                        to_remove));
810   if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
811        (GNUNET_YES != disable_try_connect) )
812   {
813     GNUNET_SCHEDULER_cancel (find_peer_task);
814     find_peer_task = NULL;
815   }
816   GNUNET_assert (to_remove->peer_bucket >= 0);
817   GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
818                                k_buckets[to_remove->peer_bucket].tail,
819                                to_remove);
820   GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
821   k_buckets[to_remove->peer_bucket].peers_size--;
822   while ( (closest_bucket > 0) &&
823           (0 == k_buckets[to_remove->peer_bucket].peers_size) )
824     closest_bucket--;
825   if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
826     update_connect_preferences ();
827   GNUNET_free (to_remove);
828 }
829
830
831 /**
832  * To how many peers should we (on average) forward the request to
833  * obtain the desired target_replication count (on average).
834  *
835  * @param hop_count number of hops the message has traversed
836  * @param target_replication the number of total paths desired
837  * @return Some number of peers to forward the message to
838  */
839 static unsigned int
840 get_forward_count (uint32_t hop_count,
841                    uint32_t target_replication)
842 {
843   uint32_t random_value;
844   uint32_t forward_count;
845   float target_value;
846
847   if (hop_count > GDS_NSE_get () * 4.0)
848   {
849     /* forcefully terminate */
850     GNUNET_STATISTICS_update (GDS_stats,
851                               gettext_noop ("# requests TTL-dropped"),
852                               1, GNUNET_NO);
853     return 0;
854   }
855   if (hop_count > GDS_NSE_get () * 2.0)
856   {
857     /* Once we have reached our ideal number of hops, only forward to 1 peer */
858     return 1;
859   }
860   /* bound by system-wide maximum */
861   target_replication =
862       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
863   target_value =
864       1 + (target_replication - 1.0) / (GDS_NSE_get () +
865                                         ((float) (target_replication - 1.0) *
866                                          hop_count));
867   /* Set forward count to floor of target_value */
868   forward_count = (uint32_t) target_value;
869   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
870   target_value = target_value - forward_count;
871   random_value =
872       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
873   if (random_value < (target_value * UINT32_MAX))
874     forward_count++;
875   return forward_count;
876 }
877
878
879 /**
880  * Compute the distance between have and target as a 32-bit value.
881  * Differences in the lower bits must count stronger than differences
882  * in the higher bits.
883  *
884  * @param target
885  * @param have
886  * @return 0 if have==target, otherwise a number
887  *           that is larger as the distance between
888  *           the two hash codes increases
889  */
890 static unsigned int
891 get_distance (const struct GNUNET_HashCode *target,
892               const struct GNUNET_HashCode *have)
893 {
894   unsigned int bucket;
895   unsigned int msb;
896   unsigned int lsb;
897   unsigned int i;
898
899   /* We have to represent the distance between two 2^9 (=512)-bit
900    * numbers as a 2^5 (=32)-bit number with "0" being used for the
901    * two numbers being identical; furthermore, we need to
902    * guarantee that a difference in the number of matching
903    * bits is always represented in the result.
904    *
905    * We use 2^32/2^9 numerical values to distinguish between
906    * hash codes that have the same LSB bit distance and
907    * use the highest 2^9 bits of the result to signify the
908    * number of (mis)matching LSB bits; if we have 0 matching
909    * and hence 512 mismatching LSB bits we return -1 (since
910    * 512 itself cannot be represented with 9 bits) */
911
912   /* first, calculate the most significant 9 bits of our
913    * result, aka the number of LSBs */
914   bucket = GNUNET_CRYPTO_hash_matching_bits (target,
915                                              have);
916   /* bucket is now a value between 0 and 512 */
917   if (bucket == 512)
918     return 0;                   /* perfect match */
919   if (bucket == 0)
920     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
921                                  * below, we'd end up with max+1 (overflow)) */
922
923   /* calculate the most significant bits of the final result */
924   msb = (512 - bucket) << (32 - 9);
925   /* calculate the 32-9 least significant bits of the final result by
926    * looking at the differences in the 32-9 bits following the
927    * mismatching bit at 'bucket' */
928   lsb = 0;
929   for (i = bucket + 1;
930        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
931   {
932     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
933         GNUNET_CRYPTO_hash_get_bit (have, i))
934       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
935                                                  * last bit set will be 31 -- if
936                                                  * i does not reach 512 first... */
937   }
938   return msb | lsb;
939 }
940
941
942 /**
943  * Check whether my identity is closer than any known peers.  If a
944  * non-null bloomfilter is given, check if this is the closest peer
945  * that hasn't already been routed to.
946  *
947  * @param key hash code to check closeness to
948  * @param bloom bloomfilter, exclude these entries from the decision
949  * @return #GNUNET_YES if node location is closest,
950  *         #GNUNET_NO otherwise.
951  */
952 static int
953 am_closest_peer (const struct GNUNET_HashCode *key,
954                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
955 {
956   int bits;
957   int other_bits;
958   int bucket_num;
959   int count;
960   struct PeerInfo *pos;
961
962   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
963     return GNUNET_YES;
964   bucket_num = find_bucket (key);
965   GNUNET_assert (bucket_num >= 0);
966   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
967                                            key);
968   pos = k_buckets[bucket_num].head;
969   count = 0;
970   while ((NULL != pos) && (count < bucket_size))
971   {
972     if ((NULL != bloom) &&
973         (GNUNET_YES ==
974          GNUNET_CONTAINER_bloomfilter_test (bloom,
975                                             &pos->phash)))
976     {
977       pos = pos->next;
978       continue;                 /* Skip already checked entries */
979     }
980     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
981                                                    key);
982     if (other_bits > bits)
983       return GNUNET_NO;
984     if (other_bits == bits)     /* We match the same number of bits */
985       return GNUNET_YES;
986     pos = pos->next;
987   }
988   /* No peers closer, we are the closest! */
989   return GNUNET_YES;
990 }
991
992
993 /**
994  * Select a peer from the routing table that would be a good routing
995  * destination for sending a message for "key".  The resulting peer
996  * must not be in the set of blocked peers.<p>
997  *
998  * Note that we should not ALWAYS select the closest peer to the
999  * target, peers further away from the target should be chosen with
1000  * exponentially declining probability.
1001  *
1002  * FIXME: double-check that this is fine
1003  *
1004  *
1005  * @param key the key we are selecting a peer to route to
1006  * @param bloom a bloomfilter containing entries this request has seen already
1007  * @param hops how many hops has this message traversed thus far
1008  * @return Peer to route to, or NULL on error
1009  */
1010 static struct PeerInfo *
1011 select_peer (const struct GNUNET_HashCode *key,
1012              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1013              uint32_t hops)
1014 {
1015   unsigned int bc;
1016   unsigned int count;
1017   unsigned int selected;
1018   struct PeerInfo *pos;
1019   unsigned int dist;
1020   unsigned int smallest_distance;
1021   struct PeerInfo *chosen;
1022
1023   if (hops >= GDS_NSE_get ())
1024   {
1025     /* greedy selection (closest peer that is not in bloomfilter) */
1026     smallest_distance = UINT_MAX;
1027     chosen = NULL;
1028     for (bc = 0; bc <= closest_bucket; bc++)
1029     {
1030       pos = k_buckets[bc].head;
1031       count = 0;
1032       while ((pos != NULL) && (count < bucket_size))
1033       {
1034         if ((bloom == NULL) ||
1035             (GNUNET_NO ==
1036              GNUNET_CONTAINER_bloomfilter_test (bloom,
1037                                                 &pos->phash)))
1038         {
1039           dist = get_distance (key,
1040                                &pos->phash);
1041           if (dist < smallest_distance)
1042           {
1043             chosen = pos;
1044             smallest_distance = dist;
1045           }
1046         }
1047         else
1048         {
1049           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1050                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1051                       GNUNET_i2s (pos->id),
1052                       GNUNET_h2s (key));
1053           GNUNET_STATISTICS_update (GDS_stats,
1054                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"),
1055                                     1,
1056                                     GNUNET_NO);
1057           dist = get_distance (key,
1058                                &pos->phash);
1059           if (dist < smallest_distance)
1060           {
1061             chosen = NULL;
1062             smallest_distance = dist;
1063           }
1064         }
1065         count++;
1066         pos = pos->next;
1067       }
1068     }
1069     if (NULL == chosen)
1070       GNUNET_STATISTICS_update (GDS_stats,
1071                                 gettext_noop ("# Peer selection failed"), 1,
1072                                 GNUNET_NO);
1073     return chosen;
1074   }
1075
1076   /* select "random" peer */
1077   /* count number of peers that are available and not filtered */
1078   count = 0;
1079   for (bc = 0; bc <= closest_bucket; bc++)
1080   {
1081     pos = k_buckets[bc].head;
1082     while ((pos != NULL) && (count < bucket_size))
1083     {
1084       if ((bloom != NULL) &&
1085           (GNUNET_YES ==
1086            GNUNET_CONTAINER_bloomfilter_test (bloom,
1087                                               &pos->phash)))
1088       {
1089         GNUNET_STATISTICS_update (GDS_stats,
1090                                   gettext_noop
1091                                   ("# Peers excluded from routing due to Bloomfilter"),
1092                                   1, GNUNET_NO);
1093         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1095                     GNUNET_i2s (pos->id),
1096                     GNUNET_h2s (key));
1097         pos = pos->next;
1098         continue;               /* Ignore bloomfiltered peers */
1099       }
1100       count++;
1101       pos = pos->next;
1102     }
1103   }
1104   if (0 == count)               /* No peers to select from! */
1105   {
1106     GNUNET_STATISTICS_update (GDS_stats,
1107                               gettext_noop ("# Peer selection failed"), 1,
1108                               GNUNET_NO);
1109     return NULL;
1110   }
1111   /* Now actually choose a peer */
1112   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1113                                        count);
1114   count = 0;
1115   for (bc = 0; bc <= closest_bucket; bc++)
1116   {
1117     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1118     {
1119       if ((bloom != NULL) &&
1120           (GNUNET_YES ==
1121            GNUNET_CONTAINER_bloomfilter_test (bloom,
1122                                               &pos->phash)))
1123       {
1124         continue;               /* Ignore bloomfiltered peers */
1125       }
1126       if (0 == selected--)
1127         return pos;
1128     }
1129   }
1130   GNUNET_break (0);
1131   return NULL;
1132 }
1133
1134
1135 /**
1136  * Compute the set of peers that the given request should be
1137  * forwarded to.
1138  *
1139  * @param key routing key
1140  * @param bloom bloom filter excluding peers as targets, all selected
1141  *        peers will be added to the bloom filter
1142  * @param hop_count number of hops the request has traversed so far
1143  * @param target_replication desired number of replicas
1144  * @param targets where to store an array of target peers (to be
1145  *         free'd by the caller)
1146  * @return number of peers returned in 'targets'.
1147  */
1148 static unsigned int
1149 get_target_peers (const struct GNUNET_HashCode *key,
1150                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1151                   uint32_t hop_count,
1152                   uint32_t target_replication,
1153                   struct PeerInfo ***targets)
1154 {
1155   unsigned int ret;
1156   unsigned int off;
1157   struct PeerInfo **rtargets;
1158   struct PeerInfo *nxt;
1159
1160   GNUNET_assert (NULL != bloom);
1161   ret = get_forward_count (hop_count,
1162                            target_replication);
1163   if (0 == ret)
1164   {
1165     *targets = NULL;
1166     return 0;
1167   }
1168   rtargets = GNUNET_new_array (ret,
1169                                struct PeerInfo *);
1170   for (off = 0; off < ret; off++)
1171   {
1172     nxt = select_peer (key, bloom, hop_count);
1173     if (NULL == nxt)
1174       break;
1175     rtargets[off] = nxt;
1176     GNUNET_break (GNUNET_NO ==
1177                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1178                                                      &nxt->phash));
1179     GNUNET_CONTAINER_bloomfilter_add (bloom,
1180                                       &nxt->phash);
1181   }
1182   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1184               off,
1185               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1186               (unsigned int) hop_count,
1187               GNUNET_h2s (key),
1188               ret);
1189   if (0 == off)
1190   {
1191     GNUNET_free (rtargets);
1192     *targets = NULL;
1193     return 0;
1194   }
1195   *targets = rtargets;
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1198               GNUNET_h2s (key),
1199               off,
1200               ret);
1201   return off;
1202 }
1203
1204
1205 /**
1206  * Perform a PUT operation.   Forwards the given request to other
1207  * peers.   Does not store the data locally.  Does not give the
1208  * data to local clients.  May do nothing if this is the only
1209  * peer in the network (or if we are the closest peer in the
1210  * network).
1211  *
1212  * @param type type of the block
1213  * @param options routing options
1214  * @param desired_replication_level desired replication count
1215  * @param expiration_time when does the content expire
1216  * @param hop_count how many hops has this message traversed so far
1217  * @param bf Bloom filter of peers this PUT has already traversed
1218  * @param key key for the content
1219  * @param put_path_length number of entries in @a put_path
1220  * @param put_path peers this request has traversed so far (if tracked)
1221  * @param data payload to store
1222  * @param data_size number of bytes in @a data
1223  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1224  */
1225 int
1226 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1227                            enum GNUNET_DHT_RouteOption options,
1228                            uint32_t desired_replication_level,
1229                            struct GNUNET_TIME_Absolute expiration_time,
1230                            uint32_t hop_count,
1231                            struct GNUNET_CONTAINER_BloomFilter *bf,
1232                            const struct GNUNET_HashCode *key,
1233                            unsigned int put_path_length,
1234                            struct GNUNET_PeerIdentity *put_path,
1235                            const void *data,
1236                            size_t data_size)
1237 {
1238   unsigned int target_count;
1239   unsigned int i;
1240   struct PeerInfo **targets;
1241   struct PeerInfo *target;
1242   size_t msize;
1243   struct GNUNET_MQ_Envelope *env;
1244   struct PeerPutMessage *ppm;
1245   struct GNUNET_PeerIdentity *pp;
1246   unsigned int skip_count;
1247
1248   GNUNET_assert (NULL != bf);
1249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250               "Adding myself (%s) to PUT bloomfilter for %s\n",
1251               GNUNET_i2s (&my_identity),
1252               GNUNET_h2s (key));
1253   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1254   GNUNET_STATISTICS_update (GDS_stats,
1255                             gettext_noop ("# PUT requests routed"),
1256                             1,
1257                             GNUNET_NO);
1258   target_count
1259     = get_target_peers (key,
1260                         bf,
1261                         hop_count,
1262                         desired_replication_level,
1263                         &targets);
1264   if (0 == target_count)
1265   {
1266     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1267                 "Routing PUT for %s terminates after %u hops at %s\n",
1268                 GNUNET_h2s (key),
1269                 (unsigned int) hop_count,
1270                 GNUNET_i2s (&my_identity));
1271     return GNUNET_NO;
1272   }
1273   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size;
1274   if (msize + sizeof (struct PeerPutMessage)
1275       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1276   {
1277     put_path_length = 0;
1278     msize = data_size;
1279   }
1280   if (msize + sizeof (struct PeerPutMessage)
1281       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1282   {
1283     GNUNET_break (0);
1284     GNUNET_free (targets);
1285     return GNUNET_NO;
1286   }
1287   GNUNET_STATISTICS_update (GDS_stats,
1288                             gettext_noop ("# PUT messages queued for transmission"),
1289                             target_count,
1290                             GNUNET_NO);
1291   skip_count = 0;
1292   for (i = 0; i < target_count; i++)
1293   {
1294     target = targets[i];
1295     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1296     {
1297       /* skip */
1298       GNUNET_STATISTICS_update (GDS_stats,
1299                                 gettext_noop ("# P2P messages dropped due to full queue"),
1300                                 1,
1301                                 GNUNET_NO);
1302       skip_count++;
1303       continue;
1304     }
1305     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306                 "Routing PUT for %s after %u hops to %s\n",
1307                 GNUNET_h2s (key),
1308                 (unsigned int) hop_count,
1309                 GNUNET_i2s (target->id));
1310     env = GNUNET_MQ_msg_extra (ppm,
1311                                msize,
1312                                GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1313     ppm->options = htonl (options);
1314     ppm->type = htonl (type);
1315     ppm->hop_count = htonl (hop_count + 1);
1316     ppm->desired_replication_level = htonl (desired_replication_level);
1317     ppm->put_path_length = htonl (put_path_length);
1318     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1319     GNUNET_break (GNUNET_YES ==
1320                   GNUNET_CONTAINER_bloomfilter_test (bf,
1321                                                      &target->phash));
1322     GNUNET_assert (GNUNET_OK ==
1323                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1324                                                               ppm->bloomfilter,
1325                                                               DHT_BLOOM_SIZE));
1326     ppm->key = *key;
1327     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1328     GNUNET_memcpy (pp,
1329                    put_path,
1330                    sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1331     GNUNET_memcpy (&pp[put_path_length],
1332                    data,
1333                    data_size);
1334     GNUNET_MQ_send (target->mq,
1335                     env);
1336   }
1337   GNUNET_free (targets);
1338   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1339 }
1340
1341
1342 /**
1343  * Perform a GET operation.  Forwards the given request to other
1344  * peers.  Does not lookup the key locally.  May do nothing if this is
1345  * the only peer in the network (or if we are the closest peer in the
1346  * network).
1347  *
1348  * @param type type of the block
1349  * @param options routing options
1350  * @param desired_replication_level desired replication count
1351  * @param hop_count how many hops did this request traverse so far?
1352  * @param key key for the content
1353  * @param xquery extended query
1354  * @param xquery_size number of bytes in @a xquery
1355  * @param reply_bf bloomfilter to filter duplicates
1356  * @param reply_bf_mutator mutator for @a reply_bf
1357  * @param peer_bf filter for peers not to select (again)
1358  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1359  */
1360 int
1361 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1362                            enum GNUNET_DHT_RouteOption options,
1363                            uint32_t desired_replication_level,
1364                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1365                            const void *xquery, size_t xquery_size,
1366                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1367                            uint32_t reply_bf_mutator,
1368                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1369 {
1370   unsigned int target_count;
1371   unsigned int i;
1372   struct PeerInfo **targets;
1373   struct PeerInfo *target;
1374   struct GNUNET_MQ_Envelope *env;
1375   size_t msize;
1376   struct PeerGetMessage *pgm;
1377   char *xq;
1378   size_t reply_bf_size;
1379   unsigned int skip_count;
1380
1381   GNUNET_assert (NULL != peer_bf);
1382   GNUNET_STATISTICS_update (GDS_stats,
1383                             gettext_noop ("# GET requests routed"),
1384                             1,
1385                             GNUNET_NO);
1386   target_count = get_target_peers (key,
1387                                    peer_bf,
1388                                    hop_count,
1389                                    desired_replication_level,
1390                                    &targets);
1391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392               "Adding myself (%s) to GET bloomfilter for %s\n",
1393               GNUNET_i2s (&my_identity),
1394               GNUNET_h2s (key));
1395   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1396                                     &my_identity_hash);
1397   if (0 == target_count)
1398   {
1399     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1400                 "Routing GET for %s terminates after %u hops at %s\n",
1401                 GNUNET_h2s (key),
1402                 (unsigned int) hop_count,
1403                 GNUNET_i2s (&my_identity));
1404     return GNUNET_NO;
1405   }
1406   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1407   msize = xquery_size + reply_bf_size;
1408   if (msize + sizeof (struct PeerGetMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1409   {
1410     GNUNET_break (0);
1411     GNUNET_free (targets);
1412     return GNUNET_NO;
1413   }
1414   GNUNET_STATISTICS_update (GDS_stats,
1415                             gettext_noop ("# GET messages queued for transmission"),
1416                             target_count,
1417                             GNUNET_NO);
1418   /* forward request */
1419   skip_count = 0;
1420   for (i = 0; i < target_count; i++)
1421   {
1422     target = targets[i];
1423     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1424     {
1425       /* skip */
1426       GNUNET_STATISTICS_update (GDS_stats,
1427                                 gettext_noop ("# P2P messages dropped due to full queue"),
1428                                 1, GNUNET_NO);
1429       skip_count++;
1430       continue;
1431     }
1432     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433                 "Routing GET for %s after %u hops to %s\n",
1434                 GNUNET_h2s (key),
1435                 (unsigned int) hop_count,
1436                 GNUNET_i2s (target->id));
1437     env = GNUNET_MQ_msg_extra (pgm,
1438                                msize,
1439                                GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1440     pgm->options = htonl (options);
1441     pgm->type = htonl (type);
1442     pgm->hop_count = htonl (hop_count + 1);
1443     pgm->desired_replication_level = htonl (desired_replication_level);
1444     pgm->xquery_size = htonl (xquery_size);
1445     pgm->bf_mutator = reply_bf_mutator;
1446     GNUNET_break (GNUNET_YES ==
1447                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1448                                                      &target->phash));
1449     GNUNET_assert (GNUNET_OK ==
1450                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1451                                                               pgm->bloomfilter,
1452                                                               DHT_BLOOM_SIZE));
1453     pgm->key = *key;
1454     xq = (char *) &pgm[1];
1455     GNUNET_memcpy (xq,
1456                    xquery,
1457                    xquery_size);
1458     if (NULL != reply_bf)
1459       GNUNET_assert (GNUNET_OK ==
1460                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1461                                                                 &xq
1462                                                                 [xquery_size],
1463                                                                 reply_bf_size));
1464     GNUNET_MQ_send (target->mq,
1465                     env);
1466   }
1467   GNUNET_free (targets);
1468   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1469 }
1470
1471
1472 /**
1473  * Handle a reply (route to origin).  Only forwards the reply back to
1474  * the given peer.  Does not do local caching or forwarding to local
1475  * clients.
1476  *
1477  * @param target neighbour that should receive the block (if still connected)
1478  * @param type type of the block
1479  * @param expiration_time when does the content expire
1480  * @param key key for the content
1481  * @param put_path_length number of entries in @a put_path
1482  * @param put_path peers the original PUT traversed (if tracked)
1483  * @param get_path_length number of entries in @a get_path
1484  * @param get_path peers this reply has traversed so far (if tracked)
1485  * @param data payload of the reply
1486  * @param data_size number of bytes in @a data
1487  */
1488 void
1489 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1490                              enum GNUNET_BLOCK_Type type,
1491                              struct GNUNET_TIME_Absolute expiration_time,
1492                              const struct GNUNET_HashCode *key,
1493                              unsigned int put_path_length,
1494                              const struct GNUNET_PeerIdentity *put_path,
1495                              unsigned int get_path_length,
1496                              const struct GNUNET_PeerIdentity *get_path,
1497                              const void *data,
1498                              size_t data_size)
1499 {
1500   struct PeerInfo *pi;
1501   struct GNUNET_MQ_Envelope *env;
1502   size_t msize;
1503   struct PeerResultMessage *prm;
1504   struct GNUNET_PeerIdentity *paths;
1505
1506   msize = data_size + (get_path_length + put_path_length) *
1507       sizeof (struct GNUNET_PeerIdentity);
1508   if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1509       (get_path_length >
1510        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1511       (put_path_length >
1512        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1513       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1514   {
1515     GNUNET_break (0);
1516     return;
1517   }
1518   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1519                                           target);
1520   if (NULL == pi)
1521   {
1522     /* peer disconnected in the meantime, drop reply */
1523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524                 "No matching peer for reply for key %s\n",
1525                 GNUNET_h2s (key));
1526     return;
1527   }
1528   if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1529   {
1530     /* skip */
1531     GNUNET_STATISTICS_update (GDS_stats,
1532                               gettext_noop ("# P2P messages dropped due to full queue"),
1533                               1,
1534                               GNUNET_NO);
1535     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536                 "Peer queue full, ignoring reply for key %s\n",
1537                 GNUNET_h2s (key));
1538     return;
1539   }
1540
1541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542               "Forwarding reply for key %s to peer %s\n",
1543               GNUNET_h2s (key),
1544               GNUNET_i2s (target));
1545   GNUNET_STATISTICS_update (GDS_stats,
1546                             gettext_noop
1547                             ("# RESULT messages queued for transmission"), 1,
1548                             GNUNET_NO);
1549   env = GNUNET_MQ_msg_extra (prm,
1550                              msize,
1551                              GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1552   prm->type = htonl (type);
1553   prm->put_path_length = htonl (put_path_length);
1554   prm->get_path_length = htonl (get_path_length);
1555   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1556   prm->key = *key;
1557   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1558   GNUNET_memcpy (paths,
1559                  put_path,
1560                  put_path_length * sizeof (struct GNUNET_PeerIdentity));
1561   GNUNET_memcpy (&paths[put_path_length],
1562                  get_path,
1563                  get_path_length * sizeof (struct GNUNET_PeerIdentity));
1564   GNUNET_memcpy (&paths[put_path_length + get_path_length],
1565                  data,
1566                  data_size);
1567   GNUNET_MQ_send (pi->mq,
1568                   env);
1569 }
1570
1571
1572 /**
1573  * To be called on core init/fail.
1574  *
1575  * @param cls service closure
1576  * @param identity the public identity of this peer
1577  */
1578 static void
1579 core_init (void *cls,
1580            const struct GNUNET_PeerIdentity *identity)
1581 {
1582   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1583               "CORE called, I am %s\n",
1584               GNUNET_i2s (identity));
1585   my_identity = *identity;
1586   GNUNET_CRYPTO_hash (identity,
1587                       sizeof (struct GNUNET_PeerIdentity),
1588                       &my_identity_hash);
1589   GNUNET_SERVICE_resume (GDS_service);
1590 }
1591
1592
1593 /**
1594  * Check validity of a p2p put request.
1595  *
1596  * @param cls closure with the `struct PeerInfo` of the sender
1597  * @param message message
1598  * @return #GNUNET_OK if the message is valid
1599  */
1600 static int
1601 check_dht_p2p_put (void *cls,
1602                    const struct PeerPutMessage *put)
1603 {
1604   uint32_t putlen;
1605   uint16_t msize;
1606
1607   msize = ntohs (put->header.size);
1608   putlen = ntohl (put->put_path_length);
1609   if ((msize <
1610        sizeof (struct PeerPutMessage) +
1611        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1612       (putlen >
1613        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1614   {
1615     GNUNET_break_op (0);
1616     return GNUNET_SYSERR;
1617   }
1618   return GNUNET_OK;
1619 }
1620
1621
1622 /**
1623  * Core handler for p2p put requests.
1624  *
1625  * @param cls closure with the `struct PeerInfo` of the sender
1626  * @param message message
1627  */
1628 static void
1629 handle_dht_p2p_put (void *cls,
1630                     const struct PeerPutMessage *put)
1631 {
1632   struct PeerInfo *peer = cls;
1633   const struct GNUNET_PeerIdentity *put_path;
1634   const void *payload;
1635   uint32_t putlen;
1636   uint16_t msize;
1637   size_t payload_size;
1638   enum GNUNET_DHT_RouteOption options;
1639   struct GNUNET_CONTAINER_BloomFilter *bf;
1640   struct GNUNET_HashCode test_key;
1641   int forwarded;
1642
1643   msize = ntohs (put->header.size);
1644   putlen = ntohl (put->put_path_length);
1645   GNUNET_STATISTICS_update (GDS_stats,
1646                             gettext_noop ("# P2P PUT requests received"),
1647                             1,
1648                             GNUNET_NO);
1649   GNUNET_STATISTICS_update (GDS_stats,
1650                             gettext_noop ("# P2P PUT bytes received"),
1651                             msize,
1652                             GNUNET_NO);
1653   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1654   payload = &put_path[putlen];
1655   options = ntohl (put->options);
1656   payload_size = msize - (sizeof (struct PeerPutMessage) +
1657                           putlen * sizeof (struct GNUNET_PeerIdentity));
1658
1659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660               "PUT for `%s' from %s\n",
1661               GNUNET_h2s (&put->key),
1662               GNUNET_i2s (peer->id));
1663   if (GNUNET_YES == log_route_details_stderr)
1664   {
1665     char *tmp;
1666
1667     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1668     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1669                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1670                  GNUNET_h2s (&put->key),
1671                  GNUNET_i2s (peer->id),
1672                  tmp,
1673                  ntohl(put->hop_count),
1674                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1675                                                    &put->key),
1676                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1677                                                    &put->key)
1678                 );
1679     GNUNET_free (tmp);
1680   }
1681   switch (GNUNET_BLOCK_get_key
1682           (GDS_block_context,
1683            ntohl (put->type),
1684            payload,
1685            payload_size,
1686            &test_key))
1687   {
1688   case GNUNET_YES:
1689     if (0 != memcmp (&test_key,
1690                      &put->key,
1691                      sizeof (struct GNUNET_HashCode)))
1692     {
1693       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1694
1695       GNUNET_break_op (0);
1696       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1697                   "PUT with key `%s' for block with key %s\n",
1698                   put_s,
1699                   GNUNET_h2s_full (&test_key));
1700       GNUNET_free (put_s);
1701       return;
1702     }
1703     break;
1704   case GNUNET_NO:
1705     GNUNET_break_op (0);
1706     return;
1707   case GNUNET_SYSERR:
1708     /* cannot verify, good luck */
1709     break;
1710   }
1711   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1712   {
1713     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1714                                    ntohl (put->type),
1715                                    GNUNET_BLOCK_EO_NONE,
1716                                    NULL,    /* query */
1717                                    NULL, 0, /* bloom filer */
1718                                    NULL, 0, /* xquery */
1719                                    payload, payload_size))
1720     {
1721     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1722     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1723       break;
1724
1725     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1726     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1727     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1728     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1729     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1730     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1731     default:
1732       GNUNET_break_op (0);
1733       return;
1734     }
1735   }
1736
1737   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1738                                           DHT_BLOOM_SIZE,
1739                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1740   GNUNET_break_op (GNUNET_YES ==
1741                    GNUNET_CONTAINER_bloomfilter_test (bf,
1742                                                       &peer->phash));
1743   {
1744     struct GNUNET_PeerIdentity pp[putlen + 1];
1745
1746     /* extend 'put path' by sender */
1747     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1748     {
1749       GNUNET_memcpy (pp,
1750                      put_path,
1751                      putlen * sizeof (struct GNUNET_PeerIdentity));
1752       pp[putlen] = *peer->id;
1753       putlen++;
1754     }
1755     else
1756       putlen = 0;
1757
1758     /* give to local clients */
1759     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1760                               &put->key,
1761                               0,
1762                               NULL,
1763                               putlen,
1764                               pp,
1765                               ntohl (put->type),
1766                               payload_size,
1767                               payload);
1768     /* store locally */
1769     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1770         (am_closest_peer (&put->key, bf)))
1771       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1772                                 &put->key,
1773                                 putlen,
1774                                 pp,
1775                                 ntohl (put->type),
1776                                 payload_size,
1777                                 payload);
1778     /* route to other peers */
1779     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1780                                            options,
1781                                            ntohl (put->desired_replication_level),
1782                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1783                                            ntohl (put->hop_count),
1784                                            bf,
1785                                            &put->key,
1786                                            putlen,
1787                                            pp,
1788                                            payload,
1789                                            payload_size);
1790     /* notify monitoring clients */
1791     GDS_CLIENTS_process_put (options
1792                              | ( (GNUNET_OK == forwarded)
1793                                  ? GNUNET_DHT_RO_LAST_HOP
1794                                  : 0 ),
1795                              ntohl (put->type),
1796                              ntohl (put->hop_count),
1797                              ntohl (put->desired_replication_level),
1798                              putlen, pp,
1799                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1800                              &put->key,
1801                              payload,
1802                              payload_size);
1803   }
1804   GNUNET_CONTAINER_bloomfilter_free (bf);
1805 }
1806
1807
1808 /**
1809  * We have received a FIND PEER request.  Send matching
1810  * HELLOs back.
1811  *
1812  * @param sender sender of the FIND PEER request
1813  * @param key peers close to this key are desired
1814  * @param bf peers matching this bf are excluded
1815  * @param bf_mutator mutator for bf
1816  */
1817 static void
1818 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1819                   const struct GNUNET_HashCode *key,
1820                   struct GNUNET_CONTAINER_BloomFilter *bf,
1821                   uint32_t bf_mutator)
1822 {
1823   int bucket_idx;
1824   struct PeerBucket *bucket;
1825   struct PeerInfo *peer;
1826   unsigned int choice;
1827   struct GNUNET_HashCode mhash;
1828   const struct GNUNET_HELLO_Message *hello;
1829
1830   /* first, check about our own HELLO */
1831   if (NULL != GDS_my_hello)
1832   {
1833     GNUNET_BLOCK_mingle_hash (&my_identity_hash,
1834                               bf_mutator,
1835                               &mhash);
1836     if ((NULL == bf) ||
1837         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1838     {
1839       size_t hello_size;
1840
1841       hello_size = GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message *) GDS_my_hello);
1842       GNUNET_break (hello_size >= sizeof (struct GNUNET_MessageHeader));
1843       GDS_NEIGHBOURS_handle_reply (sender,
1844                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1845                                    GNUNET_TIME_relative_to_absolute
1846                                    (hello_expiration),
1847                                    key,
1848                                    0,
1849                                    NULL,
1850                                    0,
1851                                    NULL,
1852                                    GDS_my_hello,
1853                                    hello_size);
1854     }
1855     else
1856     {
1857       GNUNET_STATISTICS_update (GDS_stats,
1858                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"),
1859                                 1,
1860                                 GNUNET_NO);
1861     }
1862   }
1863   else
1864   {
1865     GNUNET_STATISTICS_update (GDS_stats,
1866                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"),
1867                               1,
1868                               GNUNET_NO);
1869   }
1870
1871   /* then, also consider sending a random HELLO from the closest bucket */
1872   if (0 == memcmp (&my_identity_hash,
1873                    key,
1874                    sizeof (struct GNUNET_HashCode)))
1875     bucket_idx = closest_bucket;
1876   else
1877     bucket_idx = GNUNET_MIN (closest_bucket,
1878                              find_bucket (key));
1879   if (bucket_idx == GNUNET_SYSERR)
1880     return;
1881   bucket = &k_buckets[bucket_idx];
1882   if (bucket->peers_size == 0)
1883     return;
1884   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1885                                      bucket->peers_size);
1886   peer = bucket->head;
1887   while (choice > 0)
1888   {
1889     GNUNET_assert (NULL != peer);
1890     peer = peer->next;
1891     choice--;
1892   }
1893   choice = bucket->peers_size;
1894   do
1895   {
1896     peer = peer->next;
1897     if (choice-- == 0)
1898       return;                   /* no non-masked peer available */
1899     if (NULL == peer)
1900       peer = bucket->head;
1901     GNUNET_BLOCK_mingle_hash (&peer->phash,
1902                               bf_mutator,
1903                               &mhash);
1904     hello = GDS_HELLO_get (peer->id);
1905   } while ( (hello == NULL) ||
1906             (GNUNET_YES ==
1907              GNUNET_CONTAINER_bloomfilter_test (bf,
1908                                                 &mhash)) );
1909   GDS_NEIGHBOURS_handle_reply (sender,
1910                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1911                                GNUNET_TIME_relative_to_absolute
1912                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1913                                key,
1914                                0,
1915                                NULL,
1916                                0,
1917                                NULL,
1918                                hello,
1919                                GNUNET_HELLO_size (hello));
1920 }
1921
1922
1923 /**
1924  * Handle a result from local datacache for a GET operation.
1925  *
1926  * @param cls the `struct ClientHandle` of the client doing the query
1927  * @param type type of the block
1928  * @param expiration_time when does the content expire
1929  * @param key key for the content
1930  * @param put_path_length number of entries in @a put_path
1931  * @param put_path peers the original PUT traversed (if tracked)
1932  * @param get_path_length number of entries in @a get_path
1933  * @param get_path peers this reply has traversed so far (if tracked)
1934  * @param data payload of the reply
1935  * @param data_size number of bytes in @a data
1936  */
1937 static void
1938 handle_local_result (void *cls,
1939                      enum GNUNET_BLOCK_Type type,
1940                      struct GNUNET_TIME_Absolute expiration_time,
1941                      const struct GNUNET_HashCode *key,
1942                      unsigned int put_path_length,
1943                      const struct GNUNET_PeerIdentity *put_path,
1944                      unsigned int get_path_length,
1945                      const struct GNUNET_PeerIdentity *get_path,
1946                      const void *data,
1947                      size_t data_size)
1948 {
1949   // FIXME: we can probably do better here by
1950   // passing the peer that did the query in the closure...
1951   GDS_ROUTING_process (NULL,
1952                        type,
1953                        expiration_time,
1954                        key,
1955                        put_path_length, put_path,
1956                        0, NULL,
1957                        data, data_size);
1958 }
1959
1960
1961 /**
1962  * Check validity of p2p get request.
1963  *
1964  * @param cls closure with the `struct PeerInfo` of the sender
1965  * @param get the message
1966  * @return #GNUNET_OK if the message is well-formed
1967  */
1968 static int
1969 check_dht_p2p_get (void *cls,
1970                    const struct PeerGetMessage *get)
1971 {
1972   uint32_t xquery_size;
1973   uint16_t msize;
1974
1975   msize = ntohs (get->header.size);
1976   xquery_size = ntohl (get->xquery_size);
1977   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1978   {
1979     GNUNET_break_op (0);
1980     return GNUNET_SYSERR;
1981   }
1982   return GNUNET_OK;
1983 }
1984
1985
1986 /**
1987  * Core handler for p2p get requests.
1988  *
1989  * @param cls closure with the `struct PeerInfo` of the sender
1990  * @param get the message
1991  */
1992 static void
1993 handle_dht_p2p_get (void *cls,
1994                     const struct PeerGetMessage *get)
1995 {
1996   struct PeerInfo *peer = cls;
1997   uint32_t xquery_size;
1998   size_t reply_bf_size;
1999   uint16_t msize;
2000   enum GNUNET_BLOCK_Type type;
2001   enum GNUNET_DHT_RouteOption options;
2002   enum GNUNET_BLOCK_EvaluationResult eval;
2003   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
2004   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2005   const char *xquery;
2006   int forwarded;
2007
2008   if (NULL == peer)
2009   {
2010     GNUNET_break (0);
2011     return;
2012   }
2013   /* parse and validate message */
2014   msize = ntohs (get->header.size);
2015   xquery_size = ntohl (get->xquery_size);
2016   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2017   type = ntohl (get->type);
2018   options = ntohl (get->options);
2019   xquery = (const char *) &get[1];
2020   reply_bf = NULL;
2021   GNUNET_STATISTICS_update (GDS_stats,
2022                             gettext_noop ("# P2P GET requests received"),
2023                             1,
2024                             GNUNET_NO);
2025   GNUNET_STATISTICS_update (GDS_stats,
2026                             gettext_noop ("# P2P GET bytes received"),
2027                             msize,
2028                             GNUNET_NO);
2029   if (GNUNET_YES == log_route_details_stderr)
2030   {
2031     char *tmp;
2032
2033     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2034     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2035                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2036                  GNUNET_h2s (&get->key),
2037                  GNUNET_i2s (peer->id),
2038                  tmp,
2039                  ntohl(get->hop_count),
2040                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2041                                                    &get->key),
2042                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2043                                                    &get->key),
2044                  ntohl(get->xquery_size),
2045                  xquery);
2046     GNUNET_free (tmp);
2047   }
2048
2049   if (reply_bf_size > 0)
2050     reply_bf =
2051         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
2052                                            reply_bf_size,
2053                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
2054   eval =
2055       GNUNET_BLOCK_evaluate (GDS_block_context,
2056                              type,
2057                              GNUNET_BLOCK_EO_NONE,
2058                              &get->key,
2059                              &reply_bf,
2060                              get->bf_mutator,
2061                              xquery,
2062                              xquery_size,
2063                              NULL,
2064                              0);
2065   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2066   {
2067     /* request invalid or block type not supported */
2068     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2069     if (NULL != reply_bf)
2070       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2071     return;
2072   }
2073   peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2074                                                DHT_BLOOM_SIZE,
2075                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
2076   GNUNET_break_op (GNUNET_YES ==
2077                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2078                                                       &peer->phash));
2079   /* remember request for routing replies */
2080   GDS_ROUTING_add (peer->id,
2081                    type,
2082                    options,
2083                    &get->key,
2084                    xquery,
2085                    xquery_size,
2086                    reply_bf,
2087                    get->bf_mutator);
2088   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089               "GET for %s at %s after %u hops\n",
2090               GNUNET_h2s (&get->key),
2091               GNUNET_i2s (&my_identity),
2092               (unsigned int) ntohl (get->hop_count));
2093   /* local lookup (this may update the reply_bf) */
2094   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2095       (am_closest_peer (&get->key,
2096                         peer_bf)))
2097   {
2098     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2099     {
2100       GNUNET_STATISTICS_update (GDS_stats,
2101                                 gettext_noop ("# P2P FIND PEER requests processed"),
2102                                 1,
2103                                 GNUNET_NO);
2104       handle_find_peer (peer->id,
2105                         &get->key,
2106                         reply_bf,
2107                         get->bf_mutator);
2108     }
2109     else
2110     {
2111       eval = GDS_DATACACHE_handle_get (&get->key,
2112                                        type,
2113                                        xquery,
2114                                        xquery_size,
2115                                        &reply_bf,
2116                                        get->bf_mutator,
2117                                        &handle_local_result,
2118                                        NULL);
2119     }
2120   }
2121   else
2122   {
2123     GNUNET_STATISTICS_update (GDS_stats,
2124                               gettext_noop ("# P2P GET requests ONLY routed"),
2125                               1,
2126                               GNUNET_NO);
2127   }
2128
2129   /* P2P forwarding */
2130   forwarded = GNUNET_NO;
2131   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2132     forwarded = GDS_NEIGHBOURS_handle_get (type,
2133                                            options,
2134                                            ntohl (get->desired_replication_level),
2135                                            ntohl (get->hop_count),
2136                                            &get->key,
2137                                            xquery,
2138                                            xquery_size,
2139                                            reply_bf,
2140                                            get->bf_mutator,
2141                                            peer_bf);
2142   GDS_CLIENTS_process_get (options
2143                            | (GNUNET_OK == forwarded)
2144                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2145                            type,
2146                            ntohl (get->hop_count),
2147                            ntohl (get->desired_replication_level),
2148                            0,
2149                            NULL,
2150                            &get->key);
2151
2152
2153   /* clean up */
2154   if (NULL != reply_bf)
2155     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2156   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2157 }
2158
2159
2160 /**
2161  * Check validity of p2p result message.
2162  *
2163  * @param cls closure
2164  * @param message message
2165  * @return #GNUNET_YES if the message is well-formed
2166  */
2167 static int
2168 check_dht_p2p_result (void *cls,
2169                       const struct PeerResultMessage *prm)
2170 {
2171   uint32_t get_path_length;
2172   uint32_t put_path_length;
2173   uint16_t msize;
2174
2175   msize = ntohs (prm->header.size);
2176   put_path_length = ntohl (prm->put_path_length);
2177   get_path_length = ntohl (prm->get_path_length);
2178   if ((msize <
2179        sizeof (struct PeerResultMessage) + (get_path_length +
2180                                             put_path_length) *
2181        sizeof (struct GNUNET_PeerIdentity)) ||
2182       (get_path_length >
2183        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2184       (put_path_length >
2185        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2186   {
2187     GNUNET_break_op (0);
2188     return GNUNET_SYSERR;
2189   }
2190   return GNUNET_OK;
2191 }
2192
2193
2194 /**
2195  * Core handler for p2p result messages.
2196  *
2197  * @param cls closure
2198  * @param message message
2199  */
2200 static void
2201 handle_dht_p2p_result (void *cls,
2202                        const struct PeerResultMessage *prm)
2203 {
2204   struct PeerInfo *peer = cls;
2205   const struct GNUNET_PeerIdentity *put_path;
2206   const struct GNUNET_PeerIdentity *get_path;
2207   const void *data;
2208   uint32_t get_path_length;
2209   uint32_t put_path_length;
2210   uint16_t msize;
2211   size_t data_size;
2212   enum GNUNET_BLOCK_Type type;
2213
2214   /* parse and validate message */
2215   msize = ntohs (prm->header.size);
2216   put_path_length = ntohl (prm->put_path_length);
2217   get_path_length = ntohl (prm->get_path_length);
2218   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2219   get_path = &put_path[put_path_length];
2220   type = ntohl (prm->type);
2221   data = (const void *) &get_path[get_path_length];
2222   data_size = msize - (sizeof (struct PeerResultMessage) +
2223                        (get_path_length +
2224                         put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2225   GNUNET_STATISTICS_update (GDS_stats,
2226                             gettext_noop ("# P2P RESULTS received"),
2227                             1,
2228                             GNUNET_NO);
2229   GNUNET_STATISTICS_update (GDS_stats,
2230                             gettext_noop ("# P2P RESULT bytes received"),
2231                             msize,
2232                             GNUNET_NO);
2233   if (GNUNET_YES == log_route_details_stderr)
2234   {
2235     char *tmp;
2236
2237     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2238     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2239                  "R5N RESULT %s: %s->%s (%u)\n",
2240                  GNUNET_h2s (&prm->key),
2241                  GNUNET_i2s (peer->id),
2242                  tmp,
2243                  get_path_length + 1);
2244     GNUNET_free (tmp);
2245   }
2246   /* if we got a HELLO, consider it for our own routing table */
2247   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2248   {
2249     const struct GNUNET_MessageHeader *h;
2250     struct GNUNET_PeerIdentity pid;
2251
2252     /* Should be a HELLO, validate and consider using it! */
2253     if (data_size < sizeof (struct GNUNET_HELLO_Message))
2254     {
2255       GNUNET_break_op (0);
2256       return;
2257     }
2258     h = data;
2259     if (data_size != ntohs (h->size))
2260     {
2261       GNUNET_break_op (0);
2262       return;
2263     }
2264     if (GNUNET_OK !=
2265         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2266                              &pid))
2267     {
2268       GNUNET_break_op (0);
2269       return;
2270     }
2271     if ( (GNUNET_YES != disable_try_connect) &&
2272          (0 != memcmp (&my_identity,
2273                        &pid,
2274                        sizeof (struct GNUNET_PeerIdentity))) )
2275       try_connect (&pid,
2276                    h);
2277   }
2278
2279   /* append 'peer' to 'get_path' */
2280   {
2281     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2282
2283     GNUNET_memcpy (xget_path,
2284                    get_path,
2285                    get_path_length * sizeof (struct GNUNET_PeerIdentity));
2286     xget_path[get_path_length] = *peer->id;
2287     get_path_length++;
2288
2289     /* forward to local clients */
2290     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2291                               &prm->key,
2292                               get_path_length,
2293                               xget_path,
2294                               put_path_length,
2295                               put_path,
2296                               type,
2297                               data_size,
2298                               data);
2299     GDS_CLIENTS_process_get_resp (type,
2300                                   xget_path,
2301                                   get_path_length,
2302                                   put_path, put_path_length,
2303                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2304                                   &prm->key,
2305                                   data,
2306                                   data_size);
2307     if (GNUNET_YES == cache_results)
2308     {
2309       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2310
2311       GNUNET_memcpy (xput_path,
2312                      put_path,
2313                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
2314       GNUNET_memcpy (&xput_path[put_path_length],
2315                      xget_path,
2316                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
2317
2318       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2319                                 &prm->key,
2320                                 get_path_length + put_path_length,
2321                                 xput_path,
2322                                 type,
2323                                 data_size,
2324                                 data);
2325     }
2326     /* forward to other peers */
2327     GDS_ROUTING_process (NULL,
2328                          type,
2329                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2330                          &prm->key,
2331                          put_path_length,
2332                          put_path,
2333                          get_path_length,
2334                          xget_path,
2335                          data,
2336                          data_size);
2337   }
2338 }
2339
2340
2341 /**
2342  * Initialize neighbours subsystem.
2343  *
2344  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2345  */
2346 int
2347 GDS_NEIGHBOURS_init ()
2348 {
2349   struct GNUNET_MQ_MessageHandler core_handlers[] = {
2350     GNUNET_MQ_hd_var_size (dht_p2p_get,
2351                            GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2352                            struct PeerGetMessage,
2353                            NULL),
2354     GNUNET_MQ_hd_var_size (dht_p2p_put,
2355                            GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2356                            struct PeerPutMessage,
2357                            NULL),
2358     GNUNET_MQ_hd_var_size (dht_p2p_result,
2359                            GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2360                            struct PeerResultMessage,
2361                            NULL),
2362     GNUNET_MQ_handler_end ()
2363   };
2364   unsigned long long temp_config_num;
2365
2366   disable_try_connect
2367     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2368                                             "DHT",
2369                                             "DISABLE_TRY_CONNECT");
2370   if (GNUNET_OK ==
2371       GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2372                                              "DHT",
2373                                              "bucket_size",
2374                                              &temp_config_num))
2375     bucket_size = (unsigned int) temp_config_num;
2376   cache_results
2377     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2378                                             "DHT",
2379                                             "CACHE_RESULTS");
2380
2381   log_route_details_stderr =
2382     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2383   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2384   core_api = GNUNET_CORE_connect (GDS_cfg,
2385                                   NULL,
2386                                   &core_init,
2387                                   &handle_core_connect,
2388                                   &handle_core_disconnect,
2389                                   core_handlers);
2390   if (NULL == core_api)
2391     return GNUNET_SYSERR;
2392   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2393                                                               GNUNET_YES);
2394   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2395                                                             GNUNET_NO);
2396   return GNUNET_OK;
2397 }
2398
2399
2400 /**
2401  * Shutdown neighbours subsystem.
2402  */
2403 void
2404 GDS_NEIGHBOURS_done ()
2405 {
2406   if (NULL == core_api)
2407     return;
2408   GNUNET_CORE_disconnect (core_api);
2409   core_api = NULL;
2410   GNUNET_assert (0 ==
2411                  GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2412   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2413   all_connected_peers = NULL;
2414   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2415                                          &free_connect_info,
2416                                          NULL);
2417   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2418   all_desired_peers = NULL;
2419   GNUNET_ATS_connectivity_done (ats_ch);
2420   ats_ch = NULL;
2421   GNUNET_assert (NULL == find_peer_task);
2422 }
2423
2424
2425 /**
2426  * Get the ID of the local node.
2427  *
2428  * @return identity of the local node
2429  */
2430 struct GNUNET_PeerIdentity *
2431 GDS_NEIGHBOURS_get_id ()
2432 {
2433   return &my_identity;
2434 }
2435
2436
2437 /* end of gnunet-service-dht_neighbours.c */