wild hxing
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht.h"
40 #include "gnunet-service-dht_datacache.h"
41 #include <fenv.h>
42
43 /**
44  * How many buckets will we allow total.
45  */
46 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
47
48 /**
49  * What is the maximum number of peers in a given bucket.
50  */
51 #define DEFAULT_BUCKET_SIZE 4
52
53 /**
54  * Size of the bloom filter the DHT uses to filter peers.
55  */
56 #define DHT_BLOOM_SIZE 128
57
58
59 /**
60  * P2P PUT message
61  */
62 struct PeerPutMessage
63 {
64   /**
65    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
66    */
67   struct GNUNET_MessageHeader header;
68
69   /**
70    * Processing options
71    */
72   uint32_t options GNUNET_PACKED;
73
74   /**
75    * Content type.
76    */
77   uint32_t type GNUNET_PACKED;
78
79   /**
80    * Hop count
81    */
82   uint32_t hop_count GNUNET_PACKED;
83
84   /**
85    * Replication level for this message
86    */
87   uint32_t desired_replication_level GNUNET_PACKED;
88
89   /**
90    * Length of the PUT path that follows (if tracked).
91    */
92   uint32_t put_path_length GNUNET_PACKED;
93
94   /**
95    * When does the content expire?
96    */
97   struct GNUNET_TIME_AbsoluteNBO expiration_time;
98
99   /**
100    * Bloomfilter (for peer identities) to stop circular routes
101    */
102   char bloomfilter[DHT_BLOOM_SIZE];
103
104   /**
105    * The key we are storing under.
106    */
107   GNUNET_HashCode key;
108
109   /* put path (if tracked) */
110
111   /* Payload */
112
113 };
114
115
116 /**
117  * P2P Result message
118  */
119 struct PeerResultMessage
120 {
121   /**
122    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
123    */
124   struct GNUNET_MessageHeader header;
125
126   /**
127    * Content type.
128    */
129   uint32_t type GNUNET_PACKED;
130
131   /**
132    * Length of the PUT path that follows (if tracked).
133    */
134   uint32_t put_path_length GNUNET_PACKED;
135
136   /**
137    * Length of the GET path that follows (if tracked).
138    */
139   uint32_t get_path_length GNUNET_PACKED;
140
141   /**
142    * The key of the corresponding GET request.
143    */
144   GNUNET_HashCode key;
145
146   /* put path (if tracked) */
147
148   /* get path (if tracked) */
149
150   /* Payload */
151
152 };
153
154
155 /**
156  * P2P GET message
157  */
158 struct PeerGetMessage
159 {
160   /**
161    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
162    */
163   struct GNUNET_MessageHeader header;
164
165   /**
166    * Processing options
167    */
168   uint32_t options GNUNET_PACKED;
169
170   /**
171    * Desired content type.
172    */
173   uint32_t type GNUNET_PACKED;
174
175   /**
176    * Hop count
177    */
178   uint32_t hop_count GNUNET_PACKED;
179
180   /**
181    * Desired replication level for this request.
182    */
183   uint32_t desired_replication_level GNUNET_PACKED;
184
185   /**
186    * Size of the extended query.
187    */
188   uint32_t xquery_size;
189
190   /**
191    * Bloomfilter mutator.
192    */
193   uint32_t bf_mutator;
194
195   /**
196    * Bloomfilter (for peer identities) to stop circular routes
197    */
198   char bloomfilter[DHT_BLOOM_SIZE];
199
200   /**
201    * The key we are looking for.
202    */
203   GNUNET_HashCode key;
204
205   /* xquery */
206
207   /* result bloomfilter */
208
209 };
210
211
212 /**
213  * Linked list of messages to send to a particular other peer.
214  */
215 struct P2PPendingMessage
216 {
217   /**
218    * Pointer to next item in the list
219    */
220   struct P2PPendingMessage *next;
221
222   /**
223    * Pointer to previous item in the list
224    */
225   struct P2PPendingMessage *prev;
226
227   /**
228    * Message importance level.  FIXME: used? useful?
229    */
230   unsigned int importance;
231
232   /**
233    * When does this message time out?
234    */
235   struct GNUNET_TIME_Absolute timeout;
236
237   /**
238    * Actual message to be sent, allocated at the end of the struct:
239    * // msg = (cast) &pm[1]; 
240    * // memcpy (&pm[1], data, len);
241    */
242   const struct GNUNET_MessageHeader *msg;
243
244 };
245
246
247 /**
248  * Entry for a peer in a bucket.
249  */
250 struct PeerInfo
251 {
252   /**
253    * Next peer entry (DLL)
254    */
255   struct PeerInfo *next;
256
257   /**
258    *  Prev peer entry (DLL)
259    */
260   struct PeerInfo *prev;
261
262   /**
263    * Count of outstanding messages for peer.  FIXME: NEEDED?
264    * FIXME: bound queue size!?
265    */
266   unsigned int pending_count;
267
268   /**
269    * Head of pending messages to be sent to this peer.
270    */
271   struct P2PPendingMessage *head;
272
273   /**
274    * Tail of pending messages to be sent to this peer.
275    */
276   struct P2PPendingMessage *tail;
277
278   /**
279    * Core handle for sending messages to this peer.
280    */
281   struct GNUNET_CORE_TransmitHandle *th;
282
283   /**
284    * Preference update context
285    */
286   struct GNUNET_CORE_InformationRequestContext *info_ctx;
287
288   /**
289    * Task for scheduling message sends.
290    */
291   GNUNET_SCHEDULER_TaskIdentifier send_task;
292
293   /**
294    * Task for scheduling preference updates
295    */
296   GNUNET_SCHEDULER_TaskIdentifier preference_task;
297
298   /**
299    * What is the identity of the peer?
300    */
301   struct GNUNET_PeerIdentity id;
302
303 #if 0
304   /**
305    * What is the average latency for replies received?
306    */
307   struct GNUNET_TIME_Relative latency;
308
309   /**
310    * Transport level distance to peer.
311    */
312   unsigned int distance;
313 #endif
314
315 };
316
317
318 /**
319  * Peers are grouped into buckets.
320  */
321 struct PeerBucket
322 {
323   /**
324    * Head of DLL
325    */
326   struct PeerInfo *head;
327
328   /**
329    * Tail of DLL
330    */
331   struct PeerInfo *tail;
332
333   /**
334    * Number of peers in the bucket.
335    */
336   unsigned int peers_size;
337 };
338
339
340 /**
341  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
342  */
343 static unsigned int closest_bucket;
344
345 /**
346  * How many peers have we added since we sent out our last
347  * find peer request?
348  */
349 static unsigned int newly_found_peers;
350
351 /**
352  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
353  */
354 static struct PeerBucket k_buckets[MAX_BUCKETS];
355
356 /**
357  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
358  */
359 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
360
361 /**
362  * Maximum size for each bucket.
363  */
364 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
365
366 /**
367  * Task that sends FIND PEER requests.
368  */
369 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
370
371
372 /**
373  * Find the optimal bucket for this key.
374  *
375  * @param hc the hashcode to compare our identity to
376  * @return the proper bucket index, or GNUNET_SYSERR
377  *         on error (same hashcode)
378  */
379 static int
380 find_bucket (const GNUNET_HashCode * hc)
381 {
382   unsigned int bits;
383
384   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
385   if (bits == MAX_BUCKETS)
386     {
387       /* How can all bits match? Got my own ID? */
388       GNUNET_break (0);
389       return GNUNET_SYSERR; 
390     }
391   return MAX_BUCKETS - bits - 1;
392 }
393
394
395 /**
396  * Method called whenever a peer connects.
397  *
398  * @param cls closure
399  * @param peer peer identity this notification is about
400  * @param atsi performance data
401  */
402 static void
403 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
404                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
405 {
406   struct PeerInfo *ret;
407   int peer_bucket;
408
409   /* Check for connect to self message */
410   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
411     return;
412   if (GNUNET_YES ==
413       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
414                                               &peer->hashPubKey))
415   {
416     GNUNET_break (0);
417     return;
418   }
419   peer_bucket = find_bucket (&peer->hashPubKey);
420   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
421   ret = GNUNET_malloc (sizeof (struct PeerInfo));
422 #if 0
423   ret->latency = latency;
424   ret->distance = distance;
425 #endif
426   ret->id = *peer;
427   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
428                                      k_buckets[peer_bucket].tail,
429                                      k_buckets[peer_bucket].tail, ret);
430   k_buckets[peer_bucket].peers_size++;
431   closest_bucket = GNUNET_MAX (closest_bucket,
432                                peer_bucket);
433   if ( (peer_bucket > 0) &&
434        (k_buckets[peer_bucket].peers_size <= bucket_size) )
435     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
436   newly_found_peers++;
437   GNUNET_assert (GNUNET_OK ==
438                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
439                                                     &peer->hashPubKey, ret,
440                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
441   increment_stats (STAT_PEERS_KNOWN);
442 }
443
444
445 /**
446  * Method called whenever a peer disconnects.
447  *
448  * @param cls closure
449  * @param peer peer identity this notification is about
450  */
451 static void
452 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
453 {
454   struct PeerInfo *to_remove;
455   int current_bucket;
456   struct P2PPendingMessage *pos;
457   struct P2PPendingMessage *next;
458
459   /* Check for disconnect from self message */
460   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
461     return;
462   to_remove =
463       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
464   if (NULL == to_remove)
465     {
466       GNUNET_break (0);
467       return;
468     }
469   GNUNET_assert (GNUNET_YES ==
470                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
471                                                        &peer->hashPubKey,
472                                                        to_remove));
473   if (NULL != to_remove->info_ctx)
474   {
475     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
476     to_remove->info_ctx = NULL;
477   }
478   current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
479   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
480                                k_buckets[current_bucket].tail,
481                                to_remove);
482   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
483   k_buckets[current_bucket].peers_size--;
484   while ( (lowest_bucket > 0) &&
485           (k_buckets[lowest_bucket].peers_size == 0) )
486     lowest_bucket--;
487
488   if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
489   {
490     GNUNET_SCHEDULER_cancel (peer->send_task);
491     peer->send_task = GNUNET_SCHEDULER_NO_TASK;
492   }
493   if (to_remove->th != NULL) 
494   {
495     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
496     to_remove->th = NULL;
497   }
498   while (NULL != (pos = to_remove->head))
499   {
500     GNUNET_CONTAINER_DLL_remove (to_remove->head,
501                                  to_remove->tail,
502                                  pos);
503     GNUNET_free (pos);
504   }
505 }
506
507
508 /**
509  * Called when core is ready to send a message we asked for
510  * out to the destination.
511  *
512  * @param cls the 'struct PeerInfo' of the target peer
513  * @param size number of bytes available in buf
514  * @param buf where the callee should write the message
515  * @return number of bytes written to buf
516  */
517 static size_t
518 core_transmit_notify (void *cls, size_t size, void *buf)
519 {
520   struct PeerInfo *peer = cls;
521   char *cbuf = buf;
522   struct P2PPendingMessage *pending;
523   size_t off;
524   size_t msize;
525
526   peer->th = NULL;
527   if (buf == NULL)
528   {
529     /* client disconnected */
530     return 0;
531   }
532   if (peer->head == NULL)
533   {
534     /* no messages pending */
535     return 0;
536   }
537   off = 0;
538   while ( (NULL != (pending = peer->head)) &&
539           (size - off >= (msize = ntohs (pending->msg->size))) )
540   {
541     memcpy (&cbuf[off], pending->msg, msize);
542     off += msize;
543     peer->pending_count--;
544     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
545     GNUNET_free (pending);
546   }
547   if (peer->head != NULL)
548     peer->th 
549       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
550                                            pending->importance,
551                                            pending->timeout, &peer->id, msize,
552                                            &core_transmit_notify, peer);
553
554   return off;
555 }
556
557
558 /**
559  * Transmit all messages in the peer's message queue.
560  *
561  * @param peer message queue to process
562  */
563 static void
564 process_peer_queue (struct PeerInfo *peer)
565 {
566   struct P2PPendingMessage *pending;
567
568   if (NULL != (pending = peer->head))
569     return;
570   if (NULL != peer->th)
571     return;
572   peer->th 
573     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
574                                          pending->importance,
575                                          pending->timeout, &peer->id,
576                                          ntohs (pending->msg->size),
577                                          &core_transmit_notify, peer);
578 }
579
580
581 /**
582  * To how many peers should we (on average) forward the request to
583  * obtain the desired target_replication count (on average).
584  *
585  * FIXME: double-check that this is fine
586  * 
587  * @param hop_count number of hops the message has traversed
588  * @param target_replication the number of total paths desired
589  * @return Some number of peers to forward the message to
590  */
591 static unsigned int
592 get_forward_count (uint32_t hop_count, 
593                    uint32_t target_replication)
594 {
595   uint32_t random_value;
596   uint32_t forward_count;
597   float target_value;
598
599   /* bound by system-wide maximum */
600   target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
601                                    target_replication);
602   if (hop_count > log_of_network_size_estimate * 2.0)
603   {
604     /* Once we have reached our ideal number of hops, only forward to 1 peer */
605     return 1;
606   }
607   target_value =
608     1 + (target_replication - 1.0) / (log_of_network_size_estimate +
609                                       ((float) (target_replication - 1.0) *
610                                        hop_count));
611   /* Set forward count to floor of target_value */
612   forward_count = (uint32_t) target_value;
613   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
614   target_value = target_value - forward_count;
615   random_value =
616     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
617   if (random_value < (target_value * UINT32_MAX))
618     forward_count++;
619   return forward_count;
620 }
621
622
623 /**
624  * Check whether my identity is closer than any known peers.  If a
625  * non-null bloomfilter is given, check if this is the closest peer
626  * that hasn't already been routed to.
627  * FIXME: needed?
628  *
629  * @param key hash code to check closeness to
630  * @param bloom bloomfilter, exclude these entries from the decision
631  * @return GNUNET_YES if node location is closest,
632  *         GNUNET_NO otherwise.
633  */
634 static int
635 am_closest_peer (const GNUNET_HashCode *key,
636                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
637 {
638   int bits;
639   int other_bits;
640   int bucket_num;
641   int count;
642   struct PeerInfo *pos;
643   unsigned int my_distance;
644
645   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
646     return GNUNET_YES;
647   bucket_num = find_current_bucket (key);
648   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
649   my_distance = distance (&my_identity.hashPubKey, key);
650   pos = k_buckets[bucket_num].head;
651   count = 0;
652   while ((pos != NULL) && (count < bucket_size))
653   {
654     if ((bloom != NULL) &&
655         (GNUNET_YES ==
656          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
657     {
658       pos = pos->next;
659       continue;                 /* Skip already checked entries */
660     }
661     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
662     if (other_bits > bits)
663       return GNUNET_NO;
664     if (other_bits == bits)        /* We match the same number of bits */
665       return GNUNET_YES;
666     pos = pos->next;
667   }
668   /* No peers closer, we are the closest! */
669   return GNUNET_YES;
670 }
671
672
673 /**
674  * Select a peer from the routing table that would be a good routing
675  * destination for sending a message for "key".  The resulting peer
676  * must not be in the set of blocked peers.<p>
677  *
678  * Note that we should not ALWAYS select the closest peer to the
679  * target, peers further away from the target should be chosen with
680  * exponentially declining probability.
681  *
682  * FIXME: double-check that this is fine
683  * 
684  *
685  * @param key the key we are selecting a peer to route to
686  * @param bloom a bloomfilter containing entries this request has seen already
687  * @param hops how many hops has this message traversed thus far
688  * @return Peer to route to, or NULL on error
689  */
690 static struct PeerInfo *
691 select_peer (const GNUNET_HashCode *key,
692              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
693              uint32_t hops)
694 {
695   unsigned int bc;
696   unsigned int count;
697   unsigned int selected;
698   struct PeerInfo *pos;
699   unsigned int distance;
700   unsigned int largest_distance;
701   struct PeerInfo *chosen;
702
703   if (hops >= log_of_network_size_estimate)
704   {
705     /* greedy selection (closest peer that is not in bloomfilter) */
706     largest_distance = 0;
707     chosen = NULL;
708     for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
709     {
710       pos = k_buckets[bc].head;
711       count = 0;
712       while ((pos != NULL) && (count < bucket_size))
713       {
714         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
715         if (GNUNET_NO ==
716             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
717         {
718           distance = inverse_distance (key, &pos->id.hashPubKey);
719           if (distance > largest_distance)
720           {
721             chosen = pos;
722             largest_distance = distance;
723           }
724         }
725         count++;
726         pos = pos->next;
727       }
728     }
729     return chosen;
730   }
731
732   /* select "random" peer */
733   /* count number of peers that are available and not filtered */
734   count = 0;
735   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
736   {
737     pos = k_buckets[bc].head;
738     while ((pos != NULL) && (count < bucket_size))
739     {
740       if (GNUNET_YES ==
741           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
742       {
743         pos = pos->next;
744         continue;               /* Ignore bloomfiltered peers */
745       }
746       count++;
747       pos = pos->next;
748     }
749   }
750   if (count == 0)               /* No peers to select from! */
751   {
752     return NULL;
753   }
754   /* Now actually choose a peer */
755   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
756   count = 0;
757   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
758   {
759     pos = k_buckets[bc].head;
760     while ((pos != NULL) && (count < bucket_size))
761     {
762       if (GNUNET_YES ==
763           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
764       {
765         pos = pos->next;
766         continue;               /* Ignore bloomfiltered peers */
767       }
768       if (0 == selected--)
769         return pos;
770       pos = pos->next;
771     }
772   }
773   GNUNET_break (0);
774   return NULL;
775 }
776
777
778 /**
779  * Compute the set of peers that the given request should be
780  * forwarded to.
781  *
782  * @param key routing key
783  * @param bloom bloom filter excluding peers as targets, all selected
784  *        peers will be added to the bloom filter
785  * @param hop_count number of hops the request has traversed so far
786  * @param target_replication desired number of replicas
787  * @param targets where to store an array of target peers (to be
788  *         free'd by the caller)
789  * @return number of peers returned in 'targets'.
790  */
791 static unsigned int
792 get_target_peers (const GNUNET_HashCode *key,
793                   struct GNUNET_CONTAINER_BloomFilter *bloom,
794                   uint32_t hop_count,
795                   uint32_t target_replication,
796                   struct PeerInfo ***targets)
797 {
798   unsigned int ret;
799   unsigned int off;
800   struct PeerInfo **rtargets;
801   struct PeerInfo *nxt;
802
803   ret = get_forward_count (hop_count, target_replication);
804   if (ret == 0)
805   {
806     *targets = NULL;
807     return 0;
808   }
809   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
810   off = 0;
811   while (ret-- > 0)
812   {
813     nxt = select_peer (key, bloom, hop_count);
814     if (nxt == NULL)
815       break;
816     rtargets[off++] = nxt;
817     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
818   }
819   if (0 == off)
820   {
821     GNUNET_free (rtargets);
822     *targets = NULL;
823     return 0;
824   }
825   *targets = rtargets;
826   return off;
827 }
828
829
830 /**
831  * Perform a PUT operation.   Forwards the given request to other
832  * peers.   Does not store the data locally.  Does not give the
833  * data to local clients.  May do nothing if this is the only
834  * peer in the network (or if we are the closest peer in the
835  * network).
836  *
837  * @param type type of the block
838  * @param options routing options
839  * @param desired_replication_level desired replication count
840  * @param expiration_time when does the content expire
841  * @param hop_count how many hops has this message traversed so far
842  * @param bf Bloom filter of peers this PUT has already traversed
843  * @param key key for the content
844  * @param put_path_length number of entries in put_path
845  * @param put_path peers this request has traversed so far (if tracked)
846  * @param data payload to store
847  * @param data_size number of bytes in data
848  */
849 void
850 GDS_NEIGHBOURS_handle_put (uint32_t type,
851                            uint32_t options,
852                            uint32_t desired_replication_level,
853                            GNUNET_TIME_Absolute expiration_time,
854                            uint32_t hop_count,
855                            struct GNUNET_CONTAINER_BloomFilter *bf,
856                            const GNUNET_HashCode *key,
857                            unsigned int put_path_length,
858                            struct GNUNET_PeerIdentity *put_path,
859                            const void *data,
860                            size_t data_size)
861 {
862   unsigned int target_count;
863   unsigned int i;
864   struct PeerInfo **targets;
865   struct PeerInfo *target;
866   struct P2PPendingMessage *pending;
867   size_t msize;
868   struct PeerPutMessage *ppm;
869   struct GNUNET_PeerIdentity *pp;
870   
871   target_count = get_target_peers (key, bf, hop_count,
872                                    desired_replication_level,
873                                    &targets);
874   if (0 == target_count)
875     return;
876   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
877   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
878   {
879     put_path_length = 0;
880     msize = data_size + sizeof (struct PeerPutMessage);
881   }
882   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
883   {
884     GNUNET_break (0);
885     return;
886   }
887   for (i=0;i<target_count;i++)
888   {
889     target = targets[i];
890     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
891     pending->importance = 0; /* FIXME */
892     pending->timeout = expiration_time;   
893     ppm = (struct PeerPutMessage*) &pending[1];
894     pending->msg = &ppm->header;
895     ppm->header.size = htons (msize);
896     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
897     ppm->options = htonl (options);
898     ppm->type = htonl (type);
899     ppm->hop_count = htonl (hop_count + 1);
900     ppm->desired_replication_level = htonl (desired_replication_level);
901     ppm->put_path_length = htonl (put_path_length);
902     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
903     GNUNET_assert (GNUNET_OK ==
904                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
905                                                               ppm->bloomfilter,
906                                                               DHT_BLOOM_SIZE));
907     ppm->key = *key;
908     pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
909     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
910     memcpy (&pp[put_path_length], data, data_size);
911     GNUNET_CONTAINER_DLL_insert (target->head,
912                                  target->tail,
913                                  pending);
914     target->pending_count++;
915     process_peer_queue (target);
916   }
917   GNUNET_free (targets);
918 }
919
920
921 /**
922  * Perform a GET operation.  Forwards the given request to other
923  * peers.  Does not lookup the key locally.  May do nothing if this is
924  * the only peer in the network (or if we are the closest peer in the
925  * network).
926  *
927  * @param type type of the block
928  * @param options routing options
929  * @param desired_replication_level desired replication count
930  * @param hop_count how many hops did this request traverse so far?
931  * @param key key for the content
932  * @param xquery extended query
933  * @param xquery_size number of bytes in xquery
934  * @param reply_bf bloomfilter to filter duplicates
935  * @param reply_bf_mutator mutator for reply_bf
936  * @param peer_bf filter for peers not to select (again)
937  */
938 void
939 GDS_NEIGHBOURS_handle_get (uint32_t type,
940                            uint32_t options,
941                            uint32_t desired_replication_level,
942                            uint32_t hop_count,
943                            const GNUNET_HashCode *key,
944                            const void *xquery,
945                            size_t xquery_size,
946                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
947                            uint32_t reply_bf_mutator,
948                            const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
949 {
950   unsigned int target_count;
951   unsigned int i;
952   struct PeerInfo **targets;
953   struct PeerInfo *target;
954   struct P2PPendingMessage *pending;
955   size_t msize;
956   struct PeerGetMessage *pgm;
957   char *xq;
958   size_t reply_bf_size;
959   
960   target_count = get_target_peers (key, peer_bf, hop_count,
961                                    desired_replication_level,
962                                    &targets);
963   if (0 == target_count)
964     return;
965   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
966   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
967   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
968   {
969     GNUNET_break (0);
970     return;
971   }
972   for (i=0;i<target_count;i++)
973   {
974     target = targets[i];
975     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
976     pending->importance = 0; /* FIXME */
977     pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
978     pgm = (struct PeerGetMessage*) &pending[1];
979     pending->msg = &pgm->header;
980     pgm->header.size = htons (msize);
981     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
982     pgm->options = htonl (options);
983     pgm->type = htonl (type);
984     pgm->hop_count = htonl (hop_count + 1);
985     pgm->desired_replication_level = htonl (desired_replication_level);
986     pgm->xquery_size = htonl (xquery_size);
987     pgm->bf_mutator = reply_bf_mutator; 
988     GNUNET_assert (GNUNET_OK ==
989                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
990                                                               pgm->bloomfilter,
991                                                               DHT_BLOOM_SIZE));
992     pgm->key = *key;
993     xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
994     memcpy (xq, xquery, xquery_size);
995     GNUNET_assert (GNUNET_OK ==
996                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
997                                                               &xq[xquery_size],
998                                                               reply_bf_size));
999     GNUNET_CONTAINER_DLL_insert (target->head,
1000                                  target->tail,
1001                                  pending);
1002     target->pending_count++;
1003     process_peer_queue (target);
1004   }
1005   GNUNET_free (targets);
1006 }
1007
1008
1009 /**
1010  * Handle a reply (route to origin).  Only forwards the reply back to
1011  * other peers waiting for it.  Does not do local caching or
1012  * forwarding to local clients.
1013  *
1014  * @param type type of the block
1015  * @param options routing options
1016  * @param expiration_time when does the content expire
1017  * @param key key for the content
1018  * @param put_path_length number of entries in put_path
1019  * @param put_path peers the original PUT traversed (if tracked)
1020  * @param get_path_length number of entries in put_path
1021  * @param get_path peers this reply has traversed so far (if tracked)
1022  * @param data payload of the reply
1023  * @param data_size number of bytes in data
1024  */
1025 void
1026 GDS_NEIGHBOURS_handle_reply (uint32_t type,
1027                              uint32_t options,
1028                              GNUNET_TIME_Absolute expiration_time,
1029                              const GNUNET_HashCode *key,
1030                              unsigned int put_path_length,
1031                              struct GNUNET_PeerIdentity *put_path,
1032                              unsigned int get_path_length,
1033                              struct GNUNET_PeerIdentity *get_path,
1034                              const void *data,
1035                              size_t data_size)
1036 {
1037   // FIXME
1038 }
1039
1040
1041 /**
1042  * Closure for 'add_known_to_bloom'.
1043  */
1044 struct BloomConstructorContext
1045 {
1046   /**
1047    * Bloom filter under construction.
1048    */
1049   struct GNUNET_CONTAINER_BloomFilter *bloom;
1050
1051   /**
1052    * Mutator to use.
1053    */
1054   uint32_t bf_mutator;
1055 };
1056
1057
1058 /**
1059  * Add each of the peers we already know to the bloom filter of
1060  * the request so that we don't get duplicate HELLOs.
1061  *
1062  * @param cls the 'struct BloomConstructorContext'.
1063  * @param key peer identity to add to the bloom filter
1064  * @param value value the peer information (unused)
1065  * @return GNUNET_YES (we should continue to iterate)
1066  */
1067 static int
1068 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1069 {
1070   struct BloomConstructorContext *ctx = cls;
1071   GNUNET_HashCode mh;
1072
1073   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1074   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1075   return GNUNET_YES;
1076 }
1077
1078
1079 /**
1080  * Task to send a find peer message for our own peer identifier
1081  * so that we can find the closest peers in the network to ourselves
1082  * and attempt to connect to them.
1083  *
1084  * @param cls closure for this task
1085  * @param tc the context under which the task is running
1086  */
1087 static void
1088 send_find_peer_message (void *cls,
1089                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1090 {
1091   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
1092   struct DHT_MessageContext msg_ctx;
1093   struct GNUNET_TIME_Relative next_send_time;
1094   struct BloomConstructorContext bcc;
1095
1096   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1097   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1098     return;
1099   if (newly_found_peers > bucket_size) 
1100   {
1101     /* If we are finding many peers already, no need to send out our request right now! */
1102     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1103                                                    &send_find_peer_message, NULL);
1104     newly_found_peers = 0;
1105     return;
1106   }
1107   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1108   bcc.bloom =
1109     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1110   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1111                                          &add_known_to_bloom,
1112                                          &bcc);
1113   // FIXME: pass priority!?
1114   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1115                              GNUNET_DHT_RO_FIND_PEER,
1116                              16 /* FIXME: replication level? */,
1117                              0,
1118                              &my_identity.hashPubKey,
1119                              NULL, 0,
1120                              bcc.bloom, bcc.bf_mutator, NULL);
1121   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1122   /* schedule next round */
1123   newly_found_peers = 0;
1124   next_send_time.rel_value =
1125     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1126     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1127                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1128   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1129                                                  &send_find_peer_message,
1130                                                  NULL);  
1131 }
1132
1133
1134 /**
1135  * To be called on core init/fail.
1136  *
1137  * @param cls service closure
1138  * @param server handle to the server for this service
1139  * @param identity the public identity of this peer
1140  * @param publicKey the public key of this peer
1141  */
1142 static void
1143 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1144            const struct GNUNET_PeerIdentity *identity,
1145            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1146 {
1147   GNUNET_assert (server != NULL);
1148   my_identity = *identity;
1149   next_send_time.rel_value =
1150     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1151     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1152                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1153                                2) -
1154                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1155   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1156                                                  &send_find_peer_message,
1157                                                  NULL);
1158 }
1159
1160
1161 /**
1162  * Core handler for p2p put requests.
1163  *
1164  * @param cls closure
1165  * @param peer sender of the request
1166  * @param message message
1167  * @param peer peer identity this notification is about
1168  * @param atsi performance data
1169  * @return GNUNET_OK to keep the connection open,
1170  *         GNUNET_SYSERR to close it (signal serious error)
1171  */
1172 static int
1173 handle_dht_p2p_put (void *cls,
1174                     const struct GNUNET_PeerIdentity *peer,
1175                     const struct GNUNET_MessageHeader *message,
1176                     const struct GNUNET_TRANSPORT_ATS_Information
1177                     *atsi)
1178 {
1179   const struct PeerPutMessage *put;
1180   const struct GNUNET_PeerIdentity *put_path;
1181   const void *payload;
1182   uint32_t putlen;
1183   uint16_t msize;
1184   size_t payload_size;
1185   struct GNUNET_CONTAINER_BloomFilter *bf;
1186   GNUNET_HashCode test_key;
1187   
1188   msize = ntohs (message->size);
1189   if (msize < sizeof (struct PeerPutMessage))
1190   {
1191     GNUNET_break_op (0);
1192     return GNUNET_YES;
1193   }
1194   put = (const struct PeerPutMessage*) message;
1195   putlen = ntohl (put->put_path_length);
1196   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1197        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1198     {
1199       GNUNET_break_op (0);
1200       return GNUNET_YES;
1201     }
1202   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1203   payload = &put_path[putlen];
1204   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1205                           putlen * sizeof (struct GNUNET_PeerIdentity));
1206   switch (GNUNET_BLOCK_get_key (block_context,
1207                                 ntohl (put->type),
1208                                 payload, payload_size,
1209                                 &test_key))
1210   {
1211   case GNUNET_YES:
1212     if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
1213     {
1214       GNUNET_break_op (0);
1215       return GNUNET_YES;
1216     }
1217     break;
1218   case GNUNET_NO:
1219     GNUNET_break_op (0);
1220     return GNUNET_YES;
1221   case GNUNET_SYSERR:
1222     /* cannot verify, good luck */
1223     break;
1224   }
1225   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1226                                           DHT_BLOOM_SIZE,
1227                                           DHT_BLOOM_K);
1228   {
1229     struct GNUNET_PeerIdentity pp[putlen+1];
1230   
1231     /* extend 'put path' by sender */
1232     memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1233     pp[putlen] = *sender;
1234
1235     /* give to local clients */
1236     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1237                              &put->key,
1238                              0, NULL,
1239                              putlen + 1,
1240                              pp,
1241                              ntohl (put->type),
1242                              payload_size,
1243                              payload);
1244     /* store locally */
1245     GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1246                               &put->key,
1247                               putlen + 1, pp,
1248                               ntohl (put->type),
1249                               payload_size,
1250                               payload);
1251     /* route to other peers */
1252     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1253                                ntohl (put->options),
1254                                ntohl (put->desired_replication_level),
1255                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1256                                ntohl (put->hop_count),
1257                                bf,
1258                                putlen + 1, pp,
1259                                payload,
1260                                payload_size);
1261   }
1262   GNUNET_CONTAINER_bloomfilter_free (bf);
1263   return GNUNET_YES;
1264 }
1265
1266
1267 /**
1268  * Core handler for p2p get requests.
1269  *
1270  * @param cls closure
1271  * @param peer sender of the request
1272  * @param message message
1273  * @param peer peer identity this notification is about
1274  * @param atsi performance data
1275  * @return GNUNET_OK to keep the connection open,
1276  *         GNUNET_SYSERR to close it (signal serious error)
1277  */
1278 static int
1279 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1280                     const struct GNUNET_MessageHeader *message,
1281                     const struct GNUNET_TRANSPORT_ATS_Information
1282                     *atsi)
1283 {
1284   // 1) validate GET
1285   // 2) store in routing table
1286   // 3) check options (i.e. FIND PEER)
1287   // 4) local lookup (=> need eval result!)
1288   // 5) p2p forwarding
1289
1290
1291   struct GNUNET_DHT_P2PRouteMessage *incoming =
1292       (struct GNUNET_DHT_P2PRouteMessage *) message;
1293   struct GNUNET_MessageHeader *enc_msg =
1294       (struct GNUNET_MessageHeader *) &incoming[1];
1295   struct DHT_MessageContext *msg_ctx;
1296   char *route_path;
1297   int path_size;
1298
1299   // FIXME
1300   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
1301   {
1302     GNUNET_break_op (0);
1303     return GNUNET_YES;
1304   }
1305
1306   if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
1307   {
1308     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309                 "Sending of previous replies took too long, backing off!\n");
1310     increment_stats ("# route requests dropped due to high load");
1311     decrease_max_send_delay (get_max_send_delay ());
1312     return GNUNET_YES;
1313   }
1314   msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
1315   msg_ctx->bloom =
1316       GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
1317                                          DHT_BLOOM_K);
1318   GNUNET_assert (msg_ctx->bloom != NULL);
1319   msg_ctx->hop_count = ntohl (incoming->hop_count);
1320   memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
1321   msg_ctx->replication = ntohl (incoming->desired_replication_level);
1322   msg_ctx->msg_options = ntohl (incoming->options);
1323   if (GNUNET_DHT_RO_RECORD_ROUTE ==
1324       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1325   {
1326     path_size =
1327         ntohl (incoming->outgoing_path_length) *
1328         sizeof (struct GNUNET_PeerIdentity);
1329     if (ntohs (message->size) !=
1330         (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
1331          path_size))
1332     {
1333       GNUNET_break_op (0);
1334       GNUNET_free (msg_ctx);
1335       return GNUNET_YES;
1336     }
1337     route_path = (char *) &incoming[1];
1338     route_path = route_path + ntohs (enc_msg->size);
1339     msg_ctx->path_history =
1340         GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
1341     memcpy (msg_ctx->path_history, route_path, path_size);
1342     memcpy (&msg_ctx->path_history[path_size], &my_identity,
1343             sizeof (struct GNUNET_PeerIdentity));
1344     msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
1345   }
1346   msg_ctx->network_size = ntohl (incoming->network_size);
1347   msg_ctx->peer = *peer;
1348   msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
1349   msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
1350   demultiplex_message (enc_msg, msg_ctx);
1351   if (msg_ctx->bloom != NULL)
1352   {
1353     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
1354     msg_ctx->bloom = NULL;
1355   }
1356   GNUNET_free (msg_ctx);
1357   return GNUNET_YES;
1358 }
1359
1360
1361 /**
1362  * Core handler for p2p result messages.
1363  *
1364  * @param cls closure
1365  * @param message message
1366  * @param peer peer identity this notification is about
1367  * @param atsi performance data
1368  *
1369  */
1370 static int
1371 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1372                        const struct GNUNET_MessageHeader *message,
1373                        const struct GNUNET_TRANSPORT_ATS_Information
1374                        *atsi)
1375 {
1376   // 1) validate result format
1377   // 2) append 'peer' to put path
1378   // 3) forward to local clients
1379   // 4) p2p routing
1380   const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
1381       (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
1382   struct GNUNET_MessageHeader *enc_msg =
1383       (struct GNUNET_MessageHeader *) &incoming[1];
1384   struct DHT_MessageContext msg_ctx;
1385
1386   // FIXME
1387   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
1388   {
1389     GNUNET_break_op (0);
1390     return GNUNET_YES;
1391   }
1392
1393   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
1394   memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
1395   msg_ctx.msg_options = ntohl (incoming->options);
1396   msg_ctx.hop_count = ntohl (incoming->hop_count);
1397   msg_ctx.peer = *peer;
1398   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;  /* Make result routing a higher priority */
1399   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1400   if ((GNUNET_DHT_RO_RECORD_ROUTE ==
1401        (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
1402       (ntohl (incoming->outgoing_path_length) > 0))
1403   {
1404     if (ntohs (message->size) -
1405         sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
1406         ntohs (enc_msg->size) !=
1407         ntohl (incoming->outgoing_path_length) *
1408         sizeof (struct GNUNET_PeerIdentity))
1409     {
1410       GNUNET_break_op (0);
1411       return GNUNET_NO;
1412     }
1413     msg_ctx.path_history = (char *) &incoming[1];
1414     msg_ctx.path_history += ntohs (enc_msg->size);
1415     msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
1416   }
1417   route_result_message (enc_msg, &msg_ctx);
1418   return GNUNET_YES;
1419 }
1420
1421
1422 /**
1423  * Initialize neighbours subsystem.
1424  */
1425 int
1426 GDS_NEIGHBOURS_init ()
1427 {
1428   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1429     {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1430     {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1431     {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1432     {NULL, 0, 0}
1433   };
1434   unsigned long long temp_config_num;
1435   struct GNUNET_TIME_Relative next_send_time;
1436  
1437   if (GNUNET_OK ==
1438       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
1439                                              &temp_config_num))
1440     bucket_size = (unsigned int) temp_config_num;  
1441   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1442                                  DEFAULT_CORE_QUEUE_SIZE,
1443                                  NULL,
1444                                  &core_init,
1445                                  &handle_core_connect,
1446                                  &handle_core_disconnect, 
1447                                  NULL,  /* Do we care about "status" updates? */
1448                                  NULL, GNUNET_NO,
1449                                  NULL, GNUNET_NO,
1450                                  core_handlers);
1451   if (coreAPI == NULL)
1452     return GNUNET_SYSERR;
1453   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1454   return GNUNET_OK;
1455 }
1456
1457
1458 /**
1459  * Shutdown neighbours subsystem.
1460  */
1461 void
1462 GDS_NEIGHBOURS_done ()
1463 {
1464   GNUNET_assert (coreAPI != NULL);
1465   GNUNET_CORE_disconnect (coreAPI);
1466   coreAPI = NULL;    
1467   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
1468   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1469   all_known_peers = NULL;
1470   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1471   {
1472     GNUNET_SCHEDULER_cancel (find_peer_task);
1473     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1474   }
1475 }
1476
1477
1478 /* end of gnunet-service-dht_neighbours.c */