use new connecT API
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_hello.h"
45 #include "gnunet-service-dht_neighbours.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include "dht.h"
49
50 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * Maximum allowed number of pending messages per peer.
74  */
75 #define MAXIMUM_PENDING_PER_PEER 64
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92 /**
93  * Hello address expiration
94  */
95 extern struct GNUNET_TIME_Relative hello_expiration;
96
97
98 GNUNET_NETWORK_STRUCT_BEGIN
99
100 /**
101  * P2P PUT message
102  */
103 struct PeerPutMessage
104 {
105   /**
106    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
107    */
108   struct GNUNET_MessageHeader header;
109
110   /**
111    * Processing options
112    */
113   uint32_t options GNUNET_PACKED;
114
115   /**
116    * Content type.
117    */
118   uint32_t type GNUNET_PACKED;
119
120   /**
121    * Hop count
122    */
123   uint32_t hop_count GNUNET_PACKED;
124
125   /**
126    * Replication level for this message
127    */
128   uint32_t desired_replication_level GNUNET_PACKED;
129
130   /**
131    * Length of the PUT path that follows (if tracked).
132    */
133   uint32_t put_path_length GNUNET_PACKED;
134
135   /**
136    * When does the content expire?
137    */
138   struct GNUNET_TIME_AbsoluteNBO expiration_time;
139
140   /**
141    * Bloomfilter (for peer identities) to stop circular routes
142    */
143   char bloomfilter[DHT_BLOOM_SIZE];
144
145   /**
146    * The key we are storing under.
147    */
148   struct GNUNET_HashCode key;
149
150   /* put path (if tracked) */
151
152   /* Payload */
153
154 };
155
156
157 /**
158  * P2P Result message
159  */
160 struct PeerResultMessage
161 {
162   /**
163    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Content type.
169    */
170   uint32_t type GNUNET_PACKED;
171
172   /**
173    * Length of the PUT path that follows (if tracked).
174    */
175   uint32_t put_path_length GNUNET_PACKED;
176
177   /**
178    * Length of the GET path that follows (if tracked).
179    */
180   uint32_t get_path_length GNUNET_PACKED;
181
182   /**
183    * When does the content expire?
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration_time;
186
187   /**
188    * The key of the corresponding GET request.
189    */
190   struct GNUNET_HashCode key;
191
192   /* put path (if tracked) */
193
194   /* get path (if tracked) */
195
196   /* Payload */
197
198 };
199
200
201 /**
202  * P2P GET message
203  */
204 struct PeerGetMessage
205 {
206   /**
207    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
208    */
209   struct GNUNET_MessageHeader header;
210
211   /**
212    * Processing options
213    */
214   uint32_t options GNUNET_PACKED;
215
216   /**
217    * Desired content type.
218    */
219   uint32_t type GNUNET_PACKED;
220
221   /**
222    * Hop count
223    */
224   uint32_t hop_count GNUNET_PACKED;
225
226   /**
227    * Desired replication level for this request.
228    */
229   uint32_t desired_replication_level GNUNET_PACKED;
230
231   /**
232    * Size of the extended query.
233    */
234   uint32_t xquery_size;
235
236   /**
237    * Bloomfilter mutator.
238    */
239   uint32_t bf_mutator;
240
241   /**
242    * Bloomfilter (for peer identities) to stop circular routes
243    */
244   char bloomfilter[DHT_BLOOM_SIZE];
245
246   /**
247    * The key we are looking for.
248    */
249   struct GNUNET_HashCode key;
250
251   /* xquery */
252
253   /* result bloomfilter */
254
255 };
256 GNUNET_NETWORK_STRUCT_END
257
258 /**
259  * Linked list of messages to send to a particular other peer.
260  */
261 struct P2PPendingMessage
262 {
263   /**
264    * Pointer to next item in the list
265    */
266   struct P2PPendingMessage *next;
267
268   /**
269    * Pointer to previous item in the list
270    */
271   struct P2PPendingMessage *prev;
272
273   /**
274    * Message importance level.  FIXME: used? useful?
275    */
276   unsigned int importance;
277
278   /**
279    * When does this message time out?
280    */
281   struct GNUNET_TIME_Absolute timeout;
282
283   /**
284    * Actual message to be sent, allocated at the end of the struct:
285    * // msg = (cast) &pm[1];
286    * // memcpy (&pm[1], data, len);
287    */
288   const struct GNUNET_MessageHeader *msg;
289
290 };
291
292
293 /**
294  * Entry for a peer in a bucket.
295  */
296 struct PeerInfo
297 {
298   /**
299    * Next peer entry (DLL)
300    */
301   struct PeerInfo *next;
302
303   /**
304    *  Prev peer entry (DLL)
305    */
306   struct PeerInfo *prev;
307
308   /**
309    * Count of outstanding messages for peer.
310    */
311   unsigned int pending_count;
312
313   /**
314    * Head of pending messages to be sent to this peer.
315    */
316   struct P2PPendingMessage *head;
317
318   /**
319    * Tail of pending messages to be sent to this peer.
320    */
321   struct P2PPendingMessage *tail;
322
323   /**
324    * Core handle for sending messages to this peer.
325    */
326   struct GNUNET_CORE_TransmitHandle *th;
327
328   /**
329    * What is the identity of the peer?
330    */
331   struct GNUNET_PeerIdentity id;
332
333 #if 0
334   /**
335    * What is the average latency for replies received?
336    */
337   struct GNUNET_TIME_Relative latency;
338
339   /**
340    * Transport level distance to peer.
341    */
342   unsigned int distance;
343 #endif
344
345 };
346
347
348 /**
349  * Peers are grouped into buckets.
350  */
351 struct PeerBucket
352 {
353   /**
354    * Head of DLL
355    */
356   struct PeerInfo *head;
357
358   /**
359    * Tail of DLL
360    */
361   struct PeerInfo *tail;
362
363   /**
364    * Number of peers in the bucket.
365    */
366   unsigned int peers_size;
367 };
368
369
370 /**
371  * Information about a peer that we would like to connect to.
372  */
373 struct ConnectInfo
374 {
375
376   /**
377    * Handle to active HELLO offer operation, or NULL.
378    */
379   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
380
381   /**
382    * Handle to active connectivity suggestion operation, or NULL.
383    */
384   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
385
386   /**
387    * How much would we like to connect to this peer?
388    */
389   uint32_t strength;
390 };
391
392
393 /**
394  * Do we cache all results that we are routing in the local datacache?
395  */
396 static int cache_results;
397
398 /**
399  * Should routing details be logged to stderr (for debugging)?
400  */
401 static int log_route_details_stderr;
402
403 /**
404  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
405  */
406 static unsigned int closest_bucket;
407
408 /**
409  * How many peers have we added since we sent out our last
410  * find peer request?
411  */
412 static unsigned int newly_found_peers;
413
414 /**
415  * Option for testing that disables the 'connect' function of the DHT.
416  */
417 static int disable_try_connect;
418
419 /**
420  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
421  */
422 static struct PeerBucket k_buckets[MAX_BUCKETS];
423
424 /**
425  * Hash map of all CORE-connected peers, for easy removal from
426  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
427  */
428 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
429
430 /**
431  * Hash map of all peers we would like to be connected to.
432  * Values are of type `struct ConnectInfo`.
433  */
434 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
435
436 /**
437  * Maximum size for each bucket.
438  */
439 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
440
441 /**
442  * Task that sends FIND PEER requests.
443  */
444 static struct GNUNET_SCHEDULER_Task *find_peer_task;
445
446 /**
447  * Identity of this peer.
448  */
449 static struct GNUNET_PeerIdentity my_identity;
450
451 /**
452  * Hash of the identity of this peer.
453  */
454 static struct GNUNET_HashCode my_identity_hash;
455
456 /**
457  * Handle to CORE.
458  */
459 static struct GNUNET_CORE_Handle *core_api;
460
461 /**
462  * Handle to ATS connectivity.
463  */
464 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
465
466
467 /**
468  * Find the optimal bucket for this key.
469  *
470  * @param hc the hashcode to compare our identity to
471  * @return the proper bucket index, or GNUNET_SYSERR
472  *         on error (same hashcode)
473  */
474 static int
475 find_bucket (const struct GNUNET_HashCode *hc)
476 {
477   unsigned int bits;
478
479   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
480   if (bits == MAX_BUCKETS)
481   {
482     /* How can all bits match? Got my own ID? */
483     GNUNET_break (0);
484     return GNUNET_SYSERR;
485   }
486   return MAX_BUCKETS - bits - 1;
487 }
488
489
490 /**
491  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
492  * Clean up the "oh" field in the @a cls
493  *
494  * @param cls a `struct ConnectInfo`
495  */
496 static void
497 offer_hello_done (void *cls)
498 {
499   struct ConnectInfo *ci = cls;
500
501   ci->oh = NULL;
502 }
503
504
505 /**
506  * Function called for all entries in #all_desired_peers to clean up.
507  *
508  * @param cls NULL
509  * @param peer peer the entry is for
510  * @param value the value to remove
511  * @return #GNUNET_YES
512  */
513 static int
514 free_connect_info (void *cls,
515                    const struct GNUNET_PeerIdentity *peer,
516                    void *value)
517 {
518   struct ConnectInfo *ci = value;
519
520   GNUNET_assert (GNUNET_YES ==
521                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
522                                                        peer,
523                                                        ci));
524   if (NULL != ci->sh)
525   {
526     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
527     ci->sh = NULL;
528   }
529   if (NULL != ci->oh)
530   {
531     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
532     ci->oh = NULL;
533   }
534   GNUNET_free (ci);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Consider if we want to connect to a given peer, and if so
541  * let ATS know.  If applicable, the HELLO is offered to the
542  * TRANSPORT service.
543  *
544  * @param pid peer to consider connectivity requirements for
545  * @param h a HELLO message, or NULL
546  */
547 static void
548 try_connect (const struct GNUNET_PeerIdentity *pid,
549              const struct GNUNET_MessageHeader *h)
550 {
551   int bucket;
552   struct GNUNET_HashCode pid_hash;
553   struct ConnectInfo *ci;
554   uint32_t strength;
555
556   GNUNET_CRYPTO_hash (pid,
557                       sizeof (struct GNUNET_PeerIdentity),
558                       &pid_hash);
559   bucket = find_bucket (&pid_hash);
560   if (bucket < 0)
561     return; /* self? */
562   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
563                                           pid);
564
565   if (k_buckets[bucket].peers_size < bucket_size)
566     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
567   else
568     strength = bucket; /* minimum value of connectivity */
569   if (GNUNET_YES ==
570       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
571                                               pid))
572     strength *= 2; /* double for connected peers */
573   else if (k_buckets[bucket].peers_size > bucket_size)
574     strength = 0; /* bucket full, we really do not care about more */
575
576   if ( (0 == strength) &&
577        (NULL != ci) )
578   {
579     /* release request */
580     GNUNET_assert (GNUNET_YES ==
581                    free_connect_info (NULL,
582                                       pid,
583                                       ci));
584     return;
585   }
586   if (NULL == ci)
587   {
588     ci = GNUNET_new (struct ConnectInfo);
589     GNUNET_assert (GNUNET_OK ==
590                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
591                                                       pid,
592                                                       ci,
593                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
594   }
595   if ( (NULL != GDS_transport_handle) &&
596        (NULL != ci->oh) &&
597        (NULL != h) )
598     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
599   if ( (NULL != GDS_transport_handle) &&
600        (NULL != h) )
601     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
602                                            h,
603                                            &offer_hello_done,
604                                            ci);
605   if ( (NULL != ci->sh) &&
606        (ci->strength != strength) )
607     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
608   if (ci->strength != strength)
609     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
610                                               pid,
611                                               strength);
612   ci->strength = strength;
613 }
614
615
616 /**
617  * Function called for each peer in #all_desired_peers during
618  * #update_connect_preferences() if we have reason to adjust
619  * the strength of our desire to keep connections to certain
620  * peers.  Calls #try_connect() to update the calculations for
621  * the given @a pid.
622  *
623  * @param cls NULL
624  * @param pid peer to update
625  * @param value unused
626  * @return #GNUNET_YES (continue to iterate)
627  */
628 static int
629 update_desire_strength (void *cls,
630                         const struct GNUNET_PeerIdentity *pid,
631                         void *value)
632 {
633   try_connect (pid, NULL);
634   return GNUNET_YES;
635 }
636
637
638 /**
639  * Update our preferences for connectivity as given to ATS.
640  *
641  * @param cls the `struct PeerInfo` of the peer
642  * @param tc scheduler context.
643  */
644 static void
645 update_connect_preferences ()
646 {
647   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
648                                          &update_desire_strength,
649                                          NULL);
650 }
651
652
653 /**
654  * Closure for #add_known_to_bloom().
655  */
656 struct BloomConstructorContext
657 {
658   /**
659    * Bloom filter under construction.
660    */
661   struct GNUNET_CONTAINER_BloomFilter *bloom;
662
663   /**
664    * Mutator to use.
665    */
666   uint32_t bf_mutator;
667 };
668
669
670 /**
671  * Add each of the peers we already know to the bloom filter of
672  * the request so that we don't get duplicate HELLOs.
673  *
674  * @param cls the 'struct BloomConstructorContext'.
675  * @param key peer identity to add to the bloom filter
676  * @param value value the peer information (unused)
677  * @return #GNUNET_YES (we should continue to iterate)
678  */
679 static int
680 add_known_to_bloom (void *cls,
681                     const struct GNUNET_PeerIdentity *key,
682                     void *value)
683 {
684   struct BloomConstructorContext *ctx = cls;
685   struct GNUNET_HashCode key_hash;
686   struct GNUNET_HashCode mh;
687
688   GNUNET_CRYPTO_hash (key,
689                       sizeof (struct GNUNET_PeerIdentity),
690                       &key_hash);
691   GNUNET_BLOCK_mingle_hash (&key_hash,
692                             ctx->bf_mutator,
693                             &mh);
694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
696               GNUNET_i2s (key), ctx->bf_mutator);
697   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
698   return GNUNET_YES;
699 }
700
701
702 /**
703  * Task to send a find peer message for our own peer identifier
704  * so that we can find the closest peers in the network to ourselves
705  * and attempt to connect to them.
706  *
707  * @param cls closure for this task
708  */
709 static void
710 send_find_peer_message (void *cls)
711 {
712   struct GNUNET_TIME_Relative next_send_time;
713   struct BloomConstructorContext bcc;
714   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
715
716   find_peer_task = NULL;
717   if (newly_found_peers > bucket_size)
718   {
719     /* If we are finding many peers already, no need to send out our request right now! */
720     find_peer_task =
721         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
722                                       &send_find_peer_message, NULL);
723     newly_found_peers = 0;
724     return;
725   }
726   bcc.bf_mutator =
727       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
728   bcc.bloom =
729       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
730                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
731   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
732                                          &add_known_to_bloom,
733                                          &bcc);
734   GNUNET_STATISTICS_update (GDS_stats,
735                             gettext_noop ("# FIND PEER messages initiated"), 1,
736                             GNUNET_NO);
737   peer_bf =
738       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
739                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
740   // FIXME: pass priority!?
741   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
742                              GNUNET_DHT_RO_FIND_PEER,
743                              FIND_PEER_REPLICATION_LEVEL, 0,
744                              &my_identity_hash, NULL, 0, bcc.bloom,
745                              bcc.bf_mutator, peer_bf);
746   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
747   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
748   /* schedule next round */
749   next_send_time.rel_value_us =
750       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
751       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
752                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
753                                 (newly_found_peers + 1));
754   newly_found_peers = 0;
755   find_peer_task =
756       GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
757                                     NULL);
758 }
759
760
761 /**
762  * Method called whenever a peer connects.
763  *
764  * @param cls closure
765  * @param peer peer identity this notification is about
766  */
767 static void
768 handle_core_connect (void *cls,
769                      const struct GNUNET_PeerIdentity *peer)
770 {
771   struct PeerInfo *ret;
772   struct GNUNET_HashCode phash;
773   int peer_bucket;
774
775   /* Check for connect to self message */
776   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
777     return;
778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779               "Connected to %s\n",
780               GNUNET_i2s (peer));
781   if (GNUNET_YES ==
782       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
783                                               peer))
784   {
785     GNUNET_break (0);
786     return;
787   }
788   GNUNET_STATISTICS_update (GDS_stats,
789                             gettext_noop ("# peers connected"),
790                             1,
791                             GNUNET_NO);
792   GNUNET_CRYPTO_hash (peer,
793                       sizeof (struct GNUNET_PeerIdentity),
794                       &phash);
795   peer_bucket = find_bucket (&phash);
796   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
797   ret = GNUNET_new (struct PeerInfo);
798 #if 0
799   ret->latency = latency;
800   ret->distance = distance;
801 #endif
802   ret->id = *peer;
803   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
804                                     k_buckets[peer_bucket].tail,
805                                     ret);
806   k_buckets[peer_bucket].peers_size++;
807   closest_bucket = GNUNET_MAX (closest_bucket,
808                                peer_bucket);
809   GNUNET_assert (GNUNET_OK ==
810                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
811                                                     peer,
812                                                     ret,
813                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
814   if ( (peer_bucket > 0) &&
815        (k_buckets[peer_bucket].peers_size <= bucket_size))
816   {
817     update_connect_preferences ();
818     newly_found_peers++;
819   }
820   if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
821       (GNUNET_YES != disable_try_connect))
822   {
823     /* got a first connection, good time to start with FIND PEER requests... */
824     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
825                                                NULL);
826   }
827 }
828
829
830 /**
831  * Method called whenever a peer disconnects.
832  *
833  * @param cls closure
834  * @param peer peer identity this notification is about
835  */
836 static void
837 handle_core_disconnect (void *cls,
838                         const struct GNUNET_PeerIdentity *peer)
839 {
840   struct PeerInfo *to_remove;
841   int current_bucket;
842   struct P2PPendingMessage *pos;
843   unsigned int discarded;
844   struct GNUNET_HashCode phash;
845
846   /* Check for disconnect from self message */
847   if (0 == memcmp (&my_identity,
848                    peer,
849                    sizeof (struct GNUNET_PeerIdentity)))
850     return;
851   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852               "Disconnected %s\n",
853               GNUNET_i2s (peer));
854   to_remove =
855       GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
856                                          peer);
857   if (NULL == to_remove)
858   {
859     GNUNET_break (0);
860     return;
861   }
862   GNUNET_STATISTICS_update (GDS_stats,
863                             gettext_noop ("# peers connected"),
864                             -1,
865                             GNUNET_NO);
866   GNUNET_assert (GNUNET_YES ==
867                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
868                                                        peer,
869                                                        to_remove));
870   GNUNET_CRYPTO_hash (peer,
871                       sizeof (struct GNUNET_PeerIdentity),
872                       &phash);
873   current_bucket = find_bucket (&phash);
874   GNUNET_assert (current_bucket >= 0);
875   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
876                                k_buckets[current_bucket].tail,
877                                to_remove);
878   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
879   k_buckets[current_bucket].peers_size--;
880   while ( (closest_bucket > 0) &&
881           (0 == k_buckets[closest_bucket].peers_size) )
882     closest_bucket--;
883   if (NULL != to_remove->th)
884   {
885     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
886     to_remove->th = NULL;
887   }
888   discarded = 0;
889   while (NULL != (pos = to_remove->head))
890   {
891     GNUNET_CONTAINER_DLL_remove (to_remove->head,
892                                  to_remove->tail,
893                                  pos);
894     discarded++;
895     GNUNET_free (pos);
896   }
897   if (k_buckets[current_bucket].peers_size < bucket_size)
898     update_connect_preferences ();
899   GNUNET_STATISTICS_update (GDS_stats,
900                             gettext_noop ("# Queued messages discarded (peer disconnected)"),
901                             discarded,
902                             GNUNET_NO);
903   GNUNET_free (to_remove);
904 }
905
906
907 /**
908  * Called when core is ready to send a message we asked for
909  * out to the destination.
910  *
911  * @param cls the 'struct PeerInfo' of the target peer
912  * @param size number of bytes available in @a buf
913  * @param buf where the callee should write the message
914  * @return number of bytes written to @a buf
915  */
916 static size_t
917 core_transmit_notify (void *cls,
918                       size_t size,
919                       void *buf)
920 {
921   struct PeerInfo *peer = cls;
922   char *cbuf = buf;
923   struct P2PPendingMessage *pending;
924   size_t off;
925   size_t msize;
926
927   peer->th = NULL;
928   while ((NULL != (pending = peer->head)) &&
929          (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
930   {
931     GNUNET_STATISTICS_update (GDS_stats,
932                               gettext_noop
933                               ("# Messages dropped (CORE timeout)"),
934                               1,
935                               GNUNET_NO);
936     peer->pending_count--;
937     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
938     GNUNET_free (pending);
939   }
940   if (NULL == pending)
941   {
942     /* no messages pending */
943     return 0;
944   }
945   if (NULL == buf)
946   {
947     peer->th =
948         GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
949                                            GNUNET_CORE_PRIO_BEST_EFFORT,
950                                            GNUNET_TIME_absolute_get_remaining
951                                            (pending->timeout), &peer->id,
952                                            ntohs (pending->msg->size),
953                                            &core_transmit_notify, peer);
954     GNUNET_break (NULL != peer->th);
955     return 0;
956   }
957   off = 0;
958   while ((NULL != (pending = peer->head)) &&
959          (size - off >= (msize = ntohs (pending->msg->size))))
960   {
961     GNUNET_STATISTICS_update (GDS_stats,
962                               gettext_noop
963                               ("# Bytes transmitted to other peers"), msize,
964                               GNUNET_NO);
965     memcpy (&cbuf[off], pending->msg, msize);
966     off += msize;
967     peer->pending_count--;
968     GNUNET_CONTAINER_DLL_remove (peer->head,
969                                  peer->tail,
970                                  pending);
971     GNUNET_free (pending);
972   }
973   if (NULL != (pending = peer->head))
974   {
975     /* technically redundant, but easier to read and
976        avoids bogus gcc warning... */
977     msize = ntohs (pending->msg->size);
978     peer->th =
979       GNUNET_CORE_notify_transmit_ready (core_api,
980                                          GNUNET_NO,
981                                          GNUNET_CORE_PRIO_BEST_EFFORT,
982                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
983                                          &peer->id,
984                                          msize,
985                                          &core_transmit_notify,
986                                          peer);
987     GNUNET_break (NULL != peer->th);
988   }
989   return off;
990 }
991
992
993 /**
994  * Transmit all messages in the peer's message queue.
995  *
996  * @param peer message queue to process
997  */
998 static void
999 process_peer_queue (struct PeerInfo *peer)
1000 {
1001   struct P2PPendingMessage *pending;
1002
1003   if (NULL == (pending = peer->head))
1004     return;
1005   if (NULL != peer->th)
1006     return;
1007   GNUNET_STATISTICS_update (GDS_stats,
1008                             gettext_noop
1009                             ("# Bytes of bandwidth requested from core"),
1010                             ntohs (pending->msg->size), GNUNET_NO);
1011   peer->th =
1012       GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
1013                                          GNUNET_CORE_PRIO_BEST_EFFORT,
1014                                          GNUNET_TIME_absolute_get_remaining
1015                                          (pending->timeout),
1016                                          &peer->id,
1017                                          ntohs (pending->msg->size),
1018                                          &core_transmit_notify,
1019                                          peer);
1020   GNUNET_break (NULL != peer->th);
1021 }
1022
1023
1024 /**
1025  * To how many peers should we (on average) forward the request to
1026  * obtain the desired target_replication count (on average).
1027  *
1028  * @param hop_count number of hops the message has traversed
1029  * @param target_replication the number of total paths desired
1030  * @return Some number of peers to forward the message to
1031  */
1032 static unsigned int
1033 get_forward_count (uint32_t hop_count, uint32_t target_replication)
1034 {
1035   uint32_t random_value;
1036   uint32_t forward_count;
1037   float target_value;
1038
1039   if (hop_count > GDS_NSE_get () * 4.0)
1040   {
1041     /* forcefully terminate */
1042     GNUNET_STATISTICS_update (GDS_stats,
1043                               gettext_noop ("# requests TTL-dropped"),
1044                               1, GNUNET_NO);
1045     return 0;
1046   }
1047   if (hop_count > GDS_NSE_get () * 2.0)
1048   {
1049     /* Once we have reached our ideal number of hops, only forward to 1 peer */
1050     return 1;
1051   }
1052   /* bound by system-wide maximum */
1053   target_replication =
1054       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
1055   target_value =
1056       1 + (target_replication - 1.0) / (GDS_NSE_get () +
1057                                         ((float) (target_replication - 1.0) *
1058                                          hop_count));
1059   /* Set forward count to floor of target_value */
1060   forward_count = (uint32_t) target_value;
1061   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1062   target_value = target_value - forward_count;
1063   random_value =
1064       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1065   if (random_value < (target_value * UINT32_MAX))
1066     forward_count++;
1067   return forward_count;
1068 }
1069
1070
1071 /**
1072  * Compute the distance between have and target as a 32-bit value.
1073  * Differences in the lower bits must count stronger than differences
1074  * in the higher bits.
1075  *
1076  * @param target
1077  * @param have
1078  * @return 0 if have==target, otherwise a number
1079  *           that is larger as the distance between
1080  *           the two hash codes increases
1081  */
1082 static unsigned int
1083 get_distance (const struct GNUNET_HashCode *target,
1084               const struct GNUNET_HashCode *have)
1085 {
1086   unsigned int bucket;
1087   unsigned int msb;
1088   unsigned int lsb;
1089   unsigned int i;
1090
1091   /* We have to represent the distance between two 2^9 (=512)-bit
1092    * numbers as a 2^5 (=32)-bit number with "0" being used for the
1093    * two numbers being identical; furthermore, we need to
1094    * guarantee that a difference in the number of matching
1095    * bits is always represented in the result.
1096    *
1097    * We use 2^32/2^9 numerical values to distinguish between
1098    * hash codes that have the same LSB bit distance and
1099    * use the highest 2^9 bits of the result to signify the
1100    * number of (mis)matching LSB bits; if we have 0 matching
1101    * and hence 512 mismatching LSB bits we return -1 (since
1102    * 512 itself cannot be represented with 9 bits) */
1103
1104   /* first, calculate the most significant 9 bits of our
1105    * result, aka the number of LSBs */
1106   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1107   /* bucket is now a value between 0 and 512 */
1108   if (bucket == 512)
1109     return 0;                   /* perfect match */
1110   if (bucket == 0)
1111     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1112                                  * below, we'd end up with max+1 (overflow)) */
1113
1114   /* calculate the most significant bits of the final result */
1115   msb = (512 - bucket) << (32 - 9);
1116   /* calculate the 32-9 least significant bits of the final result by
1117    * looking at the differences in the 32-9 bits following the
1118    * mismatching bit at 'bucket' */
1119   lsb = 0;
1120   for (i = bucket + 1;
1121        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1122   {
1123     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
1124         GNUNET_CRYPTO_hash_get_bit (have, i))
1125       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
1126                                                  * last bit set will be 31 -- if
1127                                                  * i does not reach 512 first... */
1128   }
1129   return msb | lsb;
1130 }
1131
1132
1133 /**
1134  * Check whether my identity is closer than any known peers.  If a
1135  * non-null bloomfilter is given, check if this is the closest peer
1136  * that hasn't already been routed to.
1137  *
1138  * @param key hash code to check closeness to
1139  * @param bloom bloomfilter, exclude these entries from the decision
1140  * @return #GNUNET_YES if node location is closest,
1141  *         #GNUNET_NO otherwise.
1142  */
1143 static int
1144 am_closest_peer (const struct GNUNET_HashCode *key,
1145                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
1146 {
1147   int bits;
1148   int other_bits;
1149   int bucket_num;
1150   int count;
1151   struct PeerInfo *pos;
1152   struct GNUNET_HashCode phash;
1153
1154   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
1155     return GNUNET_YES;
1156   bucket_num = find_bucket (key);
1157   GNUNET_assert (bucket_num >= 0);
1158   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key);
1159   pos = k_buckets[bucket_num].head;
1160   count = 0;
1161   while ((NULL != pos) && (count < bucket_size))
1162   {
1163     GNUNET_CRYPTO_hash (&pos->id,
1164                         sizeof (struct GNUNET_PeerIdentity),
1165                         &phash);
1166     if ((NULL != bloom) &&
1167         (GNUNET_YES ==
1168          GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1169     {
1170       pos = pos->next;
1171       continue;                 /* Skip already checked entries */
1172     }
1173     other_bits = GNUNET_CRYPTO_hash_matching_bits (&phash, key);
1174     if (other_bits > bits)
1175       return GNUNET_NO;
1176     if (other_bits == bits)     /* We match the same number of bits */
1177       return GNUNET_YES;
1178     pos = pos->next;
1179   }
1180   /* No peers closer, we are the closest! */
1181   return GNUNET_YES;
1182 }
1183
1184
1185 /**
1186  * Select a peer from the routing table that would be a good routing
1187  * destination for sending a message for "key".  The resulting peer
1188  * must not be in the set of blocked peers.<p>
1189  *
1190  * Note that we should not ALWAYS select the closest peer to the
1191  * target, peers further away from the target should be chosen with
1192  * exponentially declining probability.
1193  *
1194  * FIXME: double-check that this is fine
1195  *
1196  *
1197  * @param key the key we are selecting a peer to route to
1198  * @param bloom a bloomfilter containing entries this request has seen already
1199  * @param hops how many hops has this message traversed thus far
1200  * @return Peer to route to, or NULL on error
1201  */
1202 static struct PeerInfo *
1203 select_peer (const struct GNUNET_HashCode *key,
1204              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1205              uint32_t hops)
1206 {
1207   unsigned int bc;
1208   unsigned int count;
1209   unsigned int selected;
1210   struct PeerInfo *pos;
1211   unsigned int dist;
1212   unsigned int smallest_distance;
1213   struct PeerInfo *chosen;
1214   struct GNUNET_HashCode phash;
1215
1216   if (hops >= GDS_NSE_get ())
1217   {
1218     /* greedy selection (closest peer that is not in bloomfilter) */
1219     smallest_distance = UINT_MAX;
1220     chosen = NULL;
1221     for (bc = 0; bc <= closest_bucket; bc++)
1222     {
1223       pos = k_buckets[bc].head;
1224       count = 0;
1225       while ((pos != NULL) && (count < bucket_size))
1226       {
1227         GNUNET_CRYPTO_hash (&pos->id,
1228                             sizeof (struct GNUNET_PeerIdentity),
1229                             &phash);
1230         if ((bloom == NULL) ||
1231             (GNUNET_NO ==
1232              GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1233         {
1234           dist = get_distance (key, &phash);
1235           if (dist < smallest_distance)
1236           {
1237             chosen = pos;
1238             smallest_distance = dist;
1239           }
1240         }
1241         else
1242         {
1243           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1245                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1246           GNUNET_STATISTICS_update (GDS_stats,
1247                                     gettext_noop
1248                                     ("# Peers excluded from routing due to Bloomfilter"),
1249                                     1, GNUNET_NO);
1250           dist = get_distance (key, &phash);
1251           if (dist < smallest_distance)
1252           {
1253             chosen = NULL;
1254             smallest_distance = dist;
1255           }
1256         }
1257         count++;
1258         pos = pos->next;
1259       }
1260     }
1261     if (NULL == chosen)
1262       GNUNET_STATISTICS_update (GDS_stats,
1263                                 gettext_noop ("# Peer selection failed"), 1,
1264                                 GNUNET_NO);
1265     return chosen;
1266   }
1267
1268   /* select "random" peer */
1269   /* count number of peers that are available and not filtered */
1270   count = 0;
1271   for (bc = 0; bc <= closest_bucket; bc++)
1272   {
1273     pos = k_buckets[bc].head;
1274     while ((pos != NULL) && (count < bucket_size))
1275     {
1276       GNUNET_CRYPTO_hash (&pos->id,
1277                           sizeof (struct GNUNET_PeerIdentity),
1278                           &phash);
1279       if ((bloom != NULL) &&
1280           (GNUNET_YES ==
1281            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1282       {
1283         GNUNET_STATISTICS_update (GDS_stats,
1284                                   gettext_noop
1285                                   ("# Peers excluded from routing due to Bloomfilter"),
1286                                   1, GNUNET_NO);
1287         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1289                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1290         pos = pos->next;
1291         continue;               /* Ignore bloomfiltered peers */
1292       }
1293       count++;
1294       pos = pos->next;
1295     }
1296   }
1297   if (0 == count)               /* No peers to select from! */
1298   {
1299     GNUNET_STATISTICS_update (GDS_stats,
1300                               gettext_noop ("# Peer selection failed"), 1,
1301                               GNUNET_NO);
1302     return NULL;
1303   }
1304   /* Now actually choose a peer */
1305   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1306   count = 0;
1307   for (bc = 0; bc <= closest_bucket; bc++)
1308   {
1309     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1310     {
1311       GNUNET_CRYPTO_hash (&pos->id,
1312                           sizeof (struct GNUNET_PeerIdentity),
1313                           &phash);
1314       if ((bloom != NULL) &&
1315           (GNUNET_YES ==
1316            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1317       {
1318         continue;               /* Ignore bloomfiltered peers */
1319       }
1320       if (0 == selected--)
1321         return pos;
1322     }
1323   }
1324   GNUNET_break (0);
1325   return NULL;
1326 }
1327
1328
1329 /**
1330  * Compute the set of peers that the given request should be
1331  * forwarded to.
1332  *
1333  * @param key routing key
1334  * @param bloom bloom filter excluding peers as targets, all selected
1335  *        peers will be added to the bloom filter
1336  * @param hop_count number of hops the request has traversed so far
1337  * @param target_replication desired number of replicas
1338  * @param targets where to store an array of target peers (to be
1339  *         free'd by the caller)
1340  * @return number of peers returned in 'targets'.
1341  */
1342 static unsigned int
1343 get_target_peers (const struct GNUNET_HashCode *key,
1344                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1345                   uint32_t hop_count, uint32_t target_replication,
1346                   struct PeerInfo ***targets)
1347 {
1348   unsigned int ret;
1349   unsigned int off;
1350   struct PeerInfo **rtargets;
1351   struct PeerInfo *nxt;
1352   struct GNUNET_HashCode nhash;
1353
1354   GNUNET_assert (NULL != bloom);
1355   ret = get_forward_count (hop_count, target_replication);
1356   if (0 == ret)
1357   {
1358     *targets = NULL;
1359     return 0;
1360   }
1361   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1362   for (off = 0; off < ret; off++)
1363   {
1364     nxt = select_peer (key, bloom, hop_count);
1365     if (NULL == nxt)
1366       break;
1367     rtargets[off] = nxt;
1368     GNUNET_CRYPTO_hash (&nxt->id,
1369                         sizeof (struct GNUNET_PeerIdentity),
1370                         &nhash);
1371     GNUNET_break (GNUNET_NO ==
1372                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1373                                                      &nhash));
1374     GNUNET_CONTAINER_bloomfilter_add (bloom, &nhash);
1375   }
1376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1377               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1378               off,
1379               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1380               (unsigned int) hop_count,
1381               GNUNET_h2s (key),
1382               ret);
1383   if (0 == off)
1384   {
1385     GNUNET_free (rtargets);
1386     *targets = NULL;
1387     return 0;
1388   }
1389   *targets = rtargets;
1390   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1391               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1392               GNUNET_h2s (key),
1393               off,
1394               ret);
1395   return off;
1396 }
1397
1398
1399 /**
1400  * Perform a PUT operation.   Forwards the given request to other
1401  * peers.   Does not store the data locally.  Does not give the
1402  * data to local clients.  May do nothing if this is the only
1403  * peer in the network (or if we are the closest peer in the
1404  * network).
1405  *
1406  * @param type type of the block
1407  * @param options routing options
1408  * @param desired_replication_level desired replication count
1409  * @param expiration_time when does the content expire
1410  * @param hop_count how many hops has this message traversed so far
1411  * @param bf Bloom filter of peers this PUT has already traversed
1412  * @param key key for the content
1413  * @param put_path_length number of entries in @a put_path
1414  * @param put_path peers this request has traversed so far (if tracked)
1415  * @param data payload to store
1416  * @param data_size number of bytes in @a data
1417  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1418  */
1419 int
1420 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1421                            enum GNUNET_DHT_RouteOption options,
1422                            uint32_t desired_replication_level,
1423                            struct GNUNET_TIME_Absolute expiration_time,
1424                            uint32_t hop_count,
1425                            struct GNUNET_CONTAINER_BloomFilter *bf,
1426                            const struct GNUNET_HashCode *key,
1427                            unsigned int put_path_length,
1428                            struct GNUNET_PeerIdentity *put_path,
1429                            const void *data, size_t data_size)
1430 {
1431   unsigned int target_count;
1432   unsigned int i;
1433   struct PeerInfo **targets;
1434   struct PeerInfo *target;
1435   struct P2PPendingMessage *pending;
1436   size_t msize;
1437   struct PeerPutMessage *ppm;
1438   struct GNUNET_PeerIdentity *pp;
1439   struct GNUNET_HashCode thash;
1440   unsigned int skip_count;
1441
1442   GNUNET_assert (NULL != bf);
1443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444               "Adding myself (%s) to PUT bloomfilter for %s\n",
1445               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1446   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1447   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1448                             1, GNUNET_NO);
1449   target_count =
1450       get_target_peers (key, bf, hop_count, desired_replication_level,
1451                         &targets);
1452   if (0 == target_count)
1453   {
1454     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455                 "Routing PUT for %s terminates after %u hops at %s\n",
1456                 GNUNET_h2s (key), (unsigned int) hop_count,
1457                 GNUNET_i2s (&my_identity));
1458     return GNUNET_NO;
1459   }
1460   msize =
1461       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1462       sizeof (struct PeerPutMessage);
1463   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1464   {
1465     put_path_length = 0;
1466     msize = data_size + sizeof (struct PeerPutMessage);
1467   }
1468   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1469   {
1470     GNUNET_break (0);
1471     GNUNET_free (targets);
1472     return GNUNET_NO;
1473   }
1474   GNUNET_STATISTICS_update (GDS_stats,
1475                             gettext_noop
1476                             ("# PUT messages queued for transmission"),
1477                             target_count, GNUNET_NO);
1478   skip_count = 0;
1479   for (i = 0; i < target_count; i++)
1480   {
1481     target = targets[i];
1482     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1483     {
1484       /* skip */
1485       GNUNET_STATISTICS_update (GDS_stats,
1486                                 gettext_noop ("# P2P messages dropped due to full queue"),
1487                                 1, GNUNET_NO);
1488       skip_count++;
1489       continue;
1490     }
1491     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
1493                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1494     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1495     pending->importance = 0;    /* FIXME */
1496     pending->timeout = expiration_time;
1497     ppm = (struct PeerPutMessage *) &pending[1];
1498     pending->msg = &ppm->header;
1499     ppm->header.size = htons (msize);
1500     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1501     ppm->options = htonl (options);
1502     ppm->type = htonl (type);
1503     ppm->hop_count = htonl (hop_count + 1);
1504     ppm->desired_replication_level = htonl (desired_replication_level);
1505     ppm->put_path_length = htonl (put_path_length);
1506     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1507     GNUNET_CRYPTO_hash (&target->id,
1508                         sizeof (struct GNUNET_PeerIdentity),
1509                         &thash);
1510     GNUNET_break (GNUNET_YES ==
1511                   GNUNET_CONTAINER_bloomfilter_test (bf,
1512                                                      &thash));
1513     GNUNET_assert (GNUNET_OK ==
1514                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1515                                                               ppm->bloomfilter,
1516                                                               DHT_BLOOM_SIZE));
1517     ppm->key = *key;
1518     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1519     memcpy (pp, put_path,
1520             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1521     memcpy (&pp[put_path_length], data, data_size);
1522     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1523     target->pending_count++;
1524     process_peer_queue (target);
1525   }
1526   GNUNET_free (targets);
1527   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1528 }
1529
1530
1531 /**
1532  * Perform a GET operation.  Forwards the given request to other
1533  * peers.  Does not lookup the key locally.  May do nothing if this is
1534  * the only peer in the network (or if we are the closest peer in the
1535  * network).
1536  *
1537  * @param type type of the block
1538  * @param options routing options
1539  * @param desired_replication_level desired replication count
1540  * @param hop_count how many hops did this request traverse so far?
1541  * @param key key for the content
1542  * @param xquery extended query
1543  * @param xquery_size number of bytes in @a xquery
1544  * @param reply_bf bloomfilter to filter duplicates
1545  * @param reply_bf_mutator mutator for @a reply_bf
1546  * @param peer_bf filter for peers not to select (again)
1547  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1548  */
1549 int
1550 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1551                            enum GNUNET_DHT_RouteOption options,
1552                            uint32_t desired_replication_level,
1553                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1554                            const void *xquery, size_t xquery_size,
1555                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1556                            uint32_t reply_bf_mutator,
1557                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1558 {
1559   unsigned int target_count;
1560   unsigned int i;
1561   struct PeerInfo **targets;
1562   struct PeerInfo *target;
1563   struct P2PPendingMessage *pending;
1564   size_t msize;
1565   struct PeerGetMessage *pgm;
1566   char *xq;
1567   size_t reply_bf_size;
1568   struct GNUNET_HashCode thash;
1569   unsigned int skip_count;
1570
1571   GNUNET_assert (NULL != peer_bf);
1572   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1573                             1, GNUNET_NO);
1574   target_count =
1575       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1576                         &targets);
1577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578               "Adding myself (%s) to GET bloomfilter for %s\n",
1579               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1580   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity_hash);
1581   if (0 == target_count)
1582   {
1583     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1584                 "Routing GET for %s terminates after %u hops at %s\n",
1585                 GNUNET_h2s (key), (unsigned int) hop_count,
1586                 GNUNET_i2s (&my_identity));
1587     return GNUNET_NO;
1588   }
1589   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1590   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1591   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1592   {
1593     GNUNET_break (0);
1594     GNUNET_free (targets);
1595     return GNUNET_NO;
1596   }
1597   GNUNET_STATISTICS_update (GDS_stats,
1598                             gettext_noop
1599                             ("# GET messages queued for transmission"),
1600                             target_count, GNUNET_NO);
1601   /* forward request */
1602   skip_count = 0;
1603   for (i = 0; i < target_count; i++)
1604   {
1605     target = targets[i];
1606     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1607     {
1608       /* skip */
1609       GNUNET_STATISTICS_update (GDS_stats,
1610                                 gettext_noop ("# P2P messages dropped due to full queue"),
1611                                 1, GNUNET_NO);
1612       skip_count++;
1613       continue;
1614     }
1615     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1616                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
1617                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1618     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1619     pending->importance = 0;    /* FIXME */
1620     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1621     pgm = (struct PeerGetMessage *) &pending[1];
1622     pending->msg = &pgm->header;
1623     pgm->header.size = htons (msize);
1624     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1625     pgm->options = htonl (options);
1626     pgm->type = htonl (type);
1627     pgm->hop_count = htonl (hop_count + 1);
1628     pgm->desired_replication_level = htonl (desired_replication_level);
1629     pgm->xquery_size = htonl (xquery_size);
1630     pgm->bf_mutator = reply_bf_mutator;
1631     GNUNET_CRYPTO_hash (&target->id,
1632                         sizeof (struct GNUNET_PeerIdentity),
1633                         &thash);
1634     GNUNET_break (GNUNET_YES ==
1635                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1636                                                      &thash));
1637     GNUNET_assert (GNUNET_OK ==
1638                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1639                                                               pgm->bloomfilter,
1640                                                               DHT_BLOOM_SIZE));
1641     pgm->key = *key;
1642     xq = (char *) &pgm[1];
1643     memcpy (xq, xquery, xquery_size);
1644     if (NULL != reply_bf)
1645       GNUNET_assert (GNUNET_OK ==
1646                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1647                                                                 &xq
1648                                                                 [xquery_size],
1649                                                                 reply_bf_size));
1650     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1651     target->pending_count++;
1652     process_peer_queue (target);
1653   }
1654   GNUNET_free (targets);
1655   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1656 }
1657
1658
1659 /**
1660  * Handle a reply (route to origin).  Only forwards the reply back to
1661  * the given peer.  Does not do local caching or forwarding to local
1662  * clients.
1663  *
1664  * @param target neighbour that should receive the block (if still connected)
1665  * @param type type of the block
1666  * @param expiration_time when does the content expire
1667  * @param key key for the content
1668  * @param put_path_length number of entries in @a put_path
1669  * @param put_path peers the original PUT traversed (if tracked)
1670  * @param get_path_length number of entries in @a get_path
1671  * @param get_path peers this reply has traversed so far (if tracked)
1672  * @param data payload of the reply
1673  * @param data_size number of bytes in @a data
1674  */
1675 void
1676 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1677                              enum GNUNET_BLOCK_Type type,
1678                              struct GNUNET_TIME_Absolute expiration_time,
1679                              const struct GNUNET_HashCode *key,
1680                              unsigned int put_path_length,
1681                              const struct GNUNET_PeerIdentity *put_path,
1682                              unsigned int get_path_length,
1683                              const struct GNUNET_PeerIdentity *get_path,
1684                              const void *data,
1685                              size_t data_size)
1686 {
1687   struct PeerInfo *pi;
1688   struct P2PPendingMessage *pending;
1689   size_t msize;
1690   struct PeerResultMessage *prm;
1691   struct GNUNET_PeerIdentity *paths;
1692
1693   msize =
1694       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1695                                                        put_path_length) *
1696       sizeof (struct GNUNET_PeerIdentity);
1697   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1698       (get_path_length >
1699        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1700       (put_path_length >
1701        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1702       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1703   {
1704     GNUNET_break (0);
1705     return;
1706   }
1707   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1708                                           target);
1709   if (NULL == pi)
1710   {
1711     /* peer disconnected in the meantime, drop reply */
1712     return;
1713   }
1714   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
1715   {
1716     /* skip */
1717     GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1718                               1, GNUNET_NO);
1719     return;
1720   }
1721
1722   GNUNET_STATISTICS_update (GDS_stats,
1723                             gettext_noop
1724                             ("# RESULT messages queued for transmission"), 1,
1725                             GNUNET_NO);
1726   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1727   pending->importance = 0;      /* FIXME */
1728   pending->timeout = expiration_time;
1729   prm = (struct PeerResultMessage *) &pending[1];
1730   pending->msg = &prm->header;
1731   prm->header.size = htons (msize);
1732   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1733   prm->type = htonl (type);
1734   prm->put_path_length = htonl (put_path_length);
1735   prm->get_path_length = htonl (get_path_length);
1736   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1737   prm->key = *key;
1738   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1739   memcpy (paths, put_path,
1740           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1741   memcpy (&paths[put_path_length], get_path,
1742           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1743   memcpy (&paths[put_path_length + get_path_length], data, data_size);
1744   GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
1745   pi->pending_count++;
1746   process_peer_queue (pi);
1747 }
1748
1749
1750 /**
1751  * To be called on core init/fail.
1752  *
1753  * @param cls service closure
1754  * @param identity the public identity of this peer
1755  */
1756 static void
1757 core_init (void *cls,
1758            const struct GNUNET_PeerIdentity *identity)
1759 {
1760   my_identity = *identity;
1761   GNUNET_CRYPTO_hash (identity,
1762                       sizeof (struct GNUNET_PeerIdentity),
1763                       &my_identity_hash);
1764 }
1765
1766
1767 /**
1768  * Core handler for p2p put requests.
1769  *
1770  * @param cls closure
1771  * @param peer sender of the request
1772  * @param message message
1773  * @param peer peer identity this notification is about
1774  * @return #GNUNET_OK to keep the connection open,
1775  *         #GNUNET_SYSERR to close it (signal serious error)
1776  */
1777 static int
1778 handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1779                     const struct GNUNET_MessageHeader *message)
1780 {
1781   const struct PeerPutMessage *put;
1782   const struct GNUNET_PeerIdentity *put_path;
1783   const void *payload;
1784   uint32_t putlen;
1785   uint16_t msize;
1786   size_t payload_size;
1787   enum GNUNET_DHT_RouteOption options;
1788   struct GNUNET_CONTAINER_BloomFilter *bf;
1789   struct GNUNET_HashCode test_key;
1790   struct GNUNET_HashCode phash;
1791   int forwarded;
1792
1793   msize = ntohs (message->size);
1794   if (msize < sizeof (struct PeerPutMessage))
1795   {
1796     GNUNET_break_op (0);
1797     return GNUNET_YES;
1798   }
1799   put = (const struct PeerPutMessage *) message;
1800   putlen = ntohl (put->put_path_length);
1801   if ((msize <
1802        sizeof (struct PeerPutMessage) +
1803        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1804       (putlen >
1805        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1806   {
1807     GNUNET_break_op (0);
1808     return GNUNET_YES;
1809   }
1810   GNUNET_STATISTICS_update (GDS_stats,
1811                             gettext_noop ("# P2P PUT requests received"), 1,
1812                             GNUNET_NO);
1813   GNUNET_STATISTICS_update (GDS_stats,
1814                             gettext_noop ("# P2P PUT bytes received"), msize,
1815                             GNUNET_NO);
1816   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1817   payload = &put_path[putlen];
1818   options = ntohl (put->options);
1819   payload_size =
1820       msize - (sizeof (struct PeerPutMessage) +
1821                putlen * sizeof (struct GNUNET_PeerIdentity));
1822
1823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1824               GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1825   GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash);
1826   if (GNUNET_YES == log_route_details_stderr)
1827   {
1828     char *tmp;
1829
1830     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1831     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1832                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1833                  GNUNET_h2s (&put->key), GNUNET_i2s (peer), tmp,
1834                  ntohl(put->hop_count),
1835                  GNUNET_CRYPTO_hash_matching_bits (&phash, &put->key),
1836                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &put->key)
1837                 );
1838     GNUNET_free (tmp);
1839   }
1840   switch (GNUNET_BLOCK_get_key
1841           (GDS_block_context, ntohl (put->type), payload, payload_size,
1842            &test_key))
1843   {
1844   case GNUNET_YES:
1845     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
1846     {
1847       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1848       GNUNET_break_op (0);
1849       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1850                   "PUT with key `%s' for block with key %s\n",
1851                   put_s, GNUNET_h2s_full (&test_key));
1852       GNUNET_free (put_s);
1853       return GNUNET_YES;
1854     }
1855     break;
1856   case GNUNET_NO:
1857     GNUNET_break_op (0);
1858     return GNUNET_YES;
1859   case GNUNET_SYSERR:
1860     /* cannot verify, good luck */
1861     break;
1862   }
1863   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1864   {
1865     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1866                                    ntohl (put->type),
1867                                    GNUNET_BLOCK_EO_NONE,
1868                                    NULL,    /* query */
1869                                    NULL, 0, /* bloom filer */
1870                                    NULL, 0, /* xquery */
1871                                    payload, payload_size))
1872     {
1873     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1874     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1875       break;
1876
1877     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1878     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1879     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1880     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1881     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1882     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1883     default:
1884       GNUNET_break_op (0);
1885       return GNUNET_OK;
1886     }
1887   }
1888
1889   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
1890                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1891   GNUNET_break_op (GNUNET_YES ==
1892                    GNUNET_CONTAINER_bloomfilter_test (bf, &phash));
1893   {
1894     struct GNUNET_PeerIdentity pp[putlen + 1];
1895
1896     /* extend 'put path' by sender */
1897     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1898     {
1899       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1900       pp[putlen] = *peer;
1901       putlen++;
1902     }
1903     else
1904       putlen = 0;
1905
1906     /* give to local clients */
1907     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1908                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1909                               payload_size, payload);
1910     /* store locally */
1911     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1912         (am_closest_peer (&put->key, bf)))
1913       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1914                                 (put->expiration_time), &put->key, putlen, pp,
1915                                 ntohl (put->type), payload_size, payload);
1916     /* route to other peers */
1917     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
1918                                            ntohl (put->desired_replication_level),
1919                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1920                                            ntohl (put->hop_count), bf,
1921                                            &put->key,
1922                                            putlen,
1923                                            pp,
1924                                            payload,
1925                                            payload_size);
1926     /* notify monitoring clients */
1927     GDS_CLIENTS_process_put (options
1928                              | ( (GNUNET_OK == forwarded)
1929                                  ? GNUNET_DHT_RO_LAST_HOP
1930                                  : 0 ),
1931                              ntohl (put->type),
1932                              ntohl (put->hop_count),
1933                              ntohl (put->desired_replication_level),
1934                              putlen, pp,
1935                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1936                              &put->key,
1937                              payload,
1938                              payload_size);
1939   }
1940   GNUNET_CONTAINER_bloomfilter_free (bf);
1941   return GNUNET_YES;
1942 }
1943
1944
1945 /**
1946  * We have received a FIND PEER request.  Send matching
1947  * HELLOs back.
1948  *
1949  * @param sender sender of the FIND PEER request
1950  * @param key peers close to this key are desired
1951  * @param bf peers matching this bf are excluded
1952  * @param bf_mutator mutator for bf
1953  */
1954 static void
1955 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1956                   const struct GNUNET_HashCode * key,
1957                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
1958 {
1959   int bucket_idx;
1960   struct PeerBucket *bucket;
1961   struct PeerInfo *peer;
1962   unsigned int choice;
1963   struct GNUNET_HashCode phash;
1964   struct GNUNET_HashCode mhash;
1965   const struct GNUNET_HELLO_Message *hello;
1966
1967   /* first, check about our own HELLO */
1968   if (NULL != GDS_my_hello)
1969   {
1970     GNUNET_BLOCK_mingle_hash (&my_identity_hash, bf_mutator, &mhash);
1971     if ((NULL == bf) ||
1972         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1973     {
1974       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1975                                    GNUNET_TIME_relative_to_absolute
1976                                    (hello_expiration),
1977                                    key, 0, NULL, 0, NULL, GDS_my_hello,
1978                                    GNUNET_HELLO_size ((const struct
1979                                                        GNUNET_HELLO_Message *)
1980                                                       GDS_my_hello));
1981     }
1982     else
1983     {
1984       GNUNET_STATISTICS_update (GDS_stats,
1985                                 gettext_noop
1986                                 ("# FIND PEER requests ignored due to Bloomfilter"),
1987                                 1, GNUNET_NO);
1988     }
1989   }
1990   else
1991   {
1992     GNUNET_STATISTICS_update (GDS_stats,
1993                               gettext_noop
1994                               ("# FIND PEER requests ignored due to lack of HELLO"),
1995                               1, GNUNET_NO);
1996   }
1997
1998   /* then, also consider sending a random HELLO from the closest bucket */
1999   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
2000     bucket_idx = closest_bucket;
2001   else
2002     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
2003   if (bucket_idx == GNUNET_SYSERR)
2004     return;
2005   bucket = &k_buckets[bucket_idx];
2006   if (bucket->peers_size == 0)
2007     return;
2008   choice =
2009       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
2010   peer = bucket->head;
2011   while (choice > 0)
2012   {
2013     GNUNET_assert (NULL != peer);
2014     peer = peer->next;
2015     choice--;
2016   }
2017   choice = bucket->peers_size;
2018   do
2019   {
2020     peer = peer->next;
2021     if (choice-- == 0)
2022       return;                   /* no non-masked peer available */
2023     if (peer == NULL)
2024       peer = bucket->head;
2025     GNUNET_CRYPTO_hash (&peer->id, sizeof (struct GNUNET_PeerIdentity), &phash);
2026     GNUNET_BLOCK_mingle_hash (&phash, bf_mutator, &mhash);
2027     hello = GDS_HELLO_get (&peer->id);
2028   }
2029   while ((hello == NULL) ||
2030          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
2031   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
2032                                GNUNET_TIME_relative_to_absolute
2033                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
2034                                0, NULL, 0, NULL, hello,
2035                                GNUNET_HELLO_size (hello));
2036 }
2037
2038
2039 /**
2040  * Core handler for p2p get requests.
2041  *
2042  * @param cls closure
2043  * @param peer sender of the request
2044  * @param message message
2045  * @return #GNUNET_OK to keep the connection open,
2046  *         #GNUNET_SYSERR to close it (signal serious error)
2047  */
2048 static int
2049 handle_dht_p2p_get (void *cls,
2050                     const struct GNUNET_PeerIdentity *peer,
2051                     const struct GNUNET_MessageHeader *message)
2052 {
2053   struct PeerGetMessage *get;
2054   uint32_t xquery_size;
2055   size_t reply_bf_size;
2056   uint16_t msize;
2057   enum GNUNET_BLOCK_Type type;
2058   enum GNUNET_DHT_RouteOption options;
2059   enum GNUNET_BLOCK_EvaluationResult eval;
2060   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
2061   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2062   const char *xquery;
2063   struct GNUNET_HashCode phash;
2064   int forwarded;
2065
2066   GNUNET_break (0 !=
2067                 memcmp (peer, &my_identity,
2068                         sizeof (struct GNUNET_PeerIdentity)));
2069   /* parse and validate message */
2070   msize = ntohs (message->size);
2071   if (msize < sizeof (struct PeerGetMessage))
2072   {
2073     GNUNET_break_op (0);
2074     return GNUNET_YES;
2075   }
2076   get = (struct PeerGetMessage *) message;
2077   xquery_size = ntohl (get->xquery_size);
2078   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
2079   {
2080     GNUNET_break_op (0);
2081     return GNUNET_YES;
2082   }
2083   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2084   type = ntohl (get->type);
2085   options = ntohl (get->options);
2086   xquery = (const char *) &get[1];
2087   reply_bf = NULL;
2088   GNUNET_STATISTICS_update (GDS_stats,
2089                             gettext_noop ("# P2P GET requests received"), 1,
2090                             GNUNET_NO);
2091   GNUNET_STATISTICS_update (GDS_stats,
2092                             gettext_noop ("# P2P GET bytes received"), msize,
2093                             GNUNET_NO);
2094   GNUNET_CRYPTO_hash (peer,
2095                       sizeof (struct GNUNET_PeerIdentity),
2096                       &phash);
2097   if (GNUNET_YES == log_route_details_stderr)
2098   {
2099     char *tmp;
2100
2101     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2102     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2103                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2104                  GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp,
2105                  ntohl(get->hop_count),
2106                  GNUNET_CRYPTO_hash_matching_bits (&phash, &get->key),
2107                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key),
2108                  ntohl(get->xquery_size), xquery);
2109     GNUNET_free (tmp);
2110   }
2111
2112   if (reply_bf_size > 0)
2113     reply_bf =
2114         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
2115                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
2116   eval =
2117       GNUNET_BLOCK_evaluate (GDS_block_context,
2118                              type,
2119                              GNUNET_BLOCK_EO_NONE,
2120                              &get->key,
2121                              &reply_bf,
2122                              get->bf_mutator,
2123                              xquery,
2124                              xquery_size,
2125                              NULL,
2126                              0);
2127   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2128   {
2129     /* request invalid or block type not supported */
2130     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2131     if (NULL != reply_bf)
2132       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2133     return GNUNET_YES;
2134   }
2135   peer_bf =
2136       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
2137                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
2138   GNUNET_break_op (GNUNET_YES ==
2139                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2140                                                       &phash));
2141   /* remember request for routing replies */
2142   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
2143                    reply_bf, get->bf_mutator);
2144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2145               "GET for %s at %s after %u hops\n",
2146               GNUNET_h2s (&get->key),
2147               GNUNET_i2s (&my_identity),
2148               (unsigned int) ntohl (get->hop_count));
2149   /* local lookup (this may update the reply_bf) */
2150   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2151       (am_closest_peer (&get->key, peer_bf)))
2152   {
2153     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2154     {
2155       GNUNET_STATISTICS_update (GDS_stats,
2156                                 gettext_noop
2157                                 ("# P2P FIND PEER requests processed"), 1,
2158                                 GNUNET_NO);
2159       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
2160     }
2161     else
2162     {
2163       eval =
2164           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
2165                                     &reply_bf, get->bf_mutator);
2166     }
2167   }
2168   else
2169   {
2170     GNUNET_STATISTICS_update (GDS_stats,
2171                               gettext_noop ("# P2P GET requests ONLY routed"),
2172                               1, GNUNET_NO);
2173   }
2174
2175   /* P2P forwarding */
2176   forwarded = GNUNET_NO;
2177   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2178     forwarded = GDS_NEIGHBOURS_handle_get (type, options,
2179                                            ntohl (get->desired_replication_level),
2180                                            ntohl (get->hop_count),
2181                                            &get->key,
2182                                            xquery,
2183                                            xquery_size,
2184                                            reply_bf,
2185                                            get->bf_mutator, peer_bf);
2186   GDS_CLIENTS_process_get (options
2187                            | (GNUNET_OK == forwarded)
2188                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2189                            type,
2190                            ntohl (get->hop_count),
2191                            ntohl (get->desired_replication_level),
2192                            0, NULL,
2193                            &get->key);
2194
2195
2196   /* clean up */
2197   if (NULL != reply_bf)
2198     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2199   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2200   return GNUNET_YES;
2201 }
2202
2203
2204 /**
2205  * Core handler for p2p result messages.
2206  *
2207  * @param cls closure
2208  * @param message message
2209  * @param peer peer identity this notification is about
2210  * @return #GNUNET_YES (do not cut p2p connection)
2211  */
2212 static int
2213 handle_dht_p2p_result (void *cls,
2214                        const struct GNUNET_PeerIdentity *peer,
2215                        const struct GNUNET_MessageHeader *message)
2216 {
2217   const struct PeerResultMessage *prm;
2218   const struct GNUNET_PeerIdentity *put_path;
2219   const struct GNUNET_PeerIdentity *get_path;
2220   const void *data;
2221   uint32_t get_path_length;
2222   uint32_t put_path_length;
2223   uint16_t msize;
2224   size_t data_size;
2225   enum GNUNET_BLOCK_Type type;
2226
2227   /* parse and validate message */
2228   msize = ntohs (message->size);
2229   if (msize < sizeof (struct PeerResultMessage))
2230   {
2231     GNUNET_break_op (0);
2232     return GNUNET_YES;
2233   }
2234   prm = (struct PeerResultMessage *) message;
2235   put_path_length = ntohl (prm->put_path_length);
2236   get_path_length = ntohl (prm->get_path_length);
2237   if ((msize <
2238        sizeof (struct PeerResultMessage) + (get_path_length +
2239                                             put_path_length) *
2240        sizeof (struct GNUNET_PeerIdentity)) ||
2241       (get_path_length >
2242        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2243       (put_path_length >
2244        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2245   {
2246     GNUNET_break_op (0);
2247     return GNUNET_YES;
2248   }
2249   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2250   get_path = &put_path[put_path_length];
2251   type = ntohl (prm->type);
2252   data = (const void *) &get_path[get_path_length];
2253   data_size =
2254       msize - (sizeof (struct PeerResultMessage) +
2255                (get_path_length +
2256                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2257   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
2258                             1, GNUNET_NO);
2259   GNUNET_STATISTICS_update (GDS_stats,
2260                             gettext_noop ("# P2P RESULT bytes received"),
2261                             msize, GNUNET_NO);
2262   if (GNUNET_YES == log_route_details_stderr)
2263   {
2264     char *tmp;
2265
2266     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2267     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2268                  "R5N RESULT %s: %s->%s (%u)\n",
2269                  GNUNET_h2s (&prm->key),
2270                  GNUNET_i2s (peer),
2271                  tmp,
2272                  get_path_length + 1);
2273     GNUNET_free (tmp);
2274   }
2275   /* if we got a HELLO, consider it for our own routing table */
2276   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2277   {
2278     const struct GNUNET_MessageHeader *h;
2279     struct GNUNET_PeerIdentity pid;
2280
2281     /* Should be a HELLO, validate and consider using it! */
2282     if (data_size < sizeof (struct GNUNET_MessageHeader))
2283     {
2284       GNUNET_break_op (0);
2285       return GNUNET_YES;
2286     }
2287     h = data;
2288     if (data_size != ntohs (h->size))
2289     {
2290       GNUNET_break_op (0);
2291       return GNUNET_YES;
2292     }
2293     if (GNUNET_OK !=
2294         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2295                              &pid))
2296     {
2297       GNUNET_break_op (0);
2298       return GNUNET_YES;
2299     }
2300     if ( (GNUNET_YES != disable_try_connect) &&
2301          (0 != memcmp (&my_identity,
2302                        &pid,
2303                        sizeof (struct GNUNET_PeerIdentity))) )
2304       try_connect (&pid,
2305                    h);
2306   }
2307
2308   /* append 'peer' to 'get_path' */
2309   {
2310     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2311
2312     memcpy (xget_path,
2313             get_path,
2314             get_path_length * sizeof (struct GNUNET_PeerIdentity));
2315     xget_path[get_path_length] = *peer;
2316     get_path_length++;
2317
2318     /* forward to local clients */
2319     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2320                               &prm->key,
2321                               get_path_length,
2322                               xget_path,
2323                               put_path_length,
2324                               put_path,
2325                               type,
2326                               data_size,
2327                               data);
2328     GDS_CLIENTS_process_get_resp (type,
2329                                   xget_path,
2330                                   get_path_length,
2331                                   put_path, put_path_length,
2332                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2333                                   &prm->key,
2334                                   data,
2335                                   data_size);
2336     if (GNUNET_YES == cache_results)
2337     {
2338       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2339
2340       memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
2341       memcpy (&xput_path[put_path_length],
2342               xget_path,
2343               get_path_length * sizeof (struct GNUNET_PeerIdentity));
2344
2345       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2346                                 &prm->key,
2347                                 get_path_length + put_path_length,
2348                                 xput_path,
2349                                 type,
2350                                 data_size,
2351                                 data);
2352     }
2353     /* forward to other peers */
2354     GDS_ROUTING_process (type,
2355                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2356                          &prm->key,
2357                          put_path_length,
2358                          put_path,
2359                          get_path_length,
2360                          xget_path,
2361                          data,
2362                          data_size);
2363   }
2364
2365   return GNUNET_YES;
2366 }
2367
2368
2369 /**
2370  * Initialize neighbours subsystem.
2371  *
2372  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2373  */
2374 int
2375 GDS_NEIGHBOURS_init ()
2376 {
2377   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2378     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2379     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2380     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2381     {NULL, 0, 0}
2382   };
2383   unsigned long long temp_config_num;
2384
2385   disable_try_connect
2386     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT");
2387   if (GNUNET_OK ==
2388       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2389                                              &temp_config_num))
2390     bucket_size = (unsigned int) temp_config_num;
2391   cache_results
2392     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS");
2393
2394   log_route_details_stderr =
2395     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2396   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2397   core_api =
2398       GNUNET_CORE_connect (GDS_cfg, NULL,
2399                            &core_init,
2400                            &handle_core_connect,
2401                            &handle_core_disconnect,
2402                            NULL, GNUNET_NO,
2403                            NULL, GNUNET_NO,
2404                            core_handlers);
2405   if (core_api == NULL)
2406     return GNUNET_SYSERR;
2407   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2408                                                               GNUNET_NO);
2409   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2410                                                             GNUNET_NO);
2411   return GNUNET_OK;
2412 }
2413
2414
2415 /**
2416  * Shutdown neighbours subsystem.
2417  */
2418 void
2419 GDS_NEIGHBOURS_done ()
2420 {
2421   if (NULL == core_api)
2422     return;
2423   GNUNET_CORE_disconnect (core_api);
2424   core_api = NULL;
2425   GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2426   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2427   all_connected_peers = NULL;
2428   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2429                                          &free_connect_info,
2430                                          NULL);
2431   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2432   all_desired_peers = NULL;
2433   GNUNET_ATS_connectivity_done (ats_ch);
2434   ats_ch = NULL;
2435   if (NULL != find_peer_task)
2436   {
2437     GNUNET_SCHEDULER_cancel (find_peer_task);
2438     find_peer_task = NULL;
2439   }
2440 }
2441
2442
2443 /**
2444  * Get the ID of the local node.
2445  *
2446  * @return identity of the local node
2447  */
2448 struct GNUNET_PeerIdentity *
2449 GDS_NEIGHBOURS_get_id ()
2450 {
2451   return &my_identity;
2452 }
2453
2454
2455 /* end of gnunet-service-dht_neighbours.c */