-fix (C) notices
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_ats_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_hello.h"
45 #include "gnunet-service-dht_neighbours.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include "dht.h"
49
50 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * Maximum allowed number of pending messages per peer.
74  */
75 #define MAXIMUM_PENDING_PER_PEER 64
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92 /**
93  * Hello address expiration
94  */
95 extern struct GNUNET_TIME_Relative hello_expiration;
96
97
98 GNUNET_NETWORK_STRUCT_BEGIN
99
100 /**
101  * P2P PUT message
102  */
103 struct PeerPutMessage
104 {
105   /**
106    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
107    */
108   struct GNUNET_MessageHeader header;
109
110   /**
111    * Processing options
112    */
113   uint32_t options GNUNET_PACKED;
114
115   /**
116    * Content type.
117    */
118   uint32_t type GNUNET_PACKED;
119
120   /**
121    * Hop count
122    */
123   uint32_t hop_count GNUNET_PACKED;
124
125   /**
126    * Replication level for this message
127    */
128   uint32_t desired_replication_level GNUNET_PACKED;
129
130   /**
131    * Length of the PUT path that follows (if tracked).
132    */
133   uint32_t put_path_length GNUNET_PACKED;
134
135   /**
136    * When does the content expire?
137    */
138   struct GNUNET_TIME_AbsoluteNBO expiration_time;
139
140   /**
141    * Bloomfilter (for peer identities) to stop circular routes
142    */
143   char bloomfilter[DHT_BLOOM_SIZE];
144
145   /**
146    * The key we are storing under.
147    */
148   struct GNUNET_HashCode key;
149
150   /* put path (if tracked) */
151
152   /* Payload */
153
154 };
155
156
157 /**
158  * P2P Result message
159  */
160 struct PeerResultMessage
161 {
162   /**
163    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Content type.
169    */
170   uint32_t type GNUNET_PACKED;
171
172   /**
173    * Length of the PUT path that follows (if tracked).
174    */
175   uint32_t put_path_length GNUNET_PACKED;
176
177   /**
178    * Length of the GET path that follows (if tracked).
179    */
180   uint32_t get_path_length GNUNET_PACKED;
181
182   /**
183    * When does the content expire?
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration_time;
186
187   /**
188    * The key of the corresponding GET request.
189    */
190   struct GNUNET_HashCode key;
191
192   /* put path (if tracked) */
193
194   /* get path (if tracked) */
195
196   /* Payload */
197
198 };
199
200
201 /**
202  * P2P GET message
203  */
204 struct PeerGetMessage
205 {
206   /**
207    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
208    */
209   struct GNUNET_MessageHeader header;
210
211   /**
212    * Processing options
213    */
214   uint32_t options GNUNET_PACKED;
215
216   /**
217    * Desired content type.
218    */
219   uint32_t type GNUNET_PACKED;
220
221   /**
222    * Hop count
223    */
224   uint32_t hop_count GNUNET_PACKED;
225
226   /**
227    * Desired replication level for this request.
228    */
229   uint32_t desired_replication_level GNUNET_PACKED;
230
231   /**
232    * Size of the extended query.
233    */
234   uint32_t xquery_size;
235
236   /**
237    * Bloomfilter mutator.
238    */
239   uint32_t bf_mutator;
240
241   /**
242    * Bloomfilter (for peer identities) to stop circular routes
243    */
244   char bloomfilter[DHT_BLOOM_SIZE];
245
246   /**
247    * The key we are looking for.
248    */
249   struct GNUNET_HashCode key;
250
251   /* xquery */
252
253   /* result bloomfilter */
254
255 };
256 GNUNET_NETWORK_STRUCT_END
257
258 /**
259  * Linked list of messages to send to a particular other peer.
260  */
261 struct P2PPendingMessage
262 {
263   /**
264    * Pointer to next item in the list
265    */
266   struct P2PPendingMessage *next;
267
268   /**
269    * Pointer to previous item in the list
270    */
271   struct P2PPendingMessage *prev;
272
273   /**
274    * Message importance level.  FIXME: used? useful?
275    */
276   unsigned int importance;
277
278   /**
279    * When does this message time out?
280    */
281   struct GNUNET_TIME_Absolute timeout;
282
283   /**
284    * Actual message to be sent, allocated at the end of the struct:
285    * // msg = (cast) &pm[1];
286    * // memcpy (&pm[1], data, len);
287    */
288   const struct GNUNET_MessageHeader *msg;
289
290 };
291
292
293 /**
294  * Entry for a peer in a bucket.
295  */
296 struct PeerInfo
297 {
298   /**
299    * Next peer entry (DLL)
300    */
301   struct PeerInfo *next;
302
303   /**
304    *  Prev peer entry (DLL)
305    */
306   struct PeerInfo *prev;
307
308   /**
309    * Count of outstanding messages for peer.
310    */
311   unsigned int pending_count;
312
313   /**
314    * Head of pending messages to be sent to this peer.
315    */
316   struct P2PPendingMessage *head;
317
318   /**
319    * Tail of pending messages to be sent to this peer.
320    */
321   struct P2PPendingMessage *tail;
322
323   /**
324    * Core handle for sending messages to this peer.
325    */
326   struct GNUNET_CORE_TransmitHandle *th;
327
328   /**
329    * What is the identity of the peer?
330    */
331   struct GNUNET_PeerIdentity id;
332
333 #if 0
334   /**
335    * What is the average latency for replies received?
336    */
337   struct GNUNET_TIME_Relative latency;
338
339   /**
340    * Transport level distance to peer.
341    */
342   unsigned int distance;
343 #endif
344
345 };
346
347
348 /**
349  * Peers are grouped into buckets.
350  */
351 struct PeerBucket
352 {
353   /**
354    * Head of DLL
355    */
356   struct PeerInfo *head;
357
358   /**
359    * Tail of DLL
360    */
361   struct PeerInfo *tail;
362
363   /**
364    * Number of peers in the bucket.
365    */
366   unsigned int peers_size;
367 };
368
369
370 /**
371  * Information about a peer that we would like to connect to.
372  */
373 struct ConnectInfo
374 {
375
376   /**
377    * Handle to active HELLO offer operation, or NULL.
378    */
379   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
380
381   /**
382    * Handle to active connectivity suggestion operation, or NULL.
383    */
384   struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
385
386   /**
387    * How much would we like to connect to this peer?
388    */
389   uint32_t strength;
390 };
391
392
393 /**
394  * Do we cache all results that we are routing in the local datacache?
395  */
396 static int cache_results;
397
398 /**
399  * Should routing details be logged to stderr (for debugging)?
400  */
401 static int log_route_details_stderr;
402
403 /**
404  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
405  */
406 static unsigned int closest_bucket;
407
408 /**
409  * How many peers have we added since we sent out our last
410  * find peer request?
411  */
412 static unsigned int newly_found_peers;
413
414 /**
415  * Option for testing that disables the 'connect' function of the DHT.
416  */
417 static int disable_try_connect;
418
419 /**
420  * The buckets.  Array of size #MAX_BUCKETS.  Offset 0 means 0 bits matching.
421  */
422 static struct PeerBucket k_buckets[MAX_BUCKETS];
423
424 /**
425  * Hash map of all CORE-connected peers, for easy removal from
426  * #k_buckets on disconnect.  Values are of type `struct PeerInfo`.
427  */
428 static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
429
430 /**
431  * Hash map of all peers we would like to be connected to.
432  * Values are of type `struct ConnectInfo`.
433  */
434 static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
435
436 /**
437  * Maximum size for each bucket.
438  */
439 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
440
441 /**
442  * Task that sends FIND PEER requests.
443  */
444 static struct GNUNET_SCHEDULER_Task *find_peer_task;
445
446 /**
447  * Identity of this peer.
448  */
449 static struct GNUNET_PeerIdentity my_identity;
450
451 /**
452  * Hash of the identity of this peer.
453  */
454 static struct GNUNET_HashCode my_identity_hash;
455
456 /**
457  * Handle to CORE.
458  */
459 static struct GNUNET_CORE_Handle *core_api;
460
461 /**
462  * Handle to ATS connectivity.
463  */
464 static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
465
466
467 /**
468  * Find the optimal bucket for this key.
469  *
470  * @param hc the hashcode to compare our identity to
471  * @return the proper bucket index, or GNUNET_SYSERR
472  *         on error (same hashcode)
473  */
474 static int
475 find_bucket (const struct GNUNET_HashCode *hc)
476 {
477   unsigned int bits;
478
479   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
480   if (bits == MAX_BUCKETS)
481   {
482     /* How can all bits match? Got my own ID? */
483     GNUNET_break (0);
484     return GNUNET_SYSERR;
485   }
486   return MAX_BUCKETS - bits - 1;
487 }
488
489
490 /**
491  * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
492  * Clean up the "oh" field in the @a cls
493  *
494  * @param cls a `struct ConnectInfo`
495  * @param tc unused
496  */
497 static void
498 offer_hello_done (void *cls,
499                   const struct GNUNET_SCHEDULER_TaskContext *tc)
500 {
501   struct ConnectInfo *ci = cls;
502
503   ci->oh = NULL;
504 }
505
506
507 /**
508  * Function called for all entries in #all_desired_peers to clean up.
509  *
510  * @param cls NULL
511  * @param peer peer the entry is for
512  * @param value the value to remove
513  * @return #GNUNET_YES
514  */
515 static int
516 free_connect_info (void *cls,
517                    const struct GNUNET_PeerIdentity *peer,
518                    void *value)
519 {
520   struct ConnectInfo *ci = value;
521
522   GNUNET_assert (GNUNET_YES ==
523                  GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
524                                                        peer,
525                                                        ci));
526   if (NULL != ci->sh)
527   {
528     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
529     ci->sh = NULL;
530   }
531   if (NULL != ci->oh)
532   {
533     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
534     ci->oh = NULL;
535   }
536   GNUNET_free (ci);
537   return GNUNET_YES;
538 }
539
540
541 /**
542  * Consider if we want to connect to a given peer, and if so
543  * let ATS know.  If applicable, the HELLO is offered to the
544  * TRANSPORT service.
545  *
546  * @param pid peer to consider connectivity requirements for
547  * @param h a HELLO message, or NULL
548  */
549 static void
550 try_connect (const struct GNUNET_PeerIdentity *pid,
551              const struct GNUNET_MessageHeader *h)
552 {
553   int bucket;
554   struct GNUNET_HashCode pid_hash;
555   struct ConnectInfo *ci;
556   uint32_t strength;
557
558   GNUNET_CRYPTO_hash (pid,
559                       sizeof (struct GNUNET_PeerIdentity),
560                       &pid_hash);
561   bucket = find_bucket (&pid_hash);
562   if (bucket < 0)
563     return; /* self? */
564   ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
565                                           pid);
566
567   if (k_buckets[bucket].peers_size < bucket_size)
568     strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
569   else
570     strength = bucket; /* minimum value of connectivity */
571   if (GNUNET_YES ==
572       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
573                                               pid))
574     strength *= 2; /* double for connected peers */
575   else if (k_buckets[bucket].peers_size > bucket_size)
576     strength = 0; /* bucket full, we really do not care about more */
577
578   if ( (0 == strength) &&
579        (NULL != ci) )
580   {
581     /* release request */
582     GNUNET_assert (GNUNET_YES ==
583                    free_connect_info (NULL,
584                                       pid,
585                                       ci));
586     return;
587   }
588   if (NULL == ci)
589   {
590     ci = GNUNET_new (struct ConnectInfo);
591     GNUNET_assert (GNUNET_OK ==
592                    GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
593                                                       pid,
594                                                       ci,
595                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
596   }
597   if ( (NULL != GDS_transport_handle) &&
598        (NULL != ci->oh) &&
599        (NULL != h) )
600     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
601   if ( (NULL != GDS_transport_handle) &&
602        (NULL != h) )
603     ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
604                                            h,
605                                            &offer_hello_done,
606                                            ci);
607   if ( (NULL != ci->sh) &&
608        (ci->strength != strength) )
609     GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
610   if (ci->strength != strength)
611     ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
612                                               pid,
613                                               strength);
614   ci->strength = strength;
615 }
616
617
618 /**
619  * Function called for each peer in #all_desired_peers during
620  * #update_connect_preferences() if we have reason to adjust
621  * the strength of our desire to keep connections to certain
622  * peers.  Calls #try_connect() to update the calculations for
623  * the given @a pid.
624  *
625  * @param cls NULL
626  * @param pid peer to update
627  * @param value unused
628  * @return #GNUNET_YES (continue to iterate)
629  */
630 static int
631 update_desire_strength (void *cls,
632                         const struct GNUNET_PeerIdentity *pid,
633                         void *value)
634 {
635   try_connect (pid, NULL);
636   return GNUNET_YES;
637 }
638
639
640 /**
641  * Update our preferences for connectivity as given to ATS.
642  *
643  * @param cls the `struct PeerInfo` of the peer
644  * @param tc scheduler context.
645  */
646 static void
647 update_connect_preferences ()
648 {
649   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
650                                          &update_desire_strength,
651                                          NULL);
652 }
653
654
655 /**
656  * Closure for #add_known_to_bloom().
657  */
658 struct BloomConstructorContext
659 {
660   /**
661    * Bloom filter under construction.
662    */
663   struct GNUNET_CONTAINER_BloomFilter *bloom;
664
665   /**
666    * Mutator to use.
667    */
668   uint32_t bf_mutator;
669 };
670
671
672 /**
673  * Add each of the peers we already know to the bloom filter of
674  * the request so that we don't get duplicate HELLOs.
675  *
676  * @param cls the 'struct BloomConstructorContext'.
677  * @param key peer identity to add to the bloom filter
678  * @param value value the peer information (unused)
679  * @return #GNUNET_YES (we should continue to iterate)
680  */
681 static int
682 add_known_to_bloom (void *cls,
683                     const struct GNUNET_PeerIdentity *key,
684                     void *value)
685 {
686   struct BloomConstructorContext *ctx = cls;
687   struct GNUNET_HashCode key_hash;
688   struct GNUNET_HashCode mh;
689
690   GNUNET_CRYPTO_hash (key,
691                       sizeof (struct GNUNET_PeerIdentity),
692                       &key_hash);
693   GNUNET_BLOCK_mingle_hash (&key_hash,
694                             ctx->bf_mutator,
695                             &mh);
696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
698               GNUNET_i2s (key), ctx->bf_mutator);
699   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
700   return GNUNET_YES;
701 }
702
703
704 /**
705  * Task to send a find peer message for our own peer identifier
706  * so that we can find the closest peers in the network to ourselves
707  * and attempt to connect to them.
708  *
709  * @param cls closure for this task
710  * @param tc the context under which the task is running
711  */
712 static void
713 send_find_peer_message (void *cls,
714                         const struct GNUNET_SCHEDULER_TaskContext *tc)
715 {
716   struct GNUNET_TIME_Relative next_send_time;
717   struct BloomConstructorContext bcc;
718   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
719
720   find_peer_task = NULL;
721   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
722     return;
723   if (newly_found_peers > bucket_size)
724   {
725     /* If we are finding many peers already, no need to send out our request right now! */
726     find_peer_task =
727         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
728                                       &send_find_peer_message, NULL);
729     newly_found_peers = 0;
730     return;
731   }
732   bcc.bf_mutator =
733       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
734   bcc.bloom =
735       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
736                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
737   GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
738                                          &add_known_to_bloom,
739                                          &bcc);
740   GNUNET_STATISTICS_update (GDS_stats,
741                             gettext_noop ("# FIND PEER messages initiated"), 1,
742                             GNUNET_NO);
743   peer_bf =
744       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
745                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
746   // FIXME: pass priority!?
747   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
748                              GNUNET_DHT_RO_FIND_PEER,
749                              FIND_PEER_REPLICATION_LEVEL, 0,
750                              &my_identity_hash, NULL, 0, bcc.bloom,
751                              bcc.bf_mutator, peer_bf);
752   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
753   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
754   /* schedule next round */
755   next_send_time.rel_value_us =
756       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
757       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
758                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
759                                 (newly_found_peers + 1));
760   newly_found_peers = 0;
761   find_peer_task =
762       GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
763                                     NULL);
764 }
765
766
767 /**
768  * Method called whenever a peer connects.
769  *
770  * @param cls closure
771  * @param peer peer identity this notification is about
772  */
773 static void
774 handle_core_connect (void *cls,
775                      const struct GNUNET_PeerIdentity *peer)
776 {
777   struct PeerInfo *ret;
778   struct GNUNET_HashCode phash;
779   int peer_bucket;
780
781   /* Check for connect to self message */
782   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
783     return;
784   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785               "Connected to %s\n",
786               GNUNET_i2s (peer));
787   if (GNUNET_YES ==
788       GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
789                                               peer))
790   {
791     GNUNET_break (0);
792     return;
793   }
794   GNUNET_STATISTICS_update (GDS_stats,
795                             gettext_noop ("# peers connected"),
796                             1,
797                             GNUNET_NO);
798   GNUNET_CRYPTO_hash (peer,
799                       sizeof (struct GNUNET_PeerIdentity),
800                       &phash);
801   peer_bucket = find_bucket (&phash);
802   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
803   ret = GNUNET_new (struct PeerInfo);
804 #if 0
805   ret->latency = latency;
806   ret->distance = distance;
807 #endif
808   ret->id = *peer;
809   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
810                                     k_buckets[peer_bucket].tail,
811                                     ret);
812   k_buckets[peer_bucket].peers_size++;
813   closest_bucket = GNUNET_MAX (closest_bucket,
814                                peer_bucket);
815   GNUNET_assert (GNUNET_OK ==
816                  GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
817                                                     peer,
818                                                     ret,
819                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
820   if ( (peer_bucket > 0) &&
821        (k_buckets[peer_bucket].peers_size <= bucket_size))
822   {
823     update_connect_preferences ();
824     newly_found_peers++;
825   }
826   if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
827       (GNUNET_YES != disable_try_connect))
828   {
829     /* got a first connection, good time to start with FIND PEER requests... */
830     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
831                                                NULL);
832   }
833 }
834
835
836 /**
837  * Method called whenever a peer disconnects.
838  *
839  * @param cls closure
840  * @param peer peer identity this notification is about
841  */
842 static void
843 handle_core_disconnect (void *cls,
844                         const struct GNUNET_PeerIdentity *peer)
845 {
846   struct PeerInfo *to_remove;
847   int current_bucket;
848   struct P2PPendingMessage *pos;
849   unsigned int discarded;
850   struct GNUNET_HashCode phash;
851
852   /* Check for disconnect from self message */
853   if (0 == memcmp (&my_identity,
854                    peer,
855                    sizeof (struct GNUNET_PeerIdentity)))
856     return;
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "Disconnected %s\n",
859               GNUNET_i2s (peer));
860   to_remove =
861       GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
862                                          peer);
863   if (NULL == to_remove)
864   {
865     GNUNET_break (0);
866     return;
867   }
868   GNUNET_STATISTICS_update (GDS_stats,
869                             gettext_noop ("# peers connected"),
870                             -1,
871                             GNUNET_NO);
872   GNUNET_assert (GNUNET_YES ==
873                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
874                                                        peer,
875                                                        to_remove));
876   GNUNET_CRYPTO_hash (peer,
877                       sizeof (struct GNUNET_PeerIdentity),
878                       &phash);
879   current_bucket = find_bucket (&phash);
880   GNUNET_assert (current_bucket >= 0);
881   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
882                                k_buckets[current_bucket].tail,
883                                to_remove);
884   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
885   k_buckets[current_bucket].peers_size--;
886   while ( (closest_bucket > 0) &&
887           (0 == k_buckets[closest_bucket].peers_size) )
888     closest_bucket--;
889   if (NULL != to_remove->th)
890   {
891     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
892     to_remove->th = NULL;
893   }
894   discarded = 0;
895   while (NULL != (pos = to_remove->head))
896   {
897     GNUNET_CONTAINER_DLL_remove (to_remove->head,
898                                  to_remove->tail,
899                                  pos);
900     discarded++;
901     GNUNET_free (pos);
902   }
903   if (k_buckets[current_bucket].peers_size < bucket_size)
904     update_connect_preferences ();
905   GNUNET_STATISTICS_update (GDS_stats,
906                             gettext_noop ("# Queued messages discarded (peer disconnected)"),
907                             discarded,
908                             GNUNET_NO);
909   GNUNET_free (to_remove);
910 }
911
912
913 /**
914  * Called when core is ready to send a message we asked for
915  * out to the destination.
916  *
917  * @param cls the 'struct PeerInfo' of the target peer
918  * @param size number of bytes available in @a buf
919  * @param buf where the callee should write the message
920  * @return number of bytes written to @a buf
921  */
922 static size_t
923 core_transmit_notify (void *cls,
924                       size_t size,
925                       void *buf)
926 {
927   struct PeerInfo *peer = cls;
928   char *cbuf = buf;
929   struct P2PPendingMessage *pending;
930   size_t off;
931   size_t msize;
932
933   peer->th = NULL;
934   while ((NULL != (pending = peer->head)) &&
935          (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
936   {
937     GNUNET_STATISTICS_update (GDS_stats,
938                               gettext_noop
939                               ("# Messages dropped (CORE timeout)"),
940                               1,
941                               GNUNET_NO);
942     peer->pending_count--;
943     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
944     GNUNET_free (pending);
945   }
946   if (NULL == pending)
947   {
948     /* no messages pending */
949     return 0;
950   }
951   if (NULL == buf)
952   {
953     peer->th =
954         GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
955                                            GNUNET_CORE_PRIO_BEST_EFFORT,
956                                            GNUNET_TIME_absolute_get_remaining
957                                            (pending->timeout), &peer->id,
958                                            ntohs (pending->msg->size),
959                                            &core_transmit_notify, peer);
960     GNUNET_break (NULL != peer->th);
961     return 0;
962   }
963   off = 0;
964   while ((NULL != (pending = peer->head)) &&
965          (size - off >= (msize = ntohs (pending->msg->size))))
966   {
967     GNUNET_STATISTICS_update (GDS_stats,
968                               gettext_noop
969                               ("# Bytes transmitted to other peers"), msize,
970                               GNUNET_NO);
971     memcpy (&cbuf[off], pending->msg, msize);
972     off += msize;
973     peer->pending_count--;
974     GNUNET_CONTAINER_DLL_remove (peer->head,
975                                  peer->tail,
976                                  pending);
977     GNUNET_free (pending);
978   }
979   if (NULL != (pending = peer->head))
980   {
981     /* technically redundant, but easier to read and
982        avoids bogus gcc warning... */
983     msize = ntohs (pending->msg->size);
984     peer->th =
985       GNUNET_CORE_notify_transmit_ready (core_api,
986                                          GNUNET_NO,
987                                          GNUNET_CORE_PRIO_BEST_EFFORT,
988                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
989                                          &peer->id,
990                                          msize,
991                                          &core_transmit_notify,
992                                          peer);
993     GNUNET_break (NULL != peer->th);
994   }
995   return off;
996 }
997
998
999 /**
1000  * Transmit all messages in the peer's message queue.
1001  *
1002  * @param peer message queue to process
1003  */
1004 static void
1005 process_peer_queue (struct PeerInfo *peer)
1006 {
1007   struct P2PPendingMessage *pending;
1008
1009   if (NULL == (pending = peer->head))
1010     return;
1011   if (NULL != peer->th)
1012     return;
1013   GNUNET_STATISTICS_update (GDS_stats,
1014                             gettext_noop
1015                             ("# Bytes of bandwidth requested from core"),
1016                             ntohs (pending->msg->size), GNUNET_NO);
1017   peer->th =
1018       GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
1019                                          GNUNET_CORE_PRIO_BEST_EFFORT,
1020                                          GNUNET_TIME_absolute_get_remaining
1021                                          (pending->timeout),
1022                                          &peer->id,
1023                                          ntohs (pending->msg->size),
1024                                          &core_transmit_notify,
1025                                          peer);
1026   GNUNET_break (NULL != peer->th);
1027 }
1028
1029
1030 /**
1031  * To how many peers should we (on average) forward the request to
1032  * obtain the desired target_replication count (on average).
1033  *
1034  * @param hop_count number of hops the message has traversed
1035  * @param target_replication the number of total paths desired
1036  * @return Some number of peers to forward the message to
1037  */
1038 static unsigned int
1039 get_forward_count (uint32_t hop_count, uint32_t target_replication)
1040 {
1041   uint32_t random_value;
1042   uint32_t forward_count;
1043   float target_value;
1044
1045   if (hop_count > GDS_NSE_get () * 4.0)
1046   {
1047     /* forcefully terminate */
1048     GNUNET_STATISTICS_update (GDS_stats,
1049                               gettext_noop ("# requests TTL-dropped"),
1050                               1, GNUNET_NO);
1051     return 0;
1052   }
1053   if (hop_count > GDS_NSE_get () * 2.0)
1054   {
1055     /* Once we have reached our ideal number of hops, only forward to 1 peer */
1056     return 1;
1057   }
1058   /* bound by system-wide maximum */
1059   target_replication =
1060       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
1061   target_value =
1062       1 + (target_replication - 1.0) / (GDS_NSE_get () +
1063                                         ((float) (target_replication - 1.0) *
1064                                          hop_count));
1065   /* Set forward count to floor of target_value */
1066   forward_count = (uint32_t) target_value;
1067   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1068   target_value = target_value - forward_count;
1069   random_value =
1070       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1071   if (random_value < (target_value * UINT32_MAX))
1072     forward_count++;
1073   return forward_count;
1074 }
1075
1076
1077 /**
1078  * Compute the distance between have and target as a 32-bit value.
1079  * Differences in the lower bits must count stronger than differences
1080  * in the higher bits.
1081  *
1082  * @param target
1083  * @param have
1084  * @return 0 if have==target, otherwise a number
1085  *           that is larger as the distance between
1086  *           the two hash codes increases
1087  */
1088 static unsigned int
1089 get_distance (const struct GNUNET_HashCode *target,
1090               const struct GNUNET_HashCode *have)
1091 {
1092   unsigned int bucket;
1093   unsigned int msb;
1094   unsigned int lsb;
1095   unsigned int i;
1096
1097   /* We have to represent the distance between two 2^9 (=512)-bit
1098    * numbers as a 2^5 (=32)-bit number with "0" being used for the
1099    * two numbers being identical; furthermore, we need to
1100    * guarantee that a difference in the number of matching
1101    * bits is always represented in the result.
1102    *
1103    * We use 2^32/2^9 numerical values to distinguish between
1104    * hash codes that have the same LSB bit distance and
1105    * use the highest 2^9 bits of the result to signify the
1106    * number of (mis)matching LSB bits; if we have 0 matching
1107    * and hence 512 mismatching LSB bits we return -1 (since
1108    * 512 itself cannot be represented with 9 bits) */
1109
1110   /* first, calculate the most significant 9 bits of our
1111    * result, aka the number of LSBs */
1112   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
1113   /* bucket is now a value between 0 and 512 */
1114   if (bucket == 512)
1115     return 0;                   /* perfect match */
1116   if (bucket == 0)
1117     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
1118                                  * below, we'd end up with max+1 (overflow)) */
1119
1120   /* calculate the most significant bits of the final result */
1121   msb = (512 - bucket) << (32 - 9);
1122   /* calculate the 32-9 least significant bits of the final result by
1123    * looking at the differences in the 32-9 bits following the
1124    * mismatching bit at 'bucket' */
1125   lsb = 0;
1126   for (i = bucket + 1;
1127        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
1128   {
1129     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
1130         GNUNET_CRYPTO_hash_get_bit (have, i))
1131       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
1132                                                  * last bit set will be 31 -- if
1133                                                  * i does not reach 512 first... */
1134   }
1135   return msb | lsb;
1136 }
1137
1138
1139 /**
1140  * Check whether my identity is closer than any known peers.  If a
1141  * non-null bloomfilter is given, check if this is the closest peer
1142  * that hasn't already been routed to.
1143  *
1144  * @param key hash code to check closeness to
1145  * @param bloom bloomfilter, exclude these entries from the decision
1146  * @return #GNUNET_YES if node location is closest,
1147  *         #GNUNET_NO otherwise.
1148  */
1149 static int
1150 am_closest_peer (const struct GNUNET_HashCode *key,
1151                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
1152 {
1153   int bits;
1154   int other_bits;
1155   int bucket_num;
1156   int count;
1157   struct PeerInfo *pos;
1158   struct GNUNET_HashCode phash;
1159
1160   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
1161     return GNUNET_YES;
1162   bucket_num = find_bucket (key);
1163   GNUNET_assert (bucket_num >= 0);
1164   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key);
1165   pos = k_buckets[bucket_num].head;
1166   count = 0;
1167   while ((NULL != pos) && (count < bucket_size))
1168   {
1169     GNUNET_CRYPTO_hash (&pos->id,
1170                         sizeof (struct GNUNET_PeerIdentity),
1171                         &phash);
1172     if ((NULL != bloom) &&
1173         (GNUNET_YES ==
1174          GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1175     {
1176       pos = pos->next;
1177       continue;                 /* Skip already checked entries */
1178     }
1179     other_bits = GNUNET_CRYPTO_hash_matching_bits (&phash, key);
1180     if (other_bits > bits)
1181       return GNUNET_NO;
1182     if (other_bits == bits)     /* We match the same number of bits */
1183       return GNUNET_YES;
1184     pos = pos->next;
1185   }
1186   /* No peers closer, we are the closest! */
1187   return GNUNET_YES;
1188 }
1189
1190
1191 /**
1192  * Select a peer from the routing table that would be a good routing
1193  * destination for sending a message for "key".  The resulting peer
1194  * must not be in the set of blocked peers.<p>
1195  *
1196  * Note that we should not ALWAYS select the closest peer to the
1197  * target, peers further away from the target should be chosen with
1198  * exponentially declining probability.
1199  *
1200  * FIXME: double-check that this is fine
1201  *
1202  *
1203  * @param key the key we are selecting a peer to route to
1204  * @param bloom a bloomfilter containing entries this request has seen already
1205  * @param hops how many hops has this message traversed thus far
1206  * @return Peer to route to, or NULL on error
1207  */
1208 static struct PeerInfo *
1209 select_peer (const struct GNUNET_HashCode *key,
1210              const struct GNUNET_CONTAINER_BloomFilter *bloom,
1211              uint32_t hops)
1212 {
1213   unsigned int bc;
1214   unsigned int count;
1215   unsigned int selected;
1216   struct PeerInfo *pos;
1217   unsigned int dist;
1218   unsigned int smallest_distance;
1219   struct PeerInfo *chosen;
1220   struct GNUNET_HashCode phash;
1221
1222   if (hops >= GDS_NSE_get ())
1223   {
1224     /* greedy selection (closest peer that is not in bloomfilter) */
1225     smallest_distance = UINT_MAX;
1226     chosen = NULL;
1227     for (bc = 0; bc <= closest_bucket; bc++)
1228     {
1229       pos = k_buckets[bc].head;
1230       count = 0;
1231       while ((pos != NULL) && (count < bucket_size))
1232       {
1233         GNUNET_CRYPTO_hash (&pos->id,
1234                             sizeof (struct GNUNET_PeerIdentity),
1235                             &phash);
1236         if ((bloom == NULL) ||
1237             (GNUNET_NO ==
1238              GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1239         {
1240           dist = get_distance (key, &phash);
1241           if (dist < smallest_distance)
1242           {
1243             chosen = pos;
1244             smallest_distance = dist;
1245           }
1246         }
1247         else
1248         {
1249           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1251                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1252           GNUNET_STATISTICS_update (GDS_stats,
1253                                     gettext_noop
1254                                     ("# Peers excluded from routing due to Bloomfilter"),
1255                                     1, GNUNET_NO);
1256           dist = get_distance (key, &phash);
1257           if (dist < smallest_distance)
1258           {
1259             chosen = NULL;
1260             smallest_distance = dist;
1261           }
1262         }
1263         count++;
1264         pos = pos->next;
1265       }
1266     }
1267     if (NULL == chosen)
1268       GNUNET_STATISTICS_update (GDS_stats,
1269                                 gettext_noop ("# Peer selection failed"), 1,
1270                                 GNUNET_NO);
1271     return chosen;
1272   }
1273
1274   /* select "random" peer */
1275   /* count number of peers that are available and not filtered */
1276   count = 0;
1277   for (bc = 0; bc <= closest_bucket; bc++)
1278   {
1279     pos = k_buckets[bc].head;
1280     while ((pos != NULL) && (count < bucket_size))
1281     {
1282       GNUNET_CRYPTO_hash (&pos->id,
1283                           sizeof (struct GNUNET_PeerIdentity),
1284                           &phash);
1285       if ((bloom != NULL) &&
1286           (GNUNET_YES ==
1287            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1288       {
1289         GNUNET_STATISTICS_update (GDS_stats,
1290                                   gettext_noop
1291                                   ("# Peers excluded from routing due to Bloomfilter"),
1292                                   1, GNUNET_NO);
1293         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1295                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1296         pos = pos->next;
1297         continue;               /* Ignore bloomfiltered peers */
1298       }
1299       count++;
1300       pos = pos->next;
1301     }
1302   }
1303   if (0 == count)               /* No peers to select from! */
1304   {
1305     GNUNET_STATISTICS_update (GDS_stats,
1306                               gettext_noop ("# Peer selection failed"), 1,
1307                               GNUNET_NO);
1308     return NULL;
1309   }
1310   /* Now actually choose a peer */
1311   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1312   count = 0;
1313   for (bc = 0; bc <= closest_bucket; bc++)
1314   {
1315     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1316     {
1317       GNUNET_CRYPTO_hash (&pos->id,
1318                           sizeof (struct GNUNET_PeerIdentity),
1319                           &phash);
1320       if ((bloom != NULL) &&
1321           (GNUNET_YES ==
1322            GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1323       {
1324         continue;               /* Ignore bloomfiltered peers */
1325       }
1326       if (0 == selected--)
1327         return pos;
1328     }
1329   }
1330   GNUNET_break (0);
1331   return NULL;
1332 }
1333
1334
1335 /**
1336  * Compute the set of peers that the given request should be
1337  * forwarded to.
1338  *
1339  * @param key routing key
1340  * @param bloom bloom filter excluding peers as targets, all selected
1341  *        peers will be added to the bloom filter
1342  * @param hop_count number of hops the request has traversed so far
1343  * @param target_replication desired number of replicas
1344  * @param targets where to store an array of target peers (to be
1345  *         free'd by the caller)
1346  * @return number of peers returned in 'targets'.
1347  */
1348 static unsigned int
1349 get_target_peers (const struct GNUNET_HashCode *key,
1350                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1351                   uint32_t hop_count, uint32_t target_replication,
1352                   struct PeerInfo ***targets)
1353 {
1354   unsigned int ret;
1355   unsigned int off;
1356   struct PeerInfo **rtargets;
1357   struct PeerInfo *nxt;
1358   struct GNUNET_HashCode nhash;
1359
1360   GNUNET_assert (NULL != bloom);
1361   ret = get_forward_count (hop_count, target_replication);
1362   if (0 == ret)
1363   {
1364     *targets = NULL;
1365     return 0;
1366   }
1367   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1368   for (off = 0; off < ret; off++)
1369   {
1370     nxt = select_peer (key, bloom, hop_count);
1371     if (NULL == nxt)
1372       break;
1373     rtargets[off] = nxt;
1374     GNUNET_CRYPTO_hash (&nxt->id,
1375                         sizeof (struct GNUNET_PeerIdentity),
1376                         &nhash);
1377     GNUNET_break (GNUNET_NO ==
1378                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1379                                                      &nhash));
1380     GNUNET_CONTAINER_bloomfilter_add (bloom, &nhash);
1381   }
1382   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1383               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1384               off,
1385               GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1386               (unsigned int) hop_count,
1387               GNUNET_h2s (key),
1388               ret);
1389   if (0 == off)
1390   {
1391     GNUNET_free (rtargets);
1392     *targets = NULL;
1393     return 0;
1394   }
1395   *targets = rtargets;
1396   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1397               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1398               GNUNET_h2s (key),
1399               off,
1400               ret);
1401   return off;
1402 }
1403
1404
1405 /**
1406  * Perform a PUT operation.   Forwards the given request to other
1407  * peers.   Does not store the data locally.  Does not give the
1408  * data to local clients.  May do nothing if this is the only
1409  * peer in the network (or if we are the closest peer in the
1410  * network).
1411  *
1412  * @param type type of the block
1413  * @param options routing options
1414  * @param desired_replication_level desired replication count
1415  * @param expiration_time when does the content expire
1416  * @param hop_count how many hops has this message traversed so far
1417  * @param bf Bloom filter of peers this PUT has already traversed
1418  * @param key key for the content
1419  * @param put_path_length number of entries in @a put_path
1420  * @param put_path peers this request has traversed so far (if tracked)
1421  * @param data payload to store
1422  * @param data_size number of bytes in @a data
1423  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1424  */
1425 int
1426 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1427                            enum GNUNET_DHT_RouteOption options,
1428                            uint32_t desired_replication_level,
1429                            struct GNUNET_TIME_Absolute expiration_time,
1430                            uint32_t hop_count,
1431                            struct GNUNET_CONTAINER_BloomFilter *bf,
1432                            const struct GNUNET_HashCode *key,
1433                            unsigned int put_path_length,
1434                            struct GNUNET_PeerIdentity *put_path,
1435                            const void *data, size_t data_size)
1436 {
1437   unsigned int target_count;
1438   unsigned int i;
1439   struct PeerInfo **targets;
1440   struct PeerInfo *target;
1441   struct P2PPendingMessage *pending;
1442   size_t msize;
1443   struct PeerPutMessage *ppm;
1444   struct GNUNET_PeerIdentity *pp;
1445   struct GNUNET_HashCode thash;
1446   unsigned int skip_count;
1447
1448   GNUNET_assert (NULL != bf);
1449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1450               "Adding myself (%s) to PUT bloomfilter for %s\n",
1451               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1452   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1453   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1454                             1, GNUNET_NO);
1455   target_count =
1456       get_target_peers (key, bf, hop_count, desired_replication_level,
1457                         &targets);
1458   if (0 == target_count)
1459   {
1460     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461                 "Routing PUT for %s terminates after %u hops at %s\n",
1462                 GNUNET_h2s (key), (unsigned int) hop_count,
1463                 GNUNET_i2s (&my_identity));
1464     return GNUNET_NO;
1465   }
1466   msize =
1467       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1468       sizeof (struct PeerPutMessage);
1469   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1470   {
1471     put_path_length = 0;
1472     msize = data_size + sizeof (struct PeerPutMessage);
1473   }
1474   if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1475   {
1476     GNUNET_break (0);
1477     GNUNET_free (targets);
1478     return GNUNET_NO;
1479   }
1480   GNUNET_STATISTICS_update (GDS_stats,
1481                             gettext_noop
1482                             ("# PUT messages queued for transmission"),
1483                             target_count, GNUNET_NO);
1484   skip_count = 0;
1485   for (i = 0; i < target_count; i++)
1486   {
1487     target = targets[i];
1488     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1489     {
1490       /* skip */
1491       GNUNET_STATISTICS_update (GDS_stats,
1492                                 gettext_noop ("# P2P messages dropped due to full queue"),
1493                                 1, GNUNET_NO);
1494       skip_count++;
1495       continue;
1496     }
1497     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1498                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
1499                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1500     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1501     pending->importance = 0;    /* FIXME */
1502     pending->timeout = expiration_time;
1503     ppm = (struct PeerPutMessage *) &pending[1];
1504     pending->msg = &ppm->header;
1505     ppm->header.size = htons (msize);
1506     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1507     ppm->options = htonl (options);
1508     ppm->type = htonl (type);
1509     ppm->hop_count = htonl (hop_count + 1);
1510     ppm->desired_replication_level = htonl (desired_replication_level);
1511     ppm->put_path_length = htonl (put_path_length);
1512     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1513     GNUNET_CRYPTO_hash (&target->id,
1514                         sizeof (struct GNUNET_PeerIdentity),
1515                         &thash);
1516     GNUNET_break (GNUNET_YES ==
1517                   GNUNET_CONTAINER_bloomfilter_test (bf,
1518                                                      &thash));
1519     GNUNET_assert (GNUNET_OK ==
1520                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1521                                                               ppm->bloomfilter,
1522                                                               DHT_BLOOM_SIZE));
1523     ppm->key = *key;
1524     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1525     memcpy (pp, put_path,
1526             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1527     memcpy (&pp[put_path_length], data, data_size);
1528     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1529     target->pending_count++;
1530     process_peer_queue (target);
1531   }
1532   GNUNET_free (targets);
1533   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1534 }
1535
1536
1537 /**
1538  * Perform a GET operation.  Forwards the given request to other
1539  * peers.  Does not lookup the key locally.  May do nothing if this is
1540  * the only peer in the network (or if we are the closest peer in the
1541  * network).
1542  *
1543  * @param type type of the block
1544  * @param options routing options
1545  * @param desired_replication_level desired replication count
1546  * @param hop_count how many hops did this request traverse so far?
1547  * @param key key for the content
1548  * @param xquery extended query
1549  * @param xquery_size number of bytes in @a xquery
1550  * @param reply_bf bloomfilter to filter duplicates
1551  * @param reply_bf_mutator mutator for @a reply_bf
1552  * @param peer_bf filter for peers not to select (again)
1553  * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1554  */
1555 int
1556 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1557                            enum GNUNET_DHT_RouteOption options,
1558                            uint32_t desired_replication_level,
1559                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1560                            const void *xquery, size_t xquery_size,
1561                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1562                            uint32_t reply_bf_mutator,
1563                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1564 {
1565   unsigned int target_count;
1566   unsigned int i;
1567   struct PeerInfo **targets;
1568   struct PeerInfo *target;
1569   struct P2PPendingMessage *pending;
1570   size_t msize;
1571   struct PeerGetMessage *pgm;
1572   char *xq;
1573   size_t reply_bf_size;
1574   struct GNUNET_HashCode thash;
1575   unsigned int skip_count;
1576
1577   GNUNET_assert (NULL != peer_bf);
1578   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1579                             1, GNUNET_NO);
1580   target_count =
1581       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1582                         &targets);
1583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1584               "Adding myself (%s) to GET bloomfilter for %s\n",
1585               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1586   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity_hash);
1587   if (0 == target_count)
1588   {
1589     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590                 "Routing GET for %s terminates after %u hops at %s\n",
1591                 GNUNET_h2s (key), (unsigned int) hop_count,
1592                 GNUNET_i2s (&my_identity));
1593     return GNUNET_NO;
1594   }
1595   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1596   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1597   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1598   {
1599     GNUNET_break (0);
1600     GNUNET_free (targets);
1601     return GNUNET_NO;
1602   }
1603   GNUNET_STATISTICS_update (GDS_stats,
1604                             gettext_noop
1605                             ("# GET messages queued for transmission"),
1606                             target_count, GNUNET_NO);
1607   /* forward request */
1608   skip_count = 0;
1609   for (i = 0; i < target_count; i++)
1610   {
1611     target = targets[i];
1612     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1613     {
1614       /* skip */
1615       GNUNET_STATISTICS_update (GDS_stats,
1616                                 gettext_noop ("# P2P messages dropped due to full queue"),
1617                                 1, GNUNET_NO);
1618       skip_count++;
1619       continue;
1620     }
1621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1622                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
1623                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1624     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1625     pending->importance = 0;    /* FIXME */
1626     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1627     pgm = (struct PeerGetMessage *) &pending[1];
1628     pending->msg = &pgm->header;
1629     pgm->header.size = htons (msize);
1630     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1631     pgm->options = htonl (options);
1632     pgm->type = htonl (type);
1633     pgm->hop_count = htonl (hop_count + 1);
1634     pgm->desired_replication_level = htonl (desired_replication_level);
1635     pgm->xquery_size = htonl (xquery_size);
1636     pgm->bf_mutator = reply_bf_mutator;
1637     GNUNET_CRYPTO_hash (&target->id,
1638                         sizeof (struct GNUNET_PeerIdentity),
1639                         &thash);
1640     GNUNET_break (GNUNET_YES ==
1641                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1642                                                      &thash));
1643     GNUNET_assert (GNUNET_OK ==
1644                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1645                                                               pgm->bloomfilter,
1646                                                               DHT_BLOOM_SIZE));
1647     pgm->key = *key;
1648     xq = (char *) &pgm[1];
1649     memcpy (xq, xquery, xquery_size);
1650     if (NULL != reply_bf)
1651       GNUNET_assert (GNUNET_OK ==
1652                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1653                                                                 &xq
1654                                                                 [xquery_size],
1655                                                                 reply_bf_size));
1656     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1657     target->pending_count++;
1658     process_peer_queue (target);
1659   }
1660   GNUNET_free (targets);
1661   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1662 }
1663
1664
1665 /**
1666  * Handle a reply (route to origin).  Only forwards the reply back to
1667  * the given peer.  Does not do local caching or forwarding to local
1668  * clients.
1669  *
1670  * @param target neighbour that should receive the block (if still connected)
1671  * @param type type of the block
1672  * @param expiration_time when does the content expire
1673  * @param key key for the content
1674  * @param put_path_length number of entries in @a put_path
1675  * @param put_path peers the original PUT traversed (if tracked)
1676  * @param get_path_length number of entries in @a get_path
1677  * @param get_path peers this reply has traversed so far (if tracked)
1678  * @param data payload of the reply
1679  * @param data_size number of bytes in @a data
1680  */
1681 void
1682 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1683                              enum GNUNET_BLOCK_Type type,
1684                              struct GNUNET_TIME_Absolute expiration_time,
1685                              const struct GNUNET_HashCode *key,
1686                              unsigned int put_path_length,
1687                              const struct GNUNET_PeerIdentity *put_path,
1688                              unsigned int get_path_length,
1689                              const struct GNUNET_PeerIdentity *get_path,
1690                              const void *data,
1691                              size_t data_size)
1692 {
1693   struct PeerInfo *pi;
1694   struct P2PPendingMessage *pending;
1695   size_t msize;
1696   struct PeerResultMessage *prm;
1697   struct GNUNET_PeerIdentity *paths;
1698
1699   msize =
1700       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1701                                                        put_path_length) *
1702       sizeof (struct GNUNET_PeerIdentity);
1703   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1704       (get_path_length >
1705        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1706       (put_path_length >
1707        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1708       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1709   {
1710     GNUNET_break (0);
1711     return;
1712   }
1713   pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1714                                           target);
1715   if (NULL == pi)
1716   {
1717     /* peer disconnected in the meantime, drop reply */
1718     return;
1719   }
1720   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
1721   {
1722     /* skip */
1723     GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1724                               1, GNUNET_NO);
1725     return;
1726   }
1727
1728   GNUNET_STATISTICS_update (GDS_stats,
1729                             gettext_noop
1730                             ("# RESULT messages queued for transmission"), 1,
1731                             GNUNET_NO);
1732   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1733   pending->importance = 0;      /* FIXME */
1734   pending->timeout = expiration_time;
1735   prm = (struct PeerResultMessage *) &pending[1];
1736   pending->msg = &prm->header;
1737   prm->header.size = htons (msize);
1738   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1739   prm->type = htonl (type);
1740   prm->put_path_length = htonl (put_path_length);
1741   prm->get_path_length = htonl (get_path_length);
1742   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1743   prm->key = *key;
1744   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1745   memcpy (paths, put_path,
1746           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1747   memcpy (&paths[put_path_length], get_path,
1748           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1749   memcpy (&paths[put_path_length + get_path_length], data, data_size);
1750   GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
1751   pi->pending_count++;
1752   process_peer_queue (pi);
1753 }
1754
1755
1756 /**
1757  * To be called on core init/fail.
1758  *
1759  * @param cls service closure
1760  * @param identity the public identity of this peer
1761  */
1762 static void
1763 core_init (void *cls,
1764            const struct GNUNET_PeerIdentity *identity)
1765 {
1766   my_identity = *identity;
1767   GNUNET_CRYPTO_hash (identity,
1768                       sizeof (struct GNUNET_PeerIdentity),
1769                       &my_identity_hash);
1770 }
1771
1772
1773 /**
1774  * Core handler for p2p put requests.
1775  *
1776  * @param cls closure
1777  * @param peer sender of the request
1778  * @param message message
1779  * @param peer peer identity this notification is about
1780  * @return #GNUNET_OK to keep the connection open,
1781  *         #GNUNET_SYSERR to close it (signal serious error)
1782  */
1783 static int
1784 handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1785                     const struct GNUNET_MessageHeader *message)
1786 {
1787   const struct PeerPutMessage *put;
1788   const struct GNUNET_PeerIdentity *put_path;
1789   const void *payload;
1790   uint32_t putlen;
1791   uint16_t msize;
1792   size_t payload_size;
1793   enum GNUNET_DHT_RouteOption options;
1794   struct GNUNET_CONTAINER_BloomFilter *bf;
1795   struct GNUNET_HashCode test_key;
1796   struct GNUNET_HashCode phash;
1797   int forwarded;
1798
1799   msize = ntohs (message->size);
1800   if (msize < sizeof (struct PeerPutMessage))
1801   {
1802     GNUNET_break_op (0);
1803     return GNUNET_YES;
1804   }
1805   put = (const struct PeerPutMessage *) message;
1806   putlen = ntohl (put->put_path_length);
1807   if ((msize <
1808        sizeof (struct PeerPutMessage) +
1809        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1810       (putlen >
1811        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1812   {
1813     GNUNET_break_op (0);
1814     return GNUNET_YES;
1815   }
1816   GNUNET_STATISTICS_update (GDS_stats,
1817                             gettext_noop ("# P2P PUT requests received"), 1,
1818                             GNUNET_NO);
1819   GNUNET_STATISTICS_update (GDS_stats,
1820                             gettext_noop ("# P2P PUT bytes received"), msize,
1821                             GNUNET_NO);
1822   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1823   payload = &put_path[putlen];
1824   options = ntohl (put->options);
1825   payload_size =
1826       msize - (sizeof (struct PeerPutMessage) +
1827                putlen * sizeof (struct GNUNET_PeerIdentity));
1828
1829   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1830               GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1831   GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash);
1832   if (GNUNET_YES == log_route_details_stderr)
1833   {
1834     char *tmp;
1835
1836     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1837     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1838                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1839                  GNUNET_h2s (&put->key), GNUNET_i2s (peer), tmp,
1840                  ntohl(put->hop_count),
1841                  GNUNET_CRYPTO_hash_matching_bits (&phash, &put->key),
1842                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &put->key)
1843                 );
1844     GNUNET_free (tmp);
1845   }
1846   switch (GNUNET_BLOCK_get_key
1847           (GDS_block_context, ntohl (put->type), payload, payload_size,
1848            &test_key))
1849   {
1850   case GNUNET_YES:
1851     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
1852     {
1853       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1854       GNUNET_break_op (0);
1855       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1856                   "PUT with key `%s' for block with key %s\n",
1857                   put_s, GNUNET_h2s_full (&test_key));
1858       GNUNET_free (put_s);
1859       return GNUNET_YES;
1860     }
1861     break;
1862   case GNUNET_NO:
1863     GNUNET_break_op (0);
1864     return GNUNET_YES;
1865   case GNUNET_SYSERR:
1866     /* cannot verify, good luck */
1867     break;
1868   }
1869   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1870   {
1871     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1872                                    ntohl (put->type),
1873                                    GNUNET_BLOCK_EO_NONE,
1874                                    NULL,    /* query */
1875                                    NULL, 0, /* bloom filer */
1876                                    NULL, 0, /* xquery */
1877                                    payload, payload_size))
1878     {
1879     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1880     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1881       break;
1882
1883     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1884     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1885     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1886     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1887     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1888     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1889     default:
1890       GNUNET_break_op (0);
1891       return GNUNET_OK;
1892     }
1893   }
1894
1895   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
1896                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1897   GNUNET_break_op (GNUNET_YES ==
1898                    GNUNET_CONTAINER_bloomfilter_test (bf, &phash));
1899   {
1900     struct GNUNET_PeerIdentity pp[putlen + 1];
1901
1902     /* extend 'put path' by sender */
1903     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1904     {
1905       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1906       pp[putlen] = *peer;
1907       putlen++;
1908     }
1909     else
1910       putlen = 0;
1911
1912     /* give to local clients */
1913     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1914                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1915                               payload_size, payload);
1916     /* store locally */
1917     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1918         (am_closest_peer (&put->key, bf)))
1919       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1920                                 (put->expiration_time), &put->key, putlen, pp,
1921                                 ntohl (put->type), payload_size, payload);
1922     /* route to other peers */
1923     forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
1924                                            ntohl (put->desired_replication_level),
1925                                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1926                                            ntohl (put->hop_count), bf,
1927                                            &put->key,
1928                                            putlen,
1929                                            pp,
1930                                            payload,
1931                                            payload_size);
1932     /* notify monitoring clients */
1933     GDS_CLIENTS_process_put (options
1934                              | ( (GNUNET_OK == forwarded)
1935                                  ? GNUNET_DHT_RO_LAST_HOP
1936                                  : 0 ),
1937                              ntohl (put->type),
1938                              ntohl (put->hop_count),
1939                              ntohl (put->desired_replication_level),
1940                              putlen, pp,
1941                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1942                              &put->key,
1943                              payload,
1944                              payload_size);
1945   }
1946   GNUNET_CONTAINER_bloomfilter_free (bf);
1947   return GNUNET_YES;
1948 }
1949
1950
1951 /**
1952  * We have received a FIND PEER request.  Send matching
1953  * HELLOs back.
1954  *
1955  * @param sender sender of the FIND PEER request
1956  * @param key peers close to this key are desired
1957  * @param bf peers matching this bf are excluded
1958  * @param bf_mutator mutator for bf
1959  */
1960 static void
1961 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1962                   const struct GNUNET_HashCode * key,
1963                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
1964 {
1965   int bucket_idx;
1966   struct PeerBucket *bucket;
1967   struct PeerInfo *peer;
1968   unsigned int choice;
1969   struct GNUNET_HashCode phash;
1970   struct GNUNET_HashCode mhash;
1971   const struct GNUNET_HELLO_Message *hello;
1972
1973   /* first, check about our own HELLO */
1974   if (NULL != GDS_my_hello)
1975   {
1976     GNUNET_BLOCK_mingle_hash (&my_identity_hash, bf_mutator, &mhash);
1977     if ((NULL == bf) ||
1978         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1979     {
1980       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1981                                    GNUNET_TIME_relative_to_absolute
1982                                    (hello_expiration),
1983                                    key, 0, NULL, 0, NULL, GDS_my_hello,
1984                                    GNUNET_HELLO_size ((const struct
1985                                                        GNUNET_HELLO_Message *)
1986                                                       GDS_my_hello));
1987     }
1988     else
1989     {
1990       GNUNET_STATISTICS_update (GDS_stats,
1991                                 gettext_noop
1992                                 ("# FIND PEER requests ignored due to Bloomfilter"),
1993                                 1, GNUNET_NO);
1994     }
1995   }
1996   else
1997   {
1998     GNUNET_STATISTICS_update (GDS_stats,
1999                               gettext_noop
2000                               ("# FIND PEER requests ignored due to lack of HELLO"),
2001                               1, GNUNET_NO);
2002   }
2003
2004   /* then, also consider sending a random HELLO from the closest bucket */
2005   if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode)))
2006     bucket_idx = closest_bucket;
2007   else
2008     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
2009   if (bucket_idx == GNUNET_SYSERR)
2010     return;
2011   bucket = &k_buckets[bucket_idx];
2012   if (bucket->peers_size == 0)
2013     return;
2014   choice =
2015       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
2016   peer = bucket->head;
2017   while (choice > 0)
2018   {
2019     GNUNET_assert (NULL != peer);
2020     peer = peer->next;
2021     choice--;
2022   }
2023   choice = bucket->peers_size;
2024   do
2025   {
2026     peer = peer->next;
2027     if (choice-- == 0)
2028       return;                   /* no non-masked peer available */
2029     if (peer == NULL)
2030       peer = bucket->head;
2031     GNUNET_CRYPTO_hash (&peer->id, sizeof (struct GNUNET_PeerIdentity), &phash);
2032     GNUNET_BLOCK_mingle_hash (&phash, bf_mutator, &mhash);
2033     hello = GDS_HELLO_get (&peer->id);
2034   }
2035   while ((hello == NULL) ||
2036          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
2037   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
2038                                GNUNET_TIME_relative_to_absolute
2039                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
2040                                0, NULL, 0, NULL, hello,
2041                                GNUNET_HELLO_size (hello));
2042 }
2043
2044
2045 /**
2046  * Core handler for p2p get requests.
2047  *
2048  * @param cls closure
2049  * @param peer sender of the request
2050  * @param message message
2051  * @return #GNUNET_OK to keep the connection open,
2052  *         #GNUNET_SYSERR to close it (signal serious error)
2053  */
2054 static int
2055 handle_dht_p2p_get (void *cls,
2056                     const struct GNUNET_PeerIdentity *peer,
2057                     const struct GNUNET_MessageHeader *message)
2058 {
2059   struct PeerGetMessage *get;
2060   uint32_t xquery_size;
2061   size_t reply_bf_size;
2062   uint16_t msize;
2063   enum GNUNET_BLOCK_Type type;
2064   enum GNUNET_DHT_RouteOption options;
2065   enum GNUNET_BLOCK_EvaluationResult eval;
2066   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
2067   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2068   const char *xquery;
2069   struct GNUNET_HashCode phash;
2070   int forwarded;
2071
2072   GNUNET_break (0 !=
2073                 memcmp (peer, &my_identity,
2074                         sizeof (struct GNUNET_PeerIdentity)));
2075   /* parse and validate message */
2076   msize = ntohs (message->size);
2077   if (msize < sizeof (struct PeerGetMessage))
2078   {
2079     GNUNET_break_op (0);
2080     return GNUNET_YES;
2081   }
2082   get = (struct PeerGetMessage *) message;
2083   xquery_size = ntohl (get->xquery_size);
2084   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
2085   {
2086     GNUNET_break_op (0);
2087     return GNUNET_YES;
2088   }
2089   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2090   type = ntohl (get->type);
2091   options = ntohl (get->options);
2092   xquery = (const char *) &get[1];
2093   reply_bf = NULL;
2094   GNUNET_STATISTICS_update (GDS_stats,
2095                             gettext_noop ("# P2P GET requests received"), 1,
2096                             GNUNET_NO);
2097   GNUNET_STATISTICS_update (GDS_stats,
2098                             gettext_noop ("# P2P GET bytes received"), msize,
2099                             GNUNET_NO);
2100   GNUNET_CRYPTO_hash (peer,
2101                       sizeof (struct GNUNET_PeerIdentity),
2102                       &phash);
2103   if (GNUNET_YES == log_route_details_stderr)
2104   {
2105     char *tmp;
2106
2107     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2108     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2109                  "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2110                  GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp,
2111                  ntohl(get->hop_count),
2112                  GNUNET_CRYPTO_hash_matching_bits (&phash, &get->key),
2113                  GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key),
2114                  ntohl(get->xquery_size), xquery);
2115     GNUNET_free (tmp);
2116   }
2117
2118   if (reply_bf_size > 0)
2119     reply_bf =
2120         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
2121                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
2122   eval =
2123       GNUNET_BLOCK_evaluate (GDS_block_context,
2124                              type,
2125                              GNUNET_BLOCK_EO_NONE,
2126                              &get->key,
2127                              &reply_bf,
2128                              get->bf_mutator,
2129                              xquery,
2130                              xquery_size,
2131                              NULL,
2132                              0);
2133   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2134   {
2135     /* request invalid or block type not supported */
2136     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2137     if (NULL != reply_bf)
2138       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2139     return GNUNET_YES;
2140   }
2141   peer_bf =
2142       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
2143                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
2144   GNUNET_break_op (GNUNET_YES ==
2145                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2146                                                       &phash));
2147   /* remember request for routing replies */
2148   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
2149                    reply_bf, get->bf_mutator);
2150   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2151               "GET for %s at %s after %u hops\n",
2152               GNUNET_h2s (&get->key),
2153               GNUNET_i2s (&my_identity),
2154               (unsigned int) ntohl (get->hop_count));
2155   /* local lookup (this may update the reply_bf) */
2156   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2157       (am_closest_peer (&get->key, peer_bf)))
2158   {
2159     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2160     {
2161       GNUNET_STATISTICS_update (GDS_stats,
2162                                 gettext_noop
2163                                 ("# P2P FIND PEER requests processed"), 1,
2164                                 GNUNET_NO);
2165       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
2166     }
2167     else
2168     {
2169       eval =
2170           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
2171                                     &reply_bf, get->bf_mutator);
2172     }
2173   }
2174   else
2175   {
2176     GNUNET_STATISTICS_update (GDS_stats,
2177                               gettext_noop ("# P2P GET requests ONLY routed"),
2178                               1, GNUNET_NO);
2179   }
2180
2181   /* P2P forwarding */
2182   forwarded = GNUNET_NO;
2183   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2184     forwarded = GDS_NEIGHBOURS_handle_get (type, options,
2185                                            ntohl (get->desired_replication_level),
2186                                            ntohl (get->hop_count),
2187                                            &get->key,
2188                                            xquery,
2189                                            xquery_size,
2190                                            reply_bf,
2191                                            get->bf_mutator, peer_bf);
2192   GDS_CLIENTS_process_get (options
2193                            | (GNUNET_OK == forwarded)
2194                            ? GNUNET_DHT_RO_LAST_HOP : 0,
2195                            type,
2196                            ntohl (get->hop_count),
2197                            ntohl (get->desired_replication_level),
2198                            0, NULL,
2199                            &get->key);
2200
2201
2202   /* clean up */
2203   if (NULL != reply_bf)
2204     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2205   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2206   return GNUNET_YES;
2207 }
2208
2209
2210 /**
2211  * Core handler for p2p result messages.
2212  *
2213  * @param cls closure
2214  * @param message message
2215  * @param peer peer identity this notification is about
2216  * @return #GNUNET_YES (do not cut p2p connection)
2217  */
2218 static int
2219 handle_dht_p2p_result (void *cls,
2220                        const struct GNUNET_PeerIdentity *peer,
2221                        const struct GNUNET_MessageHeader *message)
2222 {
2223   const struct PeerResultMessage *prm;
2224   const struct GNUNET_PeerIdentity *put_path;
2225   const struct GNUNET_PeerIdentity *get_path;
2226   const void *data;
2227   uint32_t get_path_length;
2228   uint32_t put_path_length;
2229   uint16_t msize;
2230   size_t data_size;
2231   enum GNUNET_BLOCK_Type type;
2232
2233   /* parse and validate message */
2234   msize = ntohs (message->size);
2235   if (msize < sizeof (struct PeerResultMessage))
2236   {
2237     GNUNET_break_op (0);
2238     return GNUNET_YES;
2239   }
2240   prm = (struct PeerResultMessage *) message;
2241   put_path_length = ntohl (prm->put_path_length);
2242   get_path_length = ntohl (prm->get_path_length);
2243   if ((msize <
2244        sizeof (struct PeerResultMessage) + (get_path_length +
2245                                             put_path_length) *
2246        sizeof (struct GNUNET_PeerIdentity)) ||
2247       (get_path_length >
2248        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
2249       (put_path_length >
2250        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2251   {
2252     GNUNET_break_op (0);
2253     return GNUNET_YES;
2254   }
2255   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2256   get_path = &put_path[put_path_length];
2257   type = ntohl (prm->type);
2258   data = (const void *) &get_path[get_path_length];
2259   data_size =
2260       msize - (sizeof (struct PeerResultMessage) +
2261                (get_path_length +
2262                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2263   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
2264                             1, GNUNET_NO);
2265   GNUNET_STATISTICS_update (GDS_stats,
2266                             gettext_noop ("# P2P RESULT bytes received"),
2267                             msize, GNUNET_NO);
2268   if (GNUNET_YES == log_route_details_stderr)
2269   {
2270     char *tmp;
2271
2272     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2273     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2274                  "R5N RESULT %s: %s->%s (%u)\n",
2275                  GNUNET_h2s (&prm->key),
2276                  GNUNET_i2s (peer),
2277                  tmp,
2278                  get_path_length + 1);
2279     GNUNET_free (tmp);
2280   }
2281   /* if we got a HELLO, consider it for our own routing table */
2282   if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2283   {
2284     const struct GNUNET_MessageHeader *h;
2285     struct GNUNET_PeerIdentity pid;
2286
2287     /* Should be a HELLO, validate and consider using it! */
2288     if (data_size < sizeof (struct GNUNET_MessageHeader))
2289     {
2290       GNUNET_break_op (0);
2291       return GNUNET_YES;
2292     }
2293     h = data;
2294     if (data_size != ntohs (h->size))
2295     {
2296       GNUNET_break_op (0);
2297       return GNUNET_YES;
2298     }
2299     if (GNUNET_OK !=
2300         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2301                              &pid))
2302     {
2303       GNUNET_break_op (0);
2304       return GNUNET_YES;
2305     }
2306     if ( (GNUNET_YES != disable_try_connect) &&
2307          (0 != memcmp (&my_identity,
2308                        &pid,
2309                        sizeof (struct GNUNET_PeerIdentity))) )
2310       try_connect (&pid,
2311                    h);
2312   }
2313
2314   /* append 'peer' to 'get_path' */
2315   {
2316     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2317
2318     memcpy (xget_path,
2319             get_path,
2320             get_path_length * sizeof (struct GNUNET_PeerIdentity));
2321     xget_path[get_path_length] = *peer;
2322     get_path_length++;
2323
2324     /* forward to local clients */
2325     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2326                               &prm->key,
2327                               get_path_length,
2328                               xget_path,
2329                               put_path_length,
2330                               put_path,
2331                               type,
2332                               data_size,
2333                               data);
2334     GDS_CLIENTS_process_get_resp (type,
2335                                   xget_path,
2336                                   get_path_length,
2337                                   put_path, put_path_length,
2338                                   GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2339                                   &prm->key,
2340                                   data,
2341                                   data_size);
2342     if (GNUNET_YES == cache_results)
2343     {
2344       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2345
2346       memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
2347       memcpy (&xput_path[put_path_length],
2348               xget_path,
2349               get_path_length * sizeof (struct GNUNET_PeerIdentity));
2350
2351       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2352                                 &prm->key,
2353                                 get_path_length + put_path_length,
2354                                 xput_path,
2355                                 type,
2356                                 data_size,
2357                                 data);
2358     }
2359     /* forward to other peers */
2360     GDS_ROUTING_process (type,
2361                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2362                          &prm->key,
2363                          put_path_length,
2364                          put_path,
2365                          get_path_length,
2366                          xget_path,
2367                          data,
2368                          data_size);
2369   }
2370
2371   return GNUNET_YES;
2372 }
2373
2374
2375 /**
2376  * Initialize neighbours subsystem.
2377  *
2378  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2379  */
2380 int
2381 GDS_NEIGHBOURS_init ()
2382 {
2383   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2384     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2385     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2386     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2387     {NULL, 0, 0}
2388   };
2389   unsigned long long temp_config_num;
2390
2391   disable_try_connect
2392     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT");
2393   if (GNUNET_OK ==
2394       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2395                                              &temp_config_num))
2396     bucket_size = (unsigned int) temp_config_num;
2397   cache_results
2398     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS");
2399
2400   log_route_details_stderr =
2401     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2402   ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2403   core_api =
2404       GNUNET_CORE_connect (GDS_cfg, NULL,
2405                            &core_init,
2406                            &handle_core_connect,
2407                            &handle_core_disconnect,
2408                            NULL, GNUNET_NO,
2409                            NULL, GNUNET_NO,
2410                            core_handlers);
2411   if (core_api == NULL)
2412     return GNUNET_SYSERR;
2413   all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2414                                                               GNUNET_NO);
2415   all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2416                                                             GNUNET_NO);
2417   return GNUNET_OK;
2418 }
2419
2420
2421 /**
2422  * Shutdown neighbours subsystem.
2423  */
2424 void
2425 GDS_NEIGHBOURS_done ()
2426 {
2427   if (NULL == core_api)
2428     return;
2429   GNUNET_CORE_disconnect (core_api);
2430   core_api = NULL;
2431   GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2432   GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2433   all_connected_peers = NULL;
2434   GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2435                                          &free_connect_info,
2436                                          NULL);
2437   GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2438   all_desired_peers = NULL;
2439   GNUNET_ATS_connectivity_done (ats_ch);
2440   ats_ch = NULL;
2441   if (NULL != find_peer_task)
2442   {
2443     GNUNET_SCHEDULER_cancel (find_peer_task);
2444     find_peer_task = NULL;
2445   }
2446 }
2447
2448 /**
2449  * Get the ID of the local node.
2450  *
2451  * @return identity of the local node
2452  */
2453 struct GNUNET_PeerIdentity *
2454 GDS_NEIGHBOURS_get_id ()
2455 {
2456   return &my_identity;
2457 }
2458
2459
2460 /* end of gnunet-service-dht_neighbours.c */