more wild hxing
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht.h"
40 #include "gnunet-service-dht_datacache.h"
41 #include "gnunet-service-dht_routing.h"
42 #include <fenv.h>
43
44 /**
45  * How many buckets will we allow total.
46  */
47 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
48
49 /**
50  * What is the maximum number of peers in a given bucket.
51  */
52 #define DEFAULT_BUCKET_SIZE 4
53
54 /**
55  * Size of the bloom filter the DHT uses to filter peers.
56  */
57 #define DHT_BLOOM_SIZE 128
58
59
60 /**
61  * P2P PUT message
62  */
63 struct PeerPutMessage
64 {
65   /**
66    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
67    */
68   struct GNUNET_MessageHeader header;
69
70   /**
71    * Processing options
72    */
73   uint32_t options GNUNET_PACKED;
74
75   /**
76    * Content type.
77    */
78   uint32_t type GNUNET_PACKED;
79
80   /**
81    * Hop count
82    */
83   uint32_t hop_count GNUNET_PACKED;
84
85   /**
86    * Replication level for this message
87    */
88   uint32_t desired_replication_level GNUNET_PACKED;
89
90   /**
91    * Length of the PUT path that follows (if tracked).
92    */
93   uint32_t put_path_length GNUNET_PACKED;
94
95   /**
96    * When does the content expire?
97    */
98   struct GNUNET_TIME_AbsoluteNBO expiration_time;
99
100   /**
101    * Bloomfilter (for peer identities) to stop circular routes
102    */
103   char bloomfilter[DHT_BLOOM_SIZE];
104
105   /**
106    * The key we are storing under.
107    */
108   GNUNET_HashCode key;
109
110   /* put path (if tracked) */
111
112   /* Payload */
113
114 };
115
116
117 /**
118  * P2P Result message
119  */
120 struct PeerResultMessage
121 {
122   /**
123    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
124    */
125   struct GNUNET_MessageHeader header;
126
127   /**
128    * Content type.
129    */
130   uint32_t type GNUNET_PACKED;
131
132   /**
133    * Length of the PUT path that follows (if tracked).
134    */
135   uint32_t put_path_length GNUNET_PACKED;
136
137   /**
138    * Length of the GET path that follows (if tracked).
139    */
140   uint32_t get_path_length GNUNET_PACKED;
141
142   /**
143    * When does the content expire?
144    */
145   struct GNUNET_TIME_AbsoluteNBO expiration_time;
146
147   /**
148    * The key of the corresponding GET request.
149    */
150   GNUNET_HashCode key;
151
152   /* put path (if tracked) */
153
154   /* get path (if tracked) */
155
156   /* Payload */
157
158 };
159
160
161 /**
162  * P2P GET message
163  */
164 struct PeerGetMessage
165 {
166   /**
167    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
168    */
169   struct GNUNET_MessageHeader header;
170
171   /**
172    * Processing options
173    */
174   uint32_t options GNUNET_PACKED;
175
176   /**
177    * Desired content type.
178    */
179   uint32_t type GNUNET_PACKED;
180
181   /**
182    * Hop count
183    */
184   uint32_t hop_count GNUNET_PACKED;
185
186   /**
187    * Desired replication level for this request.
188    */
189   uint32_t desired_replication_level GNUNET_PACKED;
190
191   /**
192    * Size of the extended query.
193    */
194   uint32_t xquery_size;
195
196   /**
197    * Bloomfilter mutator.
198    */
199   uint32_t bf_mutator;
200
201   /**
202    * Bloomfilter (for peer identities) to stop circular routes
203    */
204   char bloomfilter[DHT_BLOOM_SIZE];
205
206   /**
207    * The key we are looking for.
208    */
209   GNUNET_HashCode key;
210
211   /* xquery */
212
213   /* result bloomfilter */
214
215 };
216
217
218 /**
219  * Linked list of messages to send to a particular other peer.
220  */
221 struct P2PPendingMessage
222 {
223   /**
224    * Pointer to next item in the list
225    */
226   struct P2PPendingMessage *next;
227
228   /**
229    * Pointer to previous item in the list
230    */
231   struct P2PPendingMessage *prev;
232
233   /**
234    * Message importance level.  FIXME: used? useful?
235    */
236   unsigned int importance;
237
238   /**
239    * When does this message time out?
240    */
241   struct GNUNET_TIME_Absolute timeout;
242
243   /**
244    * Actual message to be sent, allocated at the end of the struct:
245    * // msg = (cast) &pm[1]; 
246    * // memcpy (&pm[1], data, len);
247    */
248   const struct GNUNET_MessageHeader *msg;
249
250 };
251
252
253 /**
254  * Entry for a peer in a bucket.
255  */
256 struct PeerInfo
257 {
258   /**
259    * Next peer entry (DLL)
260    */
261   struct PeerInfo *next;
262
263   /**
264    *  Prev peer entry (DLL)
265    */
266   struct PeerInfo *prev;
267
268   /**
269    * Count of outstanding messages for peer.  FIXME: NEEDED?
270    * FIXME: bound queue size!?
271    */
272   unsigned int pending_count;
273
274   /**
275    * Head of pending messages to be sent to this peer.
276    */
277   struct P2PPendingMessage *head;
278
279   /**
280    * Tail of pending messages to be sent to this peer.
281    */
282   struct P2PPendingMessage *tail;
283
284   /**
285    * Core handle for sending messages to this peer.
286    */
287   struct GNUNET_CORE_TransmitHandle *th;
288
289   /**
290    * Preference update context
291    */
292   struct GNUNET_CORE_InformationRequestContext *info_ctx;
293
294   /**
295    * Task for scheduling message sends.
296    */
297   GNUNET_SCHEDULER_TaskIdentifier send_task;
298
299   /**
300    * Task for scheduling preference updates
301    */
302   GNUNET_SCHEDULER_TaskIdentifier preference_task;
303
304   /**
305    * What is the identity of the peer?
306    */
307   struct GNUNET_PeerIdentity id;
308
309 #if 0
310   /**
311    * What is the average latency for replies received?
312    */
313   struct GNUNET_TIME_Relative latency;
314
315   /**
316    * Transport level distance to peer.
317    */
318   unsigned int distance;
319 #endif
320
321 };
322
323
324 /**
325  * Peers are grouped into buckets.
326  */
327 struct PeerBucket
328 {
329   /**
330    * Head of DLL
331    */
332   struct PeerInfo *head;
333
334   /**
335    * Tail of DLL
336    */
337   struct PeerInfo *tail;
338
339   /**
340    * Number of peers in the bucket.
341    */
342   unsigned int peers_size;
343 };
344
345
346 /**
347  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
348  */
349 static unsigned int closest_bucket;
350
351 /**
352  * How many peers have we added since we sent out our last
353  * find peer request?
354  */
355 static unsigned int newly_found_peers;
356
357 /**
358  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
359  */
360 static struct PeerBucket k_buckets[MAX_BUCKETS];
361
362 /**
363  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
364  */
365 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
366
367 /**
368  * Maximum size for each bucket.
369  */
370 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
371
372 /**
373  * Task that sends FIND PEER requests.
374  */
375 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
376
377
378 /**
379  * Find the optimal bucket for this key.
380  *
381  * @param hc the hashcode to compare our identity to
382  * @return the proper bucket index, or GNUNET_SYSERR
383  *         on error (same hashcode)
384  */
385 static int
386 find_bucket (const GNUNET_HashCode * hc)
387 {
388   unsigned int bits;
389
390   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
391   if (bits == MAX_BUCKETS)
392     {
393       /* How can all bits match? Got my own ID? */
394       GNUNET_break (0);
395       return GNUNET_SYSERR; 
396     }
397   return MAX_BUCKETS - bits - 1;
398 }
399
400
401 /**
402  * Method called whenever a peer connects.
403  *
404  * @param cls closure
405  * @param peer peer identity this notification is about
406  * @param atsi performance data
407  */
408 static void
409 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
410                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
411 {
412   struct PeerInfo *ret;
413   int peer_bucket;
414
415   /* Check for connect to self message */
416   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
417     return;
418   if (GNUNET_YES ==
419       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
420                                               &peer->hashPubKey))
421   {
422     GNUNET_break (0);
423     return;
424   }
425   peer_bucket = find_bucket (&peer->hashPubKey);
426   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
427   ret = GNUNET_malloc (sizeof (struct PeerInfo));
428 #if 0
429   ret->latency = latency;
430   ret->distance = distance;
431 #endif
432   ret->id = *peer;
433   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
434                                      k_buckets[peer_bucket].tail,
435                                      k_buckets[peer_bucket].tail, ret);
436   k_buckets[peer_bucket].peers_size++;
437   closest_bucket = GNUNET_MAX (closest_bucket,
438                                peer_bucket);
439   if ( (peer_bucket > 0) &&
440        (k_buckets[peer_bucket].peers_size <= bucket_size) )
441     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
442   newly_found_peers++;
443   GNUNET_assert (GNUNET_OK ==
444                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
445                                                     &peer->hashPubKey, ret,
446                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
447   increment_stats (STAT_PEERS_KNOWN);
448 }
449
450
451 /**
452  * Method called whenever a peer disconnects.
453  *
454  * @param cls closure
455  * @param peer peer identity this notification is about
456  */
457 static void
458 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
459 {
460   struct PeerInfo *to_remove;
461   int current_bucket;
462   struct P2PPendingMessage *pos;
463   struct P2PPendingMessage *next;
464
465   /* Check for disconnect from self message */
466   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
467     return;
468   to_remove =
469       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
470   if (NULL == to_remove)
471     {
472       GNUNET_break (0);
473       return;
474     }
475   GNUNET_assert (GNUNET_YES ==
476                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
477                                                        &peer->hashPubKey,
478                                                        to_remove));
479   if (NULL != to_remove->info_ctx)
480   {
481     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
482     to_remove->info_ctx = NULL;
483   }
484   current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
485   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
486                                k_buckets[current_bucket].tail,
487                                to_remove);
488   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
489   k_buckets[current_bucket].peers_size--;
490   while ( (lowest_bucket > 0) &&
491           (k_buckets[lowest_bucket].peers_size == 0) )
492     lowest_bucket--;
493
494   if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
495   {
496     GNUNET_SCHEDULER_cancel (peer->send_task);
497     peer->send_task = GNUNET_SCHEDULER_NO_TASK;
498   }
499   if (to_remove->th != NULL) 
500   {
501     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
502     to_remove->th = NULL;
503   }
504   while (NULL != (pos = to_remove->head))
505   {
506     GNUNET_CONTAINER_DLL_remove (to_remove->head,
507                                  to_remove->tail,
508                                  pos);
509     GNUNET_free (pos);
510   }
511 }
512
513
514 /**
515  * Called when core is ready to send a message we asked for
516  * out to the destination.
517  *
518  * @param cls the 'struct PeerInfo' of the target peer
519  * @param size number of bytes available in buf
520  * @param buf where the callee should write the message
521  * @return number of bytes written to buf
522  */
523 static size_t
524 core_transmit_notify (void *cls, size_t size, void *buf)
525 {
526   struct PeerInfo *peer = cls;
527   char *cbuf = buf;
528   struct P2PPendingMessage *pending;
529   size_t off;
530   size_t msize;
531
532   peer->th = NULL;
533   if (buf == NULL)
534   {
535     /* client disconnected */
536     return 0;
537   }
538   if (peer->head == NULL)
539   {
540     /* no messages pending */
541     return 0;
542   }
543   off = 0;
544   while ( (NULL != (pending = peer->head)) &&
545           (size - off >= (msize = ntohs (pending->msg->size))) )
546   {
547     memcpy (&cbuf[off], pending->msg, msize);
548     off += msize;
549     peer->pending_count--;
550     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
551     GNUNET_free (pending);
552   }
553   if (peer->head != NULL)
554     peer->th 
555       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
556                                            pending->importance,
557                                            pending->timeout, &peer->id, msize,
558                                            &core_transmit_notify, peer);
559
560   return off;
561 }
562
563
564 /**
565  * Transmit all messages in the peer's message queue.
566  *
567  * @param peer message queue to process
568  */
569 static void
570 process_peer_queue (struct PeerInfo *peer)
571 {
572   struct P2PPendingMessage *pending;
573
574   if (NULL != (pending = peer->head))
575     return;
576   if (NULL != peer->th)
577     return;
578   peer->th 
579     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
580                                          pending->importance,
581                                          pending->timeout, &peer->id,
582                                          ntohs (pending->msg->size),
583                                          &core_transmit_notify, peer);
584 }
585
586
587 /**
588  * To how many peers should we (on average) forward the request to
589  * obtain the desired target_replication count (on average).
590  *
591  * @param hop_count number of hops the message has traversed
592  * @param target_replication the number of total paths desired
593  * @return Some number of peers to forward the message to
594  */
595 static unsigned int
596 get_forward_count (uint32_t hop_count, 
597                    uint32_t target_replication)
598 {
599   uint32_t random_value;
600   uint32_t forward_count;
601   float target_value;
602
603   if (hop_count > log_of_network_size_estimate * 4.0)
604   {
605     /* forcefully terminate */
606     return 0;
607   }
608   if (hop_count > log_of_network_size_estimate * 2.0)
609   {
610     /* Once we have reached our ideal number of hops, only forward to 1 peer */
611     return 1;
612   }
613   /* bound by system-wide maximum */
614   target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
615                                    target_replication);
616   target_value =
617     1 + (target_replication - 1.0) / (log_of_network_size_estimate +
618                                       ((float) (target_replication - 1.0) *
619                                        hop_count));
620   /* Set forward count to floor of target_value */
621   forward_count = (uint32_t) target_value;
622   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
623   target_value = target_value - forward_count;
624   random_value =
625     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
626   if (random_value < (target_value * UINT32_MAX))
627     forward_count++;
628   return forward_count;
629 }
630
631
632 /**
633  * Check whether my identity is closer than any known peers.  If a
634  * non-null bloomfilter is given, check if this is the closest peer
635  * that hasn't already been routed to.
636  * FIXME: needed?
637  *
638  * @param key hash code to check closeness to
639  * @param bloom bloomfilter, exclude these entries from the decision
640  * @return GNUNET_YES if node location is closest,
641  *         GNUNET_NO otherwise.
642  */
643 static int
644 am_closest_peer (const GNUNET_HashCode *key,
645                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
646 {
647   int bits;
648   int other_bits;
649   int bucket_num;
650   int count;
651   struct PeerInfo *pos;
652   unsigned int my_distance;
653
654   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
655     return GNUNET_YES;
656   bucket_num = find_current_bucket (key);
657   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
658   my_distance = distance (&my_identity.hashPubKey, key);
659   pos = k_buckets[bucket_num].head;
660   count = 0;
661   while ((pos != NULL) && (count < bucket_size))
662   {
663     if ((bloom != NULL) &&
664         (GNUNET_YES ==
665          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
666     {
667       pos = pos->next;
668       continue;                 /* Skip already checked entries */
669     }
670     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
671     if (other_bits > bits)
672       return GNUNET_NO;
673     if (other_bits == bits)        /* We match the same number of bits */
674       return GNUNET_YES;
675     pos = pos->next;
676   }
677   /* No peers closer, we are the closest! */
678   return GNUNET_YES;
679 }
680
681
682 /**
683  * Select a peer from the routing table that would be a good routing
684  * destination for sending a message for "key".  The resulting peer
685  * must not be in the set of blocked peers.<p>
686  *
687  * Note that we should not ALWAYS select the closest peer to the
688  * target, peers further away from the target should be chosen with
689  * exponentially declining probability.
690  *
691  * FIXME: double-check that this is fine
692  * 
693  *
694  * @param key the key we are selecting a peer to route to
695  * @param bloom a bloomfilter containing entries this request has seen already
696  * @param hops how many hops has this message traversed thus far
697  * @return Peer to route to, or NULL on error
698  */
699 static struct PeerInfo *
700 select_peer (const GNUNET_HashCode *key,
701              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
702              uint32_t hops)
703 {
704   unsigned int bc;
705   unsigned int count;
706   unsigned int selected;
707   struct PeerInfo *pos;
708   unsigned int distance;
709   unsigned int largest_distance;
710   struct PeerInfo *chosen;
711
712   if (hops >= log_of_network_size_estimate)
713   {
714     /* greedy selection (closest peer that is not in bloomfilter) */
715     largest_distance = 0;
716     chosen = NULL;
717     for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
718     {
719       pos = k_buckets[bc].head;
720       count = 0;
721       while ((pos != NULL) && (count < bucket_size))
722       {
723         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
724         if (GNUNET_NO ==
725             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
726         {
727           distance = inverse_distance (key, &pos->id.hashPubKey);
728           if (distance > largest_distance)
729           {
730             chosen = pos;
731             largest_distance = distance;
732           }
733         }
734         count++;
735         pos = pos->next;
736       }
737     }
738     return chosen;
739   }
740
741   /* select "random" peer */
742   /* count number of peers that are available and not filtered */
743   count = 0;
744   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
745   {
746     pos = k_buckets[bc].head;
747     while ((pos != NULL) && (count < bucket_size))
748     {
749       if (GNUNET_YES ==
750           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
751       {
752         pos = pos->next;
753         continue;               /* Ignore bloomfiltered peers */
754       }
755       count++;
756       pos = pos->next;
757     }
758   }
759   if (count == 0)               /* No peers to select from! */
760   {
761     return NULL;
762   }
763   /* Now actually choose a peer */
764   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
765   count = 0;
766   for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
767   {
768     pos = k_buckets[bc].head;
769     while ((pos != NULL) && (count < bucket_size))
770     {
771       if (GNUNET_YES ==
772           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
773       {
774         pos = pos->next;
775         continue;               /* Ignore bloomfiltered peers */
776       }
777       if (0 == selected--)
778         return pos;
779       pos = pos->next;
780     }
781   }
782   GNUNET_break (0);
783   return NULL;
784 }
785
786
787 /**
788  * Compute the set of peers that the given request should be
789  * forwarded to.
790  *
791  * @param key routing key
792  * @param bloom bloom filter excluding peers as targets, all selected
793  *        peers will be added to the bloom filter
794  * @param hop_count number of hops the request has traversed so far
795  * @param target_replication desired number of replicas
796  * @param targets where to store an array of target peers (to be
797  *         free'd by the caller)
798  * @return number of peers returned in 'targets'.
799  */
800 static unsigned int
801 get_target_peers (const GNUNET_HashCode *key,
802                   struct GNUNET_CONTAINER_BloomFilter *bloom,
803                   uint32_t hop_count,
804                   uint32_t target_replication,
805                   struct PeerInfo ***targets)
806 {
807   unsigned int ret;
808   unsigned int off;
809   struct PeerInfo **rtargets;
810   struct PeerInfo *nxt;
811
812   ret = get_forward_count (hop_count, target_replication);
813   if (ret == 0)
814   {
815     *targets = NULL;
816     return 0;
817   }
818   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
819   off = 0;
820   while (ret-- > 0)
821   {
822     nxt = select_peer (key, bloom, hop_count);
823     if (nxt == NULL)
824       break;
825     rtargets[off++] = nxt;
826     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
827   }
828   if (0 == off)
829   {
830     GNUNET_free (rtargets);
831     *targets = NULL;
832     return 0;
833   }
834   *targets = rtargets;
835   return off;
836 }
837
838
839 /**
840  * Perform a PUT operation.   Forwards the given request to other
841  * peers.   Does not store the data locally.  Does not give the
842  * data to local clients.  May do nothing if this is the only
843  * peer in the network (or if we are the closest peer in the
844  * network).
845  *
846  * @param type type of the block
847  * @param options routing options
848  * @param desired_replication_level desired replication count
849  * @param expiration_time when does the content expire
850  * @param hop_count how many hops has this message traversed so far
851  * @param bf Bloom filter of peers this PUT has already traversed
852  * @param key key for the content
853  * @param put_path_length number of entries in put_path
854  * @param put_path peers this request has traversed so far (if tracked)
855  * @param data payload to store
856  * @param data_size number of bytes in data
857  */
858 void
859 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
860                            enum GNUNET_DHT_RouteOption options,
861                            uint32_t desired_replication_level,
862                            GNUNET_TIME_Absolute expiration_time,
863                            uint32_t hop_count,
864                            struct GNUNET_CONTAINER_BloomFilter *bf,
865                            const GNUNET_HashCode *key,
866                            unsigned int put_path_length,
867                            struct GNUNET_PeerIdentity *put_path,
868                            const void *data,
869                            size_t data_size)
870 {
871   unsigned int target_count;
872   unsigned int i;
873   struct PeerInfo **targets;
874   struct PeerInfo *target;
875   struct P2PPendingMessage *pending;
876   size_t msize;
877   struct PeerPutMessage *ppm;
878   struct GNUNET_PeerIdentity *pp;
879   
880   target_count = get_target_peers (key, bf, hop_count,
881                                    desired_replication_level,
882                                    &targets);
883   if (0 == target_count)
884     return;
885   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
886   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
887   {
888     put_path_length = 0;
889     msize = data_size + sizeof (struct PeerPutMessage);
890   }
891   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
892   {
893     GNUNET_break (0);
894     return;
895   }
896   for (i=0;i<target_count;i++)
897   {
898     target = targets[i];
899     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
900     pending->importance = 0; /* FIXME */
901     pending->timeout = expiration_time;   
902     ppm = (struct PeerPutMessage*) &pending[1];
903     pending->msg = &ppm->header;
904     ppm->header.size = htons (msize);
905     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
906     ppm->options = htonl (options);
907     ppm->type = htonl (type);
908     ppm->hop_count = htonl (hop_count + 1);
909     ppm->desired_replication_level = htonl (desired_replication_level);
910     ppm->put_path_length = htonl (put_path_length);
911     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
912     GNUNET_assert (GNUNET_OK ==
913                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
914                                                               ppm->bloomfilter,
915                                                               DHT_BLOOM_SIZE));
916     ppm->key = *key;
917     pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
918     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
919     memcpy (&pp[put_path_length], data, data_size);
920     GNUNET_CONTAINER_DLL_insert (target->head,
921                                  target->tail,
922                                  pending);
923     target->pending_count++;
924     process_peer_queue (target);
925   }
926   GNUNET_free (targets);
927 }
928
929
930 /**
931  * Perform a GET operation.  Forwards the given request to other
932  * peers.  Does not lookup the key locally.  May do nothing if this is
933  * the only peer in the network (or if we are the closest peer in the
934  * network).
935  *
936  * @param type type of the block
937  * @param options routing options
938  * @param desired_replication_level desired replication count
939  * @param hop_count how many hops did this request traverse so far?
940  * @param key key for the content
941  * @param xquery extended query
942  * @param xquery_size number of bytes in xquery
943  * @param reply_bf bloomfilter to filter duplicates
944  * @param reply_bf_mutator mutator for reply_bf
945  * @param peer_bf filter for peers not to select (again)
946  */
947 void
948 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
949                            enum GNUNET_DHT_RouteOption options,
950                            uint32_t desired_replication_level,
951                            uint32_t hop_count,
952                            const GNUNET_HashCode *key,
953                            const void *xquery,
954                            size_t xquery_size,
955                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
956                            uint32_t reply_bf_mutator,
957                            const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
958 {
959   unsigned int target_count;
960   unsigned int i;
961   struct PeerInfo **targets;
962   struct PeerInfo *target;
963   struct P2PPendingMessage *pending;
964   size_t msize;
965   struct PeerGetMessage *pgm;
966   char *xq;
967   size_t reply_bf_size;
968   
969   target_count = get_target_peers (key, peer_bf, hop_count,
970                                    desired_replication_level,
971                                    &targets);
972   if (0 == target_count)
973     return;
974   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
975   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
976   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
977   {
978     GNUNET_break (0);
979     return;
980   }
981   /* forward request */
982   for (i=0;i<target_count;i++)
983   {
984     target = targets[i];
985     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
986     pending->importance = 0; /* FIXME */
987     pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
988     pgm = (struct PeerGetMessage*) &pending[1];
989     pending->msg = &pgm->header;
990     pgm->header.size = htons (msize);
991     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
992     pgm->options = htonl (options);
993     pgm->type = htonl (type);
994     pgm->hop_count = htonl (hop_count + 1);
995     pgm->desired_replication_level = htonl (desired_replication_level);
996     pgm->xquery_size = htonl (xquery_size);
997     pgm->bf_mutator = reply_bf_mutator; 
998     GNUNET_assert (GNUNET_OK ==
999                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1000                                                               pgm->bloomfilter,
1001                                                               DHT_BLOOM_SIZE));
1002     pgm->key = *key;
1003     xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
1004     memcpy (xq, xquery, xquery_size);
1005     GNUNET_assert (GNUNET_OK ==
1006                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1007                                                               &xq[xquery_size],
1008                                                               reply_bf_size));
1009     GNUNET_CONTAINER_DLL_insert (target->head,
1010                                  target->tail,
1011                                  pending);
1012     target->pending_count++;
1013     process_peer_queue (target);
1014   }
1015   GNUNET_free (targets);
1016 }
1017
1018
1019 /**
1020  * Handle a reply (route to origin).  Only forwards the reply back to
1021  * the given peer.  Does not do local caching or forwarding to local
1022  * clients.
1023  *
1024  * @param target neighbour that should receive the block (if still connected)
1025  * @param type type of the block
1026  * @param expiration_time when does the content expire
1027  * @param key key for the content
1028  * @param put_path_length number of entries in put_path
1029  * @param put_path peers the original PUT traversed (if tracked)
1030  * @param get_path_length number of entries in put_path
1031  * @param get_path peers this reply has traversed so far (if tracked)
1032  * @param data payload of the reply
1033  * @param data_size number of bytes in data
1034  */
1035 void
1036 GDS_NEIGHBOURS_handle_reply (const GNUNET_PeerIdentity *target,
1037                              enum GNUNET_BLOCK_Type type,
1038                              GNUNET_TIME_Absolute expiration_time,
1039                              const GNUNET_HashCode *key,
1040                              unsigned int put_path_length,
1041                              struct GNUNET_PeerIdentity *put_path,
1042                              unsigned int get_path_length,
1043                              struct GNUNET_PeerIdentity *get_path,
1044                              const void *data,
1045                              size_t data_size)
1046 {
1047   struct PeerInfo *pi;
1048   struct P2PPendingMessage *pending;
1049   size_t msize;
1050   struct PeerResultMessage *prm;
1051   struct GNUNET_PeerIdentity *paths;
1052   
1053   msize = data_size + sizeof (struct PeerResultMessage) + 
1054     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1055   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1056        (get_path_length + put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1057        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1058   {
1059     GNUNET_break (0);
1060     return;
1061   }
1062   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1063                                           &target->hashPubKey);
1064   if (NULL == pi)
1065   {
1066     /* peer disconnected in the meantime, drop reply */
1067     return;
1068   }
1069   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1070   pending->importance = 0; /* FIXME */
1071   pending->timeout = expiration_time;
1072   prm = (struct PeerResultMessage*) &pending[1];
1073   pending->msg = &prm->header;
1074   prm->header.size = htons (msize);
1075   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1076   prm->type = htonl (type);
1077   prm->put_path_length = htonl (put_path_length);
1078   prm->get_path_length = htonl (get_path_length);
1079   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1080   prm->key = *key;
1081   paths = (struct GNUNET_PeerIdentity) &prm[1];
1082   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1083   memcpy (&paths[put_path_length],
1084           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1085   memcpy (&paths[put_path_length + get_path_length],
1086           data, data_size);
1087   GNUNET_CONTAINER_DLL_insert (target->head,
1088                                target->tail,
1089                                pending);
1090   target->pending_count++;
1091   process_peer_queue (target);
1092 }
1093
1094
1095 /**
1096  * Closure for 'add_known_to_bloom'.
1097  */
1098 struct BloomConstructorContext
1099 {
1100   /**
1101    * Bloom filter under construction.
1102    */
1103   struct GNUNET_CONTAINER_BloomFilter *bloom;
1104
1105   /**
1106    * Mutator to use.
1107    */
1108   uint32_t bf_mutator;
1109 };
1110
1111
1112 /**
1113  * Add each of the peers we already know to the bloom filter of
1114  * the request so that we don't get duplicate HELLOs.
1115  *
1116  * @param cls the 'struct BloomConstructorContext'.
1117  * @param key peer identity to add to the bloom filter
1118  * @param value value the peer information (unused)
1119  * @return GNUNET_YES (we should continue to iterate)
1120  */
1121 static int
1122 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1123 {
1124   struct BloomConstructorContext *ctx = cls;
1125   GNUNET_HashCode mh;
1126
1127   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1128   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1129   return GNUNET_YES;
1130 }
1131
1132
1133 /**
1134  * Task to send a find peer message for our own peer identifier
1135  * so that we can find the closest peers in the network to ourselves
1136  * and attempt to connect to them.
1137  *
1138  * @param cls closure for this task
1139  * @param tc the context under which the task is running
1140  */
1141 static void
1142 send_find_peer_message (void *cls,
1143                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1144 {
1145   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
1146   struct DHT_MessageContext msg_ctx;
1147   struct GNUNET_TIME_Relative next_send_time;
1148   struct BloomConstructorContext bcc;
1149
1150   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1151   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1152     return;
1153   if (newly_found_peers > bucket_size) 
1154   {
1155     /* If we are finding many peers already, no need to send out our request right now! */
1156     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1157                                                    &send_find_peer_message, NULL);
1158     newly_found_peers = 0;
1159     return;
1160   }
1161   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1162   bcc.bloom =
1163     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1164   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1165                                          &add_known_to_bloom,
1166                                          &bcc);
1167   // FIXME: pass priority!?
1168   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1169                              GNUNET_DHT_RO_FIND_PEER,
1170                              16 /* FIXME: replication level? */,
1171                              0,
1172                              &my_identity.hashPubKey,
1173                              NULL, 0,
1174                              bcc.bloom, bcc.bf_mutator, NULL);
1175   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1176   /* schedule next round */
1177   newly_found_peers = 0;
1178   next_send_time.rel_value =
1179     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1180     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1181                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1182   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1183                                                  &send_find_peer_message,
1184                                                  NULL);  
1185 }
1186
1187
1188 /**
1189  * To be called on core init/fail.
1190  *
1191  * @param cls service closure
1192  * @param server handle to the server for this service
1193  * @param identity the public identity of this peer
1194  * @param publicKey the public key of this peer
1195  */
1196 static void
1197 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1198            const struct GNUNET_PeerIdentity *identity,
1199            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1200 {
1201   GNUNET_assert (server != NULL);
1202   my_identity = *identity;
1203   next_send_time.rel_value =
1204     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1205     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1206                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1207                                2) -
1208                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1209   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1210                                                  &send_find_peer_message,
1211                                                  NULL);
1212 }
1213
1214
1215 /**
1216  * Core handler for p2p put requests.
1217  *
1218  * @param cls closure
1219  * @param peer sender of the request
1220  * @param message message
1221  * @param peer peer identity this notification is about
1222  * @param atsi performance data
1223  * @return GNUNET_OK to keep the connection open,
1224  *         GNUNET_SYSERR to close it (signal serious error)
1225  */
1226 static int
1227 handle_dht_p2p_put (void *cls,
1228                     const struct GNUNET_PeerIdentity *peer,
1229                     const struct GNUNET_MessageHeader *message,
1230                     const struct GNUNET_TRANSPORT_ATS_Information
1231                     *atsi)
1232 {
1233   const struct PeerPutMessage *put;
1234   const struct GNUNET_PeerIdentity *put_path;
1235   const void *payload;
1236   uint32_t putlen;
1237   uint16_t msize;
1238   size_t payload_size;
1239   struct GNUNET_CONTAINER_BloomFilter *bf;
1240   GNUNET_HashCode test_key;
1241   
1242   msize = ntohs (message->size);
1243   if (msize < sizeof (struct PeerPutMessage))
1244   {
1245     GNUNET_break_op (0);
1246     return GNUNET_YES;
1247   }
1248   put = (const struct PeerPutMessage*) message;
1249   putlen = ntohl (put->put_path_length);
1250   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1251        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1252     {
1253       GNUNET_break_op (0);
1254       return GNUNET_YES;
1255     }
1256   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1257   payload = &put_path[putlen];
1258   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1259                           putlen * sizeof (struct GNUNET_PeerIdentity));
1260   switch (GNUNET_BLOCK_get_key (block_context,
1261                                 ntohl (put->type),
1262                                 payload, payload_size,
1263                                 &test_key))
1264   {
1265   case GNUNET_YES:
1266     if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
1267     {
1268       GNUNET_break_op (0);
1269       return GNUNET_YES;
1270     }
1271     break;
1272   case GNUNET_NO:
1273     GNUNET_break_op (0);
1274     return GNUNET_YES;
1275   case GNUNET_SYSERR:
1276     /* cannot verify, good luck */
1277     break;
1278   }
1279   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1280                                           DHT_BLOOM_SIZE,
1281                                           DHT_BLOOM_K);
1282   {
1283     struct GNUNET_PeerIdentity pp[putlen+1];
1284   
1285     /* extend 'put path' by sender */
1286     memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1287     pp[putlen] = *sender;
1288
1289     /* give to local clients */
1290     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1291                              &put->key,
1292                              0, NULL,
1293                              putlen + 1,
1294                              pp,
1295                              ntohl (put->type),
1296                              payload_size,
1297                              payload);
1298     /* store locally */
1299     GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1300                               &put->key,
1301                               putlen + 1, pp,
1302                               ntohl (put->type),
1303                               payload_size,
1304                               payload);
1305     /* route to other peers */
1306     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1307                                ntohl (put->options),
1308                                ntohl (put->desired_replication_level),
1309                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1310                                ntohl (put->hop_count),
1311                                bf,
1312                                putlen + 1, pp,
1313                                payload,
1314                                payload_size);
1315   }
1316   GNUNET_CONTAINER_bloomfilter_free (bf);
1317   return GNUNET_YES;
1318 }
1319
1320
1321 /**
1322  * Core handler for p2p get requests.
1323  *
1324  * @param cls closure
1325  * @param peer sender of the request
1326  * @param message message
1327  * @param peer peer identity this notification is about
1328  * @param atsi performance data
1329  * @return GNUNET_OK to keep the connection open,
1330  *         GNUNET_SYSERR to close it (signal serious error)
1331  */
1332 static int
1333 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1334                     const struct GNUNET_MessageHeader *message,
1335                     const struct GNUNET_TRANSPORT_ATS_Information
1336                     *atsi)
1337 {
1338   struct PeerGetMessage *get;
1339   uint32_t xquery_size;
1340   size_t reply_bf_size;
1341   uint16_t msize;
1342   enum GNUNET_BLOCK_Type type;
1343   enum GNUNET_DHT_RouteOption options;
1344   enum GNUNET_BLOCK_EvaluationResult eval;
1345   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1346   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1347   const char *xquery;
1348                       
1349   /* parse and validate message */
1350   msize = ntohs (message->size);
1351   if (msize < sizeof (struct PeerGetMessage))
1352   {
1353     GNUNET_break_op (0);
1354     return GNUNET_YES;
1355   }
1356   get = (struct PeerGetMessage *) message;
1357   xquery_size = ntohl (get->xquery_size);
1358   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1359   {
1360     GNUNET_break_op (0);
1361     return GNUNET_YES;
1362   }
1363   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1364   type = ntohl (get->type);
1365   options = ntohl (get->options);
1366   xquery = (const char*) &get[1];
1367   reply_bf = NULL;
1368   if (reply_bf_size > 0)
1369     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1370                                                   reply_bf_size,
1371                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1372   eval = GNUNET_BLOCK_evaluate (block_context,
1373                                 type,
1374                                 &get->key,
1375                                 &reply_bf,
1376                                 get->bf_mutator,
1377                                 xquery, xquery_size,
1378                                 NULL, 0);
1379   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1380   {
1381     /* request invalid or block type not supported */
1382     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1383     if (NULL != reply_bf)
1384       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1385     return GNUNET_YES;
1386   }
1387   peer_bf =
1388     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1389                                        DHT_BLOOM_SIZE,
1390                                        DHT_BLOOM_K);
1391
1392   /* remember request for routing replies */
1393   GDS_ROUTING_add (peer,
1394                    type,
1395                    &get->key,
1396                    xquery, xquery_size,
1397                    reply_bf, get->reply_bf_mutator);
1398   /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1399
1400   /* local lookup (this may update the reply_bf) */
1401   GDS_DATACACHE_handle_get (&get->key,
1402                             type,
1403                             xquery, xquery_size,
1404                             &reply_bf, 
1405                             get->reply_bf_mutator);
1406   /* FIXME: should track if the local lookup resulted in a
1407      definitive result and then NOT do P2P forwarding */
1408     
1409   /* P2P forwarding */
1410   GDS_NEIGHBOURS_handle_get (type,
1411                              options,
1412                              ntohl (get->desired_replication_level),
1413                              ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1414                              &get->key,
1415                              xquery, xquery_size,
1416                              reply_bf,
1417                              get->reply_bf_mutator,
1418                              peer_bf);
1419   /* clean up */
1420   if (NULL != reply_bf)
1421     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1422   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1423   return GNUNET_YES;
1424 }
1425
1426
1427 /**
1428  * Core handler for p2p result messages.
1429  *
1430  * @param cls closure
1431  * @param message message
1432  * @param peer peer identity this notification is about
1433  * @param atsi performance data
1434  * @return GNUNET_YES (do not cut p2p connection)
1435  */
1436 static int
1437 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1438                        const struct GNUNET_MessageHeader *message,
1439                        const struct GNUNET_TRANSPORT_ATS_Information
1440                        *atsi)
1441 {
1442   // FIXME!
1443   // 1) validate result format
1444   // 2) append 'peer' to put path
1445   // 3) forward to local clients
1446   // 4) p2p routing
1447   const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
1448       (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
1449   struct GNUNET_MessageHeader *enc_msg =
1450       (struct GNUNET_MessageHeader *) &incoming[1];
1451   struct DHT_MessageContext msg_ctx;
1452
1453
1454
1455   return GNUNET_YES;
1456 }
1457
1458
1459 /**
1460  * Initialize neighbours subsystem.
1461  */
1462 int
1463 GDS_NEIGHBOURS_init ()
1464 {
1465   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1466     {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1467     {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1468     {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1469     {NULL, 0, 0}
1470   };
1471   unsigned long long temp_config_num;
1472   struct GNUNET_TIME_Relative next_send_time;
1473  
1474   if (GNUNET_OK ==
1475       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
1476                                              &temp_config_num))
1477     bucket_size = (unsigned int) temp_config_num;  
1478   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1479                                  DEFAULT_CORE_QUEUE_SIZE,
1480                                  NULL,
1481                                  &core_init,
1482                                  &handle_core_connect,
1483                                  &handle_core_disconnect, 
1484                                  NULL,  /* Do we care about "status" updates? */
1485                                  NULL, GNUNET_NO,
1486                                  NULL, GNUNET_NO,
1487                                  core_handlers);
1488   if (coreAPI == NULL)
1489     return GNUNET_SYSERR;
1490   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1491   return GNUNET_OK;
1492 }
1493
1494
1495 /**
1496  * Shutdown neighbours subsystem.
1497  */
1498 void
1499 GDS_NEIGHBOURS_done ()
1500 {
1501   GNUNET_assert (coreAPI != NULL);
1502   GNUNET_CORE_disconnect (coreAPI);
1503   coreAPI = NULL;    
1504   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
1505   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1506   all_known_peers = NULL;
1507   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1508   {
1509     GNUNET_SCHEDULER_cancel (find_peer_task);
1510     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1511   }
1512 }
1513
1514
1515 /* end of gnunet-service-dht_neighbours.c */