-ensure stats queues do not grow too big
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_hello.h"
45 #include "gnunet-service-dht_neighbours.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include "dht.h"
49
50 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * Maximum allowed number of pending messages per peer.
74  */
75 #define MAXIMUM_PENDING_PER_PEER 64
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92 /**
93  * Hello address expiration
94  */
95 extern struct GNUNET_TIME_Relative hello_expiration;
96
97
98 GNUNET_NETWORK_STRUCT_BEGIN
99
100 /**
101  * P2P PUT message
102  */
103 struct PeerPutMessage
104 {
105   /**
106    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
107    */
108   struct GNUNET_MessageHeader header;
109
110   /**
111    * Processing options
112    */
113   uint32_t options GNUNET_PACKED;
114
115   /**
116    * Content type.
117    */
118   uint32_t type GNUNET_PACKED;
119
120   /**
121    * Hop count
122    */
123   uint32_t hop_count GNUNET_PACKED;
124
125   /**
126    * Replication level for this message
127    */
128   uint32_t desired_replication_level GNUNET_PACKED;
129
130   /**
131    * Length of the PUT path that follows (if tracked).
132    */
133   uint32_t put_path_length GNUNET_PACKED;
134
135   /**
136    * When does the content expire?
137    */
138   struct GNUNET_TIME_AbsoluteNBO expiration_time;
139
140   /**
141    * Bloomfilter (for peer identities) to stop circular routes
142    */
143   char bloomfilter[DHT_BLOOM_SIZE];
144
145   /**
146    * The key we are storing under.
147    */
148   struct GNUNET_HashCode key;
149
150   /* put path (if tracked) */
151
152   /* Payload */
153
154 };
155
156
157 /**
158  * P2P Result message
159  */
160 struct PeerResultMessage
161 {
162   /**
163    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Content type.
169    */
170   uint32_t type GNUNET_PACKED;
171
172   /**
173    * Length of the PUT path that follows (if tracked).
174    */
175   uint32_t put_path_length GNUNET_PACKED;
176
177   /**
178    * Length of the GET path that follows (if tracked).
179    */
180   uint32_t get_path_length GNUNET_PACKED;
181
182   /**
183    * When does the content expire?
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration_time;
186
187   /**
188    * The key of the corresponding GET request.
189    */
190   struct GNUNET_HashCode key;
191
192   /* put path (if tracked) */
193
194   /* get path (if tracked) */
195
196   /* Payload */
197
198 };
199
200
201 /**
202  * P2P GET message
203  */
204 struct PeerGetMessage
205 {
206   /**
207    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
208    */
209   struct GNUNET_MessageHeader header;
210
211   /**
212    * Processing options
213    */
214   uint32_t options GNUNET_PACKED;
215
216   /**
217    * Desired content type.
218    */
219   uint32_t type GNUNET_PACKED;
220
221   /**
222    * Hop count
223    */
224   uint32_t hop_count GNUNET_PACKED;
225
226   /**
227    * Desired replication level for this request.
228    */
229   uint32_t desired_replication_level GNUNET_PACKED;
230
231   /**
232    * Size of the extended query.
233    */
234   uint32_t xquery_size;
235
236   /**
237    * Bloomfilter mutator.
238    */
239   uint32_t bf_mutator;
240
241   /**
242    * Bloomfilter (for peer identities) to stop circular routes
243    */
244   char bloomfilter[DHT_BLOOM_SIZE];
245
246   /**
247    * The key we are looking for.
248    */
249   struct GNUNET_HashCode key;
250
251   /* xquery */
252
253   /* result bloomfilter */
254
255 };
256 GNUNET_NETWORK_STRUCT_END
257
258 /**
259  * Linked list of messages to send to a particular other peer.
260  */
261 struct P2PPendingMessage
262 {
263   /**
264    * Pointer to next item in the list
265    */
266   struct P2PPendingMessage *next;
267
268   /**
269    * Pointer to previous item in the list
270    */
271   struct P2PPendingMessage *prev;
272
273   /**
274    * Message importance level.  FIXME: used? useful?
275    */
276   unsigned int importance;
277
278   /**
279    * When does this message time out?
280    */
281   struct GNUNET_TIME_Absolute timeout;
282
283   /**
284    * Actual message to be sent, allocated at the end of the struct:
285    * // msg = (cast) &pm[1];
286    * // memcpy (&pm[1], data, len);
287    */
288   const struct GNUNET_MessageHeader *msg;
289
290 };
291
292
293 /**
294  * Entry for a peer in a bucket.
295  */
296 struct PeerInfo
297 {
298   /**
299    * Next peer entry (DLL)
300    */
301   struct PeerInfo *next;
302
303   /**
304    *  Prev peer entry (DLL)
305    */
306   struct PeerInfo *prev;
307
308   /**
309    * Count of outstanding messages for peer.
310    */
311   unsigned int pending_count;
312
313   /**
314    * Head of pending messages to be sent to this peer.
315    */
316   struct P2PPendingMessage *head;
317
318   /**
319    * Tail of pending messages to be sent to this peer.
320    */
321   struct P2PPendingMessage *tail;
322
323   /**
324    * Core handle for sending messages to this peer.
325    */
326   struct GNUNET_CORE_TransmitHandle *th;
327
328   /**
329    * What is the identity of the peer?
330    */
331   struct GNUNET_PeerIdentity id;
332
333 #if 0
334   /**
335    * What is the average latency for replies received?
336    */
337   struct GNUNET_TIME_Relative latency;
338
339   /**
340    * Transport level distance to peer.
341    */
342   unsigned int distance;
343 #endif
344
345 };
346
347
348 /**
349  * Peers are grouped into buckets.
350  */
351 struct PeerBucket
352 {
353   /**
354    * Head of DLL
355    */
356   struct PeerInfo *head;
357
358   /**
359    * Tail of DLL
360    */
361   struct PeerInfo *tail;
362
363   /**
364    * Number of peers in the bucket.
365    */
366   unsigned int peers_size;
367 };
368
369
370 /**
371  * Information about a peer that we would like to connect to.
372  */
373 struct ConnectInfo
374 {
375
376   /**
377    * Handle to active HELLO offer operation, or NULL.
378    */
379   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
380
381   /**
382    * Handle to active connectivity suggestion operation, or NULL.
383    */
384   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
385
386   /**
387    * How much would we like to connect to this peer?
388    */
389   uint32_t strength;
390 };
391
392
393 /**
394  * Do we cache all results that we are routing in the local datacache?
395  */
396 static int cache_results;
397
398 /**
399  * Should routing details be logged to stderr (for debugging)?
400  */
401 static int log_route_details_stderr;
402
403 /**
404  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
405  */
406 static unsigned int closest_bucket;
407
408 /**
409  * How many peers have we added since we sent out our last
410  * find peer request?
411  */
412 static unsigned int newly_found_peers;
413
414 /**
415  * Option for testing that disables the 'connect' function of the DHT.
416  */
417 static int disable_try_connect;
418
419 /**
420  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
421  */
422 static struct PeerBucket k_buckets[MAX_BUCKETS];
423
424 /**
425  * Hash map of all CORE-connected peers, for easy removal from
426  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
427  */
428 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
429
430 /**
431  * Hash map of all peers we would like to be connected to.
432  * Values are of type `struct ConnectInfo`.
433  */
434 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
435
436 /**
437  * Maximum size for each bucket.
438  */
439 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
440
441 /**
442  * Task that sends FIND PEER requests.
443  */
444 static struct GNUNET_SCHEDULER_Task *find_peer_task;
445
446 /**
447  * Identity of this peer.
448  */
449 static struct GNUNET_PeerIdentity my_identity;
450
451 /**
452  * Hash of the identity of this peer.
453  */
454 static struct GNUNET_HashCode my_identity_hash;
455
456 /**
457  * Handle to CORE.
458  */
459 static struct GNUNET_CORE_Handle *core_api;
460
461 /**
462  * Handle to ATS connectivity.
463  */
464 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
465
466
467 /**
468  * Find the optimal bucket for this key.
469  *
470  * @param hc the hashcode to compare our identity to
471  * @return the proper bucket index, or GNUNET_SYSERR
472  *         on error (same hashcode)
473  */
474 static int
475 find_bucket (const struct GNUNET_HashCode *hc)
476 {
477   unsigned int bits;
478
479   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
480   if (bits == MAX_BUCKETS)
481   {
482     /* How can all bits match? Got my own ID? */
483     GNUNET_break (0);
484     return GNUNET_SYSERR;
485   }
486   return MAX_BUCKETS - bits - 1;
487 }
488
489
490 /**
491  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
492  * Clean up the "oh" field in the @a cls
493  *
494  * @param cls a `struct ConnectInfo`
495  */
496 static void
497 offer_hello_done (void *cls)
498 {
499   struct ConnectInfo *ci = cls;
500
501   ci->oh = NULL;
502 }
503
504
505 /**
506  * Function called for all entries in #all_desired_peers to clean up.
507  *
508  * @param cls NULL
509  * @param peer peer the entry is for
510  * @param value the value to remove
511  * @return #GNUNET_YES
512  */
513 static int
514 free_connect_info (void *cls,
515                    const struct GNUNET_PeerIdentity *peer,
516                    void *value)
517 {
518   struct ConnectInfo *ci = value;
519
520   GNUNET_assert (GNUNET_YES ==
521                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
522                                                        peer,
523                                                        ci));
524   if (NULL != ci->sh)
525   {
526     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
527     ci->sh = NULL;
528   }
529   if (NULL != ci->oh)
530   {
531     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
532     ci->oh = NULL;
533   }
534   GNUNET_free (ci);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Consider if we want to connect to a given peer, and if so
541  * let ATS know.  If applicable, the HELLO is offered to the
542  * TRANSPORT service.
543  *
544  * @param pid peer to consider connectivity requirements for
545  * @param h a HELLO message, or NULL
546  */
547 static void
548 try_connect (const struct GNUNET_PeerIdentity *pid,
549              const struct GNUNET_MessageHeader *h)
550 {
551   int bucket;
552   struct GNUNET_HashCode pid_hash;
553   struct ConnectInfo *ci;
554   uint32_t strength;
555
556   GNUNET_CRYPTO_hash (pid,
557                       sizeof (struct GNUNET_PeerIdentity),
558                       &pid_hash);
559   bucket = find_bucket (&pid_hash);
560   if (bucket < 0)
561     return; /* self? */
562   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
563                                           pid);
564
565   if (k_buckets[bucket].peers_size < bucket_size)
566     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
567   else
568     strength = bucket; /* minimum value of connectivity */
569   if (GNUNET_YES ==
570       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
571                                               pid))
572     strength *= 2; /* double for connected peers */
573   else if (k_buckets[bucket].peers_size > bucket_size)
574     strength = 0; /* bucket full, we really do not care about more */
575
576   if ( (0 == strength) &&
577        (NULL != ci) )
578   {
579     /* release request */
580     GNUNET_assert (GNUNET_YES ==
581                    free_connect_info (NULL,
582                                       pid,
583                                       ci));
584     return;
585   }
586   if (NULL == ci)
587   {
588     ci = GNUNET_new (struct ConnectInfo);
589     GNUNET_assert (GNUNET_OK ==
590                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
591                                                       pid,
592                                                       ci,
593                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
594   }
595   if ( (NULL != GDS_transport_handle) &&
596        (NULL != ci->oh) &&
597        (NULL != h) )
598     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
599   if ( (NULL != GDS_transport_handle) &&
600        (NULL != h) )
601     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
602                                            h,
603                                            &offer_hello_done,
604                                            ci);
605   if ( (NULL != ci->sh) &&
606        (ci->strength != strength) )
607     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
608   if (ci->strength != strength)
609     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
610                                               pid,
611                                               strength);
612   ci->strength = strength;
613 }
614
615
616 /**
617  * Function called for each peer in #all_desired_peers during
618  * #update_connect_preferences() if we have reason to adjust
619  * the strength of our desire to keep connections to certain
620  * peers.  Calls #try_connect() to update the calculations for
621  * the given @a pid.
622  *
623  * @param cls NULL
624  * @param pid peer to update
625  * @param value unused
626  * @return #GNUNET_YES (continue to iterate)
627  */
628 static int
629 update_desire_strength (void *cls,
630                         const struct GNUNET_PeerIdentity *pid,
631                         void *value)
632 {
633   try_connect (pid, NULL);
634   return GNUNET_YES;
635 }
636
637
638 /**
639  * Update our preferences for connectivity as given to ATS.
640  *
641  * @param cls the `struct PeerInfo` of the peer
642  * @param tc scheduler context.
643  */
644 static void
645 update_connect_preferences ()
646 {
647   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
648                                          &update_desire_strength,
649                                          NULL);
650 }
651
652
653 /**
654  * Closure for #add_known_to_bloom().
655  */
656 struct BloomConstructorContext
657 {
658   /**
659    * Bloom filter under construction.
660    */
661   struct GNUNET_CONTAINER_BloomFilter *bloom;
662
663   /**
664    * Mutator to use.
665    */
666   uint32_t bf_mutator;
667 };
668
669
670 /**
671  * Add each of the peers we already know to the bloom filter of
672  * the request so that we don't get duplicate HELLOs.
673  *
674  * @param cls the 'struct BloomConstructorContext'.
675  * @param key peer identity to add to the bloom filter
676  * @param value value the peer information (unused)
677  * @return #GNUNET_YES (we should continue to iterate)
678  */
679 static int
680 add_known_to_bloom (void *cls,
681                     const struct GNUNET_PeerIdentity *key,
682                     void *value)
683 {
684   struct BloomConstructorContext *ctx = cls;
685   struct GNUNET_HashCode key_hash;
686   struct GNUNET_HashCode mh;
687
688   GNUNET_CRYPTO_hash (key,
689                       sizeof (struct GNUNET_PeerIdentity),
690                       &key_hash);
691   GNUNET_BLOCK_mingle_hash (&key_hash,
692                             ctx->bf_mutator,
693                             &mh);
694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
696               GNUNET_i2s (key), ctx->bf_mutator);
697   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
698   return GNUNET_YES;
699 }
700
701
702 /**
703  * Task to send a find peer message for our own peer identifier
704  * so that we can find the closest peers in the network to ourselves
705  * and attempt to connect to them.
706  *
707  * @param cls closure for this task
708  */
709 static void
710 send_find_peer_message (void *cls)
711 {
712   struct GNUNET_TIME_Relative next_send_time;
713   struct BloomConstructorContext bcc;
714   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
715
716   find_peer_task = NULL;
717   if (newly_found_peers > bucket_size)
718   {
719     /* If we are finding many peers already, no need to send out our request right now! */
720     find_peer_task =
721         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
722                                       &send_find_peer_message,
723                                       NULL);
724     newly_found_peers = 0;
725     return;
726   }
727   bcc.bf_mutator =
728       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
729                                 UINT32_MAX);
730   bcc.bloom =
731       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
732                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
733   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
734                                          &add_known_to_bloom,
735                                          &bcc);
736   GNUNET_STATISTICS_update (GDS_stats,
737                             gettext_noop ("# FIND PEER messages initiated"),
738                             1,
739                             GNUNET_NO);
740   peer_bf =
741       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
742                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
743   // FIXME: pass priority!?
744   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
745                              GNUNET_DHT_RO_FIND_PEER,
746                              FIND_PEER_REPLICATION_LEVEL, 0,
747                              &my_identity_hash, NULL, 0, bcc.bloom,
748                              bcc.bf_mutator, peer_bf);
749   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
750   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
751   /* schedule next round */
752   next_send_time.rel_value_us =
753       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
754       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
755                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
756                                 (newly_found_peers + 1));
757   newly_found_peers = 0;
758   GNUNET_assert (NULL == find_peer_task);
759   find_peer_task =
760       GNUNET_SCHEDULER_add_delayed (next_send_time,
761                                     &send_find_peer_message,
762                                     NULL);
763 }
764
765
766 /**
767  * Method called whenever a peer connects.
768  *
769  * @param cls closure
770  * @param peer peer identity this notification is about
771  */
772 static void
773 handle_core_connect (void *cls,
774                      const struct GNUNET_PeerIdentity *peer)
775 {
776   struct PeerInfo *ret;
777   struct GNUNET_HashCode phash;
778   int peer_bucket;
779
780   /* Check for connect to self message */
781   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
782     return;
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Connected to %s\n",
785               GNUNET_i2s (peer));
786   if (GNUNET_YES ==
787       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
788                                               peer))
789   {
790     GNUNET_break (0);
791     return;
792   }
793   GNUNET_STATISTICS_update (GDS_stats,
794                             gettext_noop ("# peers connected"),
795                             1,
796                             GNUNET_NO);
797   GNUNET_CRYPTO_hash (peer,
798                       sizeof (struct GNUNET_PeerIdentity),
799                       &phash);
800   peer_bucket = find_bucket (&phash);
801   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
802   ret = GNUNET_new (struct PeerInfo);
803 #if 0
804   ret->latency = latency;
805   ret->distance = distance;
806 #endif
807   ret->id = *peer;
808   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
809                                     k_buckets[peer_bucket].tail,
810                                     ret);
811   k_buckets[peer_bucket].peers_size++;
812   closest_bucket = GNUNET_MAX (closest_bucket,
813                                peer_bucket);
814   GNUNET_assert (GNUNET_OK ==
815                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
816                                                     peer,
817                                                     ret,
818                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
819   if ( (peer_bucket > 0) &&
820        (k_buckets[peer_bucket].peers_size <= bucket_size))
821   {
822     update_connect_preferences ();
823     newly_found_peers++;
824   }
825   if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
826        (GNUNET_YES != disable_try_connect))
827   {
828     /* got a first connection, good time to start with FIND PEER requests... */
829     GNUNET_assert (NULL == find_peer_task);
830     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
831                                                NULL);
832   }
833 }
834
835
836 /**
837  * Method called whenever a peer disconnects.
838  *
839  * @param cls closure
840  * @param peer peer identity this notification is about
841  */
842 static void
843 handle_core_disconnect (void *cls,
844                         const struct GNUNET_PeerIdentity *peer)
845 {
846   struct PeerInfo *to_remove;
847   int current_bucket;
848   struct P2PPendingMessage *pos;
849   unsigned int discarded;
850   struct GNUNET_HashCode phash;
851
852   /* Check for disconnect from self message */
853   if (0 == memcmp (&my_identity,
854                    peer,
855                    sizeof (struct GNUNET_PeerIdentity)))
856     return;
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "Disconnected %s\n",
859               GNUNET_i2s (peer));
860   to_remove =
861       GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
862                                          peer);
863   if (NULL == to_remove)
864   {
865     GNUNET_break (0);
866     return;
867   }
868   GNUNET_STATISTICS_update (GDS_stats,
869                             gettext_noop ("# peers connected"),
870                             -1,
871                             GNUNET_NO);
872   GNUNET_assert (GNUNET_YES ==
873                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
874                                                        peer,
875                                                        to_remove));
876   if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
877         (GNUNET_YES != disable_try_connect)) )
878   {
879     GNUNET_SCHEDULER_cancel (find_peer_task);
880     find_peer_task = NULL;
881   }
882   GNUNET_CRYPTO_hash (peer,
883                       sizeof (struct GNUNET_PeerIdentity),
884                       &phash);
885   current_bucket = find_bucket (&phash);
886   GNUNET_assert (current_bucket >= 0);
887   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
888                                k_buckets[current_bucket].tail,
889                                to_remove);
890   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
891   k_buckets[current_bucket].peers_size--;
892   while ( (closest_bucket > 0) &&
893           (0 == k_buckets[closest_bucket].peers_size) )
894     closest_bucket--;
895   if (NULL != to_remove->th)
896   {
897     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
898     to_remove->th = NULL;
899   }
900   discarded = 0;
901   while (NULL != (pos = to_remove->head))
902   {
903     GNUNET_CONTAINER_DLL_remove (to_remove->head,
904                                  to_remove->tail,
905                                  pos);
906     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                 "Dropping message of type %u due to disconnect\n",
908                 ntohs (pos->msg->type));
909     discarded++;
910     GNUNET_free (pos);
911   }
912   if (k_buckets[current_bucket].peers_size < bucket_size)
913     update_connect_preferences ();
914   GNUNET_STATISTICS_update (GDS_stats,
915                             gettext_noop ("# Queued messages discarded (peer disconnected)"),
916                             discarded,
917                             GNUNET_NO);
918   GNUNET_free (to_remove);
919 }
920
921
922 /**
923  * Called when core is ready to send a message we asked for
924  * out to the destination.
925  *
926  * @param cls the 'struct PeerInfo' of the target peer
927  * @param size number of bytes available in @a buf
928  * @param buf where the callee should write the message
929  * @return number of bytes written to @a buf
930  */
931 static size_t
932 core_transmit_notify (void *cls,
933                       size_t size,
934                       void *buf)
935 {
936   struct PeerInfo *peer = cls;
937   char *cbuf = buf;
938   struct P2PPendingMessage *pending;
939   size_t off;
940   size_t msize;
941
942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943               "DHT ctn called with buffer of %u bytes\n",
944               (unsigned int) size);
945   peer->th = NULL;
946   while ((NULL != (pending = peer->head)) &&
947          (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
948   {
949     GNUNET_STATISTICS_update (GDS_stats,
950                               gettext_noop
951                               ("# Messages dropped (CORE timeout)"),
952                               1,
953                               GNUNET_NO);
954     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955                 "Dropping message of type %u due to timeout\n",
956                 ntohs (pending->msg->type));
957     peer->pending_count--;
958     GNUNET_CONTAINER_DLL_remove (peer->head,
959                                  peer->tail,
960                                  pending);
961     GNUNET_free (pending);
962   }
963   if (NULL == pending)
964   {
965     /* no messages pending */
966     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967                 "No messages pending\n");
968     return 0;
969   }
970   if (NULL == buf)
971   {
972     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973                 "Got NULL buffer, trying again\n");
974     peer->th =
975         GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
976                                            GNUNET_CORE_PRIO_BEST_EFFORT,
977                                            GNUNET_TIME_absolute_get_remaining
978                                            (pending->timeout),
979                                            &peer->id,
980                                            ntohs (pending->msg->size),
981                                            &core_transmit_notify,
982                                            peer);
983     GNUNET_break (NULL != peer->th);
984     return 0;
985   }
986   off = 0;
987   while ((NULL != (pending = peer->head)) &&
988          (size - off >= (msize = ntohs (pending->msg->size))))
989   {
990     GNUNET_STATISTICS_update (GDS_stats,
991                               gettext_noop
992                               ("# Bytes transmitted to other peers"), msize,
993                               GNUNET_NO);
994     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995                 "Transmitting message of type %u to %s\n",
996                 ntohs (pending->msg->type),
997                 GNUNET_i2s (&peer->id));
998     memcpy (&cbuf[off],
999             pending->msg,
1000             msize);
1001     off += msize;
1002     peer->pending_count--;
1003     GNUNET_CONTAINER_DLL_remove (peer->head,
1004                                  peer->tail,
1005                                  pending);
1006     GNUNET_free (pending);
1007   }
1008   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1009               "%u bytes fit in %u bytes available, next message is %u bytes\n",
1010               (unsigned int) off,
1011               (unsigned int) size,
1012               (NULL != peer->head) ? ntohs (peer->head->msg->size) : 0);
1013   if (NULL != (pending = peer->head))
1014   {
1015     /* technically redundant, but easier to read and
1016        avoids bogus gcc warning... */
1017     msize = ntohs (pending->msg->size);
1018     peer->th =
1019       GNUNET_CORE_notify_transmit_ready (core_api,
1020                                          GNUNET_NO,
1021                                          GNUNET_CORE_PRIO_BEST_EFFORT,
1022                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
1023                                          &peer->id,
1024                                          msize,
1025                                          &core_transmit_notify,
1026                                          peer);
1027     GNUNET_break (NULL != peer->th);
1028   }
1029   return off;
1030 }
1031
1032
1033 /**
1034  * Transmit all messages in the peer's message queue.
1035  *
1036  * @param peer message queue to process
1037  */
1038 static void
1039 process_peer_queue (struct PeerInfo *peer)
1040 {
1041   struct P2PPendingMessage *pending;
1042
1043   if (NULL == (pending = peer->head))
1044     return;
1045   if (NULL != peer->th)
1046   {
1047     GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
1048     peer->th = NULL;
1049   }
1050   GNUNET_STATISTICS_update (GDS_stats,
1051                             gettext_noop
1052                             ("# Bytes of bandwidth requested from core"),
1053                             ntohs (pending->msg->size), GNUNET_NO);
1054   peer->th =
1055       GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
1056                                          GNUNET_CORE_PRIO_BEST_EFFORT,
1057                                          GNUNET_TIME_absolute_get_remaining
1058                                          (pending->timeout),
1059                                          &peer->id,
1060                                          ntohs (pending->msg->size),
1061                                          &core_transmit_notify,
1062                                          peer);
1063   GNUNET_break (NULL != peer->th);
1064 }
1065
1066
1067 /**
1068  * To how many peers should we (on average) forward the request to
1069  * obtain the desired target_replication count (on average).
1070  *
1071  * @param hop_count number of hops the message has traversed
1072  * @param target_replication the number of total paths desired
1073  * @return Some number of peers to forward the message to
1074  */
1075 static unsigned int
1076 get_forward_count (uint32_t hop_count, uint32_t target_replication)
1077 {
1078   uint32_t random_value;
1079   uint32_t forward_count;
1080   float target_value;
1081
1082   if (hop_count > GDS_NSE_get () * 4.0)
1083   {
1084     /* forcefully terminate */
1085     GNUNET_STATISTICS_update (GDS_stats,
1086                               gettext_noop ("# requests TTL-dropped"),
1087                               1, GNUNET_NO);
1088     return 0;
1089   }
1090   if (hop_count > GDS_NSE_get () * 2.0)
1091   {
1092     /* Once we have reached our ideal number of hops, only forward to 1 peer */
1093     return 1;
1094   }
1095   /* bound by system-wide maximum */
1096   target_replication =
1097       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
1098   target_value =
1099       1 + (target_replication - 1.0) / (GDS_NSE_get () +
1100                                         ((float) (target_replication - 1.0) *
1101                                          hop_count));
1102   /* Set forward count to floor of target_value */
1103   forward_count = (uint32_t) target_value;
1104   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1105   target_value = target_value - forward_count;
1106   random_value =
1107       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1108   if (random_value < (target_value * UINT32_MAX))
1109     forward_count++;
1110   return forward_count;
1111 }
1112
1113
1114 /**
1115  * Compute the distance between have and target as a 32-bit value.
1116  * Differences in the lower bits must count stronger than differences
1117  * in the higher bits.
1118  *
1119  * @param target
1120  * @param have
1121  * @return 0 if have==target, otherwise a number
1122  *           that is larger as the distance between
1123  *           the two hash codes increases
1124  */
1125 static unsigned int
1126 get_distance (const struct GNUNET_HashCode *target,
1127               const struct GNUNET_HashCode *have)
1128 {
1129   unsigned int bucket;
1130   unsigned int msb;
1131   unsigned int lsb;
1132   unsigned int i;
1133
1134   /* We have to represent the distance between two 2^9 (=512)-bit
1135    * numbers as a 2^5 (=32)-bit number with "0" being used for the
1136    * two numbers being identical; furthermore, we need to
1137    * guarantee that a difference in the number of matching
1138    * bits is always represented in the result.
1139    *
1140    * We use 2^32/2^9 numerical values to distinguish between
1141    * hash codes that have the same LSB bit distance and
1142    * use the highest 2^9 bits of the result to signify the
1143    * number of (mis)matching LSB bits; if we have 0 matching
1144    * and hence 512 mismatching LSB bits we return -1 (since
1145    * 512 itself cannot be represented with 9 bits) */
1146
1147   /* first, calculate the most significant 9 bits of our
1148    * result, aka the number of LSBs */
1149   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1150   /* bucket is now a value between 0 and 512 */
1151   if (bucket == 512)
1152     return 0;                   /* perfect match */
1153   if (bucket == 0)
1154     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1155                                  * below, we'd end up with max+1 (overflow)) */
1156
1157   /* calculate the most significant bits of the final result */
1158   msb = (512 - bucket) << (32 - 9);
1159   /* calculate the 32-9 least significant bits of the final result by
1160    * looking at the differences in the 32-9 bits following the
1161    * mismatching bit at 'bucket' */
1162   lsb = 0;
1163   for (i = bucket + 1;
1164        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1165   {
1166     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
1167         GNUNET_CRYPTO_hash_get_bit (have, i))
1168       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
1169                                                  * last bit set will be 31 -- if
1170                                                  * i does not reach 512 first... */
1171   }
1172   return msb | lsb;
1173 }
1174
1175
1176 /**
1177  * Check whether my identity is closer than any known peers.  If a
1178  * non-null bloomfilter is given, check if this is the closest peer
1179  * that hasn't already been routed to.
1180  *
1181  * @param key hash code to check closeness to
1182  * @param bloom bloomfilter, exclude these entries from the decision
1183  * @return #GNUNET_YES if node location is closest,
1184  *         #GNUNET_NO otherwise.
1185  */
1186 static int
1187 am_closest_peer (const struct GNUNET_HashCode *key,
1188                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
1189 {
1190   int bits;
1191   int other_bits;
1192   int bucket_num;
1193   int count;
1194   struct PeerInfo *pos;
1195   struct GNUNET_HashCode phash;
1196
1197   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
1198     return GNUNET_YES;
1199   bucket_num = find_bucket (key);
1200   GNUNET_assert (bucket_num >= 0);
1201   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key);
1202   pos = k_buckets[bucket_num].head;
1203   count = 0;
1204   while ((NULL != pos) && (count < bucket_size))
1205   {
1206     GNUNET_CRYPTO_hash (&pos->id,
1207                         sizeof (struct GNUNET_PeerIdentity),
1208                         &phash);
1209     if ((NULL != bloom) &&
1210         (GNUNET_YES ==
1211          GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1212     {
1213       pos = pos->next;
1214       continue;                 /* Skip already checked entries */
1215     }
1216     other_bits = GNUNET_CRYPTO_hash_matching_bits (&phash, key);
1217     if (other_bits > bits)
1218       return GNUNET_NO;
1219     if (other_bits == bits)     /* We match the same number of bits */
1220       return GNUNET_YES;
1221     pos = pos->next;
1222   }
1223   /* No peers closer, we are the closest! */
1224   return GNUNET_YES;
1225 }
1226
1227
1228 /**
1229  * Select a peer from the routing table that would be a good routing
1230  * destination for sending a message for "key".  The resulting peer
1231  * must not be in the set of blocked peers.<p>
1232  *
1233  * Note that we should not ALWAYS select the closest peer to the
1234  * target, peers further away from the target should be chosen with
1235  * exponentially declining probability.
1236  *
1237  * FIXME: double-check that this is fine
1238  *
1239  *
1240  * @param key the key we are selecting a peer to route to
1241  * @param bloom a bloomfilter containing entries this request has seen already
1242  * @param hops how many hops has this message traversed thus far
1243  * @return Peer to route to, or NULL on error
1244  */
1245 static struct PeerInfo *
1246 select_peer (const struct GNUNET_HashCode *key,
1247              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1248              uint32_t hops)
1249 {
1250   unsigned int bc;
1251   unsigned int count;
1252   unsigned int selected;
1253   struct PeerInfo *pos;
1254   unsigned int dist;
1255   unsigned int smallest_distance;
1256   struct PeerInfo *chosen;
1257   struct GNUNET_HashCode phash;
1258
1259   if (hops >= GDS_NSE_get ())
1260   {
1261     /* greedy selection (closest peer that is not in bloomfilter) */
1262     smallest_distance = UINT_MAX;
1263     chosen = NULL;
1264     for (bc = 0; bc <= closest_bucket; bc++)
1265     {
1266       pos = k_buckets[bc].head;
1267       count = 0;
1268       while ((pos != NULL) && (count < bucket_size))
1269       {
1270         GNUNET_CRYPTO_hash (&pos->id,
1271                             sizeof (struct GNUNET_PeerIdentity),
1272                             &phash);
1273         if ((bloom == NULL) ||
1274             (GNUNET_NO ==
1275              GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1276         {
1277           dist = get_distance (key, &phash);
1278           if (dist < smallest_distance)
1279           {
1280             chosen = pos;
1281             smallest_distance = dist;
1282           }
1283         }
1284         else
1285         {
1286           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1288                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1289           GNUNET_STATISTICS_update (GDS_stats,
1290                                     gettext_noop
1291                                     ("# Peers excluded from routing due to Bloomfilter"),
1292                                     1, GNUNET_NO);
1293           dist = get_distance (key, &phash);
1294           if (dist < smallest_distance)
1295           {
1296             chosen = NULL;
1297             smallest_distance = dist;
1298           }
1299         }
1300         count++;
1301         pos = pos->next;
1302       }
1303     }
1304     if (NULL == chosen)
1305       GNUNET_STATISTICS_update (GDS_stats,
1306                                 gettext_noop ("# Peer selection failed"), 1,
1307                                 GNUNET_NO);
1308     return chosen;
1309   }
1310
1311   /* select "random" peer */
1312   /* count number of peers that are available and not filtered */
1313   count = 0;
1314   for (bc = 0; bc <= closest_bucket; bc++)
1315   {
1316     pos = k_buckets[bc].head;
1317     while ((pos != NULL) && (count < bucket_size))
1318     {
1319       GNUNET_CRYPTO_hash (&pos->id,
1320                           sizeof (struct GNUNET_PeerIdentity),
1321                           &phash);
1322       if ((bloom != NULL) &&
1323           (GNUNET_YES ==
1324            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1325       {
1326         GNUNET_STATISTICS_update (GDS_stats,
1327                                   gettext_noop
1328                                   ("# Peers excluded from routing due to Bloomfilter"),
1329                                   1, GNUNET_NO);
1330         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1332                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1333         pos = pos->next;
1334         continue;               /* Ignore bloomfiltered peers */
1335       }
1336       count++;
1337       pos = pos->next;
1338     }
1339   }
1340   if (0 == count)               /* No peers to select from! */
1341   {
1342     GNUNET_STATISTICS_update (GDS_stats,
1343                               gettext_noop ("# Peer selection failed"), 1,
1344                               GNUNET_NO);
1345     return NULL;
1346   }
1347   /* Now actually choose a peer */
1348   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1349   count = 0;
1350   for (bc = 0; bc <= closest_bucket; bc++)
1351   {
1352     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1353     {
1354       GNUNET_CRYPTO_hash (&pos->id,
1355                           sizeof (struct GNUNET_PeerIdentity),
1356                           &phash);
1357       if ((bloom != NULL) &&
1358           (GNUNET_YES ==
1359            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1360       {
1361         continue;               /* Ignore bloomfiltered peers */
1362       }
1363       if (0 == selected--)
1364         return pos;
1365     }
1366   }
1367   GNUNET_break (0);
1368   return NULL;
1369 }
1370
1371
1372 /**
1373  * Compute the set of peers that the given request should be
1374  * forwarded to.
1375  *
1376  * @param key routing key
1377  * @param bloom bloom filter excluding peers as targets, all selected
1378  *        peers will be added to the bloom filter
1379  * @param hop_count number of hops the request has traversed so far
1380  * @param target_replication desired number of replicas
1381  * @param targets where to store an array of target peers (to be
1382  *         free'd by the caller)
1383  * @return number of peers returned in 'targets'.
1384  */
1385 static unsigned int
1386 get_target_peers (const struct GNUNET_HashCode *key,
1387                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1388                   uint32_t hop_count, uint32_t target_replication,
1389                   struct PeerInfo ***targets)
1390 {
1391   unsigned int ret;
1392   unsigned int off;
1393   struct PeerInfo **rtargets;
1394   struct PeerInfo *nxt;
1395   struct GNUNET_HashCode nhash;
1396
1397   GNUNET_assert (NULL != bloom);
1398   ret = get_forward_count (hop_count, target_replication);
1399   if (0 == ret)
1400   {
1401     *targets = NULL;
1402     return 0;
1403   }
1404   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1405   for (off = 0; off < ret; off++)
1406   {
1407     nxt = select_peer (key, bloom, hop_count);
1408     if (NULL == nxt)
1409       break;
1410     rtargets[off] = nxt;
1411     GNUNET_CRYPTO_hash (&nxt->id,
1412                         sizeof (struct GNUNET_PeerIdentity),
1413                         &nhash);
1414     GNUNET_break (GNUNET_NO ==
1415                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1416                                                      &nhash));
1417     GNUNET_CONTAINER_bloomfilter_add (bloom, &nhash);
1418   }
1419   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1420               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1421               off,
1422               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1423               (unsigned int) hop_count,
1424               GNUNET_h2s (key),
1425               ret);
1426   if (0 == off)
1427   {
1428     GNUNET_free (rtargets);
1429     *targets = NULL;
1430     return 0;
1431   }
1432   *targets = rtargets;
1433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1434               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1435               GNUNET_h2s (key),
1436               off,
1437               ret);
1438   return off;
1439 }
1440
1441
1442 /**
1443  * Perform a PUT operation.   Forwards the given request to other
1444  * peers.   Does not store the data locally.  Does not give the
1445  * data to local clients.  May do nothing if this is the only
1446  * peer in the network (or if we are the closest peer in the
1447  * network).
1448  *
1449  * @param type type of the block
1450  * @param options routing options
1451  * @param desired_replication_level desired replication count
1452  * @param expiration_time when does the content expire
1453  * @param hop_count how many hops has this message traversed so far
1454  * @param bf Bloom filter of peers this PUT has already traversed
1455  * @param key key for the content
1456  * @param put_path_length number of entries in @a put_path
1457  * @param put_path peers this request has traversed so far (if tracked)
1458  * @param data payload to store
1459  * @param data_size number of bytes in @a data
1460  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1461  */
1462 int
1463 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1464                            enum GNUNET_DHT_RouteOption options,
1465                            uint32_t desired_replication_level,
1466                            struct GNUNET_TIME_Absolute expiration_time,
1467                            uint32_t hop_count,
1468                            struct GNUNET_CONTAINER_BloomFilter *bf,
1469                            const struct GNUNET_HashCode *key,
1470                            unsigned int put_path_length,
1471                            struct GNUNET_PeerIdentity *put_path,
1472                            const void *data, size_t data_size)
1473 {
1474   unsigned int target_count;
1475   unsigned int i;
1476   struct PeerInfo **targets;
1477   struct PeerInfo *target;
1478   struct P2PPendingMessage *pending;
1479   size_t msize;
1480   struct PeerPutMessage *ppm;
1481   struct GNUNET_PeerIdentity *pp;
1482   struct GNUNET_HashCode thash;
1483   unsigned int skip_count;
1484
1485   GNUNET_assert (NULL != bf);
1486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487               "Adding myself (%s) to PUT bloomfilter for %s\n",
1488               GNUNET_i2s (&my_identity),
1489               GNUNET_h2s (key));
1490   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1491   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1492                             1, GNUNET_NO);
1493   target_count =
1494       get_target_peers (key, bf, hop_count, desired_replication_level,
1495                         &targets);
1496   if (0 == target_count)
1497   {
1498     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1499                 "Routing PUT for %s terminates after %u hops at %s\n",
1500                 GNUNET_h2s (key), (unsigned int) hop_count,
1501                 GNUNET_i2s (&my_identity));
1502     return GNUNET_NO;
1503   }
1504   msize =
1505       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1506       sizeof (struct PeerPutMessage);
1507   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1508   {
1509     put_path_length = 0;
1510     msize = data_size + sizeof (struct PeerPutMessage);
1511   }
1512   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1513   {
1514     GNUNET_break (0);
1515     GNUNET_free (targets);
1516     return GNUNET_NO;
1517   }
1518   GNUNET_STATISTICS_update (GDS_stats,
1519                             gettext_noop
1520                             ("# PUT messages queued for transmission"),
1521                             target_count, GNUNET_NO);
1522   skip_count = 0;
1523   for (i = 0; i < target_count; i++)
1524   {
1525     target = targets[i];
1526     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1527     {
1528       /* skip */
1529       GNUNET_STATISTICS_update (GDS_stats,
1530                                 gettext_noop ("# P2P messages dropped due to full queue"),
1531                                 1, GNUNET_NO);
1532       skip_count++;
1533       continue;
1534     }
1535     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536                 "Routing PUT for %s after %u hops to %s\n",
1537                 GNUNET_h2s (key),
1538                 (unsigned int) hop_count,
1539                 GNUNET_i2s (&target->id));
1540     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1541     pending->importance = 0;    /* FIXME */
1542     pending->timeout = expiration_time;
1543     ppm = (struct PeerPutMessage *) &pending[1];
1544     pending->msg = &ppm->header;
1545     ppm->header.size = htons (msize);
1546     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1547     ppm->options = htonl (options);
1548     ppm->type = htonl (type);
1549     ppm->hop_count = htonl (hop_count + 1);
1550     ppm->desired_replication_level = htonl (desired_replication_level);
1551     ppm->put_path_length = htonl (put_path_length);
1552     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1553     GNUNET_CRYPTO_hash (&target->id,
1554                         sizeof (struct GNUNET_PeerIdentity),
1555                         &thash);
1556     GNUNET_break (GNUNET_YES ==
1557                   GNUNET_CONTAINER_bloomfilter_test (bf,
1558                                                      &thash));
1559     GNUNET_assert (GNUNET_OK ==
1560                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1561                                                               ppm->bloomfilter,
1562                                                               DHT_BLOOM_SIZE));
1563     ppm->key = *key;
1564     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1565     memcpy (pp, put_path,
1566             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1567     memcpy (&pp[put_path_length],
1568             data,
1569             data_size);
1570     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1571                                       target->tail,
1572                                       pending);
1573     target->pending_count++;
1574     if (pending == target->head)
1575       process_peer_queue (target);
1576   }
1577   GNUNET_free (targets);
1578   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1579 }
1580
1581
1582 /**
1583  * Perform a GET operation.  Forwards the given request to other
1584  * peers.  Does not lookup the key locally.  May do nothing if this is
1585  * the only peer in the network (or if we are the closest peer in the
1586  * network).
1587  *
1588  * @param type type of the block
1589  * @param options routing options
1590  * @param desired_replication_level desired replication count
1591  * @param hop_count how many hops did this request traverse so far?
1592  * @param key key for the content
1593  * @param xquery extended query
1594  * @param xquery_size number of bytes in @a xquery
1595  * @param reply_bf bloomfilter to filter duplicates
1596  * @param reply_bf_mutator mutator for @a reply_bf
1597  * @param peer_bf filter for peers not to select (again)
1598  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1599  */
1600 int
1601 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1602                            enum GNUNET_DHT_RouteOption options,
1603                            uint32_t desired_replication_level,
1604                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1605                            const void *xquery, size_t xquery_size,
1606                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1607                            uint32_t reply_bf_mutator,
1608                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1609 {
1610   unsigned int target_count;
1611   unsigned int i;
1612   struct PeerInfo **targets;
1613   struct PeerInfo *target;
1614   struct P2PPendingMessage *pending;
1615   size_t msize;
1616   struct PeerGetMessage *pgm;
1617   char *xq;
1618   size_t reply_bf_size;
1619   struct GNUNET_HashCode thash;
1620   unsigned int skip_count;
1621
1622   GNUNET_assert (NULL != peer_bf);
1623   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1624                             1, GNUNET_NO);
1625   target_count =
1626       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1627                         &targets);
1628   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1629               "Adding myself (%s) to GET bloomfilter for %s\n",
1630               GNUNET_i2s (&my_identity),
1631               GNUNET_h2s (key));
1632   GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1633                                     &my_identity_hash);
1634   if (0 == target_count)
1635   {
1636     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637                 "Routing GET for %s terminates after %u hops at %s\n",
1638                 GNUNET_h2s (key), (unsigned int) hop_count,
1639                 GNUNET_i2s (&my_identity));
1640     return GNUNET_NO;
1641   }
1642   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1643   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1644   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1645   {
1646     GNUNET_break (0);
1647     GNUNET_free (targets);
1648     return GNUNET_NO;
1649   }
1650   GNUNET_STATISTICS_update (GDS_stats,
1651                             gettext_noop
1652                             ("# GET messages queued for transmission"),
1653                             target_count, GNUNET_NO);
1654   /* forward request */
1655   skip_count = 0;
1656   for (i = 0; i < target_count; i++)
1657   {
1658     target = targets[i];
1659     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1660     {
1661       /* skip */
1662       GNUNET_STATISTICS_update (GDS_stats,
1663                                 gettext_noop ("# P2P messages dropped due to full queue"),
1664                                 1, GNUNET_NO);
1665       skip_count++;
1666       continue;
1667     }
1668     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1669                 "Routing GET for %s after %u hops to %s\n",
1670                 GNUNET_h2s (key),
1671                 (unsigned int) hop_count,
1672                 GNUNET_i2s (&target->id));
1673     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1674     pending->importance = 0;    /* FIXME */
1675     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1676     pgm = (struct PeerGetMessage *) &pending[1];
1677     pending->msg = &pgm->header;
1678     pgm->header.size = htons (msize);
1679     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1680     pgm->options = htonl (options);
1681     pgm->type = htonl (type);
1682     pgm->hop_count = htonl (hop_count + 1);
1683     pgm->desired_replication_level = htonl (desired_replication_level);
1684     pgm->xquery_size = htonl (xquery_size);
1685     pgm->bf_mutator = reply_bf_mutator;
1686     GNUNET_CRYPTO_hash (&target->id,
1687                         sizeof (struct GNUNET_PeerIdentity),
1688                         &thash);
1689     GNUNET_break (GNUNET_YES ==
1690                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1691                                                      &thash));
1692     GNUNET_assert (GNUNET_OK ==
1693                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1694                                                               pgm->bloomfilter,
1695                                                               DHT_BLOOM_SIZE));
1696     pgm->key = *key;
1697     xq = (char *) &pgm[1];
1698     memcpy (xq, xquery, xquery_size);
1699     if (NULL != reply_bf)
1700       GNUNET_assert (GNUNET_OK ==
1701                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1702                                                                 &xq
1703                                                                 [xquery_size],
1704                                                                 reply_bf_size));
1705     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1706                                       target->tail,
1707                                       pending);
1708     target->pending_count++;
1709     if (pending == target->head)
1710       process_peer_queue (target);
1711   }
1712   GNUNET_free (targets);
1713   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1714 }
1715
1716
1717 /**
1718  * Handle a reply (route to origin).  Only forwards the reply back to
1719  * the given peer.  Does not do local caching or forwarding to local
1720  * clients.
1721  *
1722  * @param target neighbour that should receive the block (if still connected)
1723  * @param type type of the block
1724  * @param expiration_time when does the content expire
1725  * @param key key for the content
1726  * @param put_path_length number of entries in @a put_path
1727  * @param put_path peers the original PUT traversed (if tracked)
1728  * @param get_path_length number of entries in @a get_path
1729  * @param get_path peers this reply has traversed so far (if tracked)
1730  * @param data payload of the reply
1731  * @param data_size number of bytes in @a data
1732  */
1733 void
1734 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1735                              enum GNUNET_BLOCK_Type type,
1736                              struct GNUNET_TIME_Absolute expiration_time,
1737                              const struct GNUNET_HashCode *key,
1738                              unsigned int put_path_length,
1739                              const struct GNUNET_PeerIdentity *put_path,
1740                              unsigned int get_path_length,
1741                              const struct GNUNET_PeerIdentity *get_path,
1742                              const void *data,
1743                              size_t data_size)
1744 {
1745   struct PeerInfo *pi;
1746   struct P2PPendingMessage *pending;
1747   size_t msize;
1748   struct PeerResultMessage *prm;
1749   struct GNUNET_PeerIdentity *paths;
1750
1751   msize =
1752       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1753                                                        put_path_length) *
1754       sizeof (struct GNUNET_PeerIdentity);
1755   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1756       (get_path_length >
1757        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1758       (put_path_length >
1759        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1760       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1761   {
1762     GNUNET_break (0);
1763     return;
1764   }
1765   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1766                                           target);
1767   if (NULL == pi)
1768   {
1769     /* peer disconnected in the meantime, drop reply */
1770     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771                 "No matching peer for reply for key %s\n",
1772                 GNUNET_h2s (key));
1773     return;
1774   }
1775   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
1776   {
1777     /* skip */
1778     GNUNET_STATISTICS_update (GDS_stats,
1779                               gettext_noop ("# P2P messages dropped due to full queue"),
1780                               1,
1781                               GNUNET_NO);
1782     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783                 "Peer queue full, ignoring reply for key %s\n",
1784                 GNUNET_h2s (key));
1785     return;
1786   }
1787
1788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789               "Forwarding reply for key %s to peer %s\n",
1790               GNUNET_h2s (key),
1791               GNUNET_i2s (target));
1792   GNUNET_STATISTICS_update (GDS_stats,
1793                             gettext_noop
1794                             ("# RESULT messages queued for transmission"), 1,
1795                             GNUNET_NO);
1796   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1797   pending->importance = 0;      /* FIXME */
1798   pending->timeout = expiration_time;
1799   prm = (struct PeerResultMessage *) &pending[1];
1800   pending->msg = &prm->header;
1801   prm->header.size = htons (msize);
1802   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1803   prm->type = htonl (type);
1804   prm->put_path_length = htonl (put_path_length);
1805   prm->get_path_length = htonl (get_path_length);
1806   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1807   prm->key = *key;
1808   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1809   memcpy (paths,
1810           put_path,
1811           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1812   memcpy (&paths[put_path_length],
1813           get_path,
1814           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1815   memcpy (&paths[put_path_length + get_path_length],
1816           data,
1817           data_size);
1818   GNUNET_CONTAINER_DLL_insert (pi->head,
1819                                pi->tail,
1820                                pending);
1821   pi->pending_count++;
1822   process_peer_queue (pi);
1823 }
1824
1825
1826 /**
1827  * To be called on core init/fail.
1828  *
1829  * @param cls service closure
1830  * @param identity the public identity of this peer
1831  */
1832 static void
1833 core_init (void *cls,
1834            const struct GNUNET_PeerIdentity *identity)
1835 {
1836   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1837               "CORE called, I am %s\n",
1838               GNUNET_i2s (identity));
1839   my_identity = *identity;
1840   GNUNET_CRYPTO_hash (identity,
1841                       sizeof (struct GNUNET_PeerIdentity),
1842                       &my_identity_hash);
1843   GDS_CLIENTS_init ();
1844 }
1845
1846
1847 /**
1848  * Core handler for p2p put requests.
1849  *
1850  * @param cls closure
1851  * @param peer sender of the request
1852  * @param message message
1853  * @param peer peer identity this notification is about
1854  * @return #GNUNET_OK to keep the connection open,
1855  *         #GNUNET_SYSERR to close it (signal serious error)
1856  */
1857 static int
1858 handle_dht_p2p_put (void *cls,
1859                     const struct GNUNET_PeerIdentity *peer,
1860                     const struct GNUNET_MessageHeader *message)
1861 {
1862   const struct PeerPutMessage *put;
1863   const struct GNUNET_PeerIdentity *put_path;
1864   const void *payload;
1865   uint32_t putlen;
1866   uint16_t msize;
1867   size_t payload_size;
1868   enum GNUNET_DHT_RouteOption options;
1869   struct GNUNET_CONTAINER_BloomFilter *bf;
1870   struct GNUNET_HashCode test_key;
1871   struct GNUNET_HashCode phash;
1872   int forwarded;
1873
1874   msize = ntohs (message->size);
1875   if (msize < sizeof (struct PeerPutMessage))
1876   {
1877     GNUNET_break_op (0);
1878     return GNUNET_YES;
1879   }
1880   put = (const struct PeerPutMessage *) message;
1881   putlen = ntohl (put->put_path_length);
1882   if ((msize <
1883        sizeof (struct PeerPutMessage) +
1884        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1885       (putlen >
1886        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1887   {
1888     GNUNET_break_op (0);
1889     return GNUNET_YES;
1890   }
1891   GNUNET_STATISTICS_update (GDS_stats,
1892                             gettext_noop ("# P2P PUT requests received"),
1893                             1,
1894                             GNUNET_NO);
1895   GNUNET_STATISTICS_update (GDS_stats,
1896                             gettext_noop ("# P2P PUT bytes received"),
1897                             msize,
1898                             GNUNET_NO);
1899   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1900   payload = &put_path[putlen];
1901   options = ntohl (put->options);
1902   payload_size =
1903       msize - (sizeof (struct PeerPutMessage) +
1904                putlen * sizeof (struct GNUNET_PeerIdentity));
1905
1906   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1907               GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1908   GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash);
1909   if (GNUNET_YES == log_route_details_stderr)
1910   {
1911     char *tmp;
1912
1913     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1914     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1915                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1916                  GNUNET_h2s (&put->key),
1917                  GNUNET_i2s (peer),
1918                  tmp,
1919                  ntohl(put->hop_count),
1920                  GNUNET_CRYPTO_hash_matching_bits (&phash,
1921                                                    &put->key),
1922                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1923                                                    &put->key)
1924                 );
1925     GNUNET_free (tmp);
1926   }
1927   switch (GNUNET_BLOCK_get_key
1928           (GDS_block_context, ntohl (put->type), payload, payload_size,
1929            &test_key))
1930   {
1931   case GNUNET_YES:
1932     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
1933     {
1934       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1935
1936       GNUNET_break_op (0);
1937       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1938                   "PUT with key `%s' for block with key %s\n",
1939                   put_s, GNUNET_h2s_full (&test_key));
1940       GNUNET_free (put_s);
1941       return GNUNET_YES;
1942     }
1943     break;
1944   case GNUNET_NO:
1945     GNUNET_break_op (0);
1946     return GNUNET_YES;
1947   case GNUNET_SYSERR:
1948     /* cannot verify, good luck */
1949     break;
1950   }
1951   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1952   {
1953     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1954                                    ntohl (put->type),
1955                                    GNUNET_BLOCK_EO_NONE,
1956                                    NULL,    /* query */
1957                                    NULL, 0, /* bloom filer */
1958                                    NULL, 0, /* xquery */
1959                                    payload, payload_size))
1960     {
1961     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1962     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1963       break;
1964
1965     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1966     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1967     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1968     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1969     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1970     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1971     default:
1972       GNUNET_break_op (0);
1973       return GNUNET_OK;
1974     }
1975   }
1976
1977   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1978                                           DHT_BLOOM_SIZE,
1979                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1980   GNUNET_break_op (GNUNET_YES ==
1981                    GNUNET_CONTAINER_bloomfilter_test (bf, &phash));
1982   {
1983     struct GNUNET_PeerIdentity pp[putlen + 1];
1984
1985     /* extend 'put path' by sender */
1986     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1987     {
1988       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1989       pp[putlen] = *peer;
1990       putlen++;
1991     }
1992     else
1993       putlen = 0;
1994
1995     /* give to local clients */
1996     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1997                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1998                               payload_size, payload);
1999     /* store locally */
2000     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2001         (am_closest_peer (&put->key, bf)))
2002       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
2003                                 (put->expiration_time), &put->key, putlen, pp,
2004                                 ntohl (put->type), payload_size, payload);
2005     /* route to other peers */
2006     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
2007                                            ntohl (put->desired_replication_level),
2008                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
2009                                            ntohl (put->hop_count), bf,
2010                                            &put->key,
2011                                            putlen,
2012                                            pp,
2013                                            payload,
2014                                            payload_size);
2015     /* notify monitoring clients */
2016     GDS_CLIENTS_process_put (options
2017                              | ( (GNUNET_OK == forwarded)
2018                                  ? GNUNET_DHT_RO_LAST_HOP
2019                                  : 0 ),
2020                              ntohl (put->type),
2021                              ntohl (put->hop_count),
2022                              ntohl (put->desired_replication_level),
2023                              putlen, pp,
2024                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
2025                              &put->key,
2026                              payload,
2027                              payload_size);
2028   }
2029   GNUNET_CONTAINER_bloomfilter_free (bf);
2030   return GNUNET_YES;
2031 }
2032
2033
2034 /**
2035  * We have received a FIND PEER request.  Send matching
2036  * HELLOs back.
2037  *
2038  * @param sender sender of the FIND PEER request
2039  * @param key peers close to this key are desired
2040  * @param bf peers matching this bf are excluded
2041  * @param bf_mutator mutator for bf
2042  */
2043 static void
2044 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
2045                   const struct GNUNET_HashCode * key,
2046                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
2047 {
2048   int bucket_idx;
2049   struct PeerBucket *bucket;
2050   struct PeerInfo *peer;
2051   unsigned int choice;
2052   struct GNUNET_HashCode phash;
2053   struct GNUNET_HashCode mhash;
2054   const struct GNUNET_HELLO_Message *hello;
2055
2056   /* first, check about our own HELLO */
2057   if (NULL != GDS_my_hello)
2058   {
2059     GNUNET_BLOCK_mingle_hash (&my_identity_hash, bf_mutator, &mhash);
2060     if ((NULL == bf) ||
2061         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
2062     {
2063       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
2064                                    GNUNET_TIME_relative_to_absolute
2065                                    (hello_expiration),
2066                                    key, 0, NULL, 0, NULL, GDS_my_hello,
2067                                    GNUNET_HELLO_size ((const struct
2068                                                        GNUNET_HELLO_Message *)
2069                                                       GDS_my_hello));
2070     }
2071     else
2072     {
2073       GNUNET_STATISTICS_update (GDS_stats,
2074                                 gettext_noop
2075                                 ("# FIND PEER requests ignored due to Bloomfilter"),
2076                                 1, GNUNET_NO);
2077     }
2078   }
2079   else
2080   {
2081     GNUNET_STATISTICS_update (GDS_stats,
2082                               gettext_noop
2083                               ("# FIND PEER requests ignored due to lack of HELLO"),
2084                               1, GNUNET_NO);
2085   }
2086
2087   /* then, also consider sending a random HELLO from the closest bucket */
2088   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
2089     bucket_idx = closest_bucket;
2090   else
2091     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
2092   if (bucket_idx == GNUNET_SYSERR)
2093     return;
2094   bucket = &k_buckets[bucket_idx];
2095   if (bucket->peers_size == 0)
2096     return;
2097   choice =
2098       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
2099   peer = bucket->head;
2100   while (choice > 0)
2101   {
2102     GNUNET_assert (NULL != peer);
2103     peer = peer->next;
2104     choice--;
2105   }
2106   choice = bucket->peers_size;
2107   do
2108   {
2109     peer = peer->next;
2110     if (choice-- == 0)
2111       return;                   /* no non-masked peer available */
2112     if (peer == NULL)
2113       peer = bucket->head;
2114     GNUNET_CRYPTO_hash (&peer->id,
2115                         sizeof (struct GNUNET_PeerIdentity),
2116                         &phash);
2117     GNUNET_BLOCK_mingle_hash (&phash,
2118                               bf_mutator,
2119                               &mhash);
2120     hello = GDS_HELLO_get (&peer->id);
2121   }
2122   while ((hello == NULL) ||
2123          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
2124   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
2125                                GNUNET_TIME_relative_to_absolute
2126                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
2127                                0, NULL, 0, NULL, hello,
2128                                GNUNET_HELLO_size (hello));
2129 }
2130
2131
2132 /**
2133  * Core handler for p2p get requests.
2134  *
2135  * @param cls closure
2136  * @param peer sender of the request
2137  * @param message message
2138  * @return #GNUNET_OK to keep the connection open,
2139  *         #GNUNET_SYSERR to close it (signal serious error)
2140  */
2141 static int
2142 handle_dht_p2p_get (void *cls,
2143                     const struct GNUNET_PeerIdentity *peer,
2144                     const struct GNUNET_MessageHeader *message)
2145 {
2146   struct PeerGetMessage *get;
2147   uint32_t xquery_size;
2148   size_t reply_bf_size;
2149   uint16_t msize;
2150   enum GNUNET_BLOCK_Type type;
2151   enum GNUNET_DHT_RouteOption options;
2152   enum GNUNET_BLOCK_EvaluationResult eval;
2153   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
2154   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2155   const char *xquery;
2156   struct GNUNET_HashCode phash;
2157   int forwarded;
2158
2159   GNUNET_break (0 !=
2160                 memcmp (peer,
2161                         &my_identity,
2162                         sizeof (struct GNUNET_PeerIdentity)));
2163   /* parse and validate message */
2164   msize = ntohs (message->size);
2165   if (msize < sizeof (struct PeerGetMessage))
2166   {
2167     GNUNET_break_op (0);
2168     return GNUNET_YES;
2169   }
2170   get = (struct PeerGetMessage *) message;
2171   xquery_size = ntohl (get->xquery_size);
2172   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
2173   {
2174     GNUNET_break_op (0);
2175     return GNUNET_YES;
2176   }
2177   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2178   type = ntohl (get->type);
2179   options = ntohl (get->options);
2180   xquery = (const char *) &get[1];
2181   reply_bf = NULL;
2182   GNUNET_STATISTICS_update (GDS_stats,
2183                             gettext_noop ("# P2P GET requests received"), 1,
2184                             GNUNET_NO);
2185   GNUNET_STATISTICS_update (GDS_stats,
2186                             gettext_noop ("# P2P GET bytes received"), msize,
2187                             GNUNET_NO);
2188   GNUNET_CRYPTO_hash (peer,
2189                       sizeof (struct GNUNET_PeerIdentity),
2190                       &phash);
2191   if (GNUNET_YES == log_route_details_stderr)
2192   {
2193     char *tmp;
2194
2195     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2196     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2197                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2198                  GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp,
2199                  ntohl(get->hop_count),
2200                  GNUNET_CRYPTO_hash_matching_bits (&phash, &get->key),
2201                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key),
2202                  ntohl(get->xquery_size), xquery);
2203     GNUNET_free (tmp);
2204   }
2205
2206   if (reply_bf_size > 0)
2207     reply_bf =
2208         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
2209                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
2210   eval =
2211       GNUNET_BLOCK_evaluate (GDS_block_context,
2212                              type,
2213                              GNUNET_BLOCK_EO_NONE,
2214                              &get->key,
2215                              &reply_bf,
2216                              get->bf_mutator,
2217                              xquery,
2218                              xquery_size,
2219                              NULL,
2220                              0);
2221   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2222   {
2223     /* request invalid or block type not supported */
2224     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2225     if (NULL != reply_bf)
2226       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2227     return GNUNET_YES;
2228   }
2229   peer_bf =
2230       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
2231                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
2232   GNUNET_break_op (GNUNET_YES ==
2233                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2234                                                       &phash));
2235   /* remember request for routing replies */
2236   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
2237                    reply_bf, get->bf_mutator);
2238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2239               "GET for %s at %s after %u hops\n",
2240               GNUNET_h2s (&get->key),
2241               GNUNET_i2s (&my_identity),
2242               (unsigned int) ntohl (get->hop_count));
2243   /* local lookup (this may update the reply_bf) */
2244   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2245       (am_closest_peer (&get->key, peer_bf)))
2246   {
2247     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2248     {
2249       GNUNET_STATISTICS_update (GDS_stats,
2250                                 gettext_noop
2251                                 ("# P2P FIND PEER requests processed"), 1,
2252                                 GNUNET_NO);
2253       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
2254     }
2255     else
2256     {
2257       eval =
2258           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
2259                                     &reply_bf, get->bf_mutator);
2260     }
2261   }
2262   else
2263   {
2264     GNUNET_STATISTICS_update (GDS_stats,
2265                               gettext_noop ("# P2P GET requests ONLY routed"),
2266                               1, GNUNET_NO);
2267   }
2268
2269   /* P2P forwarding */
2270   forwarded = GNUNET_NO;
2271   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2272     forwarded = GDS_NEIGHBOURS_handle_get (type, options,
2273                                            ntohl (get->desired_replication_level),
2274                                            ntohl (get->hop_count),
2275                                            &get->key,
2276                                            xquery,
2277                                            xquery_size,
2278                                            reply_bf,
2279                                            get->bf_mutator, peer_bf);
2280   GDS_CLIENTS_process_get (options
2281                            | (GNUNET_OK == forwarded)
2282                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2283                            type,
2284                            ntohl (get->hop_count),
2285                            ntohl (get->desired_replication_level),
2286                            0, NULL,
2287                            &get->key);
2288
2289
2290   /* clean up */
2291   if (NULL != reply_bf)
2292     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2293   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2294   return GNUNET_YES;
2295 }
2296
2297
2298 /**
2299  * Core handler for p2p result messages.
2300  *
2301  * @param cls closure
2302  * @param message message
2303  * @param peer peer identity this notification is about
2304  * @return #GNUNET_YES (do not cut p2p connection)
2305  */
2306 static int
2307 handle_dht_p2p_result (void *cls,
2308                        const struct GNUNET_PeerIdentity *peer,
2309                        const struct GNUNET_MessageHeader *message)
2310 {
2311   const struct PeerResultMessage *prm;
2312   const struct GNUNET_PeerIdentity *put_path;
2313   const struct GNUNET_PeerIdentity *get_path;
2314   const void *data;
2315   uint32_t get_path_length;
2316   uint32_t put_path_length;
2317   uint16_t msize;
2318   size_t data_size;
2319   enum GNUNET_BLOCK_Type type;
2320
2321   /* parse and validate message */
2322   msize = ntohs (message->size);
2323   if (msize < sizeof (struct PeerResultMessage))
2324   {
2325     GNUNET_break_op (0);
2326     return GNUNET_YES;
2327   }
2328   prm = (struct PeerResultMessage *) message;
2329   put_path_length = ntohl (prm->put_path_length);
2330   get_path_length = ntohl (prm->get_path_length);
2331   if ((msize <
2332        sizeof (struct PeerResultMessage) + (get_path_length +
2333                                             put_path_length) *
2334        sizeof (struct GNUNET_PeerIdentity)) ||
2335       (get_path_length >
2336        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2337       (put_path_length >
2338        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2339   {
2340     GNUNET_break_op (0);
2341     return GNUNET_YES;
2342   }
2343   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2344   get_path = &put_path[put_path_length];
2345   type = ntohl (prm->type);
2346   data = (const void *) &get_path[get_path_length];
2347   data_size =
2348       msize - (sizeof (struct PeerResultMessage) +
2349                (get_path_length +
2350                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2351   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
2352                             1, GNUNET_NO);
2353   GNUNET_STATISTICS_update (GDS_stats,
2354                             gettext_noop ("# P2P RESULT bytes received"),
2355                             msize, GNUNET_NO);
2356   if (GNUNET_YES == log_route_details_stderr)
2357   {
2358     char *tmp;
2359
2360     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2361     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2362                  "R5N RESULT %s: %s->%s (%u)\n",
2363                  GNUNET_h2s (&prm->key),
2364                  GNUNET_i2s (peer),
2365                  tmp,
2366                  get_path_length + 1);
2367     GNUNET_free (tmp);
2368   }
2369   /* if we got a HELLO, consider it for our own routing table */
2370   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2371   {
2372     const struct GNUNET_MessageHeader *h;
2373     struct GNUNET_PeerIdentity pid;
2374
2375     /* Should be a HELLO, validate and consider using it! */
2376     if (data_size < sizeof (struct GNUNET_MessageHeader))
2377     {
2378       GNUNET_break_op (0);
2379       return GNUNET_YES;
2380     }
2381     h = data;
2382     if (data_size != ntohs (h->size))
2383     {
2384       GNUNET_break_op (0);
2385       return GNUNET_YES;
2386     }
2387     if (GNUNET_OK !=
2388         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2389                              &pid))
2390     {
2391       GNUNET_break_op (0);
2392       return GNUNET_YES;
2393     }
2394     if ( (GNUNET_YES != disable_try_connect) &&
2395          (0 != memcmp (&my_identity,
2396                        &pid,
2397                        sizeof (struct GNUNET_PeerIdentity))) )
2398       try_connect (&pid,
2399                    h);
2400   }
2401
2402   /* append 'peer' to 'get_path' */
2403   {
2404     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2405
2406     memcpy (xget_path,
2407             get_path,
2408             get_path_length * sizeof (struct GNUNET_PeerIdentity));
2409     xget_path[get_path_length] = *peer;
2410     get_path_length++;
2411
2412     /* forward to local clients */
2413     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2414                               &prm->key,
2415                               get_path_length,
2416                               xget_path,
2417                               put_path_length,
2418                               put_path,
2419                               type,
2420                               data_size,
2421                               data);
2422     GDS_CLIENTS_process_get_resp (type,
2423                                   xget_path,
2424                                   get_path_length,
2425                                   put_path, put_path_length,
2426                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2427                                   &prm->key,
2428                                   data,
2429                                   data_size);
2430     if (GNUNET_YES == cache_results)
2431     {
2432       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2433
2434       memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
2435       memcpy (&xput_path[put_path_length],
2436               xget_path,
2437               get_path_length * sizeof (struct GNUNET_PeerIdentity));
2438
2439       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2440                                 &prm->key,
2441                                 get_path_length + put_path_length,
2442                                 xput_path,
2443                                 type,
2444                                 data_size,
2445                                 data);
2446     }
2447     /* forward to other peers */
2448     GDS_ROUTING_process (type,
2449                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2450                          &prm->key,
2451                          put_path_length,
2452                          put_path,
2453                          get_path_length,
2454                          xget_path,
2455                          data,
2456                          data_size);
2457   }
2458
2459   return GNUNET_YES;
2460 }
2461
2462
2463 /**
2464  * Initialize neighbours subsystem.
2465  *
2466  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2467  */
2468 int
2469 GDS_NEIGHBOURS_init ()
2470 {
2471   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2472     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2473     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2474     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2475     {NULL, 0, 0}
2476   };
2477   unsigned long long temp_config_num;
2478
2479   disable_try_connect
2480     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT");
2481   if (GNUNET_OK ==
2482       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2483                                              &temp_config_num))
2484     bucket_size = (unsigned int) temp_config_num;
2485   cache_results
2486     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS");
2487
2488   log_route_details_stderr =
2489     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2490   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2491   core_api =
2492       GNUNET_CORE_connect (GDS_cfg, NULL,
2493                            &core_init,
2494                            &handle_core_connect,
2495                            &handle_core_disconnect,
2496                            NULL, GNUNET_NO,
2497                            NULL, GNUNET_NO,
2498                            core_handlers);
2499   if (core_api == NULL)
2500     return GNUNET_SYSERR;
2501   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2502                                                               GNUNET_NO);
2503   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2504                                                             GNUNET_NO);
2505   return GNUNET_OK;
2506 }
2507
2508
2509 /**
2510  * Shutdown neighbours subsystem.
2511  */
2512 void
2513 GDS_NEIGHBOURS_done ()
2514 {
2515   if (NULL == core_api)
2516     return;
2517   GNUNET_CORE_disconnect (core_api);
2518   core_api = NULL;
2519   GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2520   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2521   all_connected_peers = NULL;
2522   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2523                                          &free_connect_info,
2524                                          NULL);
2525   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2526   all_desired_peers = NULL;
2527   GNUNET_ATS_connectivity_done (ats_ch);
2528   ats_ch = NULL;
2529   GNUNET_assert (NULL == find_peer_task);
2530 }
2531
2532
2533 /**
2534  * Get the ID of the local node.
2535  *
2536  * @return identity of the local node
2537  */
2538 struct GNUNET_PeerIdentity *
2539 GDS_NEIGHBOURS_get_id ()
2540 {
2541   return &my_identity;
2542 }
2543
2544
2545 /* end of gnunet-service-dht_neighbours.c */