-fix off-by-1
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_hello.h"
45 #include "gnunet-service-dht_neighbours.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include "dht.h"
49
50 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * Maximum allowed number of pending messages per peer.
74  */
75 #define MAXIMUM_PENDING_PER_PEER 64
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92 /**
93  * Hello address expiration
94  */
95 extern struct GNUNET_TIME_Relative hello_expiration;
96
97
98 GNUNET_NETWORK_STRUCT_BEGIN
99
100 /**
101  * P2P PUT message
102  */
103 struct PeerPutMessage
104 {
105   /**
106    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
107    */
108   struct GNUNET_MessageHeader header;
109
110   /**
111    * Processing options
112    */
113   uint32_t options GNUNET_PACKED;
114
115   /**
116    * Content type.
117    */
118   uint32_t type GNUNET_PACKED;
119
120   /**
121    * Hop count
122    */
123   uint32_t hop_count GNUNET_PACKED;
124
125   /**
126    * Replication level for this message
127    */
128   uint32_t desired_replication_level GNUNET_PACKED;
129
130   /**
131    * Length of the PUT path that follows (if tracked).
132    */
133   uint32_t put_path_length GNUNET_PACKED;
134
135   /**
136    * When does the content expire?
137    */
138   struct GNUNET_TIME_AbsoluteNBO expiration_time;
139
140   /**
141    * Bloomfilter (for peer identities) to stop circular routes
142    */
143   char bloomfilter[DHT_BLOOM_SIZE];
144
145   /**
146    * The key we are storing under.
147    */
148   struct GNUNET_HashCode key;
149
150   /* put path (if tracked) */
151
152   /* Payload */
153
154 };
155
156
157 /**
158  * P2P Result message
159  */
160 struct PeerResultMessage
161 {
162   /**
163    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Content type.
169    */
170   uint32_t type GNUNET_PACKED;
171
172   /**
173    * Length of the PUT path that follows (if tracked).
174    */
175   uint32_t put_path_length GNUNET_PACKED;
176
177   /**
178    * Length of the GET path that follows (if tracked).
179    */
180   uint32_t get_path_length GNUNET_PACKED;
181
182   /**
183    * When does the content expire?
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration_time;
186
187   /**
188    * The key of the corresponding GET request.
189    */
190   struct GNUNET_HashCode key;
191
192   /* put path (if tracked) */
193
194   /* get path (if tracked) */
195
196   /* Payload */
197
198 };
199
200
201 /**
202  * P2P GET message
203  */
204 struct PeerGetMessage
205 {
206   /**
207    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
208    */
209   struct GNUNET_MessageHeader header;
210
211   /**
212    * Processing options
213    */
214   uint32_t options GNUNET_PACKED;
215
216   /**
217    * Desired content type.
218    */
219   uint32_t type GNUNET_PACKED;
220
221   /**
222    * Hop count
223    */
224   uint32_t hop_count GNUNET_PACKED;
225
226   /**
227    * Desired replication level for this request.
228    */
229   uint32_t desired_replication_level GNUNET_PACKED;
230
231   /**
232    * Size of the extended query.
233    */
234   uint32_t xquery_size;
235
236   /**
237    * Bloomfilter mutator.
238    */
239   uint32_t bf_mutator;
240
241   /**
242    * Bloomfilter (for peer identities) to stop circular routes
243    */
244   char bloomfilter[DHT_BLOOM_SIZE];
245
246   /**
247    * The key we are looking for.
248    */
249   struct GNUNET_HashCode key;
250
251   /* xquery */
252
253   /* result bloomfilter */
254
255 };
256 GNUNET_NETWORK_STRUCT_END
257
258 /**
259  * Linked list of messages to send to a particular other peer.
260  */
261 struct P2PPendingMessage
262 {
263   /**
264    * Pointer to next item in the list
265    */
266   struct P2PPendingMessage *next;
267
268   /**
269    * Pointer to previous item in the list
270    */
271   struct P2PPendingMessage *prev;
272
273   /**
274    * Message importance level.  FIXME: used? useful?
275    */
276   unsigned int importance;
277
278   /**
279    * When does this message time out?
280    */
281   struct GNUNET_TIME_Absolute timeout;
282
283   /**
284    * Actual message to be sent, allocated at the end of the struct:
285    * // msg = (cast) &pm[1];
286    * // memcpy (&pm[1], data, len);
287    */
288   const struct GNUNET_MessageHeader *msg;
289
290 };
291
292
293 /**
294  * Entry for a peer in a bucket.
295  */
296 struct PeerInfo
297 {
298   /**
299    * Next peer entry (DLL)
300    */
301   struct PeerInfo *next;
302
303   /**
304    *  Prev peer entry (DLL)
305    */
306   struct PeerInfo *prev;
307
308   /**
309    * Count of outstanding messages for peer.
310    */
311   unsigned int pending_count;
312
313   /**
314    * Head of pending messages to be sent to this peer.
315    */
316   struct P2PPendingMessage *head;
317
318   /**
319    * Tail of pending messages to be sent to this peer.
320    */
321   struct P2PPendingMessage *tail;
322
323   /**
324    * Core handle for sending messages to this peer.
325    */
326   struct GNUNET_CORE_TransmitHandle *th;
327
328   /**
329    * What is the identity of the peer?
330    */
331   struct GNUNET_PeerIdentity id;
332
333 #if 0
334   /**
335    * What is the average latency for replies received?
336    */
337   struct GNUNET_TIME_Relative latency;
338
339   /**
340    * Transport level distance to peer.
341    */
342   unsigned int distance;
343 #endif
344
345 };
346
347
348 /**
349  * Peers are grouped into buckets.
350  */
351 struct PeerBucket
352 {
353   /**
354    * Head of DLL
355    */
356   struct PeerInfo *head;
357
358   /**
359    * Tail of DLL
360    */
361   struct PeerInfo *tail;
362
363   /**
364    * Number of peers in the bucket.
365    */
366   unsigned int peers_size;
367 };
368
369
370 /**
371  * Information about a peer that we would like to connect to.
372  */
373 struct ConnectInfo
374 {
375
376   /**
377    * Handle to active HELLO offer operation, or NULL.
378    */
379   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
380
381   /**
382    * Handle to active connectivity suggestion operation, or NULL.
383    */
384   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
385
386   /**
387    * How much would we like to connect to this peer?
388    */
389   uint32_t strength;
390 };
391
392
393 /**
394  * Do we cache all results that we are routing in the local datacache?
395  */
396 static int cache_results;
397
398 /**
399  * Should routing details be logged to stderr (for debugging)?
400  */
401 static int log_route_details_stderr;
402
403 /**
404  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
405  */
406 static unsigned int closest_bucket;
407
408 /**
409  * How many peers have we added since we sent out our last
410  * find peer request?
411  */
412 static unsigned int newly_found_peers;
413
414 /**
415  * Option for testing that disables the 'connect' function of the DHT.
416  */
417 static int disable_try_connect;
418
419 /**
420  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
421  */
422 static struct PeerBucket k_buckets[MAX_BUCKETS];
423
424 /**
425  * Hash map of all CORE-connected peers, for easy removal from
426  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
427  */
428 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
429
430 /**
431  * Hash map of all peers we would like to be connected to.
432  * Values are of type `struct ConnectInfo`.
433  */
434 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
435
436 /**
437  * Maximum size for each bucket.
438  */
439 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
440
441 /**
442  * Task that sends FIND PEER requests.
443  */
444 static struct GNUNET_SCHEDULER_Task *find_peer_task;
445
446 /**
447  * Identity of this peer.
448  */
449 static struct GNUNET_PeerIdentity my_identity;
450
451 /**
452  * Hash of the identity of this peer.
453  */
454 static struct GNUNET_HashCode my_identity_hash;
455
456 /**
457  * Handle to CORE.
458  */
459 static struct GNUNET_CORE_Handle *core_api;
460
461 /**
462  * Handle to ATS connectivity.
463  */
464 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
465
466
467 /**
468  * Find the optimal bucket for this key.
469  *
470  * @param hc the hashcode to compare our identity to
471  * @return the proper bucket index, or GNUNET_SYSERR
472  *         on error (same hashcode)
473  */
474 static int
475 find_bucket (const struct GNUNET_HashCode *hc)
476 {
477   unsigned int bits;
478
479   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
480   if (bits == MAX_BUCKETS)
481   {
482     /* How can all bits match? Got my own ID? */
483     GNUNET_break (0);
484     return GNUNET_SYSERR;
485   }
486   return MAX_BUCKETS - bits - 1;
487 }
488
489
490 /**
491  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
492  * Clean up the "oh" field in the @a cls
493  *
494  * @param cls a `struct ConnectInfo`
495  */
496 static void
497 offer_hello_done (void *cls)
498 {
499   struct ConnectInfo *ci = cls;
500
501   ci->oh = NULL;
502 }
503
504
505 /**
506  * Function called for all entries in #all_desired_peers to clean up.
507  *
508  * @param cls NULL
509  * @param peer peer the entry is for
510  * @param value the value to remove
511  * @return #GNUNET_YES
512  */
513 static int
514 free_connect_info (void *cls,
515                    const struct GNUNET_PeerIdentity *peer,
516                    void *value)
517 {
518   struct ConnectInfo *ci = value;
519
520   GNUNET_assert (GNUNET_YES ==
521                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
522                                                        peer,
523                                                        ci));
524   if (NULL != ci->sh)
525   {
526     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
527     ci->sh = NULL;
528   }
529   if (NULL != ci->oh)
530   {
531     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
532     ci->oh = NULL;
533   }
534   GNUNET_free (ci);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Consider if we want to connect to a given peer, and if so
541  * let ATS know.  If applicable, the HELLO is offered to the
542  * TRANSPORT service.
543  *
544  * @param pid peer to consider connectivity requirements for
545  * @param h a HELLO message, or NULL
546  */
547 static void
548 try_connect (const struct GNUNET_PeerIdentity *pid,
549              const struct GNUNET_MessageHeader *h)
550 {
551   int bucket;
552   struct GNUNET_HashCode pid_hash;
553   struct ConnectInfo *ci;
554   uint32_t strength;
555
556   GNUNET_CRYPTO_hash (pid,
557                       sizeof (struct GNUNET_PeerIdentity),
558                       &pid_hash);
559   bucket = find_bucket (&pid_hash);
560   if (bucket < 0)
561     return; /* self? */
562   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
563                                           pid);
564
565   if (k_buckets[bucket].peers_size < bucket_size)
566     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
567   else
568     strength = bucket; /* minimum value of connectivity */
569   if (GNUNET_YES ==
570       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
571                                               pid))
572     strength *= 2; /* double for connected peers */
573   else if (k_buckets[bucket].peers_size > bucket_size)
574     strength = 0; /* bucket full, we really do not care about more */
575
576   if ( (0 == strength) &&
577        (NULL != ci) )
578   {
579     /* release request */
580     GNUNET_assert (GNUNET_YES ==
581                    free_connect_info (NULL,
582                                       pid,
583                                       ci));
584     return;
585   }
586   if (NULL == ci)
587   {
588     ci = GNUNET_new (struct ConnectInfo);
589     GNUNET_assert (GNUNET_OK ==
590                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
591                                                       pid,
592                                                       ci,
593                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
594   }
595   if ( (NULL != GDS_transport_handle) &&
596        (NULL != ci->oh) &&
597        (NULL != h) )
598     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
599   if ( (NULL != GDS_transport_handle) &&
600        (NULL != h) )
601     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
602                                            h,
603                                            &offer_hello_done,
604                                            ci);
605   if ( (NULL != ci->sh) &&
606        (ci->strength != strength) )
607     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
608   if (ci->strength != strength)
609     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
610                                               pid,
611                                               strength);
612   ci->strength = strength;
613 }
614
615
616 /**
617  * Function called for each peer in #all_desired_peers during
618  * #update_connect_preferences() if we have reason to adjust
619  * the strength of our desire to keep connections to certain
620  * peers.  Calls #try_connect() to update the calculations for
621  * the given @a pid.
622  *
623  * @param cls NULL
624  * @param pid peer to update
625  * @param value unused
626  * @return #GNUNET_YES (continue to iterate)
627  */
628 static int
629 update_desire_strength (void *cls,
630                         const struct GNUNET_PeerIdentity *pid,
631                         void *value)
632 {
633   try_connect (pid, NULL);
634   return GNUNET_YES;
635 }
636
637
638 /**
639  * Update our preferences for connectivity as given to ATS.
640  *
641  * @param cls the `struct PeerInfo` of the peer
642  * @param tc scheduler context.
643  */
644 static void
645 update_connect_preferences ()
646 {
647   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
648                                          &update_desire_strength,
649                                          NULL);
650 }
651
652
653 /**
654  * Closure for #add_known_to_bloom().
655  */
656 struct BloomConstructorContext
657 {
658   /**
659    * Bloom filter under construction.
660    */
661   struct GNUNET_CONTAINER_BloomFilter *bloom;
662
663   /**
664    * Mutator to use.
665    */
666   uint32_t bf_mutator;
667 };
668
669
670 /**
671  * Add each of the peers we already know to the bloom filter of
672  * the request so that we don't get duplicate HELLOs.
673  *
674  * @param cls the 'struct BloomConstructorContext'.
675  * @param key peer identity to add to the bloom filter
676  * @param value value the peer information (unused)
677  * @return #GNUNET_YES (we should continue to iterate)
678  */
679 static int
680 add_known_to_bloom (void *cls,
681                     const struct GNUNET_PeerIdentity *key,
682                     void *value)
683 {
684   struct BloomConstructorContext *ctx = cls;
685   struct GNUNET_HashCode key_hash;
686   struct GNUNET_HashCode mh;
687
688   GNUNET_CRYPTO_hash (key,
689                       sizeof (struct GNUNET_PeerIdentity),
690                       &key_hash);
691   GNUNET_BLOCK_mingle_hash (&key_hash,
692                             ctx->bf_mutator,
693                             &mh);
694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
696               GNUNET_i2s (key), ctx->bf_mutator);
697   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
698   return GNUNET_YES;
699 }
700
701
702 /**
703  * Task to send a find peer message for our own peer identifier
704  * so that we can find the closest peers in the network to ourselves
705  * and attempt to connect to them.
706  *
707  * @param cls closure for this task
708  */
709 static void
710 send_find_peer_message (void *cls)
711 {
712   struct GNUNET_TIME_Relative next_send_time;
713   struct BloomConstructorContext bcc;
714   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
715
716   find_peer_task = NULL;
717   if (newly_found_peers > bucket_size)
718   {
719     /* If we are finding many peers already, no need to send out our request right now! */
720     find_peer_task =
721         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
722                                       &send_find_peer_message, NULL);
723     newly_found_peers = 0;
724     return;
725   }
726   bcc.bf_mutator =
727       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
728   bcc.bloom =
729       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
730                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
731   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
732                                          &add_known_to_bloom,
733                                          &bcc);
734   GNUNET_STATISTICS_update (GDS_stats,
735                             gettext_noop ("# FIND PEER messages initiated"), 1,
736                             GNUNET_NO);
737   peer_bf =
738       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
739                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
740   // FIXME: pass priority!?
741   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
742                              GNUNET_DHT_RO_FIND_PEER,
743                              FIND_PEER_REPLICATION_LEVEL, 0,
744                              &my_identity_hash, NULL, 0, bcc.bloom,
745                              bcc.bf_mutator, peer_bf);
746   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
747   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
748   /* schedule next round */
749   next_send_time.rel_value_us =
750       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
751       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
752                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
753                                 (newly_found_peers + 1));
754   newly_found_peers = 0;
755   GNUNET_assert (NULL == find_peer_task);
756   find_peer_task =
757       GNUNET_SCHEDULER_add_delayed (next_send_time,
758                                     &send_find_peer_message,
759                                     NULL);
760 }
761
762
763 /**
764  * Method called whenever a peer connects.
765  *
766  * @param cls closure
767  * @param peer peer identity this notification is about
768  */
769 static void
770 handle_core_connect (void *cls,
771                      const struct GNUNET_PeerIdentity *peer)
772 {
773   struct PeerInfo *ret;
774   struct GNUNET_HashCode phash;
775   int peer_bucket;
776
777   /* Check for connect to self message */
778   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
779     return;
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781               "Connected to %s\n",
782               GNUNET_i2s (peer));
783   if (GNUNET_YES ==
784       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
785                                               peer))
786   {
787     GNUNET_break (0);
788     return;
789   }
790   GNUNET_STATISTICS_update (GDS_stats,
791                             gettext_noop ("# peers connected"),
792                             1,
793                             GNUNET_NO);
794   GNUNET_CRYPTO_hash (peer,
795                       sizeof (struct GNUNET_PeerIdentity),
796                       &phash);
797   peer_bucket = find_bucket (&phash);
798   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
799   ret = GNUNET_new (struct PeerInfo);
800 #if 0
801   ret->latency = latency;
802   ret->distance = distance;
803 #endif
804   ret->id = *peer;
805   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
806                                     k_buckets[peer_bucket].tail,
807                                     ret);
808   k_buckets[peer_bucket].peers_size++;
809   closest_bucket = GNUNET_MAX (closest_bucket,
810                                peer_bucket);
811   GNUNET_assert (GNUNET_OK ==
812                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
813                                                     peer,
814                                                     ret,
815                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
816   if ( (peer_bucket > 0) &&
817        (k_buckets[peer_bucket].peers_size <= bucket_size))
818   {
819     update_connect_preferences ();
820     newly_found_peers++;
821   }
822   if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
823       (GNUNET_YES != disable_try_connect))
824   {
825     /* got a first connection, good time to start with FIND PEER requests... */
826     GNUNET_assert (NULL == find_peer_task);
827     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
828                                                NULL);
829   }
830 }
831
832
833 /**
834  * Method called whenever a peer disconnects.
835  *
836  * @param cls closure
837  * @param peer peer identity this notification is about
838  */
839 static void
840 handle_core_disconnect (void *cls,
841                         const struct GNUNET_PeerIdentity *peer)
842 {
843   struct PeerInfo *to_remove;
844   int current_bucket;
845   struct P2PPendingMessage *pos;
846   unsigned int discarded;
847   struct GNUNET_HashCode phash;
848
849   /* Check for disconnect from self message */
850   if (0 == memcmp (&my_identity,
851                    peer,
852                    sizeof (struct GNUNET_PeerIdentity)))
853     return;
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855               "Disconnected %s\n",
856               GNUNET_i2s (peer));
857   to_remove =
858       GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
859                                          peer);
860   if (NULL == to_remove)
861   {
862     GNUNET_break (0);
863     return;
864   }
865   GNUNET_STATISTICS_update (GDS_stats,
866                             gettext_noop ("# peers connected"),
867                             -1,
868                             GNUNET_NO);
869   GNUNET_assert (GNUNET_YES ==
870                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
871                                                        peer,
872                                                        to_remove));
873   GNUNET_CRYPTO_hash (peer,
874                       sizeof (struct GNUNET_PeerIdentity),
875                       &phash);
876   current_bucket = find_bucket (&phash);
877   GNUNET_assert (current_bucket >= 0);
878   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
879                                k_buckets[current_bucket].tail,
880                                to_remove);
881   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
882   k_buckets[current_bucket].peers_size--;
883   while ( (closest_bucket > 0) &&
884           (0 == k_buckets[closest_bucket].peers_size) )
885     closest_bucket--;
886   if (NULL != to_remove->th)
887   {
888     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
889     to_remove->th = NULL;
890   }
891   discarded = 0;
892   while (NULL != (pos = to_remove->head))
893   {
894     GNUNET_CONTAINER_DLL_remove (to_remove->head,
895                                  to_remove->tail,
896                                  pos);
897     discarded++;
898     GNUNET_free (pos);
899   }
900   if (k_buckets[current_bucket].peers_size < bucket_size)
901     update_connect_preferences ();
902   GNUNET_STATISTICS_update (GDS_stats,
903                             gettext_noop ("# Queued messages discarded (peer disconnected)"),
904                             discarded,
905                             GNUNET_NO);
906   GNUNET_free (to_remove);
907 }
908
909
910 /**
911  * Called when core is ready to send a message we asked for
912  * out to the destination.
913  *
914  * @param cls the 'struct PeerInfo' of the target peer
915  * @param size number of bytes available in @a buf
916  * @param buf where the callee should write the message
917  * @return number of bytes written to @a buf
918  */
919 static size_t
920 core_transmit_notify (void *cls,
921                       size_t size,
922                       void *buf)
923 {
924   struct PeerInfo *peer = cls;
925   char *cbuf = buf;
926   struct P2PPendingMessage *pending;
927   size_t off;
928   size_t msize;
929
930   peer->th = NULL;
931   while ((NULL != (pending = peer->head)) &&
932          (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
933   {
934     GNUNET_STATISTICS_update (GDS_stats,
935                               gettext_noop
936                               ("# Messages dropped (CORE timeout)"),
937                               1,
938                               GNUNET_NO);
939     peer->pending_count--;
940     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
941     GNUNET_free (pending);
942   }
943   if (NULL == pending)
944   {
945     /* no messages pending */
946     return 0;
947   }
948   if (NULL == buf)
949   {
950     peer->th =
951         GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
952                                            GNUNET_CORE_PRIO_BEST_EFFORT,
953                                            GNUNET_TIME_absolute_get_remaining
954                                            (pending->timeout), &peer->id,
955                                            ntohs (pending->msg->size),
956                                            &core_transmit_notify, peer);
957     GNUNET_break (NULL != peer->th);
958     return 0;
959   }
960   off = 0;
961   while ((NULL != (pending = peer->head)) &&
962          (size - off >= (msize = ntohs (pending->msg->size))))
963   {
964     GNUNET_STATISTICS_update (GDS_stats,
965                               gettext_noop
966                               ("# Bytes transmitted to other peers"), msize,
967                               GNUNET_NO);
968     memcpy (&cbuf[off], pending->msg, msize);
969     off += msize;
970     peer->pending_count--;
971     GNUNET_CONTAINER_DLL_remove (peer->head,
972                                  peer->tail,
973                                  pending);
974     GNUNET_free (pending);
975   }
976   if (NULL != (pending = peer->head))
977   {
978     /* technically redundant, but easier to read and
979        avoids bogus gcc warning... */
980     msize = ntohs (pending->msg->size);
981     peer->th =
982       GNUNET_CORE_notify_transmit_ready (core_api,
983                                          GNUNET_NO,
984                                          GNUNET_CORE_PRIO_BEST_EFFORT,
985                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
986                                          &peer->id,
987                                          msize,
988                                          &core_transmit_notify,
989                                          peer);
990     GNUNET_break (NULL != peer->th);
991   }
992   return off;
993 }
994
995
996 /**
997  * Transmit all messages in the peer's message queue.
998  *
999  * @param peer message queue to process
1000  */
1001 static void
1002 process_peer_queue (struct PeerInfo *peer)
1003 {
1004   struct P2PPendingMessage *pending;
1005
1006   if (NULL == (pending = peer->head))
1007     return;
1008   if (NULL != peer->th)
1009     return;
1010   GNUNET_STATISTICS_update (GDS_stats,
1011                             gettext_noop
1012                             ("# Bytes of bandwidth requested from core"),
1013                             ntohs (pending->msg->size), GNUNET_NO);
1014   peer->th =
1015       GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
1016                                          GNUNET_CORE_PRIO_BEST_EFFORT,
1017                                          GNUNET_TIME_absolute_get_remaining
1018                                          (pending->timeout),
1019                                          &peer->id,
1020                                          ntohs (pending->msg->size),
1021                                          &core_transmit_notify,
1022                                          peer);
1023   GNUNET_break (NULL != peer->th);
1024 }
1025
1026
1027 /**
1028  * To how many peers should we (on average) forward the request to
1029  * obtain the desired target_replication count (on average).
1030  *
1031  * @param hop_count number of hops the message has traversed
1032  * @param target_replication the number of total paths desired
1033  * @return Some number of peers to forward the message to
1034  */
1035 static unsigned int
1036 get_forward_count (uint32_t hop_count, uint32_t target_replication)
1037 {
1038   uint32_t random_value;
1039   uint32_t forward_count;
1040   float target_value;
1041
1042   if (hop_count > GDS_NSE_get () * 4.0)
1043   {
1044     /* forcefully terminate */
1045     GNUNET_STATISTICS_update (GDS_stats,
1046                               gettext_noop ("# requests TTL-dropped"),
1047                               1, GNUNET_NO);
1048     return 0;
1049   }
1050   if (hop_count > GDS_NSE_get () * 2.0)
1051   {
1052     /* Once we have reached our ideal number of hops, only forward to 1 peer */
1053     return 1;
1054   }
1055   /* bound by system-wide maximum */
1056   target_replication =
1057       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
1058   target_value =
1059       1 + (target_replication - 1.0) / (GDS_NSE_get () +
1060                                         ((float) (target_replication - 1.0) *
1061                                          hop_count));
1062   /* Set forward count to floor of target_value */
1063   forward_count = (uint32_t) target_value;
1064   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1065   target_value = target_value - forward_count;
1066   random_value =
1067       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1068   if (random_value < (target_value * UINT32_MAX))
1069     forward_count++;
1070   return forward_count;
1071 }
1072
1073
1074 /**
1075  * Compute the distance between have and target as a 32-bit value.
1076  * Differences in the lower bits must count stronger than differences
1077  * in the higher bits.
1078  *
1079  * @param target
1080  * @param have
1081  * @return 0 if have==target, otherwise a number
1082  *           that is larger as the distance between
1083  *           the two hash codes increases
1084  */
1085 static unsigned int
1086 get_distance (const struct GNUNET_HashCode *target,
1087               const struct GNUNET_HashCode *have)
1088 {
1089   unsigned int bucket;
1090   unsigned int msb;
1091   unsigned int lsb;
1092   unsigned int i;
1093
1094   /* We have to represent the distance between two 2^9 (=512)-bit
1095    * numbers as a 2^5 (=32)-bit number with "0" being used for the
1096    * two numbers being identical; furthermore, we need to
1097    * guarantee that a difference in the number of matching
1098    * bits is always represented in the result.
1099    *
1100    * We use 2^32/2^9 numerical values to distinguish between
1101    * hash codes that have the same LSB bit distance and
1102    * use the highest 2^9 bits of the result to signify the
1103    * number of (mis)matching LSB bits; if we have 0 matching
1104    * and hence 512 mismatching LSB bits we return -1 (since
1105    * 512 itself cannot be represented with 9 bits) */
1106
1107   /* first, calculate the most significant 9 bits of our
1108    * result, aka the number of LSBs */
1109   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1110   /* bucket is now a value between 0 and 512 */
1111   if (bucket == 512)
1112     return 0;                   /* perfect match */
1113   if (bucket == 0)
1114     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1115                                  * below, we'd end up with max+1 (overflow)) */
1116
1117   /* calculate the most significant bits of the final result */
1118   msb = (512 - bucket) << (32 - 9);
1119   /* calculate the 32-9 least significant bits of the final result by
1120    * looking at the differences in the 32-9 bits following the
1121    * mismatching bit at 'bucket' */
1122   lsb = 0;
1123   for (i = bucket + 1;
1124        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1125   {
1126     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
1127         GNUNET_CRYPTO_hash_get_bit (have, i))
1128       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
1129                                                  * last bit set will be 31 -- if
1130                                                  * i does not reach 512 first... */
1131   }
1132   return msb | lsb;
1133 }
1134
1135
1136 /**
1137  * Check whether my identity is closer than any known peers.  If a
1138  * non-null bloomfilter is given, check if this is the closest peer
1139  * that hasn't already been routed to.
1140  *
1141  * @param key hash code to check closeness to
1142  * @param bloom bloomfilter, exclude these entries from the decision
1143  * @return #GNUNET_YES if node location is closest,
1144  *         #GNUNET_NO otherwise.
1145  */
1146 static int
1147 am_closest_peer (const struct GNUNET_HashCode *key,
1148                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
1149 {
1150   int bits;
1151   int other_bits;
1152   int bucket_num;
1153   int count;
1154   struct PeerInfo *pos;
1155   struct GNUNET_HashCode phash;
1156
1157   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
1158     return GNUNET_YES;
1159   bucket_num = find_bucket (key);
1160   GNUNET_assert (bucket_num >= 0);
1161   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key);
1162   pos = k_buckets[bucket_num].head;
1163   count = 0;
1164   while ((NULL != pos) && (count < bucket_size))
1165   {
1166     GNUNET_CRYPTO_hash (&pos->id,
1167                         sizeof (struct GNUNET_PeerIdentity),
1168                         &phash);
1169     if ((NULL != bloom) &&
1170         (GNUNET_YES ==
1171          GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1172     {
1173       pos = pos->next;
1174       continue;                 /* Skip already checked entries */
1175     }
1176     other_bits = GNUNET_CRYPTO_hash_matching_bits (&phash, key);
1177     if (other_bits > bits)
1178       return GNUNET_NO;
1179     if (other_bits == bits)     /* We match the same number of bits */
1180       return GNUNET_YES;
1181     pos = pos->next;
1182   }
1183   /* No peers closer, we are the closest! */
1184   return GNUNET_YES;
1185 }
1186
1187
1188 /**
1189  * Select a peer from the routing table that would be a good routing
1190  * destination for sending a message for "key".  The resulting peer
1191  * must not be in the set of blocked peers.<p>
1192  *
1193  * Note that we should not ALWAYS select the closest peer to the
1194  * target, peers further away from the target should be chosen with
1195  * exponentially declining probability.
1196  *
1197  * FIXME: double-check that this is fine
1198  *
1199  *
1200  * @param key the key we are selecting a peer to route to
1201  * @param bloom a bloomfilter containing entries this request has seen already
1202  * @param hops how many hops has this message traversed thus far
1203  * @return Peer to route to, or NULL on error
1204  */
1205 static struct PeerInfo *
1206 select_peer (const struct GNUNET_HashCode *key,
1207              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1208              uint32_t hops)
1209 {
1210   unsigned int bc;
1211   unsigned int count;
1212   unsigned int selected;
1213   struct PeerInfo *pos;
1214   unsigned int dist;
1215   unsigned int smallest_distance;
1216   struct PeerInfo *chosen;
1217   struct GNUNET_HashCode phash;
1218
1219   if (hops >= GDS_NSE_get ())
1220   {
1221     /* greedy selection (closest peer that is not in bloomfilter) */
1222     smallest_distance = UINT_MAX;
1223     chosen = NULL;
1224     for (bc = 0; bc <= closest_bucket; bc++)
1225     {
1226       pos = k_buckets[bc].head;
1227       count = 0;
1228       while ((pos != NULL) && (count < bucket_size))
1229       {
1230         GNUNET_CRYPTO_hash (&pos->id,
1231                             sizeof (struct GNUNET_PeerIdentity),
1232                             &phash);
1233         if ((bloom == NULL) ||
1234             (GNUNET_NO ==
1235              GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1236         {
1237           dist = get_distance (key, &phash);
1238           if (dist < smallest_distance)
1239           {
1240             chosen = pos;
1241             smallest_distance = dist;
1242           }
1243         }
1244         else
1245         {
1246           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1248                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1249           GNUNET_STATISTICS_update (GDS_stats,
1250                                     gettext_noop
1251                                     ("# Peers excluded from routing due to Bloomfilter"),
1252                                     1, GNUNET_NO);
1253           dist = get_distance (key, &phash);
1254           if (dist < smallest_distance)
1255           {
1256             chosen = NULL;
1257             smallest_distance = dist;
1258           }
1259         }
1260         count++;
1261         pos = pos->next;
1262       }
1263     }
1264     if (NULL == chosen)
1265       GNUNET_STATISTICS_update (GDS_stats,
1266                                 gettext_noop ("# Peer selection failed"), 1,
1267                                 GNUNET_NO);
1268     return chosen;
1269   }
1270
1271   /* select "random" peer */
1272   /* count number of peers that are available and not filtered */
1273   count = 0;
1274   for (bc = 0; bc <= closest_bucket; bc++)
1275   {
1276     pos = k_buckets[bc].head;
1277     while ((pos != NULL) && (count < bucket_size))
1278     {
1279       GNUNET_CRYPTO_hash (&pos->id,
1280                           sizeof (struct GNUNET_PeerIdentity),
1281                           &phash);
1282       if ((bloom != NULL) &&
1283           (GNUNET_YES ==
1284            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1285       {
1286         GNUNET_STATISTICS_update (GDS_stats,
1287                                   gettext_noop
1288                                   ("# Peers excluded from routing due to Bloomfilter"),
1289                                   1, GNUNET_NO);
1290         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1292                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1293         pos = pos->next;
1294         continue;               /* Ignore bloomfiltered peers */
1295       }
1296       count++;
1297       pos = pos->next;
1298     }
1299   }
1300   if (0 == count)               /* No peers to select from! */
1301   {
1302     GNUNET_STATISTICS_update (GDS_stats,
1303                               gettext_noop ("# Peer selection failed"), 1,
1304                               GNUNET_NO);
1305     return NULL;
1306   }
1307   /* Now actually choose a peer */
1308   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1309   count = 0;
1310   for (bc = 0; bc <= closest_bucket; bc++)
1311   {
1312     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1313     {
1314       GNUNET_CRYPTO_hash (&pos->id,
1315                           sizeof (struct GNUNET_PeerIdentity),
1316                           &phash);
1317       if ((bloom != NULL) &&
1318           (GNUNET_YES ==
1319            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1320       {
1321         continue;               /* Ignore bloomfiltered peers */
1322       }
1323       if (0 == selected--)
1324         return pos;
1325     }
1326   }
1327   GNUNET_break (0);
1328   return NULL;
1329 }
1330
1331
1332 /**
1333  * Compute the set of peers that the given request should be
1334  * forwarded to.
1335  *
1336  * @param key routing key
1337  * @param bloom bloom filter excluding peers as targets, all selected
1338  *        peers will be added to the bloom filter
1339  * @param hop_count number of hops the request has traversed so far
1340  * @param target_replication desired number of replicas
1341  * @param targets where to store an array of target peers (to be
1342  *         free'd by the caller)
1343  * @return number of peers returned in 'targets'.
1344  */
1345 static unsigned int
1346 get_target_peers (const struct GNUNET_HashCode *key,
1347                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1348                   uint32_t hop_count, uint32_t target_replication,
1349                   struct PeerInfo ***targets)
1350 {
1351   unsigned int ret;
1352   unsigned int off;
1353   struct PeerInfo **rtargets;
1354   struct PeerInfo *nxt;
1355   struct GNUNET_HashCode nhash;
1356
1357   GNUNET_assert (NULL != bloom);
1358   ret = get_forward_count (hop_count, target_replication);
1359   if (0 == ret)
1360   {
1361     *targets = NULL;
1362     return 0;
1363   }
1364   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1365   for (off = 0; off < ret; off++)
1366   {
1367     nxt = select_peer (key, bloom, hop_count);
1368     if (NULL == nxt)
1369       break;
1370     rtargets[off] = nxt;
1371     GNUNET_CRYPTO_hash (&nxt->id,
1372                         sizeof (struct GNUNET_PeerIdentity),
1373                         &nhash);
1374     GNUNET_break (GNUNET_NO ==
1375                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1376                                                      &nhash));
1377     GNUNET_CONTAINER_bloomfilter_add (bloom, &nhash);
1378   }
1379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1380               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1381               off,
1382               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1383               (unsigned int) hop_count,
1384               GNUNET_h2s (key),
1385               ret);
1386   if (0 == off)
1387   {
1388     GNUNET_free (rtargets);
1389     *targets = NULL;
1390     return 0;
1391   }
1392   *targets = rtargets;
1393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1395               GNUNET_h2s (key),
1396               off,
1397               ret);
1398   return off;
1399 }
1400
1401
1402 /**
1403  * Perform a PUT operation.   Forwards the given request to other
1404  * peers.   Does not store the data locally.  Does not give the
1405  * data to local clients.  May do nothing if this is the only
1406  * peer in the network (or if we are the closest peer in the
1407  * network).
1408  *
1409  * @param type type of the block
1410  * @param options routing options
1411  * @param desired_replication_level desired replication count
1412  * @param expiration_time when does the content expire
1413  * @param hop_count how many hops has this message traversed so far
1414  * @param bf Bloom filter of peers this PUT has already traversed
1415  * @param key key for the content
1416  * @param put_path_length number of entries in @a put_path
1417  * @param put_path peers this request has traversed so far (if tracked)
1418  * @param data payload to store
1419  * @param data_size number of bytes in @a data
1420  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1421  */
1422 int
1423 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1424                            enum GNUNET_DHT_RouteOption options,
1425                            uint32_t desired_replication_level,
1426                            struct GNUNET_TIME_Absolute expiration_time,
1427                            uint32_t hop_count,
1428                            struct GNUNET_CONTAINER_BloomFilter *bf,
1429                            const struct GNUNET_HashCode *key,
1430                            unsigned int put_path_length,
1431                            struct GNUNET_PeerIdentity *put_path,
1432                            const void *data, size_t data_size)
1433 {
1434   unsigned int target_count;
1435   unsigned int i;
1436   struct PeerInfo **targets;
1437   struct PeerInfo *target;
1438   struct P2PPendingMessage *pending;
1439   size_t msize;
1440   struct PeerPutMessage *ppm;
1441   struct GNUNET_PeerIdentity *pp;
1442   struct GNUNET_HashCode thash;
1443   unsigned int skip_count;
1444
1445   GNUNET_assert (NULL != bf);
1446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1447               "Adding myself (%s) to PUT bloomfilter for %s\n",
1448               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1449   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1450   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1451                             1, GNUNET_NO);
1452   target_count =
1453       get_target_peers (key, bf, hop_count, desired_replication_level,
1454                         &targets);
1455   if (0 == target_count)
1456   {
1457     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1458                 "Routing PUT for %s terminates after %u hops at %s\n",
1459                 GNUNET_h2s (key), (unsigned int) hop_count,
1460                 GNUNET_i2s (&my_identity));
1461     return GNUNET_NO;
1462   }
1463   msize =
1464       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1465       sizeof (struct PeerPutMessage);
1466   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1467   {
1468     put_path_length = 0;
1469     msize = data_size + sizeof (struct PeerPutMessage);
1470   }
1471   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1472   {
1473     GNUNET_break (0);
1474     GNUNET_free (targets);
1475     return GNUNET_NO;
1476   }
1477   GNUNET_STATISTICS_update (GDS_stats,
1478                             gettext_noop
1479                             ("# PUT messages queued for transmission"),
1480                             target_count, GNUNET_NO);
1481   skip_count = 0;
1482   for (i = 0; i < target_count; i++)
1483   {
1484     target = targets[i];
1485     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1486     {
1487       /* skip */
1488       GNUNET_STATISTICS_update (GDS_stats,
1489                                 gettext_noop ("# P2P messages dropped due to full queue"),
1490                                 1, GNUNET_NO);
1491       skip_count++;
1492       continue;
1493     }
1494     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1495                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
1496                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1497     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1498     pending->importance = 0;    /* FIXME */
1499     pending->timeout = expiration_time;
1500     ppm = (struct PeerPutMessage *) &pending[1];
1501     pending->msg = &ppm->header;
1502     ppm->header.size = htons (msize);
1503     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1504     ppm->options = htonl (options);
1505     ppm->type = htonl (type);
1506     ppm->hop_count = htonl (hop_count + 1);
1507     ppm->desired_replication_level = htonl (desired_replication_level);
1508     ppm->put_path_length = htonl (put_path_length);
1509     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1510     GNUNET_CRYPTO_hash (&target->id,
1511                         sizeof (struct GNUNET_PeerIdentity),
1512                         &thash);
1513     GNUNET_break (GNUNET_YES ==
1514                   GNUNET_CONTAINER_bloomfilter_test (bf,
1515                                                      &thash));
1516     GNUNET_assert (GNUNET_OK ==
1517                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1518                                                               ppm->bloomfilter,
1519                                                               DHT_BLOOM_SIZE));
1520     ppm->key = *key;
1521     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1522     memcpy (pp, put_path,
1523             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1524     memcpy (&pp[put_path_length], data, data_size);
1525     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1526     target->pending_count++;
1527     process_peer_queue (target);
1528   }
1529   GNUNET_free (targets);
1530   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1531 }
1532
1533
1534 /**
1535  * Perform a GET operation.  Forwards the given request to other
1536  * peers.  Does not lookup the key locally.  May do nothing if this is
1537  * the only peer in the network (or if we are the closest peer in the
1538  * network).
1539  *
1540  * @param type type of the block
1541  * @param options routing options
1542  * @param desired_replication_level desired replication count
1543  * @param hop_count how many hops did this request traverse so far?
1544  * @param key key for the content
1545  * @param xquery extended query
1546  * @param xquery_size number of bytes in @a xquery
1547  * @param reply_bf bloomfilter to filter duplicates
1548  * @param reply_bf_mutator mutator for @a reply_bf
1549  * @param peer_bf filter for peers not to select (again)
1550  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1551  */
1552 int
1553 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1554                            enum GNUNET_DHT_RouteOption options,
1555                            uint32_t desired_replication_level,
1556                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1557                            const void *xquery, size_t xquery_size,
1558                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1559                            uint32_t reply_bf_mutator,
1560                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1561 {
1562   unsigned int target_count;
1563   unsigned int i;
1564   struct PeerInfo **targets;
1565   struct PeerInfo *target;
1566   struct P2PPendingMessage *pending;
1567   size_t msize;
1568   struct PeerGetMessage *pgm;
1569   char *xq;
1570   size_t reply_bf_size;
1571   struct GNUNET_HashCode thash;
1572   unsigned int skip_count;
1573
1574   GNUNET_assert (NULL != peer_bf);
1575   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1576                             1, GNUNET_NO);
1577   target_count =
1578       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1579                         &targets);
1580   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1581               "Adding myself (%s) to GET bloomfilter for %s\n",
1582               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1583   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity_hash);
1584   if (0 == target_count)
1585   {
1586     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1587                 "Routing GET for %s terminates after %u hops at %s\n",
1588                 GNUNET_h2s (key), (unsigned int) hop_count,
1589                 GNUNET_i2s (&my_identity));
1590     return GNUNET_NO;
1591   }
1592   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1593   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1594   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1595   {
1596     GNUNET_break (0);
1597     GNUNET_free (targets);
1598     return GNUNET_NO;
1599   }
1600   GNUNET_STATISTICS_update (GDS_stats,
1601                             gettext_noop
1602                             ("# GET messages queued for transmission"),
1603                             target_count, GNUNET_NO);
1604   /* forward request */
1605   skip_count = 0;
1606   for (i = 0; i < target_count; i++)
1607   {
1608     target = targets[i];
1609     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1610     {
1611       /* skip */
1612       GNUNET_STATISTICS_update (GDS_stats,
1613                                 gettext_noop ("# P2P messages dropped due to full queue"),
1614                                 1, GNUNET_NO);
1615       skip_count++;
1616       continue;
1617     }
1618     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
1620                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1621     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1622     pending->importance = 0;    /* FIXME */
1623     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1624     pgm = (struct PeerGetMessage *) &pending[1];
1625     pending->msg = &pgm->header;
1626     pgm->header.size = htons (msize);
1627     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1628     pgm->options = htonl (options);
1629     pgm->type = htonl (type);
1630     pgm->hop_count = htonl (hop_count + 1);
1631     pgm->desired_replication_level = htonl (desired_replication_level);
1632     pgm->xquery_size = htonl (xquery_size);
1633     pgm->bf_mutator = reply_bf_mutator;
1634     GNUNET_CRYPTO_hash (&target->id,
1635                         sizeof (struct GNUNET_PeerIdentity),
1636                         &thash);
1637     GNUNET_break (GNUNET_YES ==
1638                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1639                                                      &thash));
1640     GNUNET_assert (GNUNET_OK ==
1641                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1642                                                               pgm->bloomfilter,
1643                                                               DHT_BLOOM_SIZE));
1644     pgm->key = *key;
1645     xq = (char *) &pgm[1];
1646     memcpy (xq, xquery, xquery_size);
1647     if (NULL != reply_bf)
1648       GNUNET_assert (GNUNET_OK ==
1649                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1650                                                                 &xq
1651                                                                 [xquery_size],
1652                                                                 reply_bf_size));
1653     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1654     target->pending_count++;
1655     process_peer_queue (target);
1656   }
1657   GNUNET_free (targets);
1658   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1659 }
1660
1661
1662 /**
1663  * Handle a reply (route to origin).  Only forwards the reply back to
1664  * the given peer.  Does not do local caching or forwarding to local
1665  * clients.
1666  *
1667  * @param target neighbour that should receive the block (if still connected)
1668  * @param type type of the block
1669  * @param expiration_time when does the content expire
1670  * @param key key for the content
1671  * @param put_path_length number of entries in @a put_path
1672  * @param put_path peers the original PUT traversed (if tracked)
1673  * @param get_path_length number of entries in @a get_path
1674  * @param get_path peers this reply has traversed so far (if tracked)
1675  * @param data payload of the reply
1676  * @param data_size number of bytes in @a data
1677  */
1678 void
1679 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1680                              enum GNUNET_BLOCK_Type type,
1681                              struct GNUNET_TIME_Absolute expiration_time,
1682                              const struct GNUNET_HashCode *key,
1683                              unsigned int put_path_length,
1684                              const struct GNUNET_PeerIdentity *put_path,
1685                              unsigned int get_path_length,
1686                              const struct GNUNET_PeerIdentity *get_path,
1687                              const void *data,
1688                              size_t data_size)
1689 {
1690   struct PeerInfo *pi;
1691   struct P2PPendingMessage *pending;
1692   size_t msize;
1693   struct PeerResultMessage *prm;
1694   struct GNUNET_PeerIdentity *paths;
1695
1696   msize =
1697       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1698                                                        put_path_length) *
1699       sizeof (struct GNUNET_PeerIdentity);
1700   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1701       (get_path_length >
1702        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1703       (put_path_length >
1704        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1705       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1706   {
1707     GNUNET_break (0);
1708     return;
1709   }
1710   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1711                                           target);
1712   if (NULL == pi)
1713   {
1714     /* peer disconnected in the meantime, drop reply */
1715     return;
1716   }
1717   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
1718   {
1719     /* skip */
1720     GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1721                               1, GNUNET_NO);
1722     return;
1723   }
1724
1725   GNUNET_STATISTICS_update (GDS_stats,
1726                             gettext_noop
1727                             ("# RESULT messages queued for transmission"), 1,
1728                             GNUNET_NO);
1729   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1730   pending->importance = 0;      /* FIXME */
1731   pending->timeout = expiration_time;
1732   prm = (struct PeerResultMessage *) &pending[1];
1733   pending->msg = &prm->header;
1734   prm->header.size = htons (msize);
1735   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1736   prm->type = htonl (type);
1737   prm->put_path_length = htonl (put_path_length);
1738   prm->get_path_length = htonl (get_path_length);
1739   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1740   prm->key = *key;
1741   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1742   memcpy (paths, put_path,
1743           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1744   memcpy (&paths[put_path_length], get_path,
1745           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1746   memcpy (&paths[put_path_length + get_path_length], data, data_size);
1747   GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
1748   pi->pending_count++;
1749   process_peer_queue (pi);
1750 }
1751
1752
1753 /**
1754  * To be called on core init/fail.
1755  *
1756  * @param cls service closure
1757  * @param identity the public identity of this peer
1758  */
1759 static void
1760 core_init (void *cls,
1761            const struct GNUNET_PeerIdentity *identity)
1762 {
1763   my_identity = *identity;
1764   GNUNET_CRYPTO_hash (identity,
1765                       sizeof (struct GNUNET_PeerIdentity),
1766                       &my_identity_hash);
1767 }
1768
1769
1770 /**
1771  * Core handler for p2p put requests.
1772  *
1773  * @param cls closure
1774  * @param peer sender of the request
1775  * @param message message
1776  * @param peer peer identity this notification is about
1777  * @return #GNUNET_OK to keep the connection open,
1778  *         #GNUNET_SYSERR to close it (signal serious error)
1779  */
1780 static int
1781 handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1782                     const struct GNUNET_MessageHeader *message)
1783 {
1784   const struct PeerPutMessage *put;
1785   const struct GNUNET_PeerIdentity *put_path;
1786   const void *payload;
1787   uint32_t putlen;
1788   uint16_t msize;
1789   size_t payload_size;
1790   enum GNUNET_DHT_RouteOption options;
1791   struct GNUNET_CONTAINER_BloomFilter *bf;
1792   struct GNUNET_HashCode test_key;
1793   struct GNUNET_HashCode phash;
1794   int forwarded;
1795
1796   msize = ntohs (message->size);
1797   if (msize < sizeof (struct PeerPutMessage))
1798   {
1799     GNUNET_break_op (0);
1800     return GNUNET_YES;
1801   }
1802   put = (const struct PeerPutMessage *) message;
1803   putlen = ntohl (put->put_path_length);
1804   if ((msize <
1805        sizeof (struct PeerPutMessage) +
1806        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1807       (putlen >
1808        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1809   {
1810     GNUNET_break_op (0);
1811     return GNUNET_YES;
1812   }
1813   GNUNET_STATISTICS_update (GDS_stats,
1814                             gettext_noop ("# P2P PUT requests received"), 1,
1815                             GNUNET_NO);
1816   GNUNET_STATISTICS_update (GDS_stats,
1817                             gettext_noop ("# P2P PUT bytes received"), msize,
1818                             GNUNET_NO);
1819   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1820   payload = &put_path[putlen];
1821   options = ntohl (put->options);
1822   payload_size =
1823       msize - (sizeof (struct PeerPutMessage) +
1824                putlen * sizeof (struct GNUNET_PeerIdentity));
1825
1826   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1827               GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1828   GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash);
1829   if (GNUNET_YES == log_route_details_stderr)
1830   {
1831     char *tmp;
1832
1833     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1834     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1835                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1836                  GNUNET_h2s (&put->key), GNUNET_i2s (peer), tmp,
1837                  ntohl(put->hop_count),
1838                  GNUNET_CRYPTO_hash_matching_bits (&phash, &put->key),
1839                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &put->key)
1840                 );
1841     GNUNET_free (tmp);
1842   }
1843   switch (GNUNET_BLOCK_get_key
1844           (GDS_block_context, ntohl (put->type), payload, payload_size,
1845            &test_key))
1846   {
1847   case GNUNET_YES:
1848     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
1849     {
1850       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1851       GNUNET_break_op (0);
1852       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1853                   "PUT with key `%s' for block with key %s\n",
1854                   put_s, GNUNET_h2s_full (&test_key));
1855       GNUNET_free (put_s);
1856       return GNUNET_YES;
1857     }
1858     break;
1859   case GNUNET_NO:
1860     GNUNET_break_op (0);
1861     return GNUNET_YES;
1862   case GNUNET_SYSERR:
1863     /* cannot verify, good luck */
1864     break;
1865   }
1866   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1867   {
1868     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1869                                    ntohl (put->type),
1870                                    GNUNET_BLOCK_EO_NONE,
1871                                    NULL,    /* query */
1872                                    NULL, 0, /* bloom filer */
1873                                    NULL, 0, /* xquery */
1874                                    payload, payload_size))
1875     {
1876     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1877     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1878       break;
1879
1880     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1881     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1882     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1883     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1884     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1885     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1886     default:
1887       GNUNET_break_op (0);
1888       return GNUNET_OK;
1889     }
1890   }
1891
1892   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
1893                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1894   GNUNET_break_op (GNUNET_YES ==
1895                    GNUNET_CONTAINER_bloomfilter_test (bf, &phash));
1896   {
1897     struct GNUNET_PeerIdentity pp[putlen + 1];
1898
1899     /* extend 'put path' by sender */
1900     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1901     {
1902       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1903       pp[putlen] = *peer;
1904       putlen++;
1905     }
1906     else
1907       putlen = 0;
1908
1909     /* give to local clients */
1910     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1911                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1912                               payload_size, payload);
1913     /* store locally */
1914     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1915         (am_closest_peer (&put->key, bf)))
1916       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1917                                 (put->expiration_time), &put->key, putlen, pp,
1918                                 ntohl (put->type), payload_size, payload);
1919     /* route to other peers */
1920     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
1921                                            ntohl (put->desired_replication_level),
1922                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1923                                            ntohl (put->hop_count), bf,
1924                                            &put->key,
1925                                            putlen,
1926                                            pp,
1927                                            payload,
1928                                            payload_size);
1929     /* notify monitoring clients */
1930     GDS_CLIENTS_process_put (options
1931                              | ( (GNUNET_OK == forwarded)
1932                                  ? GNUNET_DHT_RO_LAST_HOP
1933                                  : 0 ),
1934                              ntohl (put->type),
1935                              ntohl (put->hop_count),
1936                              ntohl (put->desired_replication_level),
1937                              putlen, pp,
1938                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1939                              &put->key,
1940                              payload,
1941                              payload_size);
1942   }
1943   GNUNET_CONTAINER_bloomfilter_free (bf);
1944   return GNUNET_YES;
1945 }
1946
1947
1948 /**
1949  * We have received a FIND PEER request.  Send matching
1950  * HELLOs back.
1951  *
1952  * @param sender sender of the FIND PEER request
1953  * @param key peers close to this key are desired
1954  * @param bf peers matching this bf are excluded
1955  * @param bf_mutator mutator for bf
1956  */
1957 static void
1958 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1959                   const struct GNUNET_HashCode * key,
1960                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
1961 {
1962   int bucket_idx;
1963   struct PeerBucket *bucket;
1964   struct PeerInfo *peer;
1965   unsigned int choice;
1966   struct GNUNET_HashCode phash;
1967   struct GNUNET_HashCode mhash;
1968   const struct GNUNET_HELLO_Message *hello;
1969
1970   /* first, check about our own HELLO */
1971   if (NULL != GDS_my_hello)
1972   {
1973     GNUNET_BLOCK_mingle_hash (&my_identity_hash, bf_mutator, &mhash);
1974     if ((NULL == bf) ||
1975         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1976     {
1977       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1978                                    GNUNET_TIME_relative_to_absolute
1979                                    (hello_expiration),
1980                                    key, 0, NULL, 0, NULL, GDS_my_hello,
1981                                    GNUNET_HELLO_size ((const struct
1982                                                        GNUNET_HELLO_Message *)
1983                                                       GDS_my_hello));
1984     }
1985     else
1986     {
1987       GNUNET_STATISTICS_update (GDS_stats,
1988                                 gettext_noop
1989                                 ("# FIND PEER requests ignored due to Bloomfilter"),
1990                                 1, GNUNET_NO);
1991     }
1992   }
1993   else
1994   {
1995     GNUNET_STATISTICS_update (GDS_stats,
1996                               gettext_noop
1997                               ("# FIND PEER requests ignored due to lack of HELLO"),
1998                               1, GNUNET_NO);
1999   }
2000
2001   /* then, also consider sending a random HELLO from the closest bucket */
2002   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
2003     bucket_idx = closest_bucket;
2004   else
2005     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
2006   if (bucket_idx == GNUNET_SYSERR)
2007     return;
2008   bucket = &k_buckets[bucket_idx];
2009   if (bucket->peers_size == 0)
2010     return;
2011   choice =
2012       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
2013   peer = bucket->head;
2014   while (choice > 0)
2015   {
2016     GNUNET_assert (NULL != peer);
2017     peer = peer->next;
2018     choice--;
2019   }
2020   choice = bucket->peers_size;
2021   do
2022   {
2023     peer = peer->next;
2024     if (choice-- == 0)
2025       return;                   /* no non-masked peer available */
2026     if (peer == NULL)
2027       peer = bucket->head;
2028     GNUNET_CRYPTO_hash (&peer->id, sizeof (struct GNUNET_PeerIdentity), &phash);
2029     GNUNET_BLOCK_mingle_hash (&phash, bf_mutator, &mhash);
2030     hello = GDS_HELLO_get (&peer->id);
2031   }
2032   while ((hello == NULL) ||
2033          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
2034   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
2035                                GNUNET_TIME_relative_to_absolute
2036                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
2037                                0, NULL, 0, NULL, hello,
2038                                GNUNET_HELLO_size (hello));
2039 }
2040
2041
2042 /**
2043  * Core handler for p2p get requests.
2044  *
2045  * @param cls closure
2046  * @param peer sender of the request
2047  * @param message message
2048  * @return #GNUNET_OK to keep the connection open,
2049  *         #GNUNET_SYSERR to close it (signal serious error)
2050  */
2051 static int
2052 handle_dht_p2p_get (void *cls,
2053                     const struct GNUNET_PeerIdentity *peer,
2054                     const struct GNUNET_MessageHeader *message)
2055 {
2056   struct PeerGetMessage *get;
2057   uint32_t xquery_size;
2058   size_t reply_bf_size;
2059   uint16_t msize;
2060   enum GNUNET_BLOCK_Type type;
2061   enum GNUNET_DHT_RouteOption options;
2062   enum GNUNET_BLOCK_EvaluationResult eval;
2063   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
2064   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2065   const char *xquery;
2066   struct GNUNET_HashCode phash;
2067   int forwarded;
2068
2069   GNUNET_break (0 !=
2070                 memcmp (peer, &my_identity,
2071                         sizeof (struct GNUNET_PeerIdentity)));
2072   /* parse and validate message */
2073   msize = ntohs (message->size);
2074   if (msize < sizeof (struct PeerGetMessage))
2075   {
2076     GNUNET_break_op (0);
2077     return GNUNET_YES;
2078   }
2079   get = (struct PeerGetMessage *) message;
2080   xquery_size = ntohl (get->xquery_size);
2081   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
2082   {
2083     GNUNET_break_op (0);
2084     return GNUNET_YES;
2085   }
2086   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2087   type = ntohl (get->type);
2088   options = ntohl (get->options);
2089   xquery = (const char *) &get[1];
2090   reply_bf = NULL;
2091   GNUNET_STATISTICS_update (GDS_stats,
2092                             gettext_noop ("# P2P GET requests received"), 1,
2093                             GNUNET_NO);
2094   GNUNET_STATISTICS_update (GDS_stats,
2095                             gettext_noop ("# P2P GET bytes received"), msize,
2096                             GNUNET_NO);
2097   GNUNET_CRYPTO_hash (peer,
2098                       sizeof (struct GNUNET_PeerIdentity),
2099                       &phash);
2100   if (GNUNET_YES == log_route_details_stderr)
2101   {
2102     char *tmp;
2103
2104     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2105     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2106                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2107                  GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp,
2108                  ntohl(get->hop_count),
2109                  GNUNET_CRYPTO_hash_matching_bits (&phash, &get->key),
2110                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key),
2111                  ntohl(get->xquery_size), xquery);
2112     GNUNET_free (tmp);
2113   }
2114
2115   if (reply_bf_size > 0)
2116     reply_bf =
2117         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
2118                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
2119   eval =
2120       GNUNET_BLOCK_evaluate (GDS_block_context,
2121                              type,
2122                              GNUNET_BLOCK_EO_NONE,
2123                              &get->key,
2124                              &reply_bf,
2125                              get->bf_mutator,
2126                              xquery,
2127                              xquery_size,
2128                              NULL,
2129                              0);
2130   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2131   {
2132     /* request invalid or block type not supported */
2133     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2134     if (NULL != reply_bf)
2135       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2136     return GNUNET_YES;
2137   }
2138   peer_bf =
2139       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
2140                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
2141   GNUNET_break_op (GNUNET_YES ==
2142                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2143                                                       &phash));
2144   /* remember request for routing replies */
2145   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
2146                    reply_bf, get->bf_mutator);
2147   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2148               "GET for %s at %s after %u hops\n",
2149               GNUNET_h2s (&get->key),
2150               GNUNET_i2s (&my_identity),
2151               (unsigned int) ntohl (get->hop_count));
2152   /* local lookup (this may update the reply_bf) */
2153   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2154       (am_closest_peer (&get->key, peer_bf)))
2155   {
2156     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2157     {
2158       GNUNET_STATISTICS_update (GDS_stats,
2159                                 gettext_noop
2160                                 ("# P2P FIND PEER requests processed"), 1,
2161                                 GNUNET_NO);
2162       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
2163     }
2164     else
2165     {
2166       eval =
2167           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
2168                                     &reply_bf, get->bf_mutator);
2169     }
2170   }
2171   else
2172   {
2173     GNUNET_STATISTICS_update (GDS_stats,
2174                               gettext_noop ("# P2P GET requests ONLY routed"),
2175                               1, GNUNET_NO);
2176   }
2177
2178   /* P2P forwarding */
2179   forwarded = GNUNET_NO;
2180   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2181     forwarded = GDS_NEIGHBOURS_handle_get (type, options,
2182                                            ntohl (get->desired_replication_level),
2183                                            ntohl (get->hop_count),
2184                                            &get->key,
2185                                            xquery,
2186                                            xquery_size,
2187                                            reply_bf,
2188                                            get->bf_mutator, peer_bf);
2189   GDS_CLIENTS_process_get (options
2190                            | (GNUNET_OK == forwarded)
2191                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2192                            type,
2193                            ntohl (get->hop_count),
2194                            ntohl (get->desired_replication_level),
2195                            0, NULL,
2196                            &get->key);
2197
2198
2199   /* clean up */
2200   if (NULL != reply_bf)
2201     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2202   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2203   return GNUNET_YES;
2204 }
2205
2206
2207 /**
2208  * Core handler for p2p result messages.
2209  *
2210  * @param cls closure
2211  * @param message message
2212  * @param peer peer identity this notification is about
2213  * @return #GNUNET_YES (do not cut p2p connection)
2214  */
2215 static int
2216 handle_dht_p2p_result (void *cls,
2217                        const struct GNUNET_PeerIdentity *peer,
2218                        const struct GNUNET_MessageHeader *message)
2219 {
2220   const struct PeerResultMessage *prm;
2221   const struct GNUNET_PeerIdentity *put_path;
2222   const struct GNUNET_PeerIdentity *get_path;
2223   const void *data;
2224   uint32_t get_path_length;
2225   uint32_t put_path_length;
2226   uint16_t msize;
2227   size_t data_size;
2228   enum GNUNET_BLOCK_Type type;
2229
2230   /* parse and validate message */
2231   msize = ntohs (message->size);
2232   if (msize < sizeof (struct PeerResultMessage))
2233   {
2234     GNUNET_break_op (0);
2235     return GNUNET_YES;
2236   }
2237   prm = (struct PeerResultMessage *) message;
2238   put_path_length = ntohl (prm->put_path_length);
2239   get_path_length = ntohl (prm->get_path_length);
2240   if ((msize <
2241        sizeof (struct PeerResultMessage) + (get_path_length +
2242                                             put_path_length) *
2243        sizeof (struct GNUNET_PeerIdentity)) ||
2244       (get_path_length >
2245        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2246       (put_path_length >
2247        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2248   {
2249     GNUNET_break_op (0);
2250     return GNUNET_YES;
2251   }
2252   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2253   get_path = &put_path[put_path_length];
2254   type = ntohl (prm->type);
2255   data = (const void *) &get_path[get_path_length];
2256   data_size =
2257       msize - (sizeof (struct PeerResultMessage) +
2258                (get_path_length +
2259                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2260   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
2261                             1, GNUNET_NO);
2262   GNUNET_STATISTICS_update (GDS_stats,
2263                             gettext_noop ("# P2P RESULT bytes received"),
2264                             msize, GNUNET_NO);
2265   if (GNUNET_YES == log_route_details_stderr)
2266   {
2267     char *tmp;
2268
2269     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2270     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2271                  "R5N RESULT %s: %s->%s (%u)\n",
2272                  GNUNET_h2s (&prm->key),
2273                  GNUNET_i2s (peer),
2274                  tmp,
2275                  get_path_length + 1);
2276     GNUNET_free (tmp);
2277   }
2278   /* if we got a HELLO, consider it for our own routing table */
2279   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2280   {
2281     const struct GNUNET_MessageHeader *h;
2282     struct GNUNET_PeerIdentity pid;
2283
2284     /* Should be a HELLO, validate and consider using it! */
2285     if (data_size < sizeof (struct GNUNET_MessageHeader))
2286     {
2287       GNUNET_break_op (0);
2288       return GNUNET_YES;
2289     }
2290     h = data;
2291     if (data_size != ntohs (h->size))
2292     {
2293       GNUNET_break_op (0);
2294       return GNUNET_YES;
2295     }
2296     if (GNUNET_OK !=
2297         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2298                              &pid))
2299     {
2300       GNUNET_break_op (0);
2301       return GNUNET_YES;
2302     }
2303     if ( (GNUNET_YES != disable_try_connect) &&
2304          (0 != memcmp (&my_identity,
2305                        &pid,
2306                        sizeof (struct GNUNET_PeerIdentity))) )
2307       try_connect (&pid,
2308                    h);
2309   }
2310
2311   /* append 'peer' to 'get_path' */
2312   {
2313     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2314
2315     memcpy (xget_path,
2316             get_path,
2317             get_path_length * sizeof (struct GNUNET_PeerIdentity));
2318     xget_path[get_path_length] = *peer;
2319     get_path_length++;
2320
2321     /* forward to local clients */
2322     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2323                               &prm->key,
2324                               get_path_length,
2325                               xget_path,
2326                               put_path_length,
2327                               put_path,
2328                               type,
2329                               data_size,
2330                               data);
2331     GDS_CLIENTS_process_get_resp (type,
2332                                   xget_path,
2333                                   get_path_length,
2334                                   put_path, put_path_length,
2335                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2336                                   &prm->key,
2337                                   data,
2338                                   data_size);
2339     if (GNUNET_YES == cache_results)
2340     {
2341       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2342
2343       memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
2344       memcpy (&xput_path[put_path_length],
2345               xget_path,
2346               get_path_length * sizeof (struct GNUNET_PeerIdentity));
2347
2348       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2349                                 &prm->key,
2350                                 get_path_length + put_path_length,
2351                                 xput_path,
2352                                 type,
2353                                 data_size,
2354                                 data);
2355     }
2356     /* forward to other peers */
2357     GDS_ROUTING_process (type,
2358                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2359                          &prm->key,
2360                          put_path_length,
2361                          put_path,
2362                          get_path_length,
2363                          xget_path,
2364                          data,
2365                          data_size);
2366   }
2367
2368   return GNUNET_YES;
2369 }
2370
2371
2372 /**
2373  * Initialize neighbours subsystem.
2374  *
2375  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2376  */
2377 int
2378 GDS_NEIGHBOURS_init ()
2379 {
2380   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2381     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2382     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2383     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2384     {NULL, 0, 0}
2385   };
2386   unsigned long long temp_config_num;
2387
2388   disable_try_connect
2389     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT");
2390   if (GNUNET_OK ==
2391       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2392                                              &temp_config_num))
2393     bucket_size = (unsigned int) temp_config_num;
2394   cache_results
2395     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS");
2396
2397   log_route_details_stderr =
2398     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2399   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2400   core_api =
2401       GNUNET_CORE_connect (GDS_cfg, NULL,
2402                            &core_init,
2403                            &handle_core_connect,
2404                            &handle_core_disconnect,
2405                            NULL, GNUNET_NO,
2406                            NULL, GNUNET_NO,
2407                            core_handlers);
2408   if (core_api == NULL)
2409     return GNUNET_SYSERR;
2410   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2411                                                               GNUNET_NO);
2412   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2413                                                             GNUNET_NO);
2414   return GNUNET_OK;
2415 }
2416
2417
2418 /**
2419  * Shutdown neighbours subsystem.
2420  */
2421 void
2422 GDS_NEIGHBOURS_done ()
2423 {
2424   if (NULL == core_api)
2425     return;
2426   GNUNET_CORE_disconnect (core_api);
2427   core_api = NULL;
2428   GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2429   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2430   all_connected_peers = NULL;
2431   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2432                                          &free_connect_info,
2433                                          NULL);
2434   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2435   all_desired_peers = NULL;
2436   GNUNET_ATS_connectivity_done (ats_ch);
2437   ats_ch = NULL;
2438   if (NULL != find_peer_task)
2439   {
2440     GNUNET_SCHEDULER_cancel (find_peer_task);
2441     find_peer_task = NULL;
2442   }
2443 }
2444
2445
2446 /**
2447  * Get the ID of the local node.
2448  *
2449  * @return identity of the local node
2450  */
2451 struct GNUNET_PeerIdentity *
2452 GDS_NEIGHBOURS_get_id ()
2453 {
2454   return &my_identity;
2455 }
2456
2457
2458 /* end of gnunet-service-dht_neighbours.c */