eliminate constantly hashing PIDs by storing and caching the result
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_hello.h"
45 #include "gnunet-service-dht_neighbours.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include "dht.h"
49
50 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * Maximum allowed number of pending messages per peer.
74  */
75 #define MAXIMUM_PENDING_PER_PEER 64
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92 /**
93  * Hello address expiration
94  */
95 extern struct GNUNET_TIME_Relative hello_expiration;
96
97
98 GNUNET_NETWORK_STRUCT_BEGIN
99
100 /**
101  * P2P PUT message
102  */
103 struct PeerPutMessage
104 {
105   /**
106    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
107    */
108   struct GNUNET_MessageHeader header;
109
110   /**
111    * Processing options
112    */
113   uint32_t options GNUNET_PACKED;
114
115   /**
116    * Content type.
117    */
118   uint32_t type GNUNET_PACKED;
119
120   /**
121    * Hop count
122    */
123   uint32_t hop_count GNUNET_PACKED;
124
125   /**
126    * Replication level for this message
127    */
128   uint32_t desired_replication_level GNUNET_PACKED;
129
130   /**
131    * Length of the PUT path that follows (if tracked).
132    */
133   uint32_t put_path_length GNUNET_PACKED;
134
135   /**
136    * When does the content expire?
137    */
138   struct GNUNET_TIME_AbsoluteNBO expiration_time;
139
140   /**
141    * Bloomfilter (for peer identities) to stop circular routes
142    */
143   char bloomfilter[DHT_BLOOM_SIZE];
144
145   /**
146    * The key we are storing under.
147    */
148   struct GNUNET_HashCode key;
149
150   /* put path (if tracked) */
151
152   /* Payload */
153
154 };
155
156
157 /**
158  * P2P Result message
159  */
160 struct PeerResultMessage
161 {
162   /**
163    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Content type.
169    */
170   uint32_t type GNUNET_PACKED;
171
172   /**
173    * Length of the PUT path that follows (if tracked).
174    */
175   uint32_t put_path_length GNUNET_PACKED;
176
177   /**
178    * Length of the GET path that follows (if tracked).
179    */
180   uint32_t get_path_length GNUNET_PACKED;
181
182   /**
183    * When does the content expire?
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration_time;
186
187   /**
188    * The key of the corresponding GET request.
189    */
190   struct GNUNET_HashCode key;
191
192   /* put path (if tracked) */
193
194   /* get path (if tracked) */
195
196   /* Payload */
197
198 };
199
200
201 /**
202  * P2P GET message
203  */
204 struct PeerGetMessage
205 {
206   /**
207    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
208    */
209   struct GNUNET_MessageHeader header;
210
211   /**
212    * Processing options
213    */
214   uint32_t options GNUNET_PACKED;
215
216   /**
217    * Desired content type.
218    */
219   uint32_t type GNUNET_PACKED;
220
221   /**
222    * Hop count
223    */
224   uint32_t hop_count GNUNET_PACKED;
225
226   /**
227    * Desired replication level for this request.
228    */
229   uint32_t desired_replication_level GNUNET_PACKED;
230
231   /**
232    * Size of the extended query.
233    */
234   uint32_t xquery_size;
235
236   /**
237    * Bloomfilter mutator.
238    */
239   uint32_t bf_mutator;
240
241   /**
242    * Bloomfilter (for peer identities) to stop circular routes
243    */
244   char bloomfilter[DHT_BLOOM_SIZE];
245
246   /**
247    * The key we are looking for.
248    */
249   struct GNUNET_HashCode key;
250
251   /* xquery */
252
253   /* result bloomfilter */
254
255 };
256 GNUNET_NETWORK_STRUCT_END
257
258
259 /**
260  * Entry for a peer in a bucket.
261  */
262 struct PeerInfo
263 {
264   /**
265    * Next peer entry (DLL)
266    */
267   struct PeerInfo *next;
268
269   /**
270    *  Prev peer entry (DLL)
271    */
272   struct PeerInfo *prev;
273
274   /**
275    * Handle for sending messages to this peer.
276    */
277   struct GNUNET_MQ_Handle *mq;
278
279   /**
280    * What is the identity of the peer?
281    */
282   const struct GNUNET_PeerIdentity *id;
283
284   /**
285    * Hash of @e id.
286    */
287   struct GNUNET_HashCode phash;
288
289   /**
290    * Which bucket is this peer in?
291    */
292   int peer_bucket;
293
294 };
295
296
297 /**
298  * Peers are grouped into buckets.
299  */
300 struct PeerBucket
301 {
302   /**
303    * Head of DLL
304    */
305   struct PeerInfo *head;
306
307   /**
308    * Tail of DLL
309    */
310   struct PeerInfo *tail;
311
312   /**
313    * Number of peers in the bucket.
314    */
315   unsigned int peers_size;
316 };
317
318
319 /**
320  * Information about a peer that we would like to connect to.
321  */
322 struct ConnectInfo
323 {
324
325   /**
326    * Handle to active HELLO offer operation, or NULL.
327    */
328   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
329
330   /**
331    * Handle to active connectivity suggestion operation, or NULL.
332    */
333   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
334
335   /**
336    * How much would we like to connect to this peer?
337    */
338   uint32_t strength;
339 };
340
341
342 /**
343  * Do we cache all results that we are routing in the local datacache?
344  */
345 static int cache_results;
346
347 /**
348  * Should routing details be logged to stderr (for debugging)?
349  */
350 static int log_route_details_stderr;
351
352 /**
353  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
354  */
355 static unsigned int closest_bucket;
356
357 /**
358  * How many peers have we added since we sent out our last
359  * find peer request?
360  */
361 static unsigned int newly_found_peers;
362
363 /**
364  * Option for testing that disables the 'connect' function of the DHT.
365  */
366 static int disable_try_connect;
367
368 /**
369  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
370  */
371 static struct PeerBucket k_buckets[MAX_BUCKETS];
372
373 /**
374  * Hash map of all CORE-connected peers, for easy removal from
375  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
376  */
377 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
378
379 /**
380  * Hash map of all peers we would like to be connected to.
381  * Values are of type `struct ConnectInfo`.
382  */
383 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
384
385 /**
386  * Maximum size for each bucket.
387  */
388 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
389
390 /**
391  * Task that sends FIND PEER requests.
392  */
393 static struct GNUNET_SCHEDULER_Task *find_peer_task;
394
395 /**
396  * Identity of this peer.
397  */
398 static struct GNUNET_PeerIdentity my_identity;
399
400 /**
401  * Hash of the identity of this peer.
402  */
403 static struct GNUNET_HashCode my_identity_hash;
404
405 /**
406  * Handle to CORE.
407  */
408 static struct GNUNET_CORE_Handle *core_api;
409
410 /**
411  * Handle to ATS connectivity.
412  */
413 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
414
415
416 /**
417  * Find the optimal bucket for this key.
418  *
419  * @param hc the hashcode to compare our identity to
420  * @return the proper bucket index, or GNUNET_SYSERR
421  *         on error (same hashcode)
422  */
423 static int
424 find_bucket (const struct GNUNET_HashCode *hc)
425 {
426   unsigned int bits;
427
428   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
429   if (bits == MAX_BUCKETS)
430   {
431     /* How can all bits match? Got my own ID? */
432     GNUNET_break (0);
433     return GNUNET_SYSERR;
434   }
435   return MAX_BUCKETS - bits - 1;
436 }
437
438
439 /**
440  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
441  * Clean up the "oh" field in the @a cls
442  *
443  * @param cls a `struct ConnectInfo`
444  */
445 static void
446 offer_hello_done (void *cls)
447 {
448   struct ConnectInfo *ci = cls;
449
450   ci->oh = NULL;
451 }
452
453
454 /**
455  * Function called for all entries in #all_desired_peers to clean up.
456  *
457  * @param cls NULL
458  * @param peer peer the entry is for
459  * @param value the value to remove
460  * @return #GNUNET_YES
461  */
462 static int
463 free_connect_info (void *cls,
464                    const struct GNUNET_PeerIdentity *peer,
465                    void *value)
466 {
467   struct ConnectInfo *ci = value;
468
469   GNUNET_assert (GNUNET_YES ==
470                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
471                                                        peer,
472                                                        ci));
473   if (NULL != ci->sh)
474   {
475     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
476     ci->sh = NULL;
477   }
478   if (NULL != ci->oh)
479   {
480     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
481     ci->oh = NULL;
482   }
483   GNUNET_free (ci);
484   return GNUNET_YES;
485 }
486
487
488 /**
489  * Consider if we want to connect to a given peer, and if so
490  * let ATS know.  If applicable, the HELLO is offered to the
491  * TRANSPORT service.
492  *
493  * @param pid peer to consider connectivity requirements for
494  * @param h a HELLO message, or NULL
495  */
496 static void
497 try_connect (const struct GNUNET_PeerIdentity *pid,
498              const struct GNUNET_MessageHeader *h)
499 {
500   int bucket;
501   struct GNUNET_HashCode pid_hash;
502   struct ConnectInfo *ci;
503   uint32_t strength;
504
505   GNUNET_CRYPTO_hash (pid,
506                       sizeof (struct GNUNET_PeerIdentity),
507                       &pid_hash);
508   bucket = find_bucket (&pid_hash);
509   if (bucket < 0)
510     return; /* self? */
511   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
512                                           pid);
513
514   if (k_buckets[bucket].peers_size < bucket_size)
515     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
516   else
517     strength = bucket; /* minimum value of connectivity */
518   if (GNUNET_YES ==
519       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
520                                               pid))
521     strength *= 2; /* double for connected peers */
522   else if (k_buckets[bucket].peers_size > bucket_size)
523     strength = 0; /* bucket full, we really do not care about more */
524
525   if ( (0 == strength) &&
526        (NULL != ci) )
527   {
528     /* release request */
529     GNUNET_assert (GNUNET_YES ==
530                    free_connect_info (NULL,
531                                       pid,
532                                       ci));
533     return;
534   }
535   if (NULL == ci)
536   {
537     ci = GNUNET_new (struct ConnectInfo);
538     GNUNET_assert (GNUNET_OK ==
539                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
540                                                       pid,
541                                                       ci,
542                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
543   }
544   if ( (NULL != ci->oh) &&
545        (NULL != h) )
546     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
547   if (NULL != h)
548     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
549                                            h,
550                                            &offer_hello_done,
551                                            ci);
552   if ( (NULL != ci->sh) &&
553        (ci->strength != strength) )
554     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
555   if (ci->strength != strength)
556     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
557                                               pid,
558                                               strength);
559   ci->strength = strength;
560 }
561
562
563 /**
564  * Function called for each peer in #all_desired_peers during
565  * #update_connect_preferences() if we have reason to adjust
566  * the strength of our desire to keep connections to certain
567  * peers.  Calls #try_connect() to update the calculations for
568  * the given @a pid.
569  *
570  * @param cls NULL
571  * @param pid peer to update
572  * @param value unused
573  * @return #GNUNET_YES (continue to iterate)
574  */
575 static int
576 update_desire_strength (void *cls,
577                         const struct GNUNET_PeerIdentity *pid,
578                         void *value)
579 {
580   try_connect (pid, NULL);
581   return GNUNET_YES;
582 }
583
584
585 /**
586  * Update our preferences for connectivity as given to ATS.
587  *
588  * @param cls the `struct PeerInfo` of the peer
589  * @param tc scheduler context.
590  */
591 static void
592 update_connect_preferences ()
593 {
594   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
595                                          &update_desire_strength,
596                                          NULL);
597 }
598
599
600 /**
601  * Closure for #add_known_to_bloom().
602  */
603 struct BloomConstructorContext
604 {
605   /**
606    * Bloom filter under construction.
607    */
608   struct GNUNET_CONTAINER_BloomFilter *bloom;
609
610   /**
611    * Mutator to use.
612    */
613   uint32_t bf_mutator;
614 };
615
616
617 /**
618  * Add each of the peers we already know to the bloom filter of
619  * the request so that we don't get duplicate HELLOs.
620  *
621  * @param cls the 'struct BloomConstructorContext'.
622  * @param key peer identity to add to the bloom filter
623  * @param value value the peer information (unused)
624  * @return #GNUNET_YES (we should continue to iterate)
625  */
626 static int
627 add_known_to_bloom (void *cls,
628                     const struct GNUNET_PeerIdentity *key,
629                     void *value)
630 {
631   struct BloomConstructorContext *ctx = cls;
632   struct GNUNET_HashCode key_hash;
633   struct GNUNET_HashCode mh;
634
635   GNUNET_CRYPTO_hash (key,
636                       sizeof (struct GNUNET_PeerIdentity),
637                       &key_hash);
638   GNUNET_BLOCK_mingle_hash (&key_hash,
639                             ctx->bf_mutator,
640                             &mh);
641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
643               GNUNET_i2s (key),
644               ctx->bf_mutator);
645   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom,
646                                     &mh);
647   return GNUNET_YES;
648 }
649
650
651 /**
652  * Task to send a find peer message for our own peer identifier
653  * so that we can find the closest peers in the network to ourselves
654  * and attempt to connect to them.
655  *
656  * @param cls closure for this task
657  */
658 static void
659 send_find_peer_message (void *cls)
660 {
661   struct GNUNET_TIME_Relative next_send_time;
662   struct BloomConstructorContext bcc;
663   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
664
665   find_peer_task = NULL;
666   if (newly_found_peers > bucket_size)
667   {
668     /* If we are finding many peers already, no need to send out our request right now! */
669     find_peer_task =
670         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
671                                       &send_find_peer_message,
672                                       NULL);
673     newly_found_peers = 0;
674     return;
675   }
676   bcc.bf_mutator =
677       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
678                                 UINT32_MAX);
679   bcc.bloom =
680       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
681                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
682   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
683                                          &add_known_to_bloom,
684                                          &bcc);
685   GNUNET_STATISTICS_update (GDS_stats,
686                             gettext_noop ("# FIND PEER messages initiated"),
687                             1,
688                             GNUNET_NO);
689   peer_bf =
690       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
691                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
692   // FIXME: pass priority!?
693   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
694                              GNUNET_DHT_RO_FIND_PEER,
695                              FIND_PEER_REPLICATION_LEVEL, 0,
696                              &my_identity_hash, NULL, 0, bcc.bloom,
697                              bcc.bf_mutator, peer_bf);
698   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
699   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
700   /* schedule next round */
701   next_send_time.rel_value_us =
702       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
703       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
704                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
705                                 (newly_found_peers + 1));
706   newly_found_peers = 0;
707   GNUNET_assert (NULL == find_peer_task);
708   find_peer_task =
709       GNUNET_SCHEDULER_add_delayed (next_send_time,
710                                     &send_find_peer_message,
711                                     NULL);
712 }
713
714
715 /**
716  * Method called whenever a peer connects.
717  *
718  * @param cls closure
719  * @param peer peer identity this notification is about
720  * @param mq message queue for sending messages to @a peer
721  * @return our `struct PeerInfo` for @a peer
722  */
723 static void *
724 handle_core_connect (void *cls,
725                      const struct GNUNET_PeerIdentity *peer,
726                      struct GNUNET_MQ_Handle *mq)
727 {
728   struct PeerInfo *pi;
729
730   /* Check for connect to self message */
731   if (0 == memcmp (&my_identity,
732                    peer,
733                    sizeof (struct GNUNET_PeerIdentity)))
734     return NULL;
735   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736               "Connected to %s\n",
737               GNUNET_i2s (peer));
738   GNUNET_assert (GNUNET_NO ==
739                  GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
740                                                     peer));
741   GNUNET_STATISTICS_update (GDS_stats,
742                             gettext_noop ("# peers connected"),
743                             1,
744                             GNUNET_NO);
745   pi = GNUNET_new (struct PeerInfo);
746   pi->id = peer;
747   pi->mq = mq;
748   GNUNET_CRYPTO_hash (peer,
749                       sizeof (struct GNUNET_PeerIdentity),
750                       &pi->phash);
751   pi->peer_bucket = find_bucket (&pi->phash);
752   GNUNET_assert ( (pi->peer_bucket >= 0) &&
753                   (pi->peer_bucket < MAX_BUCKETS) );
754   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
755                                     k_buckets[pi->peer_bucket].tail,
756                                     pi);
757   k_buckets[pi->peer_bucket].peers_size++;
758   closest_bucket = GNUNET_MAX (closest_bucket,
759                                pi->peer_bucket);
760   GNUNET_assert (GNUNET_OK ==
761                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
762                                                     pi->id,
763                                                     pi,
764                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
765   if ( (pi->peer_bucket > 0) &&
766        (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
767   {
768     update_connect_preferences ();
769     newly_found_peers++;
770   }
771   if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
772        (GNUNET_YES != disable_try_connect))
773   {
774     /* got a first connection, good time to start with FIND PEER requests... */
775     GNUNET_assert (NULL == find_peer_task);
776     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
777                                                NULL);
778   }
779   return pi;
780 }
781
782
783 /**
784  * Method called whenever a peer disconnects.
785  *
786  * @param cls closure
787  * @param peer peer identity this notification is about
788  * @param internal_cls our `struct PeerInfo` for @a peer
789  */
790 static void
791 handle_core_disconnect (void *cls,
792                         const struct GNUNET_PeerIdentity *peer,
793                         void *internal_cls)
794 {
795   struct PeerInfo *to_remove = internal_cls;
796
797   /* Check for disconnect from self message */
798   if (NULL == to_remove)
799     return;
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "Disconnected %s\n",
802               GNUNET_i2s (peer));
803   GNUNET_STATISTICS_update (GDS_stats,
804                             gettext_noop ("# peers connected"),
805                             -1,
806                             GNUNET_NO);
807   GNUNET_assert (GNUNET_YES ==
808                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
809                                                        peer,
810                                                        to_remove));
811   if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
812        (GNUNET_YES != disable_try_connect) )
813   {
814     GNUNET_SCHEDULER_cancel (find_peer_task);
815     find_peer_task = NULL;
816   }
817   GNUNET_assert (to_remove->peer_bucket >= 0);
818   GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
819                                k_buckets[to_remove->peer_bucket].tail,
820                                to_remove);
821   GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
822   k_buckets[to_remove->peer_bucket].peers_size--;
823   while ( (closest_bucket > 0) &&
824           (0 == k_buckets[to_remove->peer_bucket].peers_size) )
825     closest_bucket--;
826   if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
827     update_connect_preferences ();
828   GNUNET_free (to_remove);
829 }
830
831
832 /**
833  * To how many peers should we (on average) forward the request to
834  * obtain the desired target_replication count (on average).
835  *
836  * @param hop_count number of hops the message has traversed
837  * @param target_replication the number of total paths desired
838  * @return Some number of peers to forward the message to
839  */
840 static unsigned int
841 get_forward_count (uint32_t hop_count,
842                    uint32_t target_replication)
843 {
844   uint32_t random_value;
845   uint32_t forward_count;
846   float target_value;
847
848   if (hop_count > GDS_NSE_get () * 4.0)
849   {
850     /* forcefully terminate */
851     GNUNET_STATISTICS_update (GDS_stats,
852                               gettext_noop ("# requests TTL-dropped"),
853                               1, GNUNET_NO);
854     return 0;
855   }
856   if (hop_count > GDS_NSE_get () * 2.0)
857   {
858     /* Once we have reached our ideal number of hops, only forward to 1 peer */
859     return 1;
860   }
861   /* bound by system-wide maximum */
862   target_replication =
863       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
864   target_value =
865       1 + (target_replication - 1.0) / (GDS_NSE_get () +
866                                         ((float) (target_replication - 1.0) *
867                                          hop_count));
868   /* Set forward count to floor of target_value */
869   forward_count = (uint32_t) target_value;
870   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
871   target_value = target_value - forward_count;
872   random_value =
873       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
874   if (random_value < (target_value * UINT32_MAX))
875     forward_count++;
876   return forward_count;
877 }
878
879
880 /**
881  * Compute the distance between have and target as a 32-bit value.
882  * Differences in the lower bits must count stronger than differences
883  * in the higher bits.
884  *
885  * @param target
886  * @param have
887  * @return 0 if have==target, otherwise a number
888  *           that is larger as the distance between
889  *           the two hash codes increases
890  */
891 static unsigned int
892 get_distance (const struct GNUNET_HashCode *target,
893               const struct GNUNET_HashCode *have)
894 {
895   unsigned int bucket;
896   unsigned int msb;
897   unsigned int lsb;
898   unsigned int i;
899
900   /* We have to represent the distance between two 2^9 (=512)-bit
901    * numbers as a 2^5 (=32)-bit number with "0" being used for the
902    * two numbers being identical; furthermore, we need to
903    * guarantee that a difference in the number of matching
904    * bits is always represented in the result.
905    *
906    * We use 2^32/2^9 numerical values to distinguish between
907    * hash codes that have the same LSB bit distance and
908    * use the highest 2^9 bits of the result to signify the
909    * number of (mis)matching LSB bits; if we have 0 matching
910    * and hence 512 mismatching LSB bits we return -1 (since
911    * 512 itself cannot be represented with 9 bits) */
912
913   /* first, calculate the most significant 9 bits of our
914    * result, aka the number of LSBs */
915   bucket = GNUNET_CRYPTO_hash_matching_bits (target,
916                                              have);
917   /* bucket is now a value between 0 and 512 */
918   if (bucket == 512)
919     return 0;                   /* perfect match */
920   if (bucket == 0)
921     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
922                                  * below, we'd end up with max+1 (overflow)) */
923
924   /* calculate the most significant bits of the final result */
925   msb = (512 - bucket) << (32 - 9);
926   /* calculate the 32-9 least significant bits of the final result by
927    * looking at the differences in the 32-9 bits following the
928    * mismatching bit at 'bucket' */
929   lsb = 0;
930   for (i = bucket + 1;
931        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
932   {
933     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
934         GNUNET_CRYPTO_hash_get_bit (have, i))
935       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
936                                                  * last bit set will be 31 -- if
937                                                  * i does not reach 512 first... */
938   }
939   return msb | lsb;
940 }
941
942
943 /**
944  * Check whether my identity is closer than any known peers.  If a
945  * non-null bloomfilter is given, check if this is the closest peer
946  * that hasn't already been routed to.
947  *
948  * @param key hash code to check closeness to
949  * @param bloom bloomfilter, exclude these entries from the decision
950  * @return #GNUNET_YES if node location is closest,
951  *         #GNUNET_NO otherwise.
952  */
953 static int
954 am_closest_peer (const struct GNUNET_HashCode *key,
955                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
956 {
957   int bits;
958   int other_bits;
959   int bucket_num;
960   int count;
961   struct PeerInfo *pos;
962
963   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
964     return GNUNET_YES;
965   bucket_num = find_bucket (key);
966   GNUNET_assert (bucket_num >= 0);
967   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
968                                            key);
969   pos = k_buckets[bucket_num].head;
970   count = 0;
971   while ((NULL != pos) && (count < bucket_size))
972   {
973     if ((NULL != bloom) &&
974         (GNUNET_YES ==
975          GNUNET_CONTAINER_bloomfilter_test (bloom,
976                                             &pos->phash)))
977     {
978       pos = pos->next;
979       continue;                 /* Skip already checked entries */
980     }
981     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
982                                                    key);
983     if (other_bits > bits)
984       return GNUNET_NO;
985     if (other_bits == bits)     /* We match the same number of bits */
986       return GNUNET_YES;
987     pos = pos->next;
988   }
989   /* No peers closer, we are the closest! */
990   return GNUNET_YES;
991 }
992
993
994 /**
995  * Select a peer from the routing table that would be a good routing
996  * destination for sending a message for "key".  The resulting peer
997  * must not be in the set of blocked peers.<p>
998  *
999  * Note that we should not ALWAYS select the closest peer to the
1000  * target, peers further away from the target should be chosen with
1001  * exponentially declining probability.
1002  *
1003  * FIXME: double-check that this is fine
1004  *
1005  *
1006  * @param key the key we are selecting a peer to route to
1007  * @param bloom a bloomfilter containing entries this request has seen already
1008  * @param hops how many hops has this message traversed thus far
1009  * @return Peer to route to, or NULL on error
1010  */
1011 static struct PeerInfo *
1012 select_peer (const struct GNUNET_HashCode *key,
1013              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1014              uint32_t hops)
1015 {
1016   unsigned int bc;
1017   unsigned int count;
1018   unsigned int selected;
1019   struct PeerInfo *pos;
1020   unsigned int dist;
1021   unsigned int smallest_distance;
1022   struct PeerInfo *chosen;
1023
1024   if (hops >= GDS_NSE_get ())
1025   {
1026     /* greedy selection (closest peer that is not in bloomfilter) */
1027     smallest_distance = UINT_MAX;
1028     chosen = NULL;
1029     for (bc = 0; bc <= closest_bucket; bc++)
1030     {
1031       pos = k_buckets[bc].head;
1032       count = 0;
1033       while ((pos != NULL) && (count < bucket_size))
1034       {
1035         if ((bloom == NULL) ||
1036             (GNUNET_NO ==
1037              GNUNET_CONTAINER_bloomfilter_test (bloom,
1038                                                 &pos->phash)))
1039         {
1040           dist = get_distance (key,
1041                                &pos->phash);
1042           if (dist < smallest_distance)
1043           {
1044             chosen = pos;
1045             smallest_distance = dist;
1046           }
1047         }
1048         else
1049         {
1050           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1051                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1052                       GNUNET_i2s (pos->id),
1053                       GNUNET_h2s (key));
1054           GNUNET_STATISTICS_update (GDS_stats,
1055                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"),
1056                                     1,
1057                                     GNUNET_NO);
1058           dist = get_distance (key,
1059                                &pos->phash);
1060           if (dist < smallest_distance)
1061           {
1062             chosen = NULL;
1063             smallest_distance = dist;
1064           }
1065         }
1066         count++;
1067         pos = pos->next;
1068       }
1069     }
1070     if (NULL == chosen)
1071       GNUNET_STATISTICS_update (GDS_stats,
1072                                 gettext_noop ("# Peer selection failed"), 1,
1073                                 GNUNET_NO);
1074     return chosen;
1075   }
1076
1077   /* select "random" peer */
1078   /* count number of peers that are available and not filtered */
1079   count = 0;
1080   for (bc = 0; bc <= closest_bucket; bc++)
1081   {
1082     pos = k_buckets[bc].head;
1083     while ((pos != NULL) && (count < bucket_size))
1084     {
1085       if ((bloom != NULL) &&
1086           (GNUNET_YES ==
1087            GNUNET_CONTAINER_bloomfilter_test (bloom,
1088                                               &pos->phash)))
1089       {
1090         GNUNET_STATISTICS_update (GDS_stats,
1091                                   gettext_noop
1092                                   ("# Peers excluded from routing due to Bloomfilter"),
1093                                   1, GNUNET_NO);
1094         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1096                     GNUNET_i2s (pos->id),
1097                     GNUNET_h2s (key));
1098         pos = pos->next;
1099         continue;               /* Ignore bloomfiltered peers */
1100       }
1101       count++;
1102       pos = pos->next;
1103     }
1104   }
1105   if (0 == count)               /* No peers to select from! */
1106   {
1107     GNUNET_STATISTICS_update (GDS_stats,
1108                               gettext_noop ("# Peer selection failed"), 1,
1109                               GNUNET_NO);
1110     return NULL;
1111   }
1112   /* Now actually choose a peer */
1113   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1114                                        count);
1115   count = 0;
1116   for (bc = 0; bc <= closest_bucket; bc++)
1117   {
1118     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1119     {
1120       if ((bloom != NULL) &&
1121           (GNUNET_YES ==
1122            GNUNET_CONTAINER_bloomfilter_test (bloom,
1123                                               &pos->phash)))
1124       {
1125         continue;               /* Ignore bloomfiltered peers */
1126       }
1127       if (0 == selected--)
1128         return pos;
1129     }
1130   }
1131   GNUNET_break (0);
1132   return NULL;
1133 }
1134
1135
1136 /**
1137  * Compute the set of peers that the given request should be
1138  * forwarded to.
1139  *
1140  * @param key routing key
1141  * @param bloom bloom filter excluding peers as targets, all selected
1142  *        peers will be added to the bloom filter
1143  * @param hop_count number of hops the request has traversed so far
1144  * @param target_replication desired number of replicas
1145  * @param targets where to store an array of target peers (to be
1146  *         free'd by the caller)
1147  * @return number of peers returned in 'targets'.
1148  */
1149 static unsigned int
1150 get_target_peers (const struct GNUNET_HashCode *key,
1151                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1152                   uint32_t hop_count,
1153                   uint32_t target_replication,
1154                   struct PeerInfo ***targets)
1155 {
1156   unsigned int ret;
1157   unsigned int off;
1158   struct PeerInfo **rtargets;
1159   struct PeerInfo *nxt;
1160
1161   GNUNET_assert (NULL != bloom);
1162   ret = get_forward_count (hop_count,
1163                            target_replication);
1164   if (0 == ret)
1165   {
1166     *targets = NULL;
1167     return 0;
1168   }
1169   rtargets = GNUNET_new_array (ret,
1170                                struct PeerInfo *);
1171   for (off = 0; off < ret; off++)
1172   {
1173     nxt = select_peer (key, bloom, hop_count);
1174     if (NULL == nxt)
1175       break;
1176     rtargets[off] = nxt;
1177     GNUNET_break (GNUNET_NO ==
1178                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1179                                                      &nxt->phash));
1180     GNUNET_CONTAINER_bloomfilter_add (bloom,
1181                                       &nxt->phash);
1182   }
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1185               off,
1186               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1187               (unsigned int) hop_count,
1188               GNUNET_h2s (key),
1189               ret);
1190   if (0 == off)
1191   {
1192     GNUNET_free (rtargets);
1193     *targets = NULL;
1194     return 0;
1195   }
1196   *targets = rtargets;
1197   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1199               GNUNET_h2s (key),
1200               off,
1201               ret);
1202   return off;
1203 }
1204
1205
1206 /**
1207  * Perform a PUT operation.   Forwards the given request to other
1208  * peers.   Does not store the data locally.  Does not give the
1209  * data to local clients.  May do nothing if this is the only
1210  * peer in the network (or if we are the closest peer in the
1211  * network).
1212  *
1213  * @param type type of the block
1214  * @param options routing options
1215  * @param desired_replication_level desired replication count
1216  * @param expiration_time when does the content expire
1217  * @param hop_count how many hops has this message traversed so far
1218  * @param bf Bloom filter of peers this PUT has already traversed
1219  * @param key key for the content
1220  * @param put_path_length number of entries in @a put_path
1221  * @param put_path peers this request has traversed so far (if tracked)
1222  * @param data payload to store
1223  * @param data_size number of bytes in @a data
1224  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1225  */
1226 int
1227 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1228                            enum GNUNET_DHT_RouteOption options,
1229                            uint32_t desired_replication_level,
1230                            struct GNUNET_TIME_Absolute expiration_time,
1231                            uint32_t hop_count,
1232                            struct GNUNET_CONTAINER_BloomFilter *bf,
1233                            const struct GNUNET_HashCode *key,
1234                            unsigned int put_path_length,
1235                            struct GNUNET_PeerIdentity *put_path,
1236                            const void *data,
1237                            size_t data_size)
1238 {
1239   unsigned int target_count;
1240   unsigned int i;
1241   struct PeerInfo **targets;
1242   struct PeerInfo *target;
1243   size_t msize;
1244   struct GNUNET_MQ_Envelope *env;
1245   struct PeerPutMessage *ppm;
1246   struct GNUNET_PeerIdentity *pp;
1247   unsigned int skip_count;
1248
1249   GNUNET_assert (NULL != bf);
1250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1251               "Adding myself (%s) to PUT bloomfilter for %s\n",
1252               GNUNET_i2s (&my_identity),
1253               GNUNET_h2s (key));
1254   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1255   GNUNET_STATISTICS_update (GDS_stats,
1256                             gettext_noop ("# PUT requests routed"),
1257                             1,
1258                             GNUNET_NO);
1259   target_count =
1260       get_target_peers (key,
1261                         bf,
1262                         hop_count,
1263                         desired_replication_level,
1264                         &targets);
1265   if (0 == target_count)
1266   {
1267     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268                 "Routing PUT for %s terminates after %u hops at %s\n",
1269                 GNUNET_h2s (key),
1270                 (unsigned int) hop_count,
1271                 GNUNET_i2s (&my_identity));
1272     return GNUNET_NO;
1273   }
1274   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size;
1275   if (msize + sizeof (struct PeerPutMessage)
1276       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1277   {
1278     put_path_length = 0;
1279     msize = data_size;
1280   }
1281   if (msize + sizeof (struct PeerPutMessage)
1282       >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1283   {
1284     GNUNET_break (0);
1285     GNUNET_free (targets);
1286     return GNUNET_NO;
1287   }
1288   GNUNET_STATISTICS_update (GDS_stats,
1289                             gettext_noop ("# PUT messages queued for transmission"),
1290                             target_count,
1291                             GNUNET_NO);
1292   skip_count = 0;
1293   for (i = 0; i < target_count; i++)
1294   {
1295     target = targets[i];
1296     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1297     {
1298       /* skip */
1299       GNUNET_STATISTICS_update (GDS_stats,
1300                                 gettext_noop ("# P2P messages dropped due to full queue"),
1301                                 1,
1302                                 GNUNET_NO);
1303       skip_count++;
1304       continue;
1305     }
1306     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307                 "Routing PUT for %s after %u hops to %s\n",
1308                 GNUNET_h2s (key),
1309                 (unsigned int) hop_count,
1310                 GNUNET_i2s (target->id));
1311     env = GNUNET_MQ_msg_extra (ppm,
1312                                msize,
1313                                GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1314     ppm->options = htonl (options);
1315     ppm->type = htonl (type);
1316     ppm->hop_count = htonl (hop_count + 1);
1317     ppm->desired_replication_level = htonl (desired_replication_level);
1318     ppm->put_path_length = htonl (put_path_length);
1319     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1320     GNUNET_break (GNUNET_YES ==
1321                   GNUNET_CONTAINER_bloomfilter_test (bf,
1322                                                      &target->phash));
1323     GNUNET_assert (GNUNET_OK ==
1324                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1325                                                               ppm->bloomfilter,
1326                                                               DHT_BLOOM_SIZE));
1327     ppm->key = *key;
1328     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1329     GNUNET_memcpy (pp,
1330                    put_path,
1331                    sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1332     GNUNET_memcpy (&pp[put_path_length],
1333                    data,
1334                    data_size);
1335     GNUNET_MQ_send (target->mq,
1336                     env);
1337   }
1338   GNUNET_free (targets);
1339   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1340 }
1341
1342
1343 /**
1344  * Perform a GET operation.  Forwards the given request to other
1345  * peers.  Does not lookup the key locally.  May do nothing if this is
1346  * the only peer in the network (or if we are the closest peer in the
1347  * network).
1348  *
1349  * @param type type of the block
1350  * @param options routing options
1351  * @param desired_replication_level desired replication count
1352  * @param hop_count how many hops did this request traverse so far?
1353  * @param key key for the content
1354  * @param xquery extended query
1355  * @param xquery_size number of bytes in @a xquery
1356  * @param reply_bf bloomfilter to filter duplicates
1357  * @param reply_bf_mutator mutator for @a reply_bf
1358  * @param peer_bf filter for peers not to select (again)
1359  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1360  */
1361 int
1362 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1363                            enum GNUNET_DHT_RouteOption options,
1364                            uint32_t desired_replication_level,
1365                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1366                            const void *xquery, size_t xquery_size,
1367                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1368                            uint32_t reply_bf_mutator,
1369                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1370 {
1371   unsigned int target_count;
1372   unsigned int i;
1373   struct PeerInfo **targets;
1374   struct PeerInfo *target;
1375   struct GNUNET_MQ_Envelope *env;
1376   size_t msize;
1377   struct PeerGetMessage *pgm;
1378   char *xq;
1379   size_t reply_bf_size;
1380   unsigned int skip_count;
1381
1382   GNUNET_assert (NULL != peer_bf);
1383   GNUNET_STATISTICS_update (GDS_stats,
1384                             gettext_noop ("# GET requests routed"),
1385                             1,
1386                             GNUNET_NO);
1387   target_count = get_target_peers (key,
1388                                    peer_bf,
1389                                    hop_count,
1390                                    desired_replication_level,
1391                                    &targets);
1392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393               "Adding myself (%s) to GET bloomfilter for %s\n",
1394               GNUNET_i2s (&my_identity),
1395               GNUNET_h2s (key));
1396   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1397                                     &my_identity_hash);
1398   if (0 == target_count)
1399   {
1400     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401                 "Routing GET for %s terminates after %u hops at %s\n",
1402                 GNUNET_h2s (key),
1403                 (unsigned int) hop_count,
1404                 GNUNET_i2s (&my_identity));
1405     return GNUNET_NO;
1406   }
1407   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1408   msize = xquery_size + reply_bf_size;
1409   if (msize + sizeof (struct PeerGetMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1410   {
1411     GNUNET_break (0);
1412     GNUNET_free (targets);
1413     return GNUNET_NO;
1414   }
1415   GNUNET_STATISTICS_update (GDS_stats,
1416                             gettext_noop ("# GET messages queued for transmission"),
1417                             target_count,
1418                             GNUNET_NO);
1419   /* forward request */
1420   skip_count = 0;
1421   for (i = 0; i < target_count; i++)
1422   {
1423     target = targets[i];
1424     if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1425     {
1426       /* skip */
1427       GNUNET_STATISTICS_update (GDS_stats,
1428                                 gettext_noop ("# P2P messages dropped due to full queue"),
1429                                 1, GNUNET_NO);
1430       skip_count++;
1431       continue;
1432     }
1433     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1434                 "Routing GET for %s after %u hops to %s\n",
1435                 GNUNET_h2s (key),
1436                 (unsigned int) hop_count,
1437                 GNUNET_i2s (target->id));
1438     env = GNUNET_MQ_msg_extra (pgm,
1439                                msize,
1440                                GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1441     pgm->options = htonl (options);
1442     pgm->type = htonl (type);
1443     pgm->hop_count = htonl (hop_count + 1);
1444     pgm->desired_replication_level = htonl (desired_replication_level);
1445     pgm->xquery_size = htonl (xquery_size);
1446     pgm->bf_mutator = reply_bf_mutator;
1447     GNUNET_break (GNUNET_YES ==
1448                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1449                                                      &target->phash));
1450     GNUNET_assert (GNUNET_OK ==
1451                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1452                                                               pgm->bloomfilter,
1453                                                               DHT_BLOOM_SIZE));
1454     pgm->key = *key;
1455     xq = (char *) &pgm[1];
1456     GNUNET_memcpy (xq,
1457                    xquery,
1458                    xquery_size);
1459     if (NULL != reply_bf)
1460       GNUNET_assert (GNUNET_OK ==
1461                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1462                                                                 &xq
1463                                                                 [xquery_size],
1464                                                                 reply_bf_size));
1465     GNUNET_MQ_send (target->mq,
1466                     env);
1467   }
1468   GNUNET_free (targets);
1469   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1470 }
1471
1472
1473 /**
1474  * Handle a reply (route to origin).  Only forwards the reply back to
1475  * the given peer.  Does not do local caching or forwarding to local
1476  * clients.
1477  *
1478  * @param target neighbour that should receive the block (if still connected)
1479  * @param type type of the block
1480  * @param expiration_time when does the content expire
1481  * @param key key for the content
1482  * @param put_path_length number of entries in @a put_path
1483  * @param put_path peers the original PUT traversed (if tracked)
1484  * @param get_path_length number of entries in @a get_path
1485  * @param get_path peers this reply has traversed so far (if tracked)
1486  * @param data payload of the reply
1487  * @param data_size number of bytes in @a data
1488  */
1489 void
1490 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1491                              enum GNUNET_BLOCK_Type type,
1492                              struct GNUNET_TIME_Absolute expiration_time,
1493                              const struct GNUNET_HashCode *key,
1494                              unsigned int put_path_length,
1495                              const struct GNUNET_PeerIdentity *put_path,
1496                              unsigned int get_path_length,
1497                              const struct GNUNET_PeerIdentity *get_path,
1498                              const void *data,
1499                              size_t data_size)
1500 {
1501   struct PeerInfo *pi;
1502   struct GNUNET_MQ_Envelope *env;
1503   size_t msize;
1504   struct PeerResultMessage *prm;
1505   struct GNUNET_PeerIdentity *paths;
1506
1507   msize = data_size + (get_path_length + put_path_length) *
1508       sizeof (struct GNUNET_PeerIdentity);
1509   if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1510       (get_path_length >
1511        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1512       (put_path_length >
1513        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1514       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1515   {
1516     GNUNET_break (0);
1517     return;
1518   }
1519   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1520                                           target);
1521   if (NULL == pi)
1522   {
1523     /* peer disconnected in the meantime, drop reply */
1524     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525                 "No matching peer for reply for key %s\n",
1526                 GNUNET_h2s (key));
1527     return;
1528   }
1529   if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1530   {
1531     /* skip */
1532     GNUNET_STATISTICS_update (GDS_stats,
1533                               gettext_noop ("# P2P messages dropped due to full queue"),
1534                               1,
1535                               GNUNET_NO);
1536     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537                 "Peer queue full, ignoring reply for key %s\n",
1538                 GNUNET_h2s (key));
1539     return;
1540   }
1541
1542   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543               "Forwarding reply for key %s to peer %s\n",
1544               GNUNET_h2s (key),
1545               GNUNET_i2s (target));
1546   GNUNET_STATISTICS_update (GDS_stats,
1547                             gettext_noop
1548                             ("# RESULT messages queued for transmission"), 1,
1549                             GNUNET_NO);
1550   env = GNUNET_MQ_msg_extra (prm,
1551                              msize,
1552                              GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1553   prm->type = htonl (type);
1554   prm->put_path_length = htonl (put_path_length);
1555   prm->get_path_length = htonl (get_path_length);
1556   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1557   prm->key = *key;
1558   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1559   GNUNET_memcpy (paths,
1560                  put_path,
1561                  put_path_length * sizeof (struct GNUNET_PeerIdentity));
1562   GNUNET_memcpy (&paths[put_path_length],
1563                  get_path,
1564                  get_path_length * sizeof (struct GNUNET_PeerIdentity));
1565   GNUNET_memcpy (&paths[put_path_length + get_path_length],
1566                  data,
1567                  data_size);
1568   GNUNET_MQ_send (pi->mq,
1569                   env);
1570 }
1571
1572
1573 /**
1574  * To be called on core init/fail.
1575  *
1576  * @param cls service closure
1577  * @param identity the public identity of this peer
1578  */
1579 static void
1580 core_init (void *cls,
1581            const struct GNUNET_PeerIdentity *identity)
1582 {
1583   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1584               "CORE called, I am %s\n",
1585               GNUNET_i2s (identity));
1586   my_identity = *identity;
1587   GNUNET_CRYPTO_hash (identity,
1588                       sizeof (struct GNUNET_PeerIdentity),
1589                       &my_identity_hash);
1590   GDS_CLIENTS_init ();
1591 }
1592
1593
1594 /**
1595  * Check validity of a p2p put request.
1596  *
1597  * @param cls closure with the `struct PeerInfo` of the sender
1598  * @param message message
1599  * @return #GNUNET_OK if the message is valid
1600  */
1601 static int
1602 check_dht_p2p_put (void *cls,
1603                    const struct PeerPutMessage *put)
1604 {
1605   uint32_t putlen;
1606   uint16_t msize;
1607
1608   msize = ntohs (put->header.size);
1609   putlen = ntohl (put->put_path_length);
1610   if ((msize <
1611        sizeof (struct PeerPutMessage) +
1612        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1613       (putlen >
1614        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1615   {
1616     GNUNET_break_op (0);
1617     return GNUNET_SYSERR;
1618   }
1619   return GNUNET_OK;
1620 }
1621
1622
1623 /**
1624  * Core handler for p2p put requests.
1625  *
1626  * @param cls closure with the `struct PeerInfo` of the sender
1627  * @param message message
1628  */
1629 static void
1630 handle_dht_p2p_put (void *cls,
1631                     const struct PeerPutMessage *put)
1632 {
1633   struct PeerInfo *peer = cls;
1634   const struct GNUNET_PeerIdentity *put_path;
1635   const void *payload;
1636   uint32_t putlen;
1637   uint16_t msize;
1638   size_t payload_size;
1639   enum GNUNET_DHT_RouteOption options;
1640   struct GNUNET_CONTAINER_BloomFilter *bf;
1641   struct GNUNET_HashCode test_key;
1642   int forwarded;
1643
1644   msize = ntohs (put->header.size);
1645   putlen = ntohl (put->put_path_length);
1646   GNUNET_STATISTICS_update (GDS_stats,
1647                             gettext_noop ("# P2P PUT requests received"),
1648                             1,
1649                             GNUNET_NO);
1650   GNUNET_STATISTICS_update (GDS_stats,
1651                             gettext_noop ("# P2P PUT bytes received"),
1652                             msize,
1653                             GNUNET_NO);
1654   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1655   payload = &put_path[putlen];
1656   options = ntohl (put->options);
1657   payload_size = msize - (sizeof (struct PeerPutMessage) +
1658                           putlen * sizeof (struct GNUNET_PeerIdentity));
1659
1660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661               "PUT for `%s' from %s\n",
1662               GNUNET_h2s (&put->key),
1663               GNUNET_i2s (peer->id));
1664   if (GNUNET_YES == log_route_details_stderr)
1665   {
1666     char *tmp;
1667
1668     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1669     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1670                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1671                  GNUNET_h2s (&put->key),
1672                  GNUNET_i2s (peer->id),
1673                  tmp,
1674                  ntohl(put->hop_count),
1675                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1676                                                    &put->key),
1677                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1678                                                    &put->key)
1679                 );
1680     GNUNET_free (tmp);
1681   }
1682   switch (GNUNET_BLOCK_get_key
1683           (GDS_block_context,
1684            ntohl (put->type),
1685            payload,
1686            payload_size,
1687            &test_key))
1688   {
1689   case GNUNET_YES:
1690     if (0 != memcmp (&test_key,
1691                      &put->key,
1692                      sizeof (struct GNUNET_HashCode)))
1693     {
1694       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1695
1696       GNUNET_break_op (0);
1697       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1698                   "PUT with key `%s' for block with key %s\n",
1699                   put_s,
1700                   GNUNET_h2s_full (&test_key));
1701       GNUNET_free (put_s);
1702       return;
1703     }
1704     break;
1705   case GNUNET_NO:
1706     GNUNET_break_op (0);
1707     return;
1708   case GNUNET_SYSERR:
1709     /* cannot verify, good luck */
1710     break;
1711   }
1712   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1713   {
1714     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1715                                    ntohl (put->type),
1716                                    GNUNET_BLOCK_EO_NONE,
1717                                    NULL,    /* query */
1718                                    NULL, 0, /* bloom filer */
1719                                    NULL, 0, /* xquery */
1720                                    payload, payload_size))
1721     {
1722     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1723     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1724       break;
1725
1726     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1727     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1728     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1729     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1730     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1731     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1732     default:
1733       GNUNET_break_op (0);
1734       return;
1735     }
1736   }
1737
1738   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1739                                           DHT_BLOOM_SIZE,
1740                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1741   GNUNET_break_op (GNUNET_YES ==
1742                    GNUNET_CONTAINER_bloomfilter_test (bf,
1743                                                       &peer->phash));
1744   {
1745     struct GNUNET_PeerIdentity pp[putlen + 1];
1746
1747     /* extend 'put path' by sender */
1748     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1749     {
1750       GNUNET_memcpy (pp,
1751                      put_path,
1752                      putlen * sizeof (struct GNUNET_PeerIdentity));
1753       pp[putlen] = *peer->id;
1754       putlen++;
1755     }
1756     else
1757       putlen = 0;
1758
1759     /* give to local clients */
1760     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1761                               &put->key,
1762                               0,
1763                               NULL,
1764                               putlen,
1765                               pp,
1766                               ntohl (put->type),
1767                               payload_size,
1768                               payload);
1769     /* store locally */
1770     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1771         (am_closest_peer (&put->key, bf)))
1772       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1773                                 (put->expiration_time), &put->key, putlen, pp,
1774                                 ntohl (put->type), payload_size, payload);
1775     /* route to other peers */
1776     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1777                                            options,
1778                                            ntohl (put->desired_replication_level),
1779                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1780                                            ntohl (put->hop_count),
1781                                            bf,
1782                                            &put->key,
1783                                            putlen,
1784                                            pp,
1785                                            payload,
1786                                            payload_size);
1787     /* notify monitoring clients */
1788     GDS_CLIENTS_process_put (options
1789                              | ( (GNUNET_OK == forwarded)
1790                                  ? GNUNET_DHT_RO_LAST_HOP
1791                                  : 0 ),
1792                              ntohl (put->type),
1793                              ntohl (put->hop_count),
1794                              ntohl (put->desired_replication_level),
1795                              putlen, pp,
1796                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1797                              &put->key,
1798                              payload,
1799                              payload_size);
1800   }
1801   GNUNET_CONTAINER_bloomfilter_free (bf);
1802 }
1803
1804
1805 /**
1806  * We have received a FIND PEER request.  Send matching
1807  * HELLOs back.
1808  *
1809  * @param sender sender of the FIND PEER request
1810  * @param key peers close to this key are desired
1811  * @param bf peers matching this bf are excluded
1812  * @param bf_mutator mutator for bf
1813  */
1814 static void
1815 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1816                   const struct GNUNET_HashCode *key,
1817                   struct GNUNET_CONTAINER_BloomFilter *bf,
1818                   uint32_t bf_mutator)
1819 {
1820   int bucket_idx;
1821   struct PeerBucket *bucket;
1822   struct PeerInfo *peer;
1823   unsigned int choice;
1824   struct GNUNET_HashCode mhash;
1825   const struct GNUNET_HELLO_Message *hello;
1826
1827   /* first, check about our own HELLO */
1828   if (NULL != GDS_my_hello)
1829   {
1830     GNUNET_BLOCK_mingle_hash (&my_identity_hash, bf_mutator, &mhash);
1831     if ((NULL == bf) ||
1832         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1833     {
1834       GDS_NEIGHBOURS_handle_reply (sender,
1835                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1836                                    GNUNET_TIME_relative_to_absolute
1837                                    (hello_expiration),
1838                                    key,
1839                                    0,
1840                                    NULL,
1841                                    0,
1842                                    NULL,
1843                                    GDS_my_hello,
1844                                    GNUNET_HELLO_size ((const struct
1845                                                        GNUNET_HELLO_Message *)
1846                                                       GDS_my_hello));
1847     }
1848     else
1849     {
1850       GNUNET_STATISTICS_update (GDS_stats,
1851                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"),
1852                                 1,
1853                                 GNUNET_NO);
1854     }
1855   }
1856   else
1857   {
1858     GNUNET_STATISTICS_update (GDS_stats,
1859                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"),
1860                               1,
1861                               GNUNET_NO);
1862   }
1863
1864   /* then, also consider sending a random HELLO from the closest bucket */
1865   if (0 == memcmp (&my_identity_hash,
1866                    key,
1867                    sizeof (struct GNUNET_HashCode)))
1868     bucket_idx = closest_bucket;
1869   else
1870     bucket_idx = GNUNET_MIN (closest_bucket,
1871                              find_bucket (key));
1872   if (bucket_idx == GNUNET_SYSERR)
1873     return;
1874   bucket = &k_buckets[bucket_idx];
1875   if (bucket->peers_size == 0)
1876     return;
1877   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1878                                      bucket->peers_size);
1879   peer = bucket->head;
1880   while (choice > 0)
1881   {
1882     GNUNET_assert (NULL != peer);
1883     peer = peer->next;
1884     choice--;
1885   }
1886   choice = bucket->peers_size;
1887   do
1888   {
1889     peer = peer->next;
1890     if (choice-- == 0)
1891       return;                   /* no non-masked peer available */
1892     if (NULL == peer)
1893       peer = bucket->head;
1894     GNUNET_BLOCK_mingle_hash (&peer->phash,
1895                               bf_mutator,
1896                               &mhash);
1897     hello = GDS_HELLO_get (peer->id);
1898   } while ( (hello == NULL) ||
1899             (GNUNET_YES ==
1900              GNUNET_CONTAINER_bloomfilter_test (bf,
1901                                                 &mhash)) );
1902   GDS_NEIGHBOURS_handle_reply (sender,
1903                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1904                                GNUNET_TIME_relative_to_absolute
1905                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1906                                key,
1907                                0,
1908                                NULL,
1909                                0,
1910                                NULL,
1911                                hello,
1912                                GNUNET_HELLO_size (hello));
1913 }
1914
1915
1916 /**
1917  * Check validity of p2p get request.
1918  *
1919  * @param cls closure with the `struct PeerInfo` of the sender
1920  * @param get the message
1921  * @return #GNUNET_OK if the message is well-formed
1922  */
1923 static int
1924 check_dht_p2p_get (void *cls,
1925                    const struct PeerGetMessage *get)
1926 {
1927   uint32_t xquery_size;
1928   uint16_t msize;
1929
1930   msize = ntohs (get->header.size);
1931   xquery_size = ntohl (get->xquery_size);
1932   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1933   {
1934     GNUNET_break_op (0);
1935     return GNUNET_SYSERR;
1936   }
1937   return GNUNET_OK;
1938 }
1939
1940
1941 /**
1942  * Core handler for p2p get requests.
1943  *
1944  * @param cls closure with the `struct PeerInfo` of the sender
1945  * @param get the message
1946  */
1947 static void
1948 handle_dht_p2p_get (void *cls,
1949                     const struct PeerGetMessage *get)
1950 {
1951   struct PeerInfo *peer = cls;
1952   uint32_t xquery_size;
1953   size_t reply_bf_size;
1954   uint16_t msize;
1955   enum GNUNET_BLOCK_Type type;
1956   enum GNUNET_DHT_RouteOption options;
1957   enum GNUNET_BLOCK_EvaluationResult eval;
1958   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1959   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1960   const char *xquery;
1961   int forwarded;
1962
1963   if (NULL == peer)
1964   {
1965     GNUNET_break (0);
1966     return;
1967   }
1968   /* parse and validate message */
1969   msize = ntohs (get->header.size);
1970   xquery_size = ntohl (get->xquery_size);
1971   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1972   type = ntohl (get->type);
1973   options = ntohl (get->options);
1974   xquery = (const char *) &get[1];
1975   reply_bf = NULL;
1976   GNUNET_STATISTICS_update (GDS_stats,
1977                             gettext_noop ("# P2P GET requests received"),
1978                             1,
1979                             GNUNET_NO);
1980   GNUNET_STATISTICS_update (GDS_stats,
1981                             gettext_noop ("# P2P GET bytes received"),
1982                             msize,
1983                             GNUNET_NO);
1984   if (GNUNET_YES == log_route_details_stderr)
1985   {
1986     char *tmp;
1987
1988     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1989     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1990                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
1991                  GNUNET_h2s (&get->key),
1992                  GNUNET_i2s (peer->id),
1993                  tmp,
1994                  ntohl(get->hop_count),
1995                  GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1996                                                    &get->key),
1997                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1998                                                    &get->key),
1999                  ntohl(get->xquery_size),
2000                  xquery);
2001     GNUNET_free (tmp);
2002   }
2003
2004   if (reply_bf_size > 0)
2005     reply_bf =
2006         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
2007                                            reply_bf_size,
2008                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
2009   eval =
2010       GNUNET_BLOCK_evaluate (GDS_block_context,
2011                              type,
2012                              GNUNET_BLOCK_EO_NONE,
2013                              &get->key,
2014                              &reply_bf,
2015                              get->bf_mutator,
2016                              xquery,
2017                              xquery_size,
2018                              NULL,
2019                              0);
2020   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2021   {
2022     /* request invalid or block type not supported */
2023     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2024     if (NULL != reply_bf)
2025       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2026     return;
2027   }
2028   peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2029                                                DHT_BLOOM_SIZE,
2030                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
2031   GNUNET_break_op (GNUNET_YES ==
2032                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2033                                                       &peer->phash));
2034   /* remember request for routing replies */
2035   GDS_ROUTING_add (peer->id,
2036                    type,
2037                    options,
2038                    &get->key,
2039                    xquery,
2040                    xquery_size,
2041                    reply_bf,
2042                    get->bf_mutator);
2043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2044               "GET for %s at %s after %u hops\n",
2045               GNUNET_h2s (&get->key),
2046               GNUNET_i2s (&my_identity),
2047               (unsigned int) ntohl (get->hop_count));
2048   /* local lookup (this may update the reply_bf) */
2049   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2050       (am_closest_peer (&get->key,
2051                         peer_bf)))
2052   {
2053     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2054     {
2055       GNUNET_STATISTICS_update (GDS_stats,
2056                                 gettext_noop ("# P2P FIND PEER requests processed"),
2057                                 1,
2058                                 GNUNET_NO);
2059       handle_find_peer (peer->id,
2060                         &get->key,
2061                         reply_bf,
2062                         get->bf_mutator);
2063     }
2064     else
2065     {
2066       eval = GDS_DATACACHE_handle_get (&get->key,
2067                                        type,
2068                                        xquery,
2069                                        xquery_size,
2070                                        &reply_bf,
2071                                        get->bf_mutator);
2072     }
2073   }
2074   else
2075   {
2076     GNUNET_STATISTICS_update (GDS_stats,
2077                               gettext_noop ("# P2P GET requests ONLY routed"),
2078                               1,
2079                               GNUNET_NO);
2080   }
2081
2082   /* P2P forwarding */
2083   forwarded = GNUNET_NO;
2084   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2085     forwarded = GDS_NEIGHBOURS_handle_get (type,
2086                                            options,
2087                                            ntohl (get->desired_replication_level),
2088                                            ntohl (get->hop_count),
2089                                            &get->key,
2090                                            xquery,
2091                                            xquery_size,
2092                                            reply_bf,
2093                                            get->bf_mutator,
2094                                            peer_bf);
2095   GDS_CLIENTS_process_get (options
2096                            | (GNUNET_OK == forwarded)
2097                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2098                            type,
2099                            ntohl (get->hop_count),
2100                            ntohl (get->desired_replication_level),
2101                            0,
2102                            NULL,
2103                            &get->key);
2104
2105
2106   /* clean up */
2107   if (NULL != reply_bf)
2108     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2109   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2110 }
2111
2112
2113 /**
2114  * Check validity of p2p result message.
2115  *
2116  * @param cls closure
2117  * @param message message
2118  * @return #GNUNET_YES if the message is well-formed
2119  */
2120 static int
2121 check_dht_p2p_result (void *cls,
2122                       const struct PeerResultMessage *prm)
2123 {
2124   uint32_t get_path_length;
2125   uint32_t put_path_length;
2126   uint16_t msize;
2127
2128   msize = ntohs (prm->header.size);
2129   put_path_length = ntohl (prm->put_path_length);
2130   get_path_length = ntohl (prm->get_path_length);
2131   if ((msize <
2132        sizeof (struct PeerResultMessage) + (get_path_length +
2133                                             put_path_length) *
2134        sizeof (struct GNUNET_PeerIdentity)) ||
2135       (get_path_length >
2136        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2137       (put_path_length >
2138        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2139   {
2140     GNUNET_break_op (0);
2141     return GNUNET_SYSERR;
2142   }
2143   return GNUNET_OK;
2144 }
2145
2146
2147 /**
2148  * Core handler for p2p result messages.
2149  *
2150  * @param cls closure
2151  * @param message message
2152  */
2153 static void
2154 handle_dht_p2p_result (void *cls,
2155                        const struct PeerResultMessage *prm)
2156 {
2157   struct PeerInfo *peer = cls;
2158   const struct GNUNET_PeerIdentity *put_path;
2159   const struct GNUNET_PeerIdentity *get_path;
2160   const void *data;
2161   uint32_t get_path_length;
2162   uint32_t put_path_length;
2163   uint16_t msize;
2164   size_t data_size;
2165   enum GNUNET_BLOCK_Type type;
2166
2167   /* parse and validate message */
2168   msize = ntohs (prm->header.size);
2169   put_path_length = ntohl (prm->put_path_length);
2170   get_path_length = ntohl (prm->get_path_length);
2171   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2172   get_path = &put_path[put_path_length];
2173   type = ntohl (prm->type);
2174   data = (const void *) &get_path[get_path_length];
2175   data_size = msize - (sizeof (struct PeerResultMessage) +
2176                        (get_path_length +
2177                         put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2178   GNUNET_STATISTICS_update (GDS_stats,
2179                             gettext_noop ("# P2P RESULTS received"),
2180                             1,
2181                             GNUNET_NO);
2182   GNUNET_STATISTICS_update (GDS_stats,
2183                             gettext_noop ("# P2P RESULT bytes received"),
2184                             msize,
2185                             GNUNET_NO);
2186   if (GNUNET_YES == log_route_details_stderr)
2187   {
2188     char *tmp;
2189
2190     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2191     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2192                  "R5N RESULT %s: %s->%s (%u)\n",
2193                  GNUNET_h2s (&prm->key),
2194                  GNUNET_i2s (peer->id),
2195                  tmp,
2196                  get_path_length + 1);
2197     GNUNET_free (tmp);
2198   }
2199   /* if we got a HELLO, consider it for our own routing table */
2200   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2201   {
2202     const struct GNUNET_MessageHeader *h;
2203     struct GNUNET_PeerIdentity pid;
2204
2205     /* Should be a HELLO, validate and consider using it! */
2206     if (data_size < sizeof (struct GNUNET_HELLO_Message))
2207     {
2208       GNUNET_break_op (0);
2209       return;
2210     }
2211     h = data;
2212     if (data_size != ntohs (h->size))
2213     {
2214       GNUNET_break_op (0);
2215       return;
2216     }
2217     if (GNUNET_OK !=
2218         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2219                              &pid))
2220     {
2221       GNUNET_break_op (0);
2222       return;
2223     }
2224     if ( (GNUNET_YES != disable_try_connect) &&
2225          (0 != memcmp (&my_identity,
2226                        &pid,
2227                        sizeof (struct GNUNET_PeerIdentity))) )
2228       try_connect (&pid,
2229                    h);
2230   }
2231
2232   /* append 'peer' to 'get_path' */
2233   {
2234     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2235
2236     GNUNET_memcpy (xget_path,
2237                    get_path,
2238                    get_path_length * sizeof (struct GNUNET_PeerIdentity));
2239     xget_path[get_path_length] = *peer->id;
2240     get_path_length++;
2241
2242     /* forward to local clients */
2243     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2244                               &prm->key,
2245                               get_path_length,
2246                               xget_path,
2247                               put_path_length,
2248                               put_path,
2249                               type,
2250                               data_size,
2251                               data);
2252     GDS_CLIENTS_process_get_resp (type,
2253                                   xget_path,
2254                                   get_path_length,
2255                                   put_path, put_path_length,
2256                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2257                                   &prm->key,
2258                                   data,
2259                                   data_size);
2260     if (GNUNET_YES == cache_results)
2261     {
2262       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2263
2264       GNUNET_memcpy (xput_path,
2265                      put_path,
2266                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
2267       GNUNET_memcpy (&xput_path[put_path_length],
2268                      xget_path,
2269                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
2270
2271       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2272                                 &prm->key,
2273                                 get_path_length + put_path_length,
2274                                 xput_path,
2275                                 type,
2276                                 data_size,
2277                                 data);
2278     }
2279     /* forward to other peers */
2280     GDS_ROUTING_process (type,
2281                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2282                          &prm->key,
2283                          put_path_length,
2284                          put_path,
2285                          get_path_length,
2286                          xget_path,
2287                          data,
2288                          data_size);
2289   }
2290 }
2291
2292
2293 /**
2294  * Initialize neighbours subsystem.
2295  *
2296  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2297  */
2298 int
2299 GDS_NEIGHBOURS_init ()
2300 {
2301   GNUNET_MQ_hd_var_size (dht_p2p_get,
2302                          GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2303                          struct PeerGetMessage);
2304   GNUNET_MQ_hd_var_size (dht_p2p_put,
2305                          GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2306                          struct PeerPutMessage);
2307   GNUNET_MQ_hd_var_size (dht_p2p_result,
2308                          GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2309                          struct PeerResultMessage);
2310   struct GNUNET_MQ_MessageHandler core_handlers[] = {
2311     make_dht_p2p_get_handler (NULL),
2312     make_dht_p2p_put_handler (NULL),
2313     make_dht_p2p_result_handler (NULL),
2314     GNUNET_MQ_handler_end ()
2315   };
2316   unsigned long long temp_config_num;
2317
2318   disable_try_connect
2319     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2320                                             "DHT",
2321                                             "DISABLE_TRY_CONNECT");
2322   if (GNUNET_OK ==
2323       GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2324                                              "DHT",
2325                                              "bucket_size",
2326                                              &temp_config_num))
2327     bucket_size = (unsigned int) temp_config_num;
2328   cache_results
2329     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2330                                             "DHT",
2331                                             "CACHE_RESULTS");
2332
2333   log_route_details_stderr =
2334     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2335   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2336   core_api = GNUNET_CORE_connecT (GDS_cfg,
2337                                   NULL,
2338                                   &core_init,
2339                                   &handle_core_connect,
2340                                   &handle_core_disconnect,
2341                                   core_handlers);
2342   if (NULL == core_api)
2343     return GNUNET_SYSERR;
2344   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2345                                                               GNUNET_YES);
2346   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2347                                                             GNUNET_NO);
2348   return GNUNET_OK;
2349 }
2350
2351
2352 /**
2353  * Shutdown neighbours subsystem.
2354  */
2355 void
2356 GDS_NEIGHBOURS_done ()
2357 {
2358   if (NULL == core_api)
2359     return;
2360   GNUNET_CORE_disconnecT (core_api);
2361   core_api = NULL;
2362   GNUNET_assert (0 ==
2363                  GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2364   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2365   all_connected_peers = NULL;
2366   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2367                                          &free_connect_info,
2368                                          NULL);
2369   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2370   all_desired_peers = NULL;
2371   GNUNET_ATS_connectivity_done (ats_ch);
2372   ats_ch = NULL;
2373   GNUNET_assert (NULL == find_peer_task);
2374 }
2375
2376
2377 /**
2378  * Get the ID of the local node.
2379  *
2380  * @return identity of the local node
2381  */
2382 struct GNUNET_PeerIdentity *
2383 GDS_NEIGHBOURS_get_id ()
2384 {
2385   return &my_identity;
2386 }
2387
2388
2389 /* end of gnunet-service-dht_neighbours.c */