added ATS addresstype information to unix
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_ats_service.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50 #include "dht.h"
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * How often to update our preference levels for peers in our routing tables.
74  */
75 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92
93 /**
94  * P2P PUT message
95  */
96 struct PeerPutMessage
97 {
98   /**
99    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
100    */
101   struct GNUNET_MessageHeader header;
102
103   /**
104    * Processing options
105    */
106   uint32_t options GNUNET_PACKED;
107
108   /**
109    * Content type.
110    */
111   uint32_t type GNUNET_PACKED;
112
113   /**
114    * Hop count
115    */
116   uint32_t hop_count GNUNET_PACKED;
117
118   /**
119    * Replication level for this message
120    */
121   uint32_t desired_replication_level GNUNET_PACKED;
122
123   /**
124    * Length of the PUT path that follows (if tracked).
125    */
126   uint32_t put_path_length GNUNET_PACKED;
127
128   /**
129    * When does the content expire?
130    */
131   struct GNUNET_TIME_AbsoluteNBO expiration_time;
132
133   /**
134    * Bloomfilter (for peer identities) to stop circular routes
135    */
136   char bloomfilter[DHT_BLOOM_SIZE];
137
138   /**
139    * The key we are storing under.
140    */
141   GNUNET_HashCode key;
142
143   /* put path (if tracked) */
144
145   /* Payload */
146
147 };
148
149
150 /**
151  * P2P Result message
152  */
153 struct PeerResultMessage
154 {
155   /**
156    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
157    */
158   struct GNUNET_MessageHeader header;
159
160   /**
161    * Content type.
162    */
163   uint32_t type GNUNET_PACKED;
164
165   /**
166    * Length of the PUT path that follows (if tracked).
167    */
168   uint32_t put_path_length GNUNET_PACKED;
169
170   /**
171    * Length of the GET path that follows (if tracked).
172    */
173   uint32_t get_path_length GNUNET_PACKED;
174
175   /**
176    * When does the content expire?
177    */
178   struct GNUNET_TIME_AbsoluteNBO expiration_time;
179
180   /**
181    * The key of the corresponding GET request.
182    */
183   GNUNET_HashCode key;
184
185   /* put path (if tracked) */
186
187   /* get path (if tracked) */
188
189   /* Payload */
190
191 };
192
193
194 /**
195  * P2P GET message
196  */
197 struct PeerGetMessage
198 {
199   /**
200    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
201    */
202   struct GNUNET_MessageHeader header;
203
204   /**
205    * Processing options
206    */
207   uint32_t options GNUNET_PACKED;
208
209   /**
210    * Desired content type.
211    */
212   uint32_t type GNUNET_PACKED;
213
214   /**
215    * Hop count
216    */
217   uint32_t hop_count GNUNET_PACKED;
218
219   /**
220    * Desired replication level for this request.
221    */
222   uint32_t desired_replication_level GNUNET_PACKED;
223
224   /**
225    * Size of the extended query.
226    */
227   uint32_t xquery_size;
228
229   /**
230    * Bloomfilter mutator.
231    */
232   uint32_t bf_mutator;
233
234   /**
235    * Bloomfilter (for peer identities) to stop circular routes
236    */
237   char bloomfilter[DHT_BLOOM_SIZE];
238
239   /**
240    * The key we are looking for.
241    */
242   GNUNET_HashCode key;
243
244   /* xquery */
245
246   /* result bloomfilter */
247
248 };
249
250
251 /**
252  * Linked list of messages to send to a particular other peer.
253  */
254 struct P2PPendingMessage
255 {
256   /**
257    * Pointer to next item in the list
258    */
259   struct P2PPendingMessage *next;
260
261   /**
262    * Pointer to previous item in the list
263    */
264   struct P2PPendingMessage *prev;
265
266   /**
267    * Message importance level.  FIXME: used? useful?
268    */
269   unsigned int importance;
270
271   /**
272    * When does this message time out?
273    */
274   struct GNUNET_TIME_Absolute timeout;
275
276   /**
277    * Actual message to be sent, allocated at the end of the struct:
278    * // msg = (cast) &pm[1];
279    * // memcpy (&pm[1], data, len);
280    */
281   const struct GNUNET_MessageHeader *msg;
282
283 };
284
285
286 /**
287  * Entry for a peer in a bucket.
288  */
289 struct PeerInfo
290 {
291   /**
292    * Next peer entry (DLL)
293    */
294   struct PeerInfo *next;
295
296   /**
297    *  Prev peer entry (DLL)
298    */
299   struct PeerInfo *prev;
300
301   /**
302    * Count of outstanding messages for peer.  FIXME: NEEDED?
303    * FIXME: bound queue size!?
304    */
305   unsigned int pending_count;
306
307   /**
308    * Head of pending messages to be sent to this peer.
309    */
310   struct P2PPendingMessage *head;
311
312   /**
313    * Tail of pending messages to be sent to this peer.
314    */
315   struct P2PPendingMessage *tail;
316
317   /**
318    * Core handle for sending messages to this peer.
319    */
320   struct GNUNET_CORE_TransmitHandle *th;
321
322   /**
323    * Task for scheduling preference updates
324    */
325   GNUNET_SCHEDULER_TaskIdentifier preference_task;
326
327   /**
328    * What is the identity of the peer?
329    */
330   struct GNUNET_PeerIdentity id;
331
332 #if 0
333   /**
334    * What is the average latency for replies received?
335    */
336   struct GNUNET_TIME_Relative latency;
337
338   /**
339    * Transport level distance to peer.
340    */
341   unsigned int distance;
342 #endif
343
344 };
345
346
347 /**
348  * Peers are grouped into buckets.
349  */
350 struct PeerBucket
351 {
352   /**
353    * Head of DLL
354    */
355   struct PeerInfo *head;
356
357   /**
358    * Tail of DLL
359    */
360   struct PeerInfo *tail;
361
362   /**
363    * Number of peers in the bucket.
364    */
365   unsigned int peers_size;
366 };
367
368
369 /**
370  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
371  */
372 static unsigned int closest_bucket;
373
374 /**
375  * How many peers have we added since we sent out our last
376  * find peer request?
377  */
378 static unsigned int newly_found_peers;
379
380 /**
381  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
382  */
383 static struct PeerBucket k_buckets[MAX_BUCKETS];
384
385 /**
386  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
387  */
388 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
389
390 /**
391  * Maximum size for each bucket.
392  */
393 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
394
395 /**
396  * Task that sends FIND PEER requests.
397  */
398 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
399
400 /**
401  * Identity of this peer.
402  */
403 static struct GNUNET_PeerIdentity my_identity;
404
405 /**
406  * Handle to CORE.
407  */
408 static struct GNUNET_CORE_Handle *coreAPI;
409
410 /**
411  * Handle to ATS.
412  */
413 static struct GNUNET_ATS_PerformanceHandle *atsAPI;
414
415
416
417 /**
418  * Find the optimal bucket for this key.
419  *
420  * @param hc the hashcode to compare our identity to
421  * @return the proper bucket index, or GNUNET_SYSERR
422  *         on error (same hashcode)
423  */
424 static int
425 find_bucket (const GNUNET_HashCode * hc)
426 {
427   unsigned int bits;
428
429   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
430   if (bits == MAX_BUCKETS)
431   {
432     /* How can all bits match? Got my own ID? */
433     GNUNET_break (0);
434     return GNUNET_SYSERR;
435   }
436   return MAX_BUCKETS - bits - 1;
437 }
438
439
440 /**
441  * Let GNUnet core know that we like the given peer.
442  *
443  * @param cls the 'struct PeerInfo' of the peer
444  * @param tc scheduler context.
445  */
446 static void
447 update_core_preference (void *cls,
448                         const struct GNUNET_SCHEDULER_TaskContext *tc)
449 {
450   struct PeerInfo *peer = cls;
451   uint64_t preference;
452   unsigned int matching;
453   int bucket;
454
455   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
456   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
457     return;
458   matching =
459       GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
460                                         &peer->id.hashPubKey);
461   if (matching >= 64)
462     matching = 63;
463   bucket = find_bucket (&peer->id.hashPubKey);
464   if (bucket == GNUNET_SYSERR)
465     preference = 0;
466   else
467   {
468     GNUNET_assert (k_buckets[bucket].peers_size != 0);
469     preference = (1LL << matching) / k_buckets[bucket].peers_size;
470   }
471   if (preference == 0)
472   {
473     peer->preference_task =
474         GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
475                                       &update_core_preference, peer);
476     return;
477   }
478   GNUNET_STATISTICS_update (GDS_stats,
479                             gettext_noop ("# Preference updates given to core"),
480                             1, GNUNET_NO);
481   GNUNET_ATS_change_preference (atsAPI, &peer->id,
482                                 GNUNET_ATS_PREFERENCE_BANDWIDTH,
483                                 (double) preference, GNUNET_ATS_PREFERENCE_END);
484   peer->preference_task =
485       GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
486                                     &update_core_preference, peer);
487
488
489 }
490
491
492 /**
493  * Closure for 'add_known_to_bloom'.
494  */
495 struct BloomConstructorContext
496 {
497   /**
498    * Bloom filter under construction.
499    */
500   struct GNUNET_CONTAINER_BloomFilter *bloom;
501
502   /**
503    * Mutator to use.
504    */
505   uint32_t bf_mutator;
506 };
507
508
509 /**
510  * Add each of the peers we already know to the bloom filter of
511  * the request so that we don't get duplicate HELLOs.
512  *
513  * @param cls the 'struct BloomConstructorContext'.
514  * @param key peer identity to add to the bloom filter
515  * @param value value the peer information (unused)
516  * @return GNUNET_YES (we should continue to iterate)
517  */
518 static int
519 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
520 {
521   struct BloomConstructorContext *ctx = cls;
522   GNUNET_HashCode mh;
523
524   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
525 #if DEBUG_DHT
526   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
527               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
528               GNUNET_h2s (key), ctx->bf_mutator);
529 #endif
530   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
531   return GNUNET_YES;
532 }
533
534
535 /**
536  * Task to send a find peer message for our own peer identifier
537  * so that we can find the closest peers in the network to ourselves
538  * and attempt to connect to them.
539  *
540  * @param cls closure for this task
541  * @param tc the context under which the task is running
542  */
543 static void
544 send_find_peer_message (void *cls,
545                         const struct GNUNET_SCHEDULER_TaskContext *tc)
546 {
547   struct GNUNET_TIME_Relative next_send_time;
548   struct BloomConstructorContext bcc;
549   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
550
551   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
552   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
553     return;
554   if (newly_found_peers > bucket_size)
555   {
556     /* If we are finding many peers already, no need to send out our request right now! */
557     find_peer_task =
558         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
559                                       &send_find_peer_message, NULL);
560     newly_found_peers = 0;
561     return;
562   }
563   bcc.bf_mutator =
564       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
565   bcc.bloom =
566       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
567                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
568   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
569                                          &bcc);
570   GNUNET_STATISTICS_update (GDS_stats,
571                             gettext_noop ("# FIND PEER messages initiated"), 1,
572                             GNUNET_NO);
573   peer_bf =
574       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
575                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
576   // FIXME: pass priority!?
577   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
578                              GNUNET_DHT_RO_FIND_PEER,
579                              FIND_PEER_REPLICATION_LEVEL, 0,
580                              &my_identity.hashPubKey, NULL, 0, bcc.bloom,
581                              bcc.bf_mutator, peer_bf);
582   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
583   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
584   /* schedule next round */
585   next_send_time.rel_value =
586       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
587       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
588                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
589                                 (newly_found_peers + 1));
590   newly_found_peers = 0;
591   find_peer_task =
592       GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
593                                     NULL);
594 }
595
596
597 /**
598  * Method called whenever a peer connects.
599  *
600  * @param cls closure
601  * @param peer peer identity this notification is about
602  * @param atsi performance data
603  * @param atsi_count number of records in 'atsi'
604  */
605 static void
606 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
607                      const struct GNUNET_ATS_Information *atsi,
608                      unsigned int atsi_count)
609 {
610   struct PeerInfo *ret;
611   int peer_bucket;
612
613   /* Check for connect to self message */
614   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
615     return;
616 #if DEBUG_DHT
617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n",
618               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
619 #endif
620   if (GNUNET_YES ==
621       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
622                                               &peer->hashPubKey))
623   {
624     GNUNET_break (0);
625     return;
626   }
627   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), 1,
628                             GNUNET_NO);
629   peer_bucket = find_bucket (&peer->hashPubKey);
630   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
631   ret = GNUNET_malloc (sizeof (struct PeerInfo));
632 #if 0
633   ret->latency = latency;
634   ret->distance = distance;
635 #endif
636   ret->id = *peer;
637   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
638                                     k_buckets[peer_bucket].tail, ret);
639   k_buckets[peer_bucket].peers_size++;
640   closest_bucket = GNUNET_MAX (closest_bucket, peer_bucket);
641   if ((peer_bucket > 0) && (k_buckets[peer_bucket].peers_size <= bucket_size))
642   {
643     ret->preference_task =
644         GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
645     newly_found_peers++;
646   }
647   GNUNET_assert (GNUNET_OK ==
648                  GNUNET_CONTAINER_multihashmap_put (all_known_peers,
649                                                     &peer->hashPubKey, ret,
650                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
651   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
652   {
653     /* got a first connection, good time to start with FIND PEER requests... */
654     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL);
655   }
656 }
657
658
659 /**
660  * Method called whenever a peer disconnects.
661  *
662  * @param cls closure
663  * @param peer peer identity this notification is about
664  */
665 static void
666 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
667 {
668   struct PeerInfo *to_remove;
669   int current_bucket;
670   struct P2PPendingMessage *pos;
671   unsigned int discarded;
672
673   /* Check for disconnect from self message */
674   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
675     return;
676 #if DEBUG_DHT
677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n",
678               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
679 #endif
680   to_remove =
681       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
682   if (NULL == to_remove)
683   {
684     GNUNET_break (0);
685     return;
686   }
687   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), -1,
688                             GNUNET_NO);
689   GNUNET_assert (GNUNET_YES ==
690                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
691                                                        &peer->hashPubKey,
692                                                        to_remove));
693   if (GNUNET_SCHEDULER_NO_TASK != to_remove->preference_task)
694   {
695     GNUNET_SCHEDULER_cancel (to_remove->preference_task);
696     to_remove->preference_task = GNUNET_SCHEDULER_NO_TASK;
697   }
698   current_bucket = find_bucket (&to_remove->id.hashPubKey);
699   GNUNET_assert (current_bucket >= 0);
700   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
701                                k_buckets[current_bucket].tail, to_remove);
702   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
703   k_buckets[current_bucket].peers_size--;
704   while ((closest_bucket > 0) && (k_buckets[closest_bucket].peers_size == 0))
705     closest_bucket--;
706
707   if (to_remove->th != NULL)
708   {
709     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
710     to_remove->th = NULL;
711   }
712   discarded = 0;
713   while (NULL != (pos = to_remove->head))
714   {
715     GNUNET_CONTAINER_DLL_remove (to_remove->head, to_remove->tail, pos);
716     discarded++;
717     GNUNET_free (pos);
718   }
719   GNUNET_STATISTICS_update (GDS_stats,
720                             gettext_noop
721                             ("# Queued messages discarded (peer disconnected)"),
722                             discarded, GNUNET_NO);
723   GNUNET_free (to_remove);
724 }
725
726
727 /**
728  * Called when core is ready to send a message we asked for
729  * out to the destination.
730  *
731  * @param cls the 'struct PeerInfo' of the target peer
732  * @param size number of bytes available in buf
733  * @param buf where the callee should write the message
734  * @return number of bytes written to buf
735  */
736 static size_t
737 core_transmit_notify (void *cls, size_t size, void *buf)
738 {
739   struct PeerInfo *peer = cls;
740   char *cbuf = buf;
741   struct P2PPendingMessage *pending;
742   size_t off;
743   size_t msize;
744
745   peer->th = NULL;
746   while ((NULL != (pending = peer->head)) &&
747          (GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value == 0))
748   {
749     peer->pending_count--;
750     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
751     GNUNET_free (pending);
752   }
753   if (pending == NULL)
754   {
755     /* no messages pending */
756     return 0;
757   }
758   if (buf == NULL)
759   {
760     peer->th =
761         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
762                                            pending->importance,
763                                            GNUNET_TIME_absolute_get_remaining
764                                            (pending->timeout), &peer->id,
765                                            ntohs (pending->msg->size),
766                                            &core_transmit_notify, peer);
767     GNUNET_break (NULL != peer->th);
768     return 0;
769   }
770   off = 0;
771   while ((NULL != (pending = peer->head)) &&
772          (size - off >= (msize = ntohs (pending->msg->size))))
773   {
774     GNUNET_STATISTICS_update (GDS_stats,
775                               gettext_noop
776                               ("# Bytes transmitted to other peers"), msize,
777                               GNUNET_NO);
778     memcpy (&cbuf[off], pending->msg, msize);
779     off += msize;
780     peer->pending_count--;
781     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
782     GNUNET_free (pending);
783   }
784   if (peer->head != NULL)
785   {
786     peer->th =
787         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
788                                            pending->importance,
789                                            GNUNET_TIME_absolute_get_remaining
790                                            (pending->timeout), &peer->id, msize,
791                                            &core_transmit_notify, peer);
792     GNUNET_break (NULL != peer->th);
793   }
794   return off;
795 }
796
797
798 /**
799  * Transmit all messages in the peer's message queue.
800  *
801  * @param peer message queue to process
802  */
803 static void
804 process_peer_queue (struct PeerInfo *peer)
805 {
806   struct P2PPendingMessage *pending;
807
808   if (NULL == (pending = peer->head))
809     return;
810   if (NULL != peer->th)
811     return;
812   GNUNET_STATISTICS_update (GDS_stats,
813                             gettext_noop
814                             ("# Bytes of bandwdith requested from core"),
815                             ntohs (pending->msg->size), GNUNET_NO);
816   peer->th =
817       GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
818                                          pending->importance,
819                                          GNUNET_TIME_absolute_get_remaining
820                                          (pending->timeout), &peer->id,
821                                          ntohs (pending->msg->size),
822                                          &core_transmit_notify, peer);
823   GNUNET_break (NULL != peer->th);
824 }
825
826
827 /**
828  * To how many peers should we (on average) forward the request to
829  * obtain the desired target_replication count (on average).
830  *
831  * @param hop_count number of hops the message has traversed
832  * @param target_replication the number of total paths desired
833  * @return Some number of peers to forward the message to
834  */
835 static unsigned int
836 get_forward_count (uint32_t hop_count, uint32_t target_replication)
837 {
838   uint32_t random_value;
839   uint32_t forward_count;
840   float target_value;
841
842   if (hop_count > GDS_NSE_get () * 6.0)
843   {
844     /* forcefully terminate */
845     return 0;
846   }
847   if (hop_count > GDS_NSE_get () * 4.0)
848   {
849     /* Once we have reached our ideal number of hops, only forward to 1 peer */
850     return 1;
851   }
852   /* bound by system-wide maximum */
853   target_replication =
854       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
855   target_value =
856       1 + (target_replication - 1.0) / (GDS_NSE_get () +
857                                         ((float) (target_replication - 1.0) *
858                                          hop_count));
859   /* Set forward count to floor of target_value */
860   forward_count = (uint32_t) target_value;
861   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
862   target_value = target_value - forward_count;
863   random_value =
864       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
865   if (random_value < (target_value * UINT32_MAX))
866     forward_count++;
867   return forward_count;
868 }
869
870
871 /**
872  * Compute the distance between have and target as a 32-bit value.
873  * Differences in the lower bits must count stronger than differences
874  * in the higher bits.
875  *
876  * @return 0 if have==target, otherwise a number
877  *           that is larger as the distance between
878  *           the two hash codes increases
879  */
880 static unsigned int
881 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
882 {
883   unsigned int bucket;
884   unsigned int msb;
885   unsigned int lsb;
886   unsigned int i;
887
888   /* We have to represent the distance between two 2^9 (=512)-bit
889    * numbers as a 2^5 (=32)-bit number with "0" being used for the
890    * two numbers being identical; furthermore, we need to
891    * guarantee that a difference in the number of matching
892    * bits is always represented in the result.
893    *
894    * We use 2^32/2^9 numerical values to distinguish between
895    * hash codes that have the same LSB bit distance and
896    * use the highest 2^9 bits of the result to signify the
897    * number of (mis)matching LSB bits; if we have 0 matching
898    * and hence 512 mismatching LSB bits we return -1 (since
899    * 512 itself cannot be represented with 9 bits) */
900
901   /* first, calculate the most significant 9 bits of our
902    * result, aka the number of LSBs */
903   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
904   /* bucket is now a value between 0 and 512 */
905   if (bucket == 512)
906     return 0;                   /* perfect match */
907   if (bucket == 0)
908     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
909                                  * below, we'd end up with max+1 (overflow)) */
910
911   /* calculate the most significant bits of the final result */
912   msb = (512 - bucket) << (32 - 9);
913   /* calculate the 32-9 least significant bits of the final result by
914    * looking at the differences in the 32-9 bits following the
915    * mismatching bit at 'bucket' */
916   lsb = 0;
917   for (i = bucket + 1;
918        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
919   {
920     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
921         GNUNET_CRYPTO_hash_get_bit (have, i))
922       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
923                                                  * last bit set will be 31 -- if
924                                                  * i does not reach 512 first... */
925   }
926   return msb | lsb;
927 }
928
929
930 /**
931  * Check whether my identity is closer than any known peers.  If a
932  * non-null bloomfilter is given, check if this is the closest peer
933  * that hasn't already been routed to.
934  *
935  * @param key hash code to check closeness to
936  * @param bloom bloomfilter, exclude these entries from the decision
937  * @return GNUNET_YES if node location is closest,
938  *         GNUNET_NO otherwise.
939  */
940 static int
941 am_closest_peer (const GNUNET_HashCode * key,
942                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
943 {
944   int bits;
945   int other_bits;
946   int bucket_num;
947   int count;
948   struct PeerInfo *pos;
949
950   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
951     return GNUNET_YES;
952   bucket_num = find_bucket (key);
953   GNUNET_assert (bucket_num >= 0);
954   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
955   pos = k_buckets[bucket_num].head;
956   count = 0;
957   while ((pos != NULL) && (count < bucket_size))
958   {
959     if ((bloom != NULL) &&
960         (GNUNET_YES ==
961          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
962     {
963       pos = pos->next;
964       continue;                 /* Skip already checked entries */
965     }
966     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
967     if (other_bits > bits)
968       return GNUNET_NO;
969     if (other_bits == bits)     /* We match the same number of bits */
970       return GNUNET_YES;
971     pos = pos->next;
972   }
973   /* No peers closer, we are the closest! */
974   return GNUNET_YES;
975 }
976
977
978 /**
979  * Select a peer from the routing table that would be a good routing
980  * destination for sending a message for "key".  The resulting peer
981  * must not be in the set of blocked peers.<p>
982  *
983  * Note that we should not ALWAYS select the closest peer to the
984  * target, peers further away from the target should be chosen with
985  * exponentially declining probability.
986  *
987  * FIXME: double-check that this is fine
988  *
989  *
990  * @param key the key we are selecting a peer to route to
991  * @param bloom a bloomfilter containing entries this request has seen already
992  * @param hops how many hops has this message traversed thus far
993  * @return Peer to route to, or NULL on error
994  */
995 static struct PeerInfo *
996 select_peer (const GNUNET_HashCode * key,
997              const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops)
998 {
999   unsigned int bc;
1000   unsigned int count;
1001   unsigned int selected;
1002   struct PeerInfo *pos;
1003   unsigned int dist;
1004   unsigned int smallest_distance;
1005   struct PeerInfo *chosen;
1006
1007   if (hops >= GDS_NSE_get ())
1008   {
1009     /* greedy selection (closest peer that is not in bloomfilter) */
1010     smallest_distance = UINT_MAX;
1011     chosen = NULL;
1012     for (bc = 0; bc <= closest_bucket; bc++)
1013     {
1014       pos = k_buckets[bc].head;
1015       count = 0;
1016       while ((pos != NULL) && (count < bucket_size))
1017       {
1018         if ((bloom == NULL) ||
1019             (GNUNET_NO ==
1020              GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1021         {
1022           dist = get_distance (key, &pos->id.hashPubKey);
1023           if (dist < smallest_distance)
1024           {
1025             chosen = pos;
1026             smallest_distance = dist;
1027           }
1028         }
1029         else
1030         {
1031 #if DEBUG_DHT
1032           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1034                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1035 #endif
1036           GNUNET_STATISTICS_update (GDS_stats,
1037                                     gettext_noop
1038                                     ("# Peers excluded from routing due to Bloomfilter"),
1039                                     1, GNUNET_NO);
1040         }
1041         count++;
1042         pos = pos->next;
1043       }
1044     }
1045     if (NULL == chosen)
1046       GNUNET_STATISTICS_update (GDS_stats,
1047                                 gettext_noop ("# Peer selection failed"), 1,
1048                                 GNUNET_NO);
1049     return chosen;
1050   }
1051
1052   /* select "random" peer */
1053   /* count number of peers that are available and not filtered */
1054   count = 0;
1055   for (bc = 0; bc <= closest_bucket; bc++)
1056   {
1057     pos = k_buckets[bc].head;
1058     while ((pos != NULL) && (count < bucket_size))
1059     {
1060       if ((bloom != NULL) &&
1061           (GNUNET_YES ==
1062            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1063       {
1064         GNUNET_STATISTICS_update (GDS_stats,
1065                                   gettext_noop
1066                                   ("# Peers excluded from routing due to Bloomfilter"),
1067                                   1, GNUNET_NO);
1068 #if DEBUG_DHT
1069         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1071                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1072 #endif
1073         pos = pos->next;
1074         continue;               /* Ignore bloomfiltered peers */
1075       }
1076       count++;
1077       pos = pos->next;
1078     }
1079   }
1080   if (count == 0)               /* No peers to select from! */
1081   {
1082     GNUNET_STATISTICS_update (GDS_stats,
1083                               gettext_noop ("# Peer selection failed"), 1,
1084                               GNUNET_NO);
1085     return NULL;
1086   }
1087   /* Now actually choose a peer */
1088   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1089   count = 0;
1090   for (bc = 0; bc <= closest_bucket; bc++)
1091   {
1092     pos = k_buckets[bc].head;
1093     while ((pos != NULL) && (count < bucket_size))
1094     {
1095       if ((bloom != NULL) &&
1096           (GNUNET_YES ==
1097            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1098       {
1099         pos = pos->next;
1100         continue;               /* Ignore bloomfiltered peers */
1101       }
1102       if (0 == selected--)
1103         return pos;
1104       pos = pos->next;
1105     }
1106   }
1107   GNUNET_break (0);
1108   return NULL;
1109 }
1110
1111
1112 /**
1113  * Compute the set of peers that the given request should be
1114  * forwarded to.
1115  *
1116  * @param key routing key
1117  * @param bloom bloom filter excluding peers as targets, all selected
1118  *        peers will be added to the bloom filter
1119  * @param hop_count number of hops the request has traversed so far
1120  * @param target_replication desired number of replicas
1121  * @param targets where to store an array of target peers (to be
1122  *         free'd by the caller)
1123  * @return number of peers returned in 'targets'.
1124  */
1125 static unsigned int
1126 get_target_peers (const GNUNET_HashCode * key,
1127                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1128                   uint32_t hop_count, uint32_t target_replication,
1129                   struct PeerInfo ***targets)
1130 {
1131   unsigned int ret;
1132   unsigned int off;
1133   struct PeerInfo **rtargets;
1134   struct PeerInfo *nxt;
1135
1136   GNUNET_assert (NULL != bloom);
1137   ret = get_forward_count (hop_count, target_replication);
1138   if (ret == 0)
1139   {
1140     *targets = NULL;
1141     return 0;
1142   }
1143   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1144   for (off = 0; off < ret; off++)
1145   {
1146     nxt = select_peer (key, bloom, hop_count);
1147     if (nxt == NULL)
1148       break;
1149     rtargets[off] = nxt;
1150     GNUNET_break (GNUNET_NO ==
1151                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1152                                                      &nxt->id.hashPubKey));
1153     GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
1154   }
1155 #if DEBUG_DHT
1156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157               "Selected %u/%u peers at hop %u for %s (target was %u)\n", off,
1158               GNUNET_CONTAINER_multihashmap_size (all_known_peers),
1159               (unsigned int) hop_count, GNUNET_h2s (key), ret);
1160 #endif
1161   if (0 == off)
1162   {
1163     GNUNET_free (rtargets);
1164     *targets = NULL;
1165     return 0;
1166   }
1167   *targets = rtargets;
1168   return off;
1169 }
1170
1171
1172 /**
1173  * Perform a PUT operation.   Forwards the given request to other
1174  * peers.   Does not store the data locally.  Does not give the
1175  * data to local clients.  May do nothing if this is the only
1176  * peer in the network (or if we are the closest peer in the
1177  * network).
1178  *
1179  * @param type type of the block
1180  * @param options routing options
1181  * @param desired_replication_level desired replication count
1182  * @param expiration_time when does the content expire
1183  * @param hop_count how many hops has this message traversed so far
1184  * @param bf Bloom filter of peers this PUT has already traversed
1185  * @param key key for the content
1186  * @param put_path_length number of entries in put_path
1187  * @param put_path peers this request has traversed so far (if tracked)
1188  * @param data payload to store
1189  * @param data_size number of bytes in data
1190  */
1191 void
1192 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1193                            enum GNUNET_DHT_RouteOption options,
1194                            uint32_t desired_replication_level,
1195                            struct GNUNET_TIME_Absolute expiration_time,
1196                            uint32_t hop_count,
1197                            struct GNUNET_CONTAINER_BloomFilter *bf,
1198                            const GNUNET_HashCode * key,
1199                            unsigned int put_path_length,
1200                            struct GNUNET_PeerIdentity *put_path,
1201                            const void *data, size_t data_size)
1202 {
1203   unsigned int target_count;
1204   unsigned int i;
1205   struct PeerInfo **targets;
1206   struct PeerInfo *target;
1207   struct P2PPendingMessage *pending;
1208   size_t msize;
1209   struct PeerPutMessage *ppm;
1210   struct GNUNET_PeerIdentity *pp;
1211
1212   GNUNET_assert (NULL != bf);
1213 #if DEBUG_DHT
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215               "Adding myself (%s) to PUT bloomfilter for %s\n",
1216               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1217 #endif
1218   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
1219   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1220                             1, GNUNET_NO);
1221   target_count =
1222       get_target_peers (key, bf, hop_count, desired_replication_level,
1223                         &targets);
1224   if (0 == target_count)
1225   {
1226 #if DEBUG_DHT
1227     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228                 "Routing PUT for %s terminates after %u hops at %s\n",
1229                 GNUNET_h2s (key), (unsigned int) hop_count,
1230                 GNUNET_i2s (&my_identity));
1231 #endif
1232     return;
1233   }
1234   msize =
1235       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1236       sizeof (struct PeerPutMessage);
1237   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1238   {
1239     put_path_length = 0;
1240     msize = data_size + sizeof (struct PeerPutMessage);
1241   }
1242   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1243   {
1244     GNUNET_break (0);
1245     GNUNET_free (targets);
1246     return;
1247   }
1248   GNUNET_STATISTICS_update (GDS_stats,
1249                             gettext_noop
1250                             ("# PUT messages queued for transmission"),
1251                             target_count, GNUNET_NO);
1252   for (i = 0; i < target_count; i++)
1253   {
1254     target = targets[i];
1255 #if DEBUG_DHT
1256     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
1258                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1259 #endif
1260     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1261     pending->importance = 0;    /* FIXME */
1262     pending->timeout = expiration_time;
1263     ppm = (struct PeerPutMessage *) &pending[1];
1264     pending->msg = &ppm->header;
1265     ppm->header.size = htons (msize);
1266     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1267     ppm->options = htonl (options);
1268     ppm->type = htonl (type);
1269     ppm->hop_count = htonl (hop_count + 1);
1270     ppm->desired_replication_level = htonl (desired_replication_level);
1271     ppm->put_path_length = htonl (put_path_length);
1272     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1273     GNUNET_break (GNUNET_YES ==
1274                   GNUNET_CONTAINER_bloomfilter_test (bf,
1275                                                      &target->id.hashPubKey));
1276     GNUNET_assert (GNUNET_OK ==
1277                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1278                                                               ppm->bloomfilter,
1279                                                               DHT_BLOOM_SIZE));
1280     ppm->key = *key;
1281     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1282     memcpy (pp, put_path,
1283             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1284     memcpy (&pp[put_path_length], data, data_size);
1285     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1286     target->pending_count++;
1287     process_peer_queue (target);
1288   }
1289   GNUNET_free (targets);
1290 }
1291
1292
1293 /**
1294  * Perform a GET operation.  Forwards the given request to other
1295  * peers.  Does not lookup the key locally.  May do nothing if this is
1296  * the only peer in the network (or if we are the closest peer in the
1297  * network).
1298  *
1299  * @param type type of the block
1300  * @param options routing options
1301  * @param desired_replication_level desired replication count
1302  * @param hop_count how many hops did this request traverse so far?
1303  * @param key key for the content
1304  * @param xquery extended query
1305  * @param xquery_size number of bytes in xquery
1306  * @param reply_bf bloomfilter to filter duplicates
1307  * @param reply_bf_mutator mutator for reply_bf
1308  * @param peer_bf filter for peers not to select (again)
1309  */
1310 void
1311 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1312                            enum GNUNET_DHT_RouteOption options,
1313                            uint32_t desired_replication_level,
1314                            uint32_t hop_count, const GNUNET_HashCode * key,
1315                            const void *xquery, size_t xquery_size,
1316                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1317                            uint32_t reply_bf_mutator,
1318                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1319 {
1320   unsigned int target_count;
1321   unsigned int i;
1322   struct PeerInfo **targets;
1323   struct PeerInfo *target;
1324   struct P2PPendingMessage *pending;
1325   size_t msize;
1326   struct PeerGetMessage *pgm;
1327   char *xq;
1328   size_t reply_bf_size;
1329
1330   GNUNET_assert (NULL != peer_bf);
1331   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1332                             1, GNUNET_NO);
1333   target_count =
1334       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1335                         &targets);
1336 #if DEBUG_DHT
1337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1338               "Adding myself (%s) to GET bloomfilter for %s\n",
1339               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1340 #endif
1341   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
1342   if (0 == target_count)
1343   {
1344 #if DEBUG_DHT
1345     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1346                 "Routing GET for %s terminates after %u hops at %s\n",
1347                 GNUNET_h2s (key), (unsigned int) hop_count,
1348                 GNUNET_i2s (&my_identity));
1349 #endif
1350     return;
1351   }
1352   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1353   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1354   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1355   {
1356     GNUNET_break (0);
1357     GNUNET_free (targets);
1358     return;
1359   }
1360   GNUNET_STATISTICS_update (GDS_stats,
1361                             gettext_noop
1362                             ("# GET messages queued for transmission"),
1363                             target_count, GNUNET_NO);
1364   /* forward request */
1365   for (i = 0; i < target_count; i++)
1366   {
1367     target = targets[i];
1368 #if DEBUG_DHT
1369     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
1371                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1372 #endif
1373     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1374     pending->importance = 0;    /* FIXME */
1375     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1376     pgm = (struct PeerGetMessage *) &pending[1];
1377     pending->msg = &pgm->header;
1378     pgm->header.size = htons (msize);
1379     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1380     pgm->options = htonl (options);
1381     pgm->type = htonl (type);
1382     pgm->hop_count = htonl (hop_count + 1);
1383     pgm->desired_replication_level = htonl (desired_replication_level);
1384     pgm->xquery_size = htonl (xquery_size);
1385     pgm->bf_mutator = reply_bf_mutator;
1386     GNUNET_break (GNUNET_YES ==
1387                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1388                                                      &target->id.hashPubKey));
1389     GNUNET_assert (GNUNET_OK ==
1390                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1391                                                               pgm->bloomfilter,
1392                                                               DHT_BLOOM_SIZE));
1393     pgm->key = *key;
1394     xq = (char *) &pgm[1];
1395     memcpy (xq, xquery, xquery_size);
1396     if (NULL != reply_bf)
1397       GNUNET_assert (GNUNET_OK ==
1398                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1399                                                                 &xq
1400                                                                 [xquery_size],
1401                                                                 reply_bf_size));
1402     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1403     target->pending_count++;
1404     process_peer_queue (target);
1405   }
1406   GNUNET_free (targets);
1407 }
1408
1409
1410 /**
1411  * Handle a reply (route to origin).  Only forwards the reply back to
1412  * the given peer.  Does not do local caching or forwarding to local
1413  * clients.
1414  *
1415  * @param target neighbour that should receive the block (if still connected)
1416  * @param type type of the block
1417  * @param expiration_time when does the content expire
1418  * @param key key for the content
1419  * @param put_path_length number of entries in put_path
1420  * @param put_path peers the original PUT traversed (if tracked)
1421  * @param get_path_length number of entries in put_path
1422  * @param get_path peers this reply has traversed so far (if tracked)
1423  * @param data payload of the reply
1424  * @param data_size number of bytes in data
1425  */
1426 void
1427 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1428                              enum GNUNET_BLOCK_Type type,
1429                              struct GNUNET_TIME_Absolute expiration_time,
1430                              const GNUNET_HashCode * key,
1431                              unsigned int put_path_length,
1432                              const struct GNUNET_PeerIdentity *put_path,
1433                              unsigned int get_path_length,
1434                              const struct GNUNET_PeerIdentity *get_path,
1435                              const void *data, size_t data_size)
1436 {
1437   struct PeerInfo *pi;
1438   struct P2PPendingMessage *pending;
1439   size_t msize;
1440   struct PeerResultMessage *prm;
1441   struct GNUNET_PeerIdentity *paths;
1442
1443   msize =
1444       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1445                                                        put_path_length) *
1446       sizeof (struct GNUNET_PeerIdentity);
1447   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1448       (get_path_length >
1449        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1450       (put_path_length >
1451        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1452       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1453   {
1454     GNUNET_break (0);
1455     return;
1456   }
1457   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers, &target->hashPubKey);
1458   if (NULL == pi)
1459   {
1460     /* peer disconnected in the meantime, drop reply */
1461     return;
1462   }
1463   GNUNET_STATISTICS_update (GDS_stats,
1464                             gettext_noop
1465                             ("# RESULT messages queued for transmission"), 1,
1466                             GNUNET_NO);
1467   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1468   pending->importance = 0;      /* FIXME */
1469   pending->timeout = expiration_time;
1470   prm = (struct PeerResultMessage *) &pending[1];
1471   pending->msg = &prm->header;
1472   prm->header.size = htons (msize);
1473   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1474   prm->type = htonl (type);
1475   prm->put_path_length = htonl (put_path_length);
1476   prm->get_path_length = htonl (get_path_length);
1477   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1478   prm->key = *key;
1479   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1480   memcpy (paths, put_path,
1481           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1482   memcpy (&paths[put_path_length], get_path,
1483           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1484   memcpy (&paths[put_path_length + get_path_length], data, data_size);
1485   GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
1486   pi->pending_count++;
1487   process_peer_queue (pi);
1488 }
1489
1490
1491 /**
1492  * To be called on core init/fail.
1493  *
1494  * @param cls service closure
1495  * @param server handle to the server for this service
1496  * @param identity the public identity of this peer
1497  */
1498 static void
1499 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1500            const struct GNUNET_PeerIdentity *identity)
1501 {
1502   GNUNET_assert (server != NULL);
1503   my_identity = *identity;
1504 }
1505
1506
1507 /**
1508  * Core handler for p2p put requests.
1509  *
1510  * @param cls closure
1511  * @param peer sender of the request
1512  * @param message message
1513  * @param peer peer identity this notification is about
1514  * @param atsi performance data
1515  * @param atsi_count number of records in 'atsi'
1516  * @return GNUNET_OK to keep the connection open,
1517  *         GNUNET_SYSERR to close it (signal serious error)
1518  */
1519 static int
1520 handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1521                     const struct GNUNET_MessageHeader *message,
1522                     const struct GNUNET_ATS_Information *atsi,
1523                     unsigned int atsi_count)
1524 {
1525   const struct PeerPutMessage *put;
1526   const struct GNUNET_PeerIdentity *put_path;
1527   const void *payload;
1528   uint32_t putlen;
1529   uint16_t msize;
1530   size_t payload_size;
1531   enum GNUNET_DHT_RouteOption options;
1532   struct GNUNET_CONTAINER_BloomFilter *bf;
1533   GNUNET_HashCode test_key;
1534
1535   msize = ntohs (message->size);
1536   if (msize < sizeof (struct PeerPutMessage))
1537   {
1538     GNUNET_break_op (0);
1539     return GNUNET_YES;
1540   }
1541   put = (const struct PeerPutMessage *) message;
1542   putlen = ntohl (put->put_path_length);
1543   if ((msize <
1544        sizeof (struct PeerPutMessage) +
1545        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1546       (putlen >
1547        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1548   {
1549     GNUNET_break_op (0);
1550     return GNUNET_YES;
1551   }
1552   GNUNET_STATISTICS_update (GDS_stats,
1553                             gettext_noop ("# P2P PUT requests received"), 1,
1554                             GNUNET_NO);
1555   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1556   payload = &put_path[putlen];
1557   options = ntohl (put->options);
1558   payload_size =
1559       msize - (sizeof (struct PeerPutMessage) +
1560                putlen * sizeof (struct GNUNET_PeerIdentity));
1561   switch (GNUNET_BLOCK_get_key
1562           (GDS_block_context, ntohl (put->type), payload, payload_size,
1563            &test_key))
1564   {
1565   case GNUNET_YES:
1566     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1567     {
1568       GNUNET_break_op (0);
1569       return GNUNET_YES;
1570     }
1571     break;
1572   case GNUNET_NO:
1573     GNUNET_break_op (0);
1574     return GNUNET_YES;
1575   case GNUNET_SYSERR:
1576     /* cannot verify, good luck */
1577     break;
1578   }
1579 #if DEBUG_DHT
1580   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for %s at %s\n",
1581               GNUNET_h2s (&put->key), GNUNET_i2s (&my_identity));
1582 #endif
1583   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
1584                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1585   GNUNET_break_op (GNUNET_YES ==
1586                    GNUNET_CONTAINER_bloomfilter_test (bf, &peer->hashPubKey));
1587   {
1588     struct GNUNET_PeerIdentity pp[putlen + 1];
1589
1590     /* extend 'put path' by sender */
1591     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1592     {
1593       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1594       pp[putlen] = *peer;
1595       putlen++;
1596     }
1597     else
1598       putlen = 0;
1599
1600     /* give to local clients */
1601     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1602                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1603                               payload_size, payload);
1604     /* store locally */
1605     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1606         (am_closest_peer (&put->key, bf)))
1607       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1608                                 (put->expiration_time), &put->key, putlen, pp,
1609                                 ntohl (put->type), payload_size, payload);
1610     /* route to other peers */
1611     GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
1612                                ntohl (put->desired_replication_level),
1613                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1614                                ntohl (put->hop_count), bf, &put->key, putlen,
1615                                pp, payload, payload_size);
1616   }
1617   GNUNET_CONTAINER_bloomfilter_free (bf);
1618   return GNUNET_YES;
1619 }
1620
1621
1622 /**
1623  * We have received a FIND PEER request.  Send matching
1624  * HELLOs back.
1625  *
1626  * @param sender sender of the FIND PEER request
1627  * @param key peers close to this key are desired
1628  * @param bf peers matching this bf are excluded
1629  * @param bf_mutator mutator for bf
1630  */
1631 static void
1632 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1633                   const GNUNET_HashCode * key,
1634                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
1635 {
1636   int bucket_idx;
1637   struct PeerBucket *bucket;
1638   struct PeerInfo *peer;
1639   unsigned int choice;
1640   GNUNET_HashCode mhash;
1641   const struct GNUNET_HELLO_Message *hello;
1642
1643   /* first, check about our own HELLO */
1644   if (NULL != GDS_my_hello)
1645   {
1646     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1647     if ((NULL == bf) ||
1648         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1649     {
1650       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1651                                    GNUNET_TIME_relative_to_absolute
1652                                    (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1653                                    key, 0, NULL, 0, NULL, GDS_my_hello,
1654                                    GNUNET_HELLO_size ((const struct
1655                                                        GNUNET_HELLO_Message *)
1656                                                       GDS_my_hello));
1657     }
1658     else
1659     {
1660       GNUNET_STATISTICS_update (GDS_stats,
1661                                 gettext_noop
1662                                 ("# FIND PEER requests ignored due to Bloomfilter"),
1663                                 1, GNUNET_NO);
1664     }
1665   }
1666   else
1667   {
1668     GNUNET_STATISTICS_update (GDS_stats,
1669                               gettext_noop
1670                               ("# FIND PEER requests ignored due to lack of HELLO"),
1671                               1, GNUNET_NO);
1672   }
1673
1674   /* then, also consider sending a random HELLO from the closest bucket */
1675   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
1676     bucket_idx = closest_bucket;
1677   else
1678     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
1679   if (bucket_idx == GNUNET_SYSERR)
1680     return;
1681   bucket = &k_buckets[bucket_idx];
1682   if (bucket->peers_size == 0)
1683     return;
1684   choice =
1685       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
1686   peer = bucket->head;
1687   while (choice > 0)
1688   {
1689     GNUNET_assert (peer != NULL);
1690     peer = peer->next;
1691     choice--;
1692   }
1693   choice = bucket->peers_size;
1694   do
1695   {
1696     peer = peer->next;
1697     if (choice-- == 0)
1698       return;                   /* no non-masked peer available */
1699     if (peer == NULL)
1700       peer = bucket->head;
1701     GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1702     hello = GDS_HELLO_get (&peer->id);
1703   }
1704   while ((hello == NULL) ||
1705          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
1706   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1707                                GNUNET_TIME_relative_to_absolute
1708                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
1709                                0, NULL, 0, NULL, hello,
1710                                GNUNET_HELLO_size (hello));
1711 }
1712
1713
1714 /**
1715  * Core handler for p2p get requests.
1716  *
1717  * @param cls closure
1718  * @param peer sender of the request
1719  * @param message message
1720  * @param peer peer identity this notification is about
1721  * @param atsi performance data
1722  * @param atsi_count number of records in 'atsi'
1723  * @return GNUNET_OK to keep the connection open,
1724  *         GNUNET_SYSERR to close it (signal serious error)
1725  */
1726 static int
1727 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1728                     const struct GNUNET_MessageHeader *message,
1729                     const struct GNUNET_ATS_Information *atsi,
1730                     unsigned int atsi_count)
1731 {
1732   struct PeerGetMessage *get;
1733   uint32_t xquery_size;
1734   size_t reply_bf_size;
1735   uint16_t msize;
1736   enum GNUNET_BLOCK_Type type;
1737   enum GNUNET_DHT_RouteOption options;
1738   enum GNUNET_BLOCK_EvaluationResult eval;
1739   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1740   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1741   const char *xquery;
1742
1743   GNUNET_break (0 !=
1744                 memcmp (peer, &my_identity,
1745                         sizeof (struct GNUNET_PeerIdentity)));
1746   /* parse and validate message */
1747   msize = ntohs (message->size);
1748   if (msize < sizeof (struct PeerGetMessage))
1749   {
1750     GNUNET_break_op (0);
1751     return GNUNET_YES;
1752   }
1753   get = (struct PeerGetMessage *) message;
1754   xquery_size = ntohl (get->xquery_size);
1755   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1756   {
1757     GNUNET_break_op (0);
1758     return GNUNET_YES;
1759   }
1760   GNUNET_STATISTICS_update (GDS_stats,
1761                             gettext_noop ("# P2P GET requests received"), 1,
1762                             GNUNET_NO);
1763   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1764   type = ntohl (get->type);
1765   options = ntohl (get->options);
1766   xquery = (const char *) &get[1];
1767   reply_bf = NULL;
1768   if (reply_bf_size > 0)
1769     reply_bf =
1770         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
1771                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
1772   eval =
1773       GNUNET_BLOCK_evaluate (GDS_block_context, type, &get->key, &reply_bf,
1774                              get->bf_mutator, xquery, xquery_size, NULL, 0);
1775   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1776   {
1777     /* request invalid or block type not supported */
1778     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1779     if (NULL != reply_bf)
1780       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1781     return GNUNET_YES;
1782   }
1783   peer_bf =
1784       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
1785                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
1786   GNUNET_break_op (GNUNET_YES ==
1787                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1788                                                       &peer->hashPubKey));
1789   /* remember request for routing replies */
1790   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
1791                    reply_bf, get->bf_mutator);
1792 #if DEBUG_DHT
1793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n",
1794               GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity),
1795               (unsigned int) ntohl (get->hop_count));
1796 #endif
1797   /* local lookup (this may update the reply_bf) */
1798   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1799       (am_closest_peer (&get->key, peer_bf)))
1800   {
1801     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1802     {
1803       GNUNET_STATISTICS_update (GDS_stats,
1804                                 gettext_noop
1805                                 ("# P2P FIND PEER requests processed"), 1,
1806                                 GNUNET_NO);
1807       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
1808     }
1809     else
1810     {
1811       eval =
1812           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
1813                                     &reply_bf, get->bf_mutator);
1814     }
1815   }
1816   else
1817   {
1818     GNUNET_STATISTICS_update (GDS_stats,
1819                               gettext_noop ("# P2P GET requests ONLY routed"),
1820                               1, GNUNET_NO);
1821   }
1822
1823   /* P2P forwarding */
1824   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1825     GDS_NEIGHBOURS_handle_get (type, options,
1826                                ntohl (get->desired_replication_level),
1827                                ntohl (get->hop_count), &get->key, xquery,
1828                                xquery_size, reply_bf, get->bf_mutator, peer_bf);
1829   /* clean up */
1830   if (NULL != reply_bf)
1831     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1832   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
1833   return GNUNET_YES;
1834 }
1835
1836
1837 /**
1838  * Core handler for p2p result messages.
1839  *
1840  * @param cls closure
1841  * @param message message
1842  * @param peer peer identity this notification is about
1843  * @param atsi performance data
1844  * @param atsi_count number of records in 'atsi'
1845  * @return GNUNET_YES (do not cut p2p connection)
1846  */
1847 static int
1848 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1849                        const struct GNUNET_MessageHeader *message,
1850                        const struct GNUNET_ATS_Information *atsi,
1851                        unsigned int atsi_count)
1852 {
1853   const struct PeerResultMessage *prm;
1854   const struct GNUNET_PeerIdentity *put_path;
1855   const struct GNUNET_PeerIdentity *get_path;
1856   const void *data;
1857   uint32_t get_path_length;
1858   uint32_t put_path_length;
1859   uint16_t msize;
1860   size_t data_size;
1861   enum GNUNET_BLOCK_Type type;
1862
1863   /* parse and validate message */
1864   msize = ntohs (message->size);
1865   if (msize < sizeof (struct PeerResultMessage))
1866   {
1867     GNUNET_break_op (0);
1868     return GNUNET_YES;
1869   }
1870   prm = (struct PeerResultMessage *) message;
1871   put_path_length = ntohl (prm->put_path_length);
1872   get_path_length = ntohl (prm->get_path_length);
1873   if ((msize <
1874        sizeof (struct PeerResultMessage) + (get_path_length +
1875                                             put_path_length) *
1876        sizeof (struct GNUNET_PeerIdentity)) ||
1877       (get_path_length >
1878        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1879       (put_path_length >
1880        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1881   {
1882     GNUNET_break_op (0);
1883     return GNUNET_YES;
1884   }
1885   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
1886                             1, GNUNET_NO);
1887   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
1888   get_path = &put_path[put_path_length];
1889   type = ntohl (prm->type);
1890   data = (const void *) &get_path[get_path_length];
1891   data_size =
1892       msize - (sizeof (struct PeerResultMessage) +
1893                (get_path_length +
1894                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1895
1896   /* if we got a HELLO, consider it for our own routing table */
1897   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1898   {
1899     const struct GNUNET_MessageHeader *h;
1900     struct GNUNET_PeerIdentity pid;
1901     int bucket;
1902
1903     /* Should be a HELLO, validate and consider using it! */
1904     if (data_size < sizeof (struct GNUNET_MessageHeader))
1905     {
1906       GNUNET_break_op (0);
1907       return GNUNET_YES;
1908     }
1909     h = data;
1910     if (data_size != ntohs (h->size))
1911     {
1912       GNUNET_break_op (0);
1913       return GNUNET_YES;
1914     }
1915     if (GNUNET_OK !=
1916         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid))
1917     {
1918       GNUNET_break_op (0);
1919       return GNUNET_YES;
1920     }
1921     if (0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity)))
1922     {
1923       bucket = find_bucket (&pid.hashPubKey);
1924       if ((bucket >= 0) && (k_buckets[bucket].peers_size < bucket_size))
1925       {
1926         if (NULL != GDS_transport_handle)
1927         {
1928           GNUNET_TRANSPORT_offer_hello (GDS_transport_handle, h, NULL, NULL);
1929           GNUNET_TRANSPORT_try_connect (GDS_transport_handle, &pid);
1930         }
1931       }
1932     }
1933   }
1934
1935   /* append 'peer' to 'get_path' */
1936   {
1937     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
1938
1939     memcpy (xget_path, get_path,
1940             get_path_length * sizeof (struct GNUNET_PeerIdentity));
1941     xget_path[get_path_length] = *peer;
1942     get_path_length++;
1943
1944     /* forward to local clients */
1945     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1946                               &prm->key, get_path_length, xget_path,
1947                               put_path_length, put_path, type, data_size, data);
1948
1949     /* forward to other peers */
1950     GDS_ROUTING_process (type, GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1951                          &prm->key, put_path_length, put_path, get_path_length,
1952                          xget_path, data, data_size);
1953   }
1954   return GNUNET_YES;
1955 }
1956
1957
1958 /**
1959  * Initialize neighbours subsystem.
1960  *
1961  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1962  */
1963 int
1964 GDS_NEIGHBOURS_init ()
1965 {
1966   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1967     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1968     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1969     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1970     {NULL, 0, 0}
1971   };
1972   unsigned long long temp_config_num;
1973
1974   if (GNUNET_OK ==
1975       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1976                                              &temp_config_num))
1977     bucket_size = (unsigned int) temp_config_num;
1978   atsAPI = GNUNET_ATS_performance_init (GDS_cfg, NULL, NULL);
1979   coreAPI =
1980       GNUNET_CORE_connect (GDS_cfg, 1, NULL, &core_init, &handle_core_connect,
1981                            &handle_core_disconnect, NULL, GNUNET_NO, NULL,
1982                            GNUNET_NO, core_handlers);
1983   if (coreAPI == NULL)
1984     return GNUNET_SYSERR;
1985   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1986   return GNUNET_OK;
1987 }
1988
1989
1990 /**
1991  * Shutdown neighbours subsystem.
1992  */
1993 void
1994 GDS_NEIGHBOURS_done ()
1995 {
1996   if (coreAPI == NULL)
1997     return;
1998   GNUNET_CORE_disconnect (coreAPI);
1999   coreAPI = NULL;
2000   GNUNET_ATS_performance_done (atsAPI);
2001   atsAPI = NULL;
2002   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
2003   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
2004   all_known_peers = NULL;
2005   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
2006   {
2007     GNUNET_SCHEDULER_cancel (find_peer_task);
2008     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
2009   }
2010 }
2011
2012
2013 /* end of gnunet-service-dht_neighbours.c */