-fixing #2397
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_ats_service.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50 #include "dht.h"
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * Maximum allowed number of pending messages per peer.
74  */
75 #define MAXIMUM_PENDING_PER_PEER 64
76
77 /**
78  * How often to update our preference levels for peers in our routing tables.
79  */
80 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
81
82 /**
83  * How long at least to wait before sending another find peer request.
84  */
85 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
86
87 /**
88  * How long at most to wait before sending another find peer request.
89  */
90 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
91
92 /**
93  * How long at most to wait for transmission of a GET request to another peer?
94  */
95 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
96
97
98 GNUNET_NETWORK_STRUCT_BEGIN
99
100 /**
101  * P2P PUT message
102  */
103 struct PeerPutMessage
104 {
105   /**
106    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
107    */
108   struct GNUNET_MessageHeader header;
109
110   /**
111    * Processing options
112    */
113   uint32_t options GNUNET_PACKED;
114
115   /**
116    * Content type.
117    */
118   uint32_t type GNUNET_PACKED;
119
120   /**
121    * Hop count
122    */
123   uint32_t hop_count GNUNET_PACKED;
124
125   /**
126    * Replication level for this message
127    */
128   uint32_t desired_replication_level GNUNET_PACKED;
129
130   /**
131    * Length of the PUT path that follows (if tracked).
132    */
133   uint32_t put_path_length GNUNET_PACKED;
134
135   /**
136    * When does the content expire?
137    */
138   struct GNUNET_TIME_AbsoluteNBO expiration_time;
139
140   /**
141    * Bloomfilter (for peer identities) to stop circular routes
142    */
143   char bloomfilter[DHT_BLOOM_SIZE];
144
145   /**
146    * The key we are storing under.
147    */
148   struct GNUNET_HashCode key;
149
150   /* put path (if tracked) */
151
152   /* Payload */
153
154 };
155
156
157 /**
158  * P2P Result message
159  */
160 struct PeerResultMessage
161 {
162   /**
163    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Content type.
169    */
170   uint32_t type GNUNET_PACKED;
171
172   /**
173    * Length of the PUT path that follows (if tracked).
174    */
175   uint32_t put_path_length GNUNET_PACKED;
176
177   /**
178    * Length of the GET path that follows (if tracked).
179    */
180   uint32_t get_path_length GNUNET_PACKED;
181
182   /**
183    * When does the content expire?
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration_time;
186
187   /**
188    * The key of the corresponding GET request.
189    */
190   struct GNUNET_HashCode key;
191
192   /* put path (if tracked) */
193
194   /* get path (if tracked) */
195
196   /* Payload */
197
198 };
199
200
201 /**
202  * P2P GET message
203  */
204 struct PeerGetMessage
205 {
206   /**
207    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
208    */
209   struct GNUNET_MessageHeader header;
210
211   /**
212    * Processing options
213    */
214   uint32_t options GNUNET_PACKED;
215
216   /**
217    * Desired content type.
218    */
219   uint32_t type GNUNET_PACKED;
220
221   /**
222    * Hop count
223    */
224   uint32_t hop_count GNUNET_PACKED;
225
226   /**
227    * Desired replication level for this request.
228    */
229   uint32_t desired_replication_level GNUNET_PACKED;
230
231   /**
232    * Size of the extended query.
233    */
234   uint32_t xquery_size;
235
236   /**
237    * Bloomfilter mutator.
238    */
239   uint32_t bf_mutator;
240
241   /**
242    * Bloomfilter (for peer identities) to stop circular routes
243    */
244   char bloomfilter[DHT_BLOOM_SIZE];
245
246   /**
247    * The key we are looking for.
248    */
249   struct GNUNET_HashCode key;
250
251   /* xquery */
252
253   /* result bloomfilter */
254
255 };
256 GNUNET_NETWORK_STRUCT_END
257
258 /**
259  * Linked list of messages to send to a particular other peer.
260  */
261 struct P2PPendingMessage
262 {
263   /**
264    * Pointer to next item in the list
265    */
266   struct P2PPendingMessage *next;
267
268   /**
269    * Pointer to previous item in the list
270    */
271   struct P2PPendingMessage *prev;
272
273   /**
274    * Message importance level.  FIXME: used? useful?
275    */
276   unsigned int importance;
277
278   /**
279    * When does this message time out?
280    */
281   struct GNUNET_TIME_Absolute timeout;
282
283   /**
284    * Actual message to be sent, allocated at the end of the struct:
285    * // msg = (cast) &pm[1];
286    * // memcpy (&pm[1], data, len);
287    */
288   const struct GNUNET_MessageHeader *msg;
289
290 };
291
292
293 /**
294  * Entry for a peer in a bucket.
295  */
296 struct PeerInfo
297 {
298   /**
299    * Next peer entry (DLL)
300    */
301   struct PeerInfo *next;
302
303   /**
304    *  Prev peer entry (DLL)
305    */
306   struct PeerInfo *prev;
307
308   /**
309    * Count of outstanding messages for peer. 
310    */
311   unsigned int pending_count;
312
313   /**
314    * Head of pending messages to be sent to this peer.
315    */
316   struct P2PPendingMessage *head;
317
318   /**
319    * Tail of pending messages to be sent to this peer.
320    */
321   struct P2PPendingMessage *tail;
322
323   /**
324    * Core handle for sending messages to this peer.
325    */
326   struct GNUNET_CORE_TransmitHandle *th;
327
328   /**
329    * Task for scheduling preference updates
330    */
331   GNUNET_SCHEDULER_TaskIdentifier preference_task;
332
333   /**
334    * What is the identity of the peer?
335    */
336   struct GNUNET_PeerIdentity id;
337
338 #if 0
339   /**
340    * What is the average latency for replies received?
341    */
342   struct GNUNET_TIME_Relative latency;
343
344   /**
345    * Transport level distance to peer.
346    */
347   unsigned int distance;
348 #endif
349
350 };
351
352
353 /**
354  * Peers are grouped into buckets.
355  */
356 struct PeerBucket
357 {
358   /**
359    * Head of DLL
360    */
361   struct PeerInfo *head;
362
363   /**
364    * Tail of DLL
365    */
366   struct PeerInfo *tail;
367
368   /**
369    * Number of peers in the bucket.
370    */
371   unsigned int peers_size;
372 };
373
374
375 /**
376  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
377  */
378 static unsigned int closest_bucket;
379
380 /**
381  * How many peers have we added since we sent out our last
382  * find peer request?
383  */
384 static unsigned int newly_found_peers;
385
386 /**
387  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
388  */
389 static struct PeerBucket k_buckets[MAX_BUCKETS];
390
391 /**
392  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
393  */
394 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
395
396 /**
397  * Maximum size for each bucket.
398  */
399 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
400
401 /**
402  * Task that sends FIND PEER requests.
403  */
404 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
405
406 /**
407  * Identity of this peer.
408  */
409 static struct GNUNET_PeerIdentity my_identity;
410
411 /**
412  * Handle to CORE.
413  */
414 static struct GNUNET_CORE_Handle *coreAPI;
415
416 /**
417  * Handle to ATS.
418  */
419 static struct GNUNET_ATS_PerformanceHandle *atsAPI;
420
421
422
423 /**
424  * Find the optimal bucket for this key.
425  *
426  * @param hc the hashcode to compare our identity to
427  * @return the proper bucket index, or GNUNET_SYSERR
428  *         on error (same hashcode)
429  */
430 static int
431 find_bucket (const struct GNUNET_HashCode * hc)
432 {
433   unsigned int bits;
434
435   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
436   if (bits == MAX_BUCKETS)
437   {
438     /* How can all bits match? Got my own ID? */
439     GNUNET_break (0);
440     return GNUNET_SYSERR;
441   }
442   return MAX_BUCKETS - bits - 1;
443 }
444
445
446 /**
447  * Let GNUnet core know that we like the given peer.
448  *
449  * @param cls the 'struct PeerInfo' of the peer
450  * @param tc scheduler context.
451  */
452 static void
453 update_core_preference (void *cls,
454                         const struct GNUNET_SCHEDULER_TaskContext *tc)
455 {
456   struct PeerInfo *peer = cls;
457   uint64_t preference;
458   unsigned int matching;
459   int bucket;
460
461   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
462   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
463     return;
464   matching =
465       GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
466                                         &peer->id.hashPubKey);
467   if (matching >= 64)
468     matching = 63;
469   bucket = find_bucket (&peer->id.hashPubKey);
470   if (bucket == GNUNET_SYSERR)
471     preference = 0;
472   else
473   {
474     GNUNET_assert (k_buckets[bucket].peers_size != 0);
475     preference = (1LL << matching) / k_buckets[bucket].peers_size;
476   }
477   if (preference == 0)
478   {
479     peer->preference_task =
480         GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
481                                       &update_core_preference, peer);
482     return;
483   }
484   GNUNET_STATISTICS_update (GDS_stats,
485                             gettext_noop ("# Preference updates given to core"),
486                             1, GNUNET_NO);
487   GNUNET_ATS_change_preference (atsAPI, &peer->id,
488                                 GNUNET_ATS_PREFERENCE_BANDWIDTH,
489                                 (double) preference, GNUNET_ATS_PREFERENCE_END);
490   peer->preference_task =
491       GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
492                                     &update_core_preference, peer);
493
494
495 }
496
497
498 /**
499  * Closure for 'add_known_to_bloom'.
500  */
501 struct BloomConstructorContext
502 {
503   /**
504    * Bloom filter under construction.
505    */
506   struct GNUNET_CONTAINER_BloomFilter *bloom;
507
508   /**
509    * Mutator to use.
510    */
511   uint32_t bf_mutator;
512 };
513
514
515 /**
516  * Add each of the peers we already know to the bloom filter of
517  * the request so that we don't get duplicate HELLOs.
518  *
519  * @param cls the 'struct BloomConstructorContext'.
520  * @param key peer identity to add to the bloom filter
521  * @param value value the peer information (unused)
522  * @return GNUNET_YES (we should continue to iterate)
523  */
524 static int
525 add_known_to_bloom (void *cls, const struct GNUNET_HashCode * key, void *value)
526 {
527   struct BloomConstructorContext *ctx = cls;
528   struct GNUNET_HashCode mh;
529
530   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
533               GNUNET_h2s (key), ctx->bf_mutator);
534   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Task to send a find peer message for our own peer identifier
541  * so that we can find the closest peers in the network to ourselves
542  * and attempt to connect to them.
543  *
544  * @param cls closure for this task
545  * @param tc the context under which the task is running
546  */
547 static void
548 send_find_peer_message (void *cls,
549                         const struct GNUNET_SCHEDULER_TaskContext *tc)
550 {
551   struct GNUNET_TIME_Relative next_send_time;
552   struct BloomConstructorContext bcc;
553   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
554
555   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
556   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
557     return;
558   if (newly_found_peers > bucket_size)
559   {
560     /* If we are finding many peers already, no need to send out our request right now! */
561     find_peer_task =
562         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
563                                       &send_find_peer_message, NULL);
564     newly_found_peers = 0;
565     return;
566   }
567   bcc.bf_mutator =
568       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
569   bcc.bloom =
570       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
571                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
572   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
573                                          &bcc);
574   GNUNET_STATISTICS_update (GDS_stats,
575                             gettext_noop ("# FIND PEER messages initiated"), 1,
576                             GNUNET_NO);
577   peer_bf =
578       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
579                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
580   // FIXME: pass priority!?
581   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
582                              GNUNET_DHT_RO_FIND_PEER,
583                              FIND_PEER_REPLICATION_LEVEL, 0,
584                              &my_identity.hashPubKey, NULL, 0, bcc.bloom,
585                              bcc.bf_mutator, peer_bf);
586   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
587   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
588   /* schedule next round */
589   next_send_time.rel_value =
590       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
591       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
592                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
593                                 (newly_found_peers + 1));
594   newly_found_peers = 0;
595   find_peer_task =
596       GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
597                                     NULL);
598 }
599
600
601 /**
602  * Method called whenever a peer connects.
603  *
604  * @param cls closure
605  * @param peer peer identity this notification is about
606  * @param atsi performance data
607  * @param atsi_count number of records in 'atsi'
608  */
609 static void
610 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
611                      const struct GNUNET_ATS_Information *atsi,
612                      unsigned int atsi_count)
613 {
614   struct PeerInfo *ret;
615   int peer_bucket;
616
617   /* Check for connect to self message */
618   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
619     return;
620   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n",
621               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
622   if (GNUNET_YES ==
623       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
624                                               &peer->hashPubKey))
625   {
626     GNUNET_break (0);
627     return;
628   }
629   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1,
630                             GNUNET_NO);
631   peer_bucket = find_bucket (&peer->hashPubKey);
632   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
633   ret = GNUNET_malloc (sizeof (struct PeerInfo));
634 #if 0
635   ret->latency = latency;
636   ret->distance = distance;
637 #endif
638   ret->id = *peer;
639   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
640                                     k_buckets[peer_bucket].tail, ret);
641   k_buckets[peer_bucket].peers_size++;
642   closest_bucket = GNUNET_MAX (closest_bucket, peer_bucket);
643   if ((peer_bucket > 0) && (k_buckets[peer_bucket].peers_size <= bucket_size))
644   {
645     ret->preference_task =
646         GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
647     newly_found_peers++;
648   }
649   GNUNET_assert (GNUNET_OK ==
650                  GNUNET_CONTAINER_multihashmap_put (all_known_peers,
651                                                     &peer->hashPubKey, ret,
652                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
653   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
654   {
655     /* got a first connection, good time to start with FIND PEER requests... */
656     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL);
657   }
658 }
659
660
661 /**
662  * Method called whenever a peer disconnects.
663  *
664  * @param cls closure
665  * @param peer peer identity this notification is about
666  */
667 static void
668 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
669 {
670   struct PeerInfo *to_remove;
671   int current_bucket;
672   struct P2PPendingMessage *pos;
673   unsigned int discarded;
674
675   /* Check for disconnect from self message */
676   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
677     return;
678   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n",
679               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
680   to_remove =
681       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
682   if (NULL == to_remove)
683   {
684     GNUNET_break (0);
685     return;
686   }
687   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1,
688                             GNUNET_NO);
689   GNUNET_assert (GNUNET_YES ==
690                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
691                                                        &peer->hashPubKey,
692                                                        to_remove));
693   if (GNUNET_SCHEDULER_NO_TASK != to_remove->preference_task)
694   {
695     GNUNET_SCHEDULER_cancel (to_remove->preference_task);
696     to_remove->preference_task = GNUNET_SCHEDULER_NO_TASK;
697   }
698   current_bucket = find_bucket (&to_remove->id.hashPubKey);
699   GNUNET_assert (current_bucket >= 0);
700   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
701                                k_buckets[current_bucket].tail, to_remove);
702   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
703   k_buckets[current_bucket].peers_size--;
704   while ((closest_bucket > 0) && (k_buckets[closest_bucket].peers_size == 0))
705     closest_bucket--;
706
707   if (to_remove->th != NULL)
708   {
709     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
710     to_remove->th = NULL;
711   }
712   discarded = 0;
713   while (NULL != (pos = to_remove->head))
714   {
715     GNUNET_CONTAINER_DLL_remove (to_remove->head, to_remove->tail, pos);
716     discarded++;
717     GNUNET_free (pos);
718   }
719   GNUNET_STATISTICS_update (GDS_stats,
720                             gettext_noop
721                             ("# Queued messages discarded (peer disconnected)"),
722                             discarded, GNUNET_NO);
723   GNUNET_free (to_remove);
724 }
725
726
727 /**
728  * Called when core is ready to send a message we asked for
729  * out to the destination.
730  *
731  * @param cls the 'struct PeerInfo' of the target peer
732  * @param size number of bytes available in buf
733  * @param buf where the callee should write the message
734  * @return number of bytes written to buf
735  */
736 static size_t
737 core_transmit_notify (void *cls, size_t size, void *buf)
738 {
739   struct PeerInfo *peer = cls;
740   char *cbuf = buf;
741   struct P2PPendingMessage *pending;
742   size_t off;
743   size_t msize;
744
745   peer->th = NULL;
746   while ((NULL != (pending = peer->head)) &&
747          (GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value == 0))
748   {
749     peer->pending_count--;
750     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
751     GNUNET_free (pending);
752   }
753   if (pending == NULL)
754   {
755     /* no messages pending */
756     return 0;
757   }
758   if (buf == NULL)
759   {
760     peer->th =
761         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
762                                            pending->importance,
763                                            GNUNET_TIME_absolute_get_remaining
764                                            (pending->timeout), &peer->id,
765                                            ntohs (pending->msg->size),
766                                            &core_transmit_notify, peer);
767     GNUNET_break (NULL != peer->th);
768     return 0;
769   }
770   off = 0;
771   while ((NULL != (pending = peer->head)) &&
772          (size - off >= (msize = ntohs (pending->msg->size))))
773   {
774     GNUNET_STATISTICS_update (GDS_stats,
775                               gettext_noop
776                               ("# Bytes transmitted to other peers"), msize,
777                               GNUNET_NO);
778     memcpy (&cbuf[off], pending->msg, msize);
779     off += msize;
780     peer->pending_count--;
781     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
782     GNUNET_free (pending);
783   }
784   if (peer->head != NULL)
785   {
786     peer->th =
787         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
788                                            pending->importance,
789                                            GNUNET_TIME_absolute_get_remaining
790                                            (pending->timeout), &peer->id, msize,
791                                            &core_transmit_notify, peer);
792     GNUNET_break (NULL != peer->th);
793   }
794   return off;
795 }
796
797
798 /**
799  * Transmit all messages in the peer's message queue.
800  *
801  * @param peer message queue to process
802  */
803 static void
804 process_peer_queue (struct PeerInfo *peer)
805 {
806   struct P2PPendingMessage *pending;
807
808   if (NULL == (pending = peer->head))
809     return;
810   if (NULL != peer->th)
811     return;
812   GNUNET_STATISTICS_update (GDS_stats,
813                             gettext_noop
814                             ("# Bytes of bandwdith requested from core"),
815                             ntohs (pending->msg->size), GNUNET_NO);
816   peer->th =
817       GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
818                                          pending->importance,
819                                          GNUNET_TIME_absolute_get_remaining
820                                          (pending->timeout), &peer->id,
821                                          ntohs (pending->msg->size),
822                                          &core_transmit_notify, peer);
823   GNUNET_break (NULL != peer->th);
824 }
825
826
827 /**
828  * To how many peers should we (on average) forward the request to
829  * obtain the desired target_replication count (on average).
830  *
831  * @param hop_count number of hops the message has traversed
832  * @param target_replication the number of total paths desired
833  * @return Some number of peers to forward the message to
834  */
835 static unsigned int
836 get_forward_count (uint32_t hop_count, uint32_t target_replication)
837 {
838   uint32_t random_value;
839   uint32_t forward_count;
840   float target_value;
841
842   if (hop_count > GDS_NSE_get () * 6.0)
843   {
844     /* forcefully terminate */
845     return 0;
846   }
847   if (hop_count > GDS_NSE_get () * 4.0)
848   {
849     /* Once we have reached our ideal number of hops, only forward to 1 peer */
850     return 1;
851   }
852   /* bound by system-wide maximum */
853   target_replication =
854       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
855   target_value =
856       1 + (target_replication - 1.0) / (GDS_NSE_get () +
857                                         ((float) (target_replication - 1.0) *
858                                          hop_count));
859   /* Set forward count to floor of target_value */
860   forward_count = (uint32_t) target_value;
861   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
862   target_value = target_value - forward_count;
863   random_value =
864       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
865   if (random_value < (target_value * UINT32_MAX))
866     forward_count++;
867   return forward_count;
868 }
869
870
871 /**
872  * Compute the distance between have and target as a 32-bit value.
873  * Differences in the lower bits must count stronger than differences
874  * in the higher bits.
875  *
876  * @return 0 if have==target, otherwise a number
877  *           that is larger as the distance between
878  *           the two hash codes increases
879  */
880 static unsigned int
881 get_distance (const struct GNUNET_HashCode * target, const struct GNUNET_HashCode * have)
882 {
883   unsigned int bucket;
884   unsigned int msb;
885   unsigned int lsb;
886   unsigned int i;
887
888   /* We have to represent the distance between two 2^9 (=512)-bit
889    * numbers as a 2^5 (=32)-bit number with "0" being used for the
890    * two numbers being identical; furthermore, we need to
891    * guarantee that a difference in the number of matching
892    * bits is always represented in the result.
893    *
894    * We use 2^32/2^9 numerical values to distinguish between
895    * hash codes that have the same LSB bit distance and
896    * use the highest 2^9 bits of the result to signify the
897    * number of (mis)matching LSB bits; if we have 0 matching
898    * and hence 512 mismatching LSB bits we return -1 (since
899    * 512 itself cannot be represented with 9 bits) */
900
901   /* first, calculate the most significant 9 bits of our
902    * result, aka the number of LSBs */
903   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
904   /* bucket is now a value between 0 and 512 */
905   if (bucket == 512)
906     return 0;                   /* perfect match */
907   if (bucket == 0)
908     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
909                                  * below, we'd end up with max+1 (overflow)) */
910
911   /* calculate the most significant bits of the final result */
912   msb = (512 - bucket) << (32 - 9);
913   /* calculate the 32-9 least significant bits of the final result by
914    * looking at the differences in the 32-9 bits following the
915    * mismatching bit at 'bucket' */
916   lsb = 0;
917   for (i = bucket + 1;
918        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
919   {
920     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
921         GNUNET_CRYPTO_hash_get_bit (have, i))
922       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
923                                                  * last bit set will be 31 -- if
924                                                  * i does not reach 512 first... */
925   }
926   return msb | lsb;
927 }
928
929
930 /**
931  * Check whether my identity is closer than any known peers.  If a
932  * non-null bloomfilter is given, check if this is the closest peer
933  * that hasn't already been routed to.
934  *
935  * @param key hash code to check closeness to
936  * @param bloom bloomfilter, exclude these entries from the decision
937  * @return GNUNET_YES if node location is closest,
938  *         GNUNET_NO otherwise.
939  */
940 static int
941 am_closest_peer (const struct GNUNET_HashCode * key,
942                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
943 {
944   int bits;
945   int other_bits;
946   int bucket_num;
947   int count;
948   struct PeerInfo *pos;
949
950   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
951     return GNUNET_YES;
952   bucket_num = find_bucket (key);
953   GNUNET_assert (bucket_num >= 0);
954   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
955   pos = k_buckets[bucket_num].head;
956   count = 0;
957   while ((pos != NULL) && (count < bucket_size))
958   {
959     if ((bloom != NULL) &&
960         (GNUNET_YES ==
961          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
962     {
963       pos = pos->next;
964       continue;                 /* Skip already checked entries */
965     }
966     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
967     if (other_bits > bits)
968       return GNUNET_NO;
969     if (other_bits == bits)     /* We match the same number of bits */
970       return GNUNET_YES;
971     pos = pos->next;
972   }
973   /* No peers closer, we are the closest! */
974   return GNUNET_YES;
975 }
976
977
978 /**
979  * Select a peer from the routing table that would be a good routing
980  * destination for sending a message for "key".  The resulting peer
981  * must not be in the set of blocked peers.<p>
982  *
983  * Note that we should not ALWAYS select the closest peer to the
984  * target, peers further away from the target should be chosen with
985  * exponentially declining probability.
986  *
987  * FIXME: double-check that this is fine
988  *
989  *
990  * @param key the key we are selecting a peer to route to
991  * @param bloom a bloomfilter containing entries this request has seen already
992  * @param hops how many hops has this message traversed thus far
993  * @return Peer to route to, or NULL on error
994  */
995 static struct PeerInfo *
996 select_peer (const struct GNUNET_HashCode * key,
997              const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops)
998 {
999   unsigned int bc;
1000   unsigned int count;
1001   unsigned int selected;
1002   struct PeerInfo *pos;
1003   unsigned int dist;
1004   unsigned int smallest_distance;
1005   struct PeerInfo *chosen;
1006
1007   if (hops >= GDS_NSE_get ())
1008   {
1009     /* greedy selection (closest peer that is not in bloomfilter) */
1010     smallest_distance = UINT_MAX;
1011     chosen = NULL;
1012     for (bc = 0; bc <= closest_bucket; bc++)
1013     {
1014       pos = k_buckets[bc].head;
1015       count = 0;
1016       while ((pos != NULL) && (count < bucket_size))
1017       {
1018         if ((bloom == NULL) ||
1019             (GNUNET_NO ==
1020              GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1021         {
1022           dist = get_distance (key, &pos->id.hashPubKey);
1023           if (dist < smallest_distance)
1024           {
1025             chosen = pos;
1026             smallest_distance = dist;
1027           }
1028         }
1029         else
1030         {
1031           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1033                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1034           GNUNET_STATISTICS_update (GDS_stats,
1035                                     gettext_noop
1036                                     ("# Peers excluded from routing due to Bloomfilter"),
1037                                     1, GNUNET_NO);
1038         }
1039         count++;
1040         pos = pos->next;
1041       }
1042     }
1043     if (NULL == chosen)
1044       GNUNET_STATISTICS_update (GDS_stats,
1045                                 gettext_noop ("# Peer selection failed"), 1,
1046                                 GNUNET_NO);
1047     return chosen;
1048   }
1049
1050   /* select "random" peer */
1051   /* count number of peers that are available and not filtered */
1052   count = 0;
1053   for (bc = 0; bc <= closest_bucket; bc++)
1054   {
1055     pos = k_buckets[bc].head;
1056     while ((pos != NULL) && (count < bucket_size))
1057     {
1058       if ((bloom != NULL) &&
1059           (GNUNET_YES ==
1060            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1061       {
1062         GNUNET_STATISTICS_update (GDS_stats,
1063                                   gettext_noop
1064                                   ("# Peers excluded from routing due to Bloomfilter"),
1065                                   1, GNUNET_NO);
1066         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1067                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1068                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1069         pos = pos->next;
1070         continue;               /* Ignore bloomfiltered peers */
1071       }
1072       count++;
1073       pos = pos->next;
1074     }
1075   }
1076   if (count == 0)               /* No peers to select from! */
1077   {
1078     GNUNET_STATISTICS_update (GDS_stats,
1079                               gettext_noop ("# Peer selection failed"), 1,
1080                               GNUNET_NO);
1081     return NULL;
1082   }
1083   /* Now actually choose a peer */
1084   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1085   count = 0;
1086   for (bc = 0; bc <= closest_bucket; bc++)
1087   {
1088     pos = k_buckets[bc].head;
1089     while ((pos != NULL) && (count < bucket_size))
1090     {
1091       if ((bloom != NULL) &&
1092           (GNUNET_YES ==
1093            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1094       {
1095         pos = pos->next;
1096         continue;               /* Ignore bloomfiltered peers */
1097       }
1098       if (0 == selected--)
1099         return pos;
1100       pos = pos->next;
1101     }
1102   }
1103   GNUNET_break (0);
1104   return NULL;
1105 }
1106
1107
1108 /**
1109  * Compute the set of peers that the given request should be
1110  * forwarded to.
1111  *
1112  * @param key routing key
1113  * @param bloom bloom filter excluding peers as targets, all selected
1114  *        peers will be added to the bloom filter
1115  * @param hop_count number of hops the request has traversed so far
1116  * @param target_replication desired number of replicas
1117  * @param targets where to store an array of target peers (to be
1118  *         free'd by the caller)
1119  * @return number of peers returned in 'targets'.
1120  */
1121 static unsigned int
1122 get_target_peers (const struct GNUNET_HashCode * key,
1123                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1124                   uint32_t hop_count, uint32_t target_replication,
1125                   struct PeerInfo ***targets)
1126 {
1127   unsigned int ret;
1128   unsigned int off;
1129   struct PeerInfo **rtargets;
1130   struct PeerInfo *nxt;
1131
1132   GNUNET_assert (NULL != bloom);
1133   ret = get_forward_count (hop_count, target_replication);
1134   if (ret == 0)
1135   {
1136     *targets = NULL;
1137     return 0;
1138   }
1139   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1140   for (off = 0; off < ret; off++)
1141   {
1142     nxt = select_peer (key, bloom, hop_count);
1143     if (nxt == NULL)
1144       break;
1145     rtargets[off] = nxt;
1146     GNUNET_break (GNUNET_NO ==
1147                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1148                                                      &nxt->id.hashPubKey));
1149     GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
1150   }
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Selected %u/%u peers at hop %u for %s (target was %u)\n", off,
1153               GNUNET_CONTAINER_multihashmap_size (all_known_peers),
1154               (unsigned int) hop_count, GNUNET_h2s (key), ret);
1155   if (0 == off)
1156   {
1157     GNUNET_free (rtargets);
1158     *targets = NULL;
1159     return 0;
1160   }
1161   *targets = rtargets;
1162   return off;
1163 }
1164
1165
1166 /**
1167  * Perform a PUT operation.   Forwards the given request to other
1168  * peers.   Does not store the data locally.  Does not give the
1169  * data to local clients.  May do nothing if this is the only
1170  * peer in the network (or if we are the closest peer in the
1171  * network).
1172  *
1173  * @param type type of the block
1174  * @param options routing options
1175  * @param desired_replication_level desired replication count
1176  * @param expiration_time when does the content expire
1177  * @param hop_count how many hops has this message traversed so far
1178  * @param bf Bloom filter of peers this PUT has already traversed
1179  * @param key key for the content
1180  * @param put_path_length number of entries in put_path
1181  * @param put_path peers this request has traversed so far (if tracked)
1182  * @param data payload to store
1183  * @param data_size number of bytes in data
1184  */
1185 void
1186 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1187                            enum GNUNET_DHT_RouteOption options,
1188                            uint32_t desired_replication_level,
1189                            struct GNUNET_TIME_Absolute expiration_time,
1190                            uint32_t hop_count,
1191                            struct GNUNET_CONTAINER_BloomFilter *bf,
1192                            const struct GNUNET_HashCode * key,
1193                            unsigned int put_path_length,
1194                            struct GNUNET_PeerIdentity *put_path,
1195                            const void *data, size_t data_size)
1196 {
1197   unsigned int target_count;
1198   unsigned int i;
1199   struct PeerInfo **targets;
1200   struct PeerInfo *target;
1201   struct P2PPendingMessage *pending;
1202   size_t msize;
1203   struct PeerPutMessage *ppm;
1204   struct GNUNET_PeerIdentity *pp;
1205
1206   GNUNET_assert (NULL != bf);
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208               "Adding myself (%s) to PUT bloomfilter for %s\n",
1209               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1210   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
1211   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1212                             1, GNUNET_NO);
1213   target_count =
1214       get_target_peers (key, bf, hop_count, desired_replication_level,
1215                         &targets);
1216   if (0 == target_count)
1217   {
1218     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219                 "Routing PUT for %s terminates after %u hops at %s\n",
1220                 GNUNET_h2s (key), (unsigned int) hop_count,
1221                 GNUNET_i2s (&my_identity));
1222     return;
1223   }
1224   msize =
1225       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1226       sizeof (struct PeerPutMessage);
1227   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1228   {
1229     put_path_length = 0;
1230     msize = data_size + sizeof (struct PeerPutMessage);
1231   }
1232   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1233   {
1234     GNUNET_break (0);
1235     GNUNET_free (targets);
1236     return;
1237   }
1238   GNUNET_STATISTICS_update (GDS_stats,
1239                             gettext_noop
1240                             ("# PUT messages queued for transmission"),
1241                             target_count, GNUNET_NO);
1242   for (i = 0; i < target_count; i++)
1243   {
1244     target = targets[i];
1245     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1246     {
1247       GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1248                                 1, GNUNET_NO);
1249       continue; /* skip */
1250     }
1251     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
1253                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1254     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1255     pending->importance = 0;    /* FIXME */
1256     pending->timeout = expiration_time;
1257     ppm = (struct PeerPutMessage *) &pending[1];
1258     pending->msg = &ppm->header;
1259     ppm->header.size = htons (msize);
1260     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1261     ppm->options = htonl (options);
1262     ppm->type = htonl (type);
1263     ppm->hop_count = htonl (hop_count + 1);
1264     ppm->desired_replication_level = htonl (desired_replication_level);
1265     ppm->put_path_length = htonl (put_path_length);
1266     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1267     GNUNET_break (GNUNET_YES ==
1268                   GNUNET_CONTAINER_bloomfilter_test (bf,
1269                                                      &target->id.hashPubKey));
1270     GNUNET_assert (GNUNET_OK ==
1271                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1272                                                               ppm->bloomfilter,
1273                                                               DHT_BLOOM_SIZE));
1274     ppm->key = *key;
1275     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1276     memcpy (pp, put_path,
1277             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1278     memcpy (&pp[put_path_length], data, data_size);
1279     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1280     target->pending_count++;
1281     process_peer_queue (target);
1282   }
1283   GNUNET_free (targets);
1284 }
1285
1286
1287 /**
1288  * Perform a GET operation.  Forwards the given request to other
1289  * peers.  Does not lookup the key locally.  May do nothing if this is
1290  * the only peer in the network (or if we are the closest peer in the
1291  * network).
1292  *
1293  * @param type type of the block
1294  * @param options routing options
1295  * @param desired_replication_level desired replication count
1296  * @param hop_count how many hops did this request traverse so far?
1297  * @param key key for the content
1298  * @param xquery extended query
1299  * @param xquery_size number of bytes in xquery
1300  * @param reply_bf bloomfilter to filter duplicates
1301  * @param reply_bf_mutator mutator for reply_bf
1302  * @param peer_bf filter for peers not to select (again)
1303  */
1304 void
1305 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1306                            enum GNUNET_DHT_RouteOption options,
1307                            uint32_t desired_replication_level,
1308                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1309                            const void *xquery, size_t xquery_size,
1310                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1311                            uint32_t reply_bf_mutator,
1312                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1313 {
1314   unsigned int target_count;
1315   unsigned int i;
1316   struct PeerInfo **targets;
1317   struct PeerInfo *target;
1318   struct P2PPendingMessage *pending;
1319   size_t msize;
1320   struct PeerGetMessage *pgm;
1321   char *xq;
1322   size_t reply_bf_size;
1323
1324   GNUNET_assert (NULL != peer_bf);
1325   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1326                             1, GNUNET_NO);
1327   target_count =
1328       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1329                         &targets);
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "Adding myself (%s) to GET bloomfilter for %s\n",
1332               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1333   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
1334   if (0 == target_count)
1335   {
1336     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337                 "Routing GET for %s terminates after %u hops at %s\n",
1338                 GNUNET_h2s (key), (unsigned int) hop_count,
1339                 GNUNET_i2s (&my_identity));
1340     return;
1341   }
1342   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1343   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1344   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1345   {
1346     GNUNET_break (0);
1347     GNUNET_free (targets);
1348     return;
1349   }
1350   GNUNET_STATISTICS_update (GDS_stats,
1351                             gettext_noop
1352                             ("# GET messages queued for transmission"),
1353                             target_count, GNUNET_NO);
1354   /* forward request */
1355   for (i = 0; i < target_count; i++)
1356   {
1357     target = targets[i];
1358     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1359     {
1360       GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1361                                 1, GNUNET_NO);
1362       continue; /* skip */
1363     }
1364     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1365                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
1366                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1367     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1368     pending->importance = 0;    /* FIXME */
1369     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1370     pgm = (struct PeerGetMessage *) &pending[1];
1371     pending->msg = &pgm->header;
1372     pgm->header.size = htons (msize);
1373     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1374     pgm->options = htonl (options);
1375     pgm->type = htonl (type);
1376     pgm->hop_count = htonl (hop_count + 1);
1377     pgm->desired_replication_level = htonl (desired_replication_level);
1378     pgm->xquery_size = htonl (xquery_size);
1379     pgm->bf_mutator = reply_bf_mutator;
1380     GNUNET_break (GNUNET_YES ==
1381                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1382                                                      &target->id.hashPubKey));
1383     GNUNET_assert (GNUNET_OK ==
1384                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1385                                                               pgm->bloomfilter,
1386                                                               DHT_BLOOM_SIZE));
1387     pgm->key = *key;
1388     xq = (char *) &pgm[1];
1389     memcpy (xq, xquery, xquery_size);
1390     if (NULL != reply_bf)
1391       GNUNET_assert (GNUNET_OK ==
1392                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1393                                                                 &xq
1394                                                                 [xquery_size],
1395                                                                 reply_bf_size));
1396     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1397     target->pending_count++;
1398     process_peer_queue (target);
1399   }
1400   GNUNET_free (targets);
1401 }
1402
1403
1404 /**
1405  * Handle a reply (route to origin).  Only forwards the reply back to
1406  * the given peer.  Does not do local caching or forwarding to local
1407  * clients.
1408  *
1409  * @param target neighbour that should receive the block (if still connected)
1410  * @param type type of the block
1411  * @param expiration_time when does the content expire
1412  * @param key key for the content
1413  * @param put_path_length number of entries in put_path
1414  * @param put_path peers the original PUT traversed (if tracked)
1415  * @param get_path_length number of entries in put_path
1416  * @param get_path peers this reply has traversed so far (if tracked)
1417  * @param data payload of the reply
1418  * @param data_size number of bytes in data
1419  */
1420 void
1421 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1422                              enum GNUNET_BLOCK_Type type,
1423                              struct GNUNET_TIME_Absolute expiration_time,
1424                              const struct GNUNET_HashCode * key,
1425                              unsigned int put_path_length,
1426                              const struct GNUNET_PeerIdentity *put_path,
1427                              unsigned int get_path_length,
1428                              const struct GNUNET_PeerIdentity *get_path,
1429                              const void *data, size_t data_size)
1430 {
1431   struct PeerInfo *pi;
1432   struct P2PPendingMessage *pending;
1433   size_t msize;
1434   struct PeerResultMessage *prm;
1435   struct GNUNET_PeerIdentity *paths;
1436
1437   msize =
1438       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1439                                                        put_path_length) *
1440       sizeof (struct GNUNET_PeerIdentity);
1441   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1442       (get_path_length >
1443        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1444       (put_path_length >
1445        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1446       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1447   {
1448     GNUNET_break (0);
1449     return;
1450   }
1451   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers, &target->hashPubKey);
1452   if (NULL == pi)
1453   {
1454     /* peer disconnected in the meantime, drop reply */
1455     return;
1456   }
1457   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
1458   {
1459     /* skip */
1460     GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1461                               1, GNUNET_NO);
1462     return;
1463   }
1464
1465   GNUNET_STATISTICS_update (GDS_stats,
1466                             gettext_noop
1467                             ("# RESULT messages queued for transmission"), 1,
1468                             GNUNET_NO);
1469   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1470   pending->importance = 0;      /* FIXME */
1471   pending->timeout = expiration_time;
1472   prm = (struct PeerResultMessage *) &pending[1];
1473   pending->msg = &prm->header;
1474   prm->header.size = htons (msize);
1475   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1476   prm->type = htonl (type);
1477   prm->put_path_length = htonl (put_path_length);
1478   prm->get_path_length = htonl (get_path_length);
1479   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1480   prm->key = *key;
1481   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1482   memcpy (paths, put_path,
1483           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1484   memcpy (&paths[put_path_length], get_path,
1485           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1486   memcpy (&paths[put_path_length + get_path_length], data, data_size);
1487   GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
1488   pi->pending_count++;
1489   process_peer_queue (pi);
1490 }
1491
1492
1493 /**
1494  * To be called on core init/fail.
1495  *
1496  * @param cls service closure
1497  * @param server handle to the server for this service
1498  * @param identity the public identity of this peer
1499  */
1500 static void
1501 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1502            const struct GNUNET_PeerIdentity *identity)
1503 {
1504   GNUNET_assert (server != NULL);
1505   my_identity = *identity;
1506 }
1507
1508
1509 /**
1510  * Core handler for p2p put requests.
1511  *
1512  * @param cls closure
1513  * @param peer sender of the request
1514  * @param message message
1515  * @param peer peer identity this notification is about
1516  * @param atsi performance data
1517  * @param atsi_count number of records in 'atsi'
1518  * @return GNUNET_OK to keep the connection open,
1519  *         GNUNET_SYSERR to close it (signal serious error)
1520  */
1521 static int
1522 handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1523                     const struct GNUNET_MessageHeader *message,
1524                     const struct GNUNET_ATS_Information *atsi,
1525                     unsigned int atsi_count)
1526 {
1527   const struct PeerPutMessage *put;
1528   const struct GNUNET_PeerIdentity *put_path;
1529   const void *payload;
1530   uint32_t putlen;
1531   uint16_t msize;
1532   size_t payload_size;
1533   enum GNUNET_DHT_RouteOption options;
1534   struct GNUNET_CONTAINER_BloomFilter *bf;
1535   struct GNUNET_HashCode test_key;
1536
1537   msize = ntohs (message->size);
1538   if (msize < sizeof (struct PeerPutMessage))
1539   {
1540     GNUNET_break_op (0);
1541     return GNUNET_YES;
1542   }
1543   put = (const struct PeerPutMessage *) message;
1544   putlen = ntohl (put->put_path_length);
1545   if ((msize <
1546        sizeof (struct PeerPutMessage) +
1547        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1548       (putlen >
1549        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1550   {
1551     GNUNET_break_op (0);
1552     return GNUNET_YES;
1553   }
1554   GNUNET_STATISTICS_update (GDS_stats,
1555                             gettext_noop ("# P2P PUT requests received"), 1,
1556                             GNUNET_NO);
1557   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1558   payload = &put_path[putlen];
1559   options = ntohl (put->options);
1560   payload_size =
1561       msize - (sizeof (struct PeerPutMessage) +
1562                putlen * sizeof (struct GNUNET_PeerIdentity));
1563   switch (GNUNET_BLOCK_get_key
1564           (GDS_block_context, ntohl (put->type), payload, payload_size,
1565            &test_key))
1566   {
1567   case GNUNET_YES:
1568     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
1569     {
1570       GNUNET_break_op (0);
1571       return GNUNET_YES;
1572     }
1573     break;
1574   case GNUNET_NO:
1575     GNUNET_break_op (0);
1576     return GNUNET_YES;
1577   case GNUNET_SYSERR:
1578     /* cannot verify, good luck */
1579     break;
1580   }
1581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1582               GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1583   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
1584                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1585   GNUNET_break_op (GNUNET_YES ==
1586                    GNUNET_CONTAINER_bloomfilter_test (bf, &peer->hashPubKey));
1587   {
1588     struct GNUNET_PeerIdentity pp[putlen + 1];
1589
1590     /* extend 'put path' by sender */
1591     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1592     {
1593       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1594       pp[putlen] = *peer;
1595       putlen++;
1596     }
1597     else
1598       putlen = 0;
1599
1600     /* give to local clients */
1601     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1602                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1603                               payload_size, payload);
1604     /* store locally */
1605     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1606         (am_closest_peer (&put->key, bf)))
1607       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1608                                 (put->expiration_time), &put->key, putlen, pp,
1609                                 ntohl (put->type), payload_size, payload);
1610     /* route to other peers */
1611     GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
1612                                ntohl (put->desired_replication_level),
1613                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1614                                ntohl (put->hop_count), bf, &put->key, putlen,
1615                                pp, payload, payload_size);
1616   }
1617   GNUNET_CONTAINER_bloomfilter_free (bf);
1618   GDS_CLIENTS_process_put (options,
1619                            ntohl (put->type),
1620                            ntohl (put->hop_count),
1621                            ntohl (put->desired_replication_level),
1622                            putlen, put_path,
1623                            GNUNET_TIME_absolute_ntoh (put->expiration_time),
1624                            &put->key,
1625                            payload,
1626                            payload_size);
1627   return GNUNET_YES;
1628 }
1629
1630
1631 /**
1632  * We have received a FIND PEER request.  Send matching
1633  * HELLOs back.
1634  *
1635  * @param sender sender of the FIND PEER request
1636  * @param key peers close to this key are desired
1637  * @param bf peers matching this bf are excluded
1638  * @param bf_mutator mutator for bf
1639  */
1640 static void
1641 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1642                   const struct GNUNET_HashCode * key,
1643                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
1644 {
1645   int bucket_idx;
1646   struct PeerBucket *bucket;
1647   struct PeerInfo *peer;
1648   unsigned int choice;
1649   struct GNUNET_HashCode mhash;
1650   const struct GNUNET_HELLO_Message *hello;
1651
1652   /* first, check about our own HELLO */
1653   if (NULL != GDS_my_hello)
1654   {
1655     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1656     if ((NULL == bf) ||
1657         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1658     {
1659       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1660                                    GNUNET_TIME_relative_to_absolute
1661                                    (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1662                                    key, 0, NULL, 0, NULL, GDS_my_hello,
1663                                    GNUNET_HELLO_size ((const struct
1664                                                        GNUNET_HELLO_Message *)
1665                                                       GDS_my_hello));
1666     }
1667     else
1668     {
1669       GNUNET_STATISTICS_update (GDS_stats,
1670                                 gettext_noop
1671                                 ("# FIND PEER requests ignored due to Bloomfilter"),
1672                                 1, GNUNET_NO);
1673     }
1674   }
1675   else
1676   {
1677     GNUNET_STATISTICS_update (GDS_stats,
1678                               gettext_noop
1679                               ("# FIND PEER requests ignored due to lack of HELLO"),
1680                               1, GNUNET_NO);
1681   }
1682
1683   /* then, also consider sending a random HELLO from the closest bucket */
1684   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
1685     bucket_idx = closest_bucket;
1686   else
1687     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
1688   if (bucket_idx == GNUNET_SYSERR)
1689     return;
1690   bucket = &k_buckets[bucket_idx];
1691   if (bucket->peers_size == 0)
1692     return;
1693   choice =
1694       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
1695   peer = bucket->head;
1696   while (choice > 0)
1697   {
1698     GNUNET_assert (peer != NULL);
1699     peer = peer->next;
1700     choice--;
1701   }
1702   choice = bucket->peers_size;
1703   do
1704   {
1705     peer = peer->next;
1706     if (choice-- == 0)
1707       return;                   /* no non-masked peer available */
1708     if (peer == NULL)
1709       peer = bucket->head;
1710     GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1711     hello = GDS_HELLO_get (&peer->id);
1712   }
1713   while ((hello == NULL) ||
1714          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
1715   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1716                                GNUNET_TIME_relative_to_absolute
1717                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
1718                                0, NULL, 0, NULL, hello,
1719                                GNUNET_HELLO_size (hello));
1720 }
1721
1722
1723 /**
1724  * Core handler for p2p get requests.
1725  *
1726  * @param cls closure
1727  * @param peer sender of the request
1728  * @param message message
1729  * @param peer peer identity this notification is about
1730  * @param atsi performance data
1731  * @param atsi_count number of records in 'atsi'
1732  * @return GNUNET_OK to keep the connection open,
1733  *         GNUNET_SYSERR to close it (signal serious error)
1734  */
1735 static int
1736 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1737                     const struct GNUNET_MessageHeader *message,
1738                     const struct GNUNET_ATS_Information *atsi,
1739                     unsigned int atsi_count)
1740 {
1741   struct PeerGetMessage *get;
1742   uint32_t xquery_size;
1743   size_t reply_bf_size;
1744   uint16_t msize;
1745   enum GNUNET_BLOCK_Type type;
1746   enum GNUNET_DHT_RouteOption options;
1747   enum GNUNET_BLOCK_EvaluationResult eval;
1748   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1749   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1750   const char *xquery;
1751
1752   GNUNET_break (0 !=
1753                 memcmp (peer, &my_identity,
1754                         sizeof (struct GNUNET_PeerIdentity)));
1755   /* parse and validate message */
1756   msize = ntohs (message->size);
1757   if (msize < sizeof (struct PeerGetMessage))
1758   {
1759     GNUNET_break_op (0);
1760     return GNUNET_YES;
1761   }
1762   get = (struct PeerGetMessage *) message;
1763   xquery_size = ntohl (get->xquery_size);
1764   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1765   {
1766     GNUNET_break_op (0);
1767     return GNUNET_YES;
1768   }
1769   GNUNET_STATISTICS_update (GDS_stats,
1770                             gettext_noop ("# P2P GET requests received"), 1,
1771                             GNUNET_NO);
1772   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1773   type = ntohl (get->type);
1774   options = ntohl (get->options);
1775   xquery = (const char *) &get[1];
1776   reply_bf = NULL;
1777   if (reply_bf_size > 0)
1778     reply_bf =
1779         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
1780                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
1781   eval =
1782       GNUNET_BLOCK_evaluate (GDS_block_context, type, &get->key, &reply_bf,
1783                              get->bf_mutator, xquery, xquery_size, NULL, 0);
1784   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1785   {
1786     /* request invalid or block type not supported */
1787     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1788     if (NULL != reply_bf)
1789       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1790     return GNUNET_YES;
1791   }
1792   peer_bf =
1793       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
1794                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
1795   GNUNET_break_op (GNUNET_YES ==
1796                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1797                                                       &peer->hashPubKey));
1798   /* remember request for routing replies */
1799   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
1800                    reply_bf, get->bf_mutator);
1801   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n",
1802               GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity),
1803               (unsigned int) ntohl (get->hop_count));
1804   /* local lookup (this may update the reply_bf) */
1805   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1806       (am_closest_peer (&get->key, peer_bf)))
1807   {
1808     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1809     {
1810       GNUNET_STATISTICS_update (GDS_stats,
1811                                 gettext_noop
1812                                 ("# P2P FIND PEER requests processed"), 1,
1813                                 GNUNET_NO);
1814       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
1815     }
1816     else
1817     {
1818       eval =
1819           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
1820                                     &reply_bf, get->bf_mutator);
1821     }
1822   }
1823   else
1824   {
1825     GNUNET_STATISTICS_update (GDS_stats,
1826                               gettext_noop ("# P2P GET requests ONLY routed"),
1827                               1, GNUNET_NO);
1828   }
1829
1830   /* FIXME Path */
1831   GDS_CLIENTS_process_get (options,
1832                            type,
1833                            ntohl(get->hop_count),
1834                            ntohl(get->desired_replication_level),
1835                            0, NULL,
1836                            &get->key);
1837
1838   /* P2P forwarding */
1839   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1840     GDS_NEIGHBOURS_handle_get (type, options,
1841                                ntohl (get->desired_replication_level),
1842                                ntohl (get->hop_count), &get->key, xquery,
1843                                xquery_size, reply_bf, get->bf_mutator, peer_bf);
1844   /* clean up */
1845   if (NULL != reply_bf)
1846     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1847   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
1848   return GNUNET_YES;
1849 }
1850
1851
1852 /**
1853  * Core handler for p2p result messages.
1854  *
1855  * @param cls closure
1856  * @param message message
1857  * @param peer peer identity this notification is about
1858  * @param atsi performance data
1859  * @param atsi_count number of records in 'atsi'
1860  * @return GNUNET_YES (do not cut p2p connection)
1861  */
1862 static int
1863 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1864                        const struct GNUNET_MessageHeader *message,
1865                        const struct GNUNET_ATS_Information *atsi,
1866                        unsigned int atsi_count)
1867 {
1868   const struct PeerResultMessage *prm;
1869   const struct GNUNET_PeerIdentity *put_path;
1870   const struct GNUNET_PeerIdentity *get_path;
1871   const void *data;
1872   uint32_t get_path_length;
1873   uint32_t put_path_length;
1874   uint16_t msize;
1875   size_t data_size;
1876   enum GNUNET_BLOCK_Type type;
1877
1878   /* parse and validate message */
1879   msize = ntohs (message->size);
1880   if (msize < sizeof (struct PeerResultMessage))
1881   {
1882     GNUNET_break_op (0);
1883     return GNUNET_YES;
1884   }
1885   prm = (struct PeerResultMessage *) message;
1886   put_path_length = ntohl (prm->put_path_length);
1887   get_path_length = ntohl (prm->get_path_length);
1888   if ((msize <
1889        sizeof (struct PeerResultMessage) + (get_path_length +
1890                                             put_path_length) *
1891        sizeof (struct GNUNET_PeerIdentity)) ||
1892       (get_path_length >
1893        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1894       (put_path_length >
1895        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1896   {
1897     GNUNET_break_op (0);
1898     return GNUNET_YES;
1899   }
1900   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
1901                             1, GNUNET_NO);
1902   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
1903   get_path = &put_path[put_path_length];
1904   type = ntohl (prm->type);
1905   data = (const void *) &get_path[get_path_length];
1906   data_size =
1907       msize - (sizeof (struct PeerResultMessage) +
1908                (get_path_length +
1909                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1910
1911   /* if we got a HELLO, consider it for our own routing table */
1912   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1913   {
1914     const struct GNUNET_MessageHeader *h;
1915     struct GNUNET_PeerIdentity pid;
1916     int bucket;
1917
1918     /* Should be a HELLO, validate and consider using it! */
1919     if (data_size < sizeof (struct GNUNET_MessageHeader))
1920     {
1921       GNUNET_break_op (0);
1922       return GNUNET_YES;
1923     }
1924     h = data;
1925     if (data_size != ntohs (h->size))
1926     {
1927       GNUNET_break_op (0);
1928       return GNUNET_YES;
1929     }
1930     if (GNUNET_OK !=
1931         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid))
1932     {
1933       GNUNET_break_op (0);
1934       return GNUNET_YES;
1935     }
1936     if (0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity)))
1937     {
1938       bucket = find_bucket (&pid.hashPubKey);
1939       if ((bucket >= 0) && (k_buckets[bucket].peers_size < bucket_size))
1940       {
1941         if (NULL != GDS_transport_handle)
1942         {
1943           GNUNET_TRANSPORT_offer_hello (GDS_transport_handle, h, NULL, NULL);
1944           GNUNET_TRANSPORT_try_connect (GDS_transport_handle, &pid);
1945         }
1946       }
1947     }
1948   }
1949
1950   /* append 'peer' to 'get_path' */
1951   {
1952     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
1953
1954     memcpy (xget_path, get_path,
1955             get_path_length * sizeof (struct GNUNET_PeerIdentity));
1956     xget_path[get_path_length] = *peer;
1957     get_path_length++;
1958
1959     /* forward to local clients */
1960     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1961                               &prm->key, get_path_length, xget_path,
1962                               put_path_length, put_path, type, data_size, data);
1963
1964     /* forward to other peers */
1965     GDS_ROUTING_process (type, GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1966                          &prm->key, put_path_length, put_path, get_path_length,
1967                          xget_path, data, data_size);
1968   }
1969
1970   GDS_CLIENTS_process_get_resp (type,
1971                                 get_path,
1972                                 get_path_length,
1973                                 put_path,
1974                                 put_path_length,
1975                                 GNUNET_TIME_absolute_ntoh (
1976                                   prm->expiration_time),
1977                                 &prm->key,
1978                                 data,
1979                                 data_size);
1980
1981   return GNUNET_YES;
1982 }
1983
1984
1985 /**
1986  * Initialize neighbours subsystem.
1987  *
1988  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1989  */
1990 int
1991 GDS_NEIGHBOURS_init ()
1992 {
1993   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1994     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1995     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1996     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1997     {NULL, 0, 0}
1998   };
1999   unsigned long long temp_config_num;
2000
2001   if (GNUNET_OK ==
2002       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2003                                              &temp_config_num))
2004     bucket_size = (unsigned int) temp_config_num;
2005   atsAPI = GNUNET_ATS_performance_init (GDS_cfg, NULL, NULL);
2006   coreAPI =
2007       GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect,
2008                            &handle_core_disconnect, NULL, GNUNET_NO, NULL,
2009                            GNUNET_NO, core_handlers);
2010   if (coreAPI == NULL)
2011     return GNUNET_SYSERR;
2012   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
2013   return GNUNET_OK;
2014 }
2015
2016
2017 /**
2018  * Shutdown neighbours subsystem.
2019  */
2020 void
2021 GDS_NEIGHBOURS_done ()
2022 {
2023   if (coreAPI == NULL)
2024     return;
2025   GNUNET_CORE_disconnect (coreAPI);
2026   coreAPI = NULL;
2027   GNUNET_ATS_performance_done (atsAPI);
2028   atsAPI = NULL;
2029   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
2030   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
2031   all_known_peers = NULL;
2032   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
2033   {
2034     GNUNET_SCHEDULER_cancel (find_peer_task);
2035     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
2036   }
2037 }
2038
2039 /**
2040  * Get the ID of the local node.
2041  * 
2042  * @return identity of the local node
2043  */
2044 struct GNUNET_PeerIdentity *
2045 GDS_NEIGHBOURS_get_id ()
2046 {
2047     return &my_identity;
2048 }
2049
2050
2051 /* end of gnunet-service-dht_neighbours.c */