stuff
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_datacache_lib.h"
36 #include "gnunet_transport_service.h"
37 #include "gnunet_hello_lib.h"
38 #include "gnunet_dht_service.h"
39 #include "gnunet_statistics_service.h"
40 #include "dht.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_nse.h"
45 #include "gnunet-service-dht_routing.h"
46 #include <fenv.h>
47
48 /**
49  * How many buckets will we allow total.
50  */
51 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
52
53 /**
54  * What is the maximum number of peers in a given bucket.
55  */
56 #define DEFAULT_BUCKET_SIZE 4
57
58 /**
59  * Size of the bloom filter the DHT uses to filter peers.
60  */
61 #define DHT_BLOOM_SIZE 128
62
63 /**
64  * How often to update our preference levels for peers in our routing tables.
65  */
66 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
67
68 /**
69  * How long at least to wait before sending another find peer request.
70  */
71 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
72
73 /**
74  * How long at most to wait before sending another find peer request.
75  */
76 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
77
78
79 /**
80  * P2P PUT message
81  */
82 struct PeerPutMessage
83 {
84   /**
85    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
86    */
87   struct GNUNET_MessageHeader header;
88
89   /**
90    * Processing options
91    */
92   uint32_t options GNUNET_PACKED;
93
94   /**
95    * Content type.
96    */
97   uint32_t type GNUNET_PACKED;
98
99   /**
100    * Hop count
101    */
102   uint32_t hop_count GNUNET_PACKED;
103
104   /**
105    * Replication level for this message
106    */
107   uint32_t desired_replication_level GNUNET_PACKED;
108
109   /**
110    * Length of the PUT path that follows (if tracked).
111    */
112   uint32_t put_path_length GNUNET_PACKED;
113
114   /**
115    * When does the content expire?
116    */
117   struct GNUNET_TIME_AbsoluteNBO expiration_time;
118
119   /**
120    * Bloomfilter (for peer identities) to stop circular routes
121    */
122   char bloomfilter[DHT_BLOOM_SIZE];
123
124   /**
125    * The key we are storing under.
126    */
127   GNUNET_HashCode key;
128
129   /* put path (if tracked) */
130
131   /* Payload */
132
133 };
134
135
136 /**
137  * P2P Result message
138  */
139 struct PeerResultMessage
140 {
141   /**
142    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
143    */
144   struct GNUNET_MessageHeader header;
145
146   /**
147    * Content type.
148    */
149   uint32_t type GNUNET_PACKED;
150
151   /**
152    * Length of the PUT path that follows (if tracked).
153    */
154   uint32_t put_path_length GNUNET_PACKED;
155
156   /**
157    * Length of the GET path that follows (if tracked).
158    */
159   uint32_t get_path_length GNUNET_PACKED;
160
161   /**
162    * When does the content expire?
163    */
164   struct GNUNET_TIME_AbsoluteNBO expiration_time;
165
166   /**
167    * The key of the corresponding GET request.
168    */
169   GNUNET_HashCode key;
170
171   /* put path (if tracked) */
172
173   /* get path (if tracked) */
174
175   /* Payload */
176
177 };
178
179
180 /**
181  * P2P GET message
182  */
183 struct PeerGetMessage
184 {
185   /**
186    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
187    */
188   struct GNUNET_MessageHeader header;
189
190   /**
191    * Processing options
192    */
193   uint32_t options GNUNET_PACKED;
194
195   /**
196    * Desired content type.
197    */
198   uint32_t type GNUNET_PACKED;
199
200   /**
201    * Hop count
202    */
203   uint32_t hop_count GNUNET_PACKED;
204
205   /**
206    * Desired replication level for this request.
207    */
208   uint32_t desired_replication_level GNUNET_PACKED;
209
210   /**
211    * Size of the extended query.
212    */
213   uint32_t xquery_size;
214
215   /**
216    * Bloomfilter mutator.
217    */
218   uint32_t bf_mutator;
219
220   /**
221    * Bloomfilter (for peer identities) to stop circular routes
222    */
223   char bloomfilter[DHT_BLOOM_SIZE];
224
225   /**
226    * The key we are looking for.
227    */
228   GNUNET_HashCode key;
229
230   /* xquery */
231
232   /* result bloomfilter */
233
234 };
235
236
237 /**
238  * Linked list of messages to send to a particular other peer.
239  */
240 struct P2PPendingMessage
241 {
242   /**
243    * Pointer to next item in the list
244    */
245   struct P2PPendingMessage *next;
246
247   /**
248    * Pointer to previous item in the list
249    */
250   struct P2PPendingMessage *prev;
251
252   /**
253    * Message importance level.  FIXME: used? useful?
254    */
255   unsigned int importance;
256
257   /**
258    * When does this message time out?
259    */
260   struct GNUNET_TIME_Absolute timeout;
261
262   /**
263    * Actual message to be sent, allocated at the end of the struct:
264    * // msg = (cast) &pm[1]; 
265    * // memcpy (&pm[1], data, len);
266    */
267   const struct GNUNET_MessageHeader *msg;
268
269 };
270
271
272 /**
273  * Entry for a peer in a bucket.
274  */
275 struct PeerInfo
276 {
277   /**
278    * Next peer entry (DLL)
279    */
280   struct PeerInfo *next;
281
282   /**
283    *  Prev peer entry (DLL)
284    */
285   struct PeerInfo *prev;
286
287   /**
288    * Count of outstanding messages for peer.  FIXME: NEEDED?
289    * FIXME: bound queue size!?
290    */
291   unsigned int pending_count;
292
293   /**
294    * Head of pending messages to be sent to this peer.
295    */
296   struct P2PPendingMessage *head;
297
298   /**
299    * Tail of pending messages to be sent to this peer.
300    */
301   struct P2PPendingMessage *tail;
302
303   /**
304    * Core handle for sending messages to this peer.
305    */
306   struct GNUNET_CORE_TransmitHandle *th;
307
308   /**
309    * Preference update context
310    */
311   struct GNUNET_CORE_InformationRequestContext *info_ctx;
312
313   /**
314    * Task for scheduling message sends.
315    */
316   GNUNET_SCHEDULER_TaskIdentifier send_task;
317
318   /**
319    * Task for scheduling preference updates
320    */
321   GNUNET_SCHEDULER_TaskIdentifier preference_task;
322
323   /**
324    * What is the identity of the peer?
325    */
326   struct GNUNET_PeerIdentity id;
327
328 #if 0
329   /**
330    * What is the average latency for replies received?
331    */
332   struct GNUNET_TIME_Relative latency;
333
334   /**
335    * Transport level distance to peer.
336    */
337   unsigned int distance;
338 #endif
339
340 };
341
342
343 /**
344  * Peers are grouped into buckets.
345  */
346 struct PeerBucket
347 {
348   /**
349    * Head of DLL
350    */
351   struct PeerInfo *head;
352
353   /**
354    * Tail of DLL
355    */
356   struct PeerInfo *tail;
357
358   /**
359    * Number of peers in the bucket.
360    */
361   unsigned int peers_size;
362 };
363
364
365 /**
366  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
367  */
368 static unsigned int closest_bucket;
369
370 /**
371  * How many peers have we added since we sent out our last
372  * find peer request?
373  */
374 static unsigned int newly_found_peers;
375
376 /**
377  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
378  */
379 static struct PeerBucket k_buckets[MAX_BUCKETS];
380
381 /**
382  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
383  */
384 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
385
386 /**
387  * Maximum size for each bucket.
388  */
389 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
390
391 /**
392  * Task that sends FIND PEER requests.
393  */
394 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
395
396 /**
397  * Identity of this peer.
398  */ 
399 static struct GNUNET_PeerIdentity my_identity;
400
401 /**
402  * Handle to GNUnet core.
403  */
404 static struct GNUNET_CORE_Handle *coreAPI;
405
406
407 /**
408  * Find the optimal bucket for this key.
409  *
410  * @param hc the hashcode to compare our identity to
411  * @return the proper bucket index, or GNUNET_SYSERR
412  *         on error (same hashcode)
413  */
414 static int
415 find_bucket (const GNUNET_HashCode * hc)
416 {
417   unsigned int bits;
418
419   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
420   if (bits == MAX_BUCKETS)
421     {
422       /* How can all bits match? Got my own ID? */
423       GNUNET_break (0);
424       return GNUNET_SYSERR; 
425     }
426   return MAX_BUCKETS - bits - 1;
427 }
428
429
430 /**
431  * Let GNUnet core know that we like the given peer.
432  *
433  * @param cls the 'struct PeerInfo' of the peer
434  * @param tc scheduler context.
435  */ 
436 static void
437 update_core_preference (void *cls,
438                         const struct GNUNET_SCHEDULER_TaskContext *tc);
439
440
441 /**
442  * Function called with statistics about the given peer.
443  *
444  * @param cls closure
445  * @param peer identifies the peer
446  * @param bpm_out set to the current bandwidth limit (sending) for this peer
447  * @param amount set to the amount that was actually reserved or unreserved;
448  *               either the full requested amount or zero (no partial reservations)
449  * @param res_delay if the reservation could not be satisfied (amount was 0), how
450  *        long should the client wait until re-trying?
451  * @param preference current traffic preference for the given peer
452  */
453 static void
454 update_core_preference_finish (void *cls,
455                                const struct GNUNET_PeerIdentity *peer,
456                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
457                                int32_t amount,
458                                struct GNUNET_TIME_Relative res_delay,
459                                uint64_t preference)
460 {
461   struct PeerInfo *peer_info = cls;
462
463   peer_info->info_ctx = NULL;
464   peer_info->preference_task
465     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
466                                     &update_core_preference, peer_info);
467 }
468
469
470 /**
471  * Let GNUnet core know that we like the given peer.
472  *
473  * @param cls the 'struct PeerInfo' of the peer
474  * @param tc scheduler context.
475  */ 
476 static void
477 update_core_preference (void *cls,
478                         const struct GNUNET_SCHEDULER_TaskContext *tc)
479 {
480   struct PeerInfo *peer = cls;
481   uint64_t preference;
482   unsigned int matching;
483   int bucket;
484
485   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
486   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
487     return;  
488   matching =
489     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
490                                       &peer->id.hashPubKey);
491   if (matching >= 64)
492     matching = 63;
493   bucket = find_bucket(&peer->id.hashPubKey);
494   if (bucket == GNUNET_SYSERR)
495     preference = 0;
496   else
497     preference = (1LL << matching) / k_buckets[bucket].peers_size;
498   if (preference == 0)
499     {
500       peer->preference_task
501         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
502                                         &update_core_preference, peer);
503       return;
504     }
505   peer->info_ctx =
506     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
507                                         GNUNET_TIME_UNIT_FOREVER_REL,
508                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
509                                         preference,
510                                         &update_core_preference_finish, peer);
511 }
512
513
514 /**
515  * Method called whenever a peer connects.
516  *
517  * @param cls closure
518  * @param peer peer identity this notification is about
519  * @param atsi performance data
520  */
521 static void
522 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
523                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
524 {
525   struct PeerInfo *ret;
526   int peer_bucket;
527
528   /* Check for connect to self message */
529   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
530     return;
531   if (GNUNET_YES ==
532       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
533                                               &peer->hashPubKey))
534   {
535     GNUNET_break (0);
536     return;
537   }
538   peer_bucket = find_bucket (&peer->hashPubKey);
539   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
540   ret = GNUNET_malloc (sizeof (struct PeerInfo));
541 #if 0
542   ret->latency = latency;
543   ret->distance = distance;
544 #endif
545   ret->id = *peer;
546   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
547                                      k_buckets[peer_bucket].tail,
548                                      k_buckets[peer_bucket].tail, ret);
549   k_buckets[peer_bucket].peers_size++;
550   closest_bucket = GNUNET_MAX (closest_bucket,
551                                peer_bucket);
552   if ( (peer_bucket > 0) &&
553        (k_buckets[peer_bucket].peers_size <= bucket_size) )
554     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
555   newly_found_peers++;
556   GNUNET_assert (GNUNET_OK ==
557                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
558                                                     &peer->hashPubKey, ret,
559                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
560 }
561
562
563 /**
564  * Method called whenever a peer disconnects.
565  *
566  * @param cls closure
567  * @param peer peer identity this notification is about
568  */
569 static void
570 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
571 {
572   struct PeerInfo *to_remove;
573   int current_bucket;
574   struct P2PPendingMessage *pos;
575
576   /* Check for disconnect from self message */
577   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
578     return;
579   to_remove =
580       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
581   if (NULL == to_remove)
582     {
583       GNUNET_break (0);
584       return;
585     }
586   GNUNET_assert (GNUNET_YES ==
587                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
588                                                        &peer->hashPubKey,
589                                                        to_remove));
590   if (NULL != to_remove->info_ctx)
591   {
592     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
593     to_remove->info_ctx = NULL;
594   }
595   current_bucket = find_bucket (&to_remove->id.hashPubKey);
596   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
597                                k_buckets[current_bucket].tail,
598                                to_remove);
599   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
600   k_buckets[current_bucket].peers_size--;
601   while ( (closest_bucket > 0) &&
602           (k_buckets[closest_bucket].peers_size == 0) )
603     closest_bucket--;
604
605   if (to_remove->th != NULL) 
606   {
607     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
608     to_remove->th = NULL;
609   }
610   while (NULL != (pos = to_remove->head))
611   {
612     GNUNET_CONTAINER_DLL_remove (to_remove->head,
613                                  to_remove->tail,
614                                  pos);
615     GNUNET_free (pos);
616   }
617 }
618
619
620 /**
621  * Called when core is ready to send a message we asked for
622  * out to the destination.
623  *
624  * @param cls the 'struct PeerInfo' of the target peer
625  * @param size number of bytes available in buf
626  * @param buf where the callee should write the message
627  * @return number of bytes written to buf
628  */
629 static size_t
630 core_transmit_notify (void *cls, size_t size, void *buf)
631 {
632   struct PeerInfo *peer = cls;
633   char *cbuf = buf;
634   struct P2PPendingMessage *pending;
635   size_t off;
636   size_t msize;
637
638   peer->th = NULL;
639   if (buf == NULL)
640   {
641     /* client disconnected */
642     return 0;
643   }
644   if (peer->head == NULL)
645   {
646     /* no messages pending */
647     return 0;
648   }
649   off = 0;
650   while ( (NULL != (pending = peer->head)) &&
651           (size - off >= (msize = ntohs (pending->msg->size))) )
652   {
653     memcpy (&cbuf[off], pending->msg, msize);
654     off += msize;
655     peer->pending_count--;
656     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
657     GNUNET_free (pending);
658   }
659   if (peer->head != NULL)
660     peer->th 
661       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
662                                            pending->importance,
663                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
664                                            &peer->id, msize,
665                                            &core_transmit_notify, peer);
666
667   return off;
668 }
669
670
671 /**
672  * Transmit all messages in the peer's message queue.
673  *
674  * @param peer message queue to process
675  */
676 static void
677 process_peer_queue (struct PeerInfo *peer)
678 {
679   struct P2PPendingMessage *pending;
680
681   if (NULL != (pending = peer->head))
682     return;
683   if (NULL != peer->th)
684     return;
685   peer->th 
686     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
687                                          pending->importance,
688                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
689                                          &peer->id,
690                                          ntohs (pending->msg->size),
691                                          &core_transmit_notify, peer);
692 }
693
694
695 /**
696  * To how many peers should we (on average) forward the request to
697  * obtain the desired target_replication count (on average).
698  *
699  * @param hop_count number of hops the message has traversed
700  * @param target_replication the number of total paths desired
701  * @return Some number of peers to forward the message to
702  */
703 static unsigned int
704 get_forward_count (uint32_t hop_count, 
705                    uint32_t target_replication)
706 {
707   uint32_t random_value;
708   uint32_t forward_count;
709   float target_value;
710
711   if (hop_count > GDS_NSE_get () * 4.0)
712   {
713     /* forcefully terminate */
714     return 0;
715   }
716   if (hop_count > GDS_NSE_get () * 2.0)
717   {
718     /* Once we have reached our ideal number of hops, only forward to 1 peer */
719     return 1;
720   }
721   /* bound by system-wide maximum */
722   target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
723                                    target_replication);
724   target_value =
725     1 + (target_replication - 1.0) / (GDS_NSE_get () +
726                                       ((float) (target_replication - 1.0) *
727                                        hop_count));
728   /* Set forward count to floor of target_value */
729   forward_count = (uint32_t) target_value;
730   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
731   target_value = target_value - forward_count;
732   random_value =
733     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
734   if (random_value < (target_value * UINT32_MAX))
735     forward_count++;
736   return forward_count;
737 }
738
739
740 /**
741  * Compute the distance between have and target as a 32-bit value.
742  * Differences in the lower bits must count stronger than differences
743  * in the higher bits.
744  *
745  * @return 0 if have==target, otherwise a number
746  *           that is larger as the distance between
747  *           the two hash codes increases
748  */
749 static unsigned int
750 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
751 {
752   unsigned int bucket;
753   unsigned int msb;
754   unsigned int lsb;
755   unsigned int i;
756
757   /* We have to represent the distance between two 2^9 (=512)-bit
758    * numbers as a 2^5 (=32)-bit number with "0" being used for the
759    * two numbers being identical; furthermore, we need to
760    * guarantee that a difference in the number of matching
761    * bits is always represented in the result.
762    *
763    * We use 2^32/2^9 numerical values to distinguish between
764    * hash codes that have the same LSB bit distance and
765    * use the highest 2^9 bits of the result to signify the
766    * number of (mis)matching LSB bits; if we have 0 matching
767    * and hence 512 mismatching LSB bits we return -1 (since
768    * 512 itself cannot be represented with 9 bits) */
769
770   /* first, calculate the most significant 9 bits of our
771    * result, aka the number of LSBs */
772   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
773   /* bucket is now a value between 0 and 512 */
774   if (bucket == 512)
775     return 0;                   /* perfect match */
776   if (bucket == 0)
777     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
778                                  * below, we'd end up with max+1 (overflow)) */
779
780   /* calculate the most significant bits of the final result */
781   msb = (512 - bucket) << (32 - 9);
782   /* calculate the 32-9 least significant bits of the final result by
783    * looking at the differences in the 32-9 bits following the
784    * mismatching bit at 'bucket' */
785   lsb = 0;
786   for (i = bucket + 1;
787        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
788   {
789     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
790         GNUNET_CRYPTO_hash_get_bit (have, i))
791       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
792                                                  * last bit set will be 31 -- if
793                                                  * i does not reach 512 first... */
794   }
795   return msb | lsb;
796 }
797
798
799 /**
800  * Return a number that is larger the closer the
801  * "have" GNUNET_hash code is to the "target".
802  *
803  * @return inverse distance metric, non-zero.
804  *         Must fudge the value if NO bits match.
805  */
806 static unsigned int
807 inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
808 {
809   if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
810     return 1;                   /* Never return 0! */
811   return ((unsigned int) -1) - distance (target, have);
812 }
813
814
815 /**
816  * Check whether my identity is closer than any known peers.  If a
817  * non-null bloomfilter is given, check if this is the closest peer
818  * that hasn't already been routed to.
819  * FIXME: needed?
820  *
821  * @param key hash code to check closeness to
822  * @param bloom bloomfilter, exclude these entries from the decision
823  * @return GNUNET_YES if node location is closest,
824  *         GNUNET_NO otherwise.
825  */
826 /* static */
827 int
828 am_closest_peer (const GNUNET_HashCode *key,
829                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
830 {
831   int bits;
832   int other_bits;
833   int bucket_num;
834   int count;
835   struct PeerInfo *pos;
836   unsigned int my_distance;
837
838   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
839     return GNUNET_YES;
840   bucket_num = find_bucket (key);
841   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
842   my_distance = distance (&my_identity.hashPubKey, key);
843   pos = k_buckets[bucket_num].head;
844   count = 0;
845   while ((pos != NULL) && (count < bucket_size))
846   {
847     if ((bloom != NULL) &&
848         (GNUNET_YES ==
849          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
850     {
851       pos = pos->next;
852       continue;                 /* Skip already checked entries */
853     }
854     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
855     if (other_bits > bits)
856       return GNUNET_NO;
857     if (other_bits == bits)        /* We match the same number of bits */
858       return GNUNET_YES;
859     pos = pos->next;
860   }
861   /* No peers closer, we are the closest! */
862   return GNUNET_YES;
863 }
864
865
866 /**
867  * Select a peer from the routing table that would be a good routing
868  * destination for sending a message for "key".  The resulting peer
869  * must not be in the set of blocked peers.<p>
870  *
871  * Note that we should not ALWAYS select the closest peer to the
872  * target, peers further away from the target should be chosen with
873  * exponentially declining probability.
874  *
875  * FIXME: double-check that this is fine
876  * 
877  *
878  * @param key the key we are selecting a peer to route to
879  * @param bloom a bloomfilter containing entries this request has seen already
880  * @param hops how many hops has this message traversed thus far
881  * @return Peer to route to, or NULL on error
882  */
883 static struct PeerInfo *
884 select_peer (const GNUNET_HashCode *key,
885              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
886              uint32_t hops)
887 {
888   unsigned int bc;
889   unsigned int count;
890   unsigned int selected;
891   struct PeerInfo *pos;
892   unsigned int distance;
893   unsigned int largest_distance;
894   struct PeerInfo *chosen;
895
896   if (hops >= GDS_NSE_get ())
897   {
898     /* greedy selection (closest peer that is not in bloomfilter) */
899     largest_distance = 0;
900     chosen = NULL;
901     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
902     {
903       pos = k_buckets[bc].head;
904       count = 0;
905       while ((pos != NULL) && (count < bucket_size))
906       {
907         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
908         if (GNUNET_NO ==
909             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
910         {
911           distance = inverse_distance (key, &pos->id.hashPubKey);
912           if (distance > largest_distance)
913           {
914             chosen = pos;
915             largest_distance = distance;
916           }
917         }
918         count++;
919         pos = pos->next;
920       }
921     }
922     return chosen;
923   }
924
925   /* select "random" peer */
926   /* count number of peers that are available and not filtered */
927   count = 0;
928   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
929   {
930     pos = k_buckets[bc].head;
931     while ((pos != NULL) && (count < bucket_size))
932     {
933       if (GNUNET_YES ==
934           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
935       {
936         pos = pos->next;
937         continue;               /* Ignore bloomfiltered peers */
938       }
939       count++;
940       pos = pos->next;
941     }
942   }
943   if (count == 0)               /* No peers to select from! */
944   {
945     return NULL;
946   }
947   /* Now actually choose a peer */
948   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
949   count = 0;
950   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
951   {
952     pos = k_buckets[bc].head;
953     while ((pos != NULL) && (count < bucket_size))
954     {
955       if (GNUNET_YES ==
956           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
957       {
958         pos = pos->next;
959         continue;               /* Ignore bloomfiltered peers */
960       }
961       if (0 == selected--)
962         return pos;
963       pos = pos->next;
964     }
965   }
966   GNUNET_break (0);
967   return NULL;
968 }
969
970
971 /**
972  * Compute the set of peers that the given request should be
973  * forwarded to.
974  *
975  * @param key routing key
976  * @param bloom bloom filter excluding peers as targets, all selected
977  *        peers will be added to the bloom filter
978  * @param hop_count number of hops the request has traversed so far
979  * @param target_replication desired number of replicas
980  * @param targets where to store an array of target peers (to be
981  *         free'd by the caller)
982  * @return number of peers returned in 'targets'.
983  */
984 static unsigned int
985 get_target_peers (const GNUNET_HashCode *key,
986                   struct GNUNET_CONTAINER_BloomFilter *bloom,
987                   uint32_t hop_count,
988                   uint32_t target_replication,
989                   struct PeerInfo ***targets)
990 {
991   unsigned int ret;
992   unsigned int off;
993   struct PeerInfo **rtargets;
994   struct PeerInfo *nxt;
995
996   ret = get_forward_count (hop_count, target_replication);
997   if (ret == 0)
998   {
999     *targets = NULL;
1000     return 0;
1001   }
1002   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1003   off = 0;
1004   while (ret-- > 0)
1005   {
1006     nxt = select_peer (key, bloom, hop_count);
1007     if (nxt == NULL)
1008       break;
1009     rtargets[off++] = nxt;
1010     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1011   }
1012   if (0 == off)
1013   {
1014     GNUNET_free (rtargets);
1015     *targets = NULL;
1016     return 0;
1017   }
1018   *targets = rtargets;
1019   return off;
1020 }
1021
1022
1023 /**
1024  * Perform a PUT operation.   Forwards the given request to other
1025  * peers.   Does not store the data locally.  Does not give the
1026  * data to local clients.  May do nothing if this is the only
1027  * peer in the network (or if we are the closest peer in the
1028  * network).
1029  *
1030  * @param type type of the block
1031  * @param options routing options
1032  * @param desired_replication_level desired replication count
1033  * @param expiration_time when does the content expire
1034  * @param hop_count how many hops has this message traversed so far
1035  * @param bf Bloom filter of peers this PUT has already traversed
1036  * @param key key for the content
1037  * @param put_path_length number of entries in put_path
1038  * @param put_path peers this request has traversed so far (if tracked)
1039  * @param data payload to store
1040  * @param data_size number of bytes in data
1041  */
1042 void
1043 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1044                            enum GNUNET_DHT_RouteOption options,
1045                            uint32_t desired_replication_level,
1046                            struct GNUNET_TIME_Absolute expiration_time,
1047                            uint32_t hop_count,
1048                            struct GNUNET_CONTAINER_BloomFilter *bf,
1049                            const GNUNET_HashCode *key,
1050                            unsigned int put_path_length,
1051                            struct GNUNET_PeerIdentity *put_path,
1052                            const void *data,
1053                            size_t data_size)
1054 {
1055   unsigned int target_count;
1056   unsigned int i;
1057   struct PeerInfo **targets;
1058   struct PeerInfo *target;
1059   struct P2PPendingMessage *pending;
1060   size_t msize;
1061   struct PeerPutMessage *ppm;
1062   struct GNUNET_PeerIdentity *pp;
1063   
1064   target_count = get_target_peers (key, bf, hop_count,
1065                                    desired_replication_level,
1066                                    &targets);
1067   if (0 == target_count)
1068     return;
1069   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1070   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1071   {
1072     put_path_length = 0;
1073     msize = data_size + sizeof (struct PeerPutMessage);
1074   }
1075   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1076   {
1077     GNUNET_break (0);
1078     return;
1079   }
1080   for (i=0;i<target_count;i++)
1081   {
1082     target = targets[i];
1083     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1084     pending->importance = 0; /* FIXME */
1085     pending->timeout = expiration_time;   
1086     ppm = (struct PeerPutMessage*) &pending[1];
1087     pending->msg = &ppm->header;
1088     ppm->header.size = htons (msize);
1089     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1090     ppm->options = htonl (options);
1091     ppm->type = htonl (type);
1092     ppm->hop_count = htonl (hop_count + 1);
1093     ppm->desired_replication_level = htonl (desired_replication_level);
1094     ppm->put_path_length = htonl (put_path_length);
1095     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1096     GNUNET_assert (GNUNET_OK ==
1097                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1098                                                               ppm->bloomfilter,
1099                                                               DHT_BLOOM_SIZE));
1100     ppm->key = *key;
1101     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1102     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1103     memcpy (&pp[put_path_length], data, data_size);
1104     GNUNET_CONTAINER_DLL_insert (target->head,
1105                                  target->tail,
1106                                  pending);
1107     target->pending_count++;
1108     process_peer_queue (target);
1109   }
1110   GNUNET_free (targets);
1111 }
1112
1113
1114 /**
1115  * Perform a GET operation.  Forwards the given request to other
1116  * peers.  Does not lookup the key locally.  May do nothing if this is
1117  * the only peer in the network (or if we are the closest peer in the
1118  * network).
1119  *
1120  * @param type type of the block
1121  * @param options routing options
1122  * @param desired_replication_level desired replication count
1123  * @param hop_count how many hops did this request traverse so far?
1124  * @param key key for the content
1125  * @param xquery extended query
1126  * @param xquery_size number of bytes in xquery
1127  * @param reply_bf bloomfilter to filter duplicates
1128  * @param reply_bf_mutator mutator for reply_bf
1129  * @param peer_bf filter for peers not to select (again)
1130  */
1131 void
1132 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1133                            enum GNUNET_DHT_RouteOption options,
1134                            uint32_t desired_replication_level,
1135                            uint32_t hop_count,
1136                            const GNUNET_HashCode *key,
1137                            const void *xquery,
1138                            size_t xquery_size,
1139                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1140                            uint32_t reply_bf_mutator,
1141                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1142 {
1143   unsigned int target_count;
1144   unsigned int i;
1145   struct PeerInfo **targets;
1146   struct PeerInfo *target;
1147   struct P2PPendingMessage *pending;
1148   size_t msize;
1149   struct PeerGetMessage *pgm;
1150   char *xq;
1151   size_t reply_bf_size;
1152   
1153   target_count = get_target_peers (key, peer_bf, hop_count,
1154                                    desired_replication_level,
1155                                    &targets);
1156   if (0 == target_count)
1157     return;
1158   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1159   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1160   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1161   {
1162     GNUNET_break (0);
1163     return;
1164   }
1165   /* forward request */
1166   for (i=0;i<target_count;i++)
1167   {
1168     target = targets[i];
1169     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1170     pending->importance = 0; /* FIXME */
1171     pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
1172     pgm = (struct PeerGetMessage*) &pending[1];
1173     pending->msg = &pgm->header;
1174     pgm->header.size = htons (msize);
1175     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1176     pgm->options = htonl (options);
1177     pgm->type = htonl (type);
1178     pgm->hop_count = htonl (hop_count + 1);
1179     pgm->desired_replication_level = htonl (desired_replication_level);
1180     pgm->xquery_size = htonl (xquery_size);
1181     pgm->bf_mutator = reply_bf_mutator; 
1182     GNUNET_assert (GNUNET_OK ==
1183                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1184                                                               pgm->bloomfilter,
1185                                                               DHT_BLOOM_SIZE));
1186     pgm->key = *key;
1187     xq = (char *) &pgm[1];
1188     memcpy (xq, xquery, xquery_size);
1189     GNUNET_assert (GNUNET_OK ==
1190                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1191                                                               &xq[xquery_size],
1192                                                               reply_bf_size));
1193     GNUNET_CONTAINER_DLL_insert (target->head,
1194                                  target->tail,
1195                                  pending);
1196     target->pending_count++;
1197     process_peer_queue (target);
1198   }
1199   GNUNET_free (targets);
1200 }
1201
1202
1203 /**
1204  * Handle a reply (route to origin).  Only forwards the reply back to
1205  * the given peer.  Does not do local caching or forwarding to local
1206  * clients.
1207  *
1208  * @param target neighbour that should receive the block (if still connected)
1209  * @param type type of the block
1210  * @param expiration_time when does the content expire
1211  * @param key key for the content
1212  * @param put_path_length number of entries in put_path
1213  * @param put_path peers the original PUT traversed (if tracked)
1214  * @param get_path_length number of entries in put_path
1215  * @param get_path peers this reply has traversed so far (if tracked)
1216  * @param data payload of the reply
1217  * @param data_size number of bytes in data
1218  */
1219 void
1220 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1221                              enum GNUNET_BLOCK_Type type,
1222                              struct GNUNET_TIME_Absolute expiration_time,
1223                              const GNUNET_HashCode *key,
1224                              unsigned int put_path_length,
1225                              struct GNUNET_PeerIdentity *put_path,
1226                              unsigned int get_path_length,
1227                              struct GNUNET_PeerIdentity *get_path,
1228                              const void *data,
1229                              size_t data_size)
1230 {
1231   struct PeerInfo *pi;
1232   struct P2PPendingMessage *pending;
1233   size_t msize;
1234   struct PeerResultMessage *prm;
1235   struct GNUNET_PeerIdentity *paths;
1236   
1237   msize = data_size + sizeof (struct PeerResultMessage) + 
1238     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1239   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1240        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1241        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1242        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1243   {
1244     GNUNET_break (0);
1245     return;
1246   }
1247   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1248                                           &target->hashPubKey);
1249   if (NULL == pi)
1250   {
1251     /* peer disconnected in the meantime, drop reply */
1252     return;
1253   }
1254   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1255   pending->importance = 0; /* FIXME */
1256   pending->timeout = expiration_time;
1257   prm = (struct PeerResultMessage*) &pending[1];
1258   pending->msg = &prm->header;
1259   prm->header.size = htons (msize);
1260   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1261   prm->type = htonl (type);
1262   prm->put_path_length = htonl (put_path_length);
1263   prm->get_path_length = htonl (get_path_length);
1264   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1265   prm->key = *key;
1266   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1267   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1268   memcpy (&paths[put_path_length],
1269           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1270   memcpy (&paths[put_path_length + get_path_length],
1271           data, data_size);
1272   GNUNET_CONTAINER_DLL_insert (pi->head,
1273                                pi->tail,
1274                                pending);
1275   pi->pending_count++;
1276   process_peer_queue (pi);
1277 }
1278
1279
1280 /**
1281  * Closure for 'add_known_to_bloom'.
1282  */
1283 struct BloomConstructorContext
1284 {
1285   /**
1286    * Bloom filter under construction.
1287    */
1288   struct GNUNET_CONTAINER_BloomFilter *bloom;
1289
1290   /**
1291    * Mutator to use.
1292    */
1293   uint32_t bf_mutator;
1294 };
1295
1296
1297 /**
1298  * Add each of the peers we already know to the bloom filter of
1299  * the request so that we don't get duplicate HELLOs.
1300  *
1301  * @param cls the 'struct BloomConstructorContext'.
1302  * @param key peer identity to add to the bloom filter
1303  * @param value value the peer information (unused)
1304  * @return GNUNET_YES (we should continue to iterate)
1305  */
1306 static int
1307 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1308 {
1309   struct BloomConstructorContext *ctx = cls;
1310   GNUNET_HashCode mh;
1311
1312   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1313   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1314   return GNUNET_YES;
1315 }
1316
1317
1318 /**
1319  * Task to send a find peer message for our own peer identifier
1320  * so that we can find the closest peers in the network to ourselves
1321  * and attempt to connect to them.
1322  *
1323  * @param cls closure for this task
1324  * @param tc the context under which the task is running
1325  */
1326 static void
1327 send_find_peer_message (void *cls,
1328                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1329 {
1330   struct GNUNET_TIME_Relative next_send_time;
1331   struct BloomConstructorContext bcc;
1332
1333   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1334   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1335     return;
1336   if (newly_found_peers > bucket_size) 
1337   {
1338     /* If we are finding many peers already, no need to send out our request right now! */
1339     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1340                                                    &send_find_peer_message, NULL);
1341     newly_found_peers = 0;
1342     return;
1343   }
1344   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1345   bcc.bloom =
1346     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1347   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1348                                          &add_known_to_bloom,
1349                                          &bcc);
1350   // FIXME: pass priority!?
1351   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1352                              GNUNET_DHT_RO_FIND_PEER,
1353                              16 /* FIXME: replication level? */,
1354                              0,
1355                              &my_identity.hashPubKey,
1356                              NULL, 0,
1357                              bcc.bloom, bcc.bf_mutator, NULL);
1358   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1359   /* schedule next round */
1360   newly_found_peers = 0;
1361   next_send_time.rel_value =
1362     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1363     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1364                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1365   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1366                                                  &send_find_peer_message,
1367                                                  NULL);  
1368 }
1369
1370
1371 /**
1372  * To be called on core init/fail.
1373  *
1374  * @param cls service closure
1375  * @param server handle to the server for this service
1376  * @param identity the public identity of this peer
1377  * @param publicKey the public key of this peer
1378  */
1379 static void
1380 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1381            const struct GNUNET_PeerIdentity *identity,
1382            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1383 {
1384   struct GNUNET_TIME_Relative next_send_time;
1385
1386   GNUNET_assert (server != NULL);
1387   my_identity = *identity;
1388   /* FIXME: do upon 1st connect instead! */
1389   next_send_time.rel_value =
1390     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1391     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1392                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1393                                2) -
1394                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1395   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1396                                                  &send_find_peer_message,
1397                                                  NULL);
1398 }
1399
1400
1401 /**
1402  * Core handler for p2p put requests.
1403  *
1404  * @param cls closure
1405  * @param peer sender of the request
1406  * @param message message
1407  * @param peer peer identity this notification is about
1408  * @param atsi performance data
1409  * @return GNUNET_OK to keep the connection open,
1410  *         GNUNET_SYSERR to close it (signal serious error)
1411  */
1412 static int
1413 handle_dht_p2p_put (void *cls,
1414                     const struct GNUNET_PeerIdentity *peer,
1415                     const struct GNUNET_MessageHeader *message,
1416                     const struct GNUNET_TRANSPORT_ATS_Information
1417                     *atsi)
1418 {
1419   const struct PeerPutMessage *put;
1420   const struct GNUNET_PeerIdentity *put_path;
1421   const void *payload;
1422   uint32_t putlen;
1423   uint16_t msize;
1424   size_t payload_size;
1425   struct GNUNET_CONTAINER_BloomFilter *bf;
1426   GNUNET_HashCode test_key;
1427   
1428   msize = ntohs (message->size);
1429   if (msize < sizeof (struct PeerPutMessage))
1430   {
1431     GNUNET_break_op (0);
1432     return GNUNET_YES;
1433   }
1434   put = (const struct PeerPutMessage*) message;
1435   putlen = ntohl (put->put_path_length);
1436   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1437        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1438     {
1439       GNUNET_break_op (0);
1440       return GNUNET_YES;
1441     }
1442   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1443   payload = &put_path[putlen];
1444   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1445                           putlen * sizeof (struct GNUNET_PeerIdentity));
1446   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1447                                 ntohl (put->type),
1448                                 payload, payload_size,
1449                                 &test_key))
1450   {
1451   case GNUNET_YES:
1452     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1453     {
1454       GNUNET_break_op (0);
1455       return GNUNET_YES;
1456     }
1457     break;
1458   case GNUNET_NO:
1459     GNUNET_break_op (0);
1460     return GNUNET_YES;
1461   case GNUNET_SYSERR:
1462     /* cannot verify, good luck */
1463     break;
1464   }
1465   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1466                                           DHT_BLOOM_SIZE,
1467                                           DHT_BLOOM_K);
1468   {
1469     struct GNUNET_PeerIdentity pp[putlen+1];
1470   
1471     /* extend 'put path' by sender */
1472     memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1473     pp[putlen] = *peer;
1474
1475     /* give to local clients */
1476     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1477                              &put->key,
1478                              0, NULL,
1479                              putlen + 1,
1480                              pp,
1481                              ntohl (put->type),
1482                              payload_size,
1483                              payload);
1484     /* store locally */
1485     GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1486                               &put->key,
1487                               putlen + 1, pp,
1488                               ntohl (put->type),
1489                               payload_size,
1490                               payload);
1491     /* route to other peers */
1492     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1493                                ntohl (put->options),
1494                                ntohl (put->desired_replication_level),
1495                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1496                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1497                                bf,
1498                                &put->key,
1499                                putlen + 1, pp,
1500                                payload,
1501                                payload_size);
1502   }
1503   GNUNET_CONTAINER_bloomfilter_free (bf);
1504   return GNUNET_YES;
1505 }
1506
1507
1508 /**
1509  * Core handler for p2p get requests.
1510  *
1511  * @param cls closure
1512  * @param peer sender of the request
1513  * @param message message
1514  * @param peer peer identity this notification is about
1515  * @param atsi performance data
1516  * @return GNUNET_OK to keep the connection open,
1517  *         GNUNET_SYSERR to close it (signal serious error)
1518  */
1519 static int
1520 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1521                     const struct GNUNET_MessageHeader *message,
1522                     const struct GNUNET_TRANSPORT_ATS_Information
1523                     *atsi)
1524 {
1525   struct PeerGetMessage *get;
1526   uint32_t xquery_size;
1527   size_t reply_bf_size;
1528   uint16_t msize;
1529   enum GNUNET_BLOCK_Type type;
1530   enum GNUNET_DHT_RouteOption options;
1531   enum GNUNET_BLOCK_EvaluationResult eval;
1532   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1533   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1534   const char *xquery;
1535                       
1536   /* parse and validate message */
1537   msize = ntohs (message->size);
1538   if (msize < sizeof (struct PeerGetMessage))
1539   {
1540     GNUNET_break_op (0);
1541     return GNUNET_YES;
1542   }
1543   get = (struct PeerGetMessage *) message;
1544   xquery_size = ntohl (get->xquery_size);
1545   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1546   {
1547     GNUNET_break_op (0);
1548     return GNUNET_YES;
1549   }
1550   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1551   type = ntohl (get->type);
1552   options = ntohl (get->options);
1553   xquery = (const char*) &get[1];
1554   reply_bf = NULL;
1555   if (reply_bf_size > 0)
1556     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1557                                                   reply_bf_size,
1558                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1559   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1560                                 type,
1561                                 &get->key,
1562                                 &reply_bf,
1563                                 get->bf_mutator,
1564                                 xquery, xquery_size,
1565                                 NULL, 0);
1566   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1567   {
1568     /* request invalid or block type not supported */
1569     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1570     if (NULL != reply_bf)
1571       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1572     return GNUNET_YES;
1573   }
1574   peer_bf =
1575     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1576                                        DHT_BLOOM_SIZE,
1577                                        DHT_BLOOM_K);
1578
1579   /* remember request for routing replies */
1580   GDS_ROUTING_add (peer,
1581                    type,
1582                    &get->key,
1583                    xquery, xquery_size,
1584                    reply_bf, get->bf_mutator);
1585   /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1586
1587   /* local lookup (this may update the reply_bf) */
1588   GDS_DATACACHE_handle_get (&get->key,
1589                             type,
1590                             xquery, xquery_size,
1591                             &reply_bf, 
1592                             get->bf_mutator);
1593   /* FIXME: should track if the local lookup resulted in a
1594      definitive result and then NOT do P2P forwarding */
1595     
1596   /* P2P forwarding */
1597   GDS_NEIGHBOURS_handle_get (type,
1598                              options,
1599                              ntohl (get->desired_replication_level),
1600                              ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1601                              &get->key,
1602                              xquery, xquery_size,
1603                              reply_bf,
1604                              get->bf_mutator,
1605                              peer_bf);
1606   /* clean up */
1607   if (NULL != reply_bf)
1608     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1609   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1610   return GNUNET_YES;
1611 }
1612
1613
1614 /**
1615  * Core handler for p2p result messages.
1616  *
1617  * @param cls closure
1618  * @param message message
1619  * @param peer peer identity this notification is about
1620  * @param atsi performance data
1621  * @return GNUNET_YES (do not cut p2p connection)
1622  */
1623 static int
1624 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1625                        const struct GNUNET_MessageHeader *message,
1626                        const struct GNUNET_TRANSPORT_ATS_Information
1627                        *atsi)
1628 {
1629   const struct PeerResultMessage *prm;
1630   const struct GNUNET_PeerIdentity *put_path;
1631   const struct GNUNET_PeerIdentity *get_path;
1632   const void *data;
1633   uint32_t get_path_length;
1634   uint32_t put_path_length;
1635   uint16_t msize;
1636   size_t data_size;
1637   enum GNUNET_BLOCK_Type type;
1638                        
1639   /* parse and validate message */
1640   msize = ntohs (message->size);
1641   if (msize < sizeof (struct PeerResultMessage))
1642   {
1643     GNUNET_break_op (0);
1644     return GNUNET_YES;
1645   }
1646   prm = (struct PeerResultMessage *) message;
1647   put_path_length = ntohl (prm->put_path_length);
1648   get_path_length = ntohl (prm->get_path_length);
1649   if ( (msize < sizeof (struct PeerResultMessage) + 
1650         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1651        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1652        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1653   {
1654     GNUNET_break_op (0);
1655     return GNUNET_YES;
1656   } 
1657   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1658   get_path = &put_path[put_path_length];
1659   type = ntohl (prm->type);
1660   data = (const void*) &get_path[get_path_length];
1661   data_size = msize - (sizeof (struct PeerResultMessage) + 
1662                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1663   /* append 'peer' to 'get_path' */
1664   {    
1665     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1666     
1667     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1668     xget_path[get_path_length] = *peer;
1669
1670     /* forward to local clients */   
1671     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1672                              &prm->key,
1673                              get_path_length + 1,
1674                              xget_path,
1675                              put_path_length,
1676                              put_path,
1677                              type,
1678                              data_size, 
1679                              data);
1680
1681     /* forward to other peers */
1682     GDS_ROUTING_process (type,
1683                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1684                          &prm->key,
1685                          put_path_length,
1686                          put_path,
1687                          get_path_length + 1,
1688                          xget_path,
1689                          data,
1690                          data_size);                     
1691   }
1692   return GNUNET_YES;
1693 }
1694
1695
1696 /**
1697  * Initialize neighbours subsystem.
1698  *
1699  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1700  */
1701 int
1702 GDS_NEIGHBOURS_init ()
1703 {
1704   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1705     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1706     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1707     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1708     {NULL, 0, 0}
1709   };
1710   unsigned long long temp_config_num;
1711  
1712   if (GNUNET_OK ==
1713       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1714                                              &temp_config_num))
1715     bucket_size = (unsigned int) temp_config_num;  
1716   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1717                                  1,
1718                                  NULL,
1719                                  &core_init,
1720                                  &handle_core_connect,
1721                                  &handle_core_disconnect, 
1722                                  NULL,  /* Do we care about "status" updates? */
1723                                  NULL, GNUNET_NO,
1724                                  NULL, GNUNET_NO,
1725                                  core_handlers);
1726   if (coreAPI == NULL)
1727     return GNUNET_SYSERR;
1728   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1729 #if 0
1730   struct GNUNET_TIME_Relative next_send_time;
1731
1732   // FIXME!
1733   next_send_time.rel_value =
1734     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1735     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1736                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1737                                2) -
1738                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1739   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1740                                                  &send_find_peer_message,
1741                                                  &find_peer_context);  
1742 #endif
1743   return GNUNET_OK;
1744 }
1745
1746
1747 /**
1748  * Shutdown neighbours subsystem.
1749  */
1750 void
1751 GDS_NEIGHBOURS_done ()
1752 {
1753   if (coreAPI == NULL)
1754     return;
1755   GNUNET_CORE_disconnect (coreAPI);
1756   coreAPI = NULL;    
1757   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1758   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1759   all_known_peers = NULL;
1760   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1761   {
1762     GNUNET_SCHEDULER_cancel (find_peer_task);
1763     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1764   }
1765 }
1766
1767
1768 /* end of gnunet-service-dht_neighbours.c */
1769
1770
1771 #if 0
1772
1773
1774
1775 #endif