making some files compile
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht.h"
40 #include "gnunet-service-dht.h"
41 #include "gnunet-service-dht_clients.h"
42 #include "gnunet-service-dht_datacache.h"
43 #include "gnunet-service-dht_nse.h"
44 #include "gnunet-service-dht_routing.h"
45 #include <fenv.h>
46
47 /**
48  * How many buckets will we allow total.
49  */
50 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
51
52 /**
53  * What is the maximum number of peers in a given bucket.
54  */
55 #define DEFAULT_BUCKET_SIZE 4
56
57 /**
58  * Size of the bloom filter the DHT uses to filter peers.
59  */
60 #define DHT_BLOOM_SIZE 128
61
62 /**
63  * How often to update our preference levels for peers in our routing tables.
64  */
65 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
66
67 /**
68  * How long at least to wait before sending another find peer request.
69  */
70 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
71
72 /**
73  * How long at most to wait before sending another find peer request.
74  */
75 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
76
77
78 /**
79  * P2P PUT message
80  */
81 struct PeerPutMessage
82 {
83   /**
84    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Processing options
90    */
91   uint32_t options GNUNET_PACKED;
92
93   /**
94    * Content type.
95    */
96   uint32_t type GNUNET_PACKED;
97
98   /**
99    * Hop count
100    */
101   uint32_t hop_count GNUNET_PACKED;
102
103   /**
104    * Replication level for this message
105    */
106   uint32_t desired_replication_level GNUNET_PACKED;
107
108   /**
109    * Length of the PUT path that follows (if tracked).
110    */
111   uint32_t put_path_length GNUNET_PACKED;
112
113   /**
114    * When does the content expire?
115    */
116   struct GNUNET_TIME_AbsoluteNBO expiration_time;
117
118   /**
119    * Bloomfilter (for peer identities) to stop circular routes
120    */
121   char bloomfilter[DHT_BLOOM_SIZE];
122
123   /**
124    * The key we are storing under.
125    */
126   GNUNET_HashCode key;
127
128   /* put path (if tracked) */
129
130   /* Payload */
131
132 };
133
134
135 /**
136  * P2P Result message
137  */
138 struct PeerResultMessage
139 {
140   /**
141    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
142    */
143   struct GNUNET_MessageHeader header;
144
145   /**
146    * Content type.
147    */
148   uint32_t type GNUNET_PACKED;
149
150   /**
151    * Length of the PUT path that follows (if tracked).
152    */
153   uint32_t put_path_length GNUNET_PACKED;
154
155   /**
156    * Length of the GET path that follows (if tracked).
157    */
158   uint32_t get_path_length GNUNET_PACKED;
159
160   /**
161    * When does the content expire?
162    */
163   struct GNUNET_TIME_AbsoluteNBO expiration_time;
164
165   /**
166    * The key of the corresponding GET request.
167    */
168   GNUNET_HashCode key;
169
170   /* put path (if tracked) */
171
172   /* get path (if tracked) */
173
174   /* Payload */
175
176 };
177
178
179 /**
180  * P2P GET message
181  */
182 struct PeerGetMessage
183 {
184   /**
185    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
186    */
187   struct GNUNET_MessageHeader header;
188
189   /**
190    * Processing options
191    */
192   uint32_t options GNUNET_PACKED;
193
194   /**
195    * Desired content type.
196    */
197   uint32_t type GNUNET_PACKED;
198
199   /**
200    * Hop count
201    */
202   uint32_t hop_count GNUNET_PACKED;
203
204   /**
205    * Desired replication level for this request.
206    */
207   uint32_t desired_replication_level GNUNET_PACKED;
208
209   /**
210    * Size of the extended query.
211    */
212   uint32_t xquery_size;
213
214   /**
215    * Bloomfilter mutator.
216    */
217   uint32_t bf_mutator;
218
219   /**
220    * Bloomfilter (for peer identities) to stop circular routes
221    */
222   char bloomfilter[DHT_BLOOM_SIZE];
223
224   /**
225    * The key we are looking for.
226    */
227   GNUNET_HashCode key;
228
229   /* xquery */
230
231   /* result bloomfilter */
232
233 };
234
235
236 /**
237  * Linked list of messages to send to a particular other peer.
238  */
239 struct P2PPendingMessage
240 {
241   /**
242    * Pointer to next item in the list
243    */
244   struct P2PPendingMessage *next;
245
246   /**
247    * Pointer to previous item in the list
248    */
249   struct P2PPendingMessage *prev;
250
251   /**
252    * Message importance level.  FIXME: used? useful?
253    */
254   unsigned int importance;
255
256   /**
257    * When does this message time out?
258    */
259   struct GNUNET_TIME_Absolute timeout;
260
261   /**
262    * Actual message to be sent, allocated at the end of the struct:
263    * // msg = (cast) &pm[1]; 
264    * // memcpy (&pm[1], data, len);
265    */
266   const struct GNUNET_MessageHeader *msg;
267
268 };
269
270
271 /**
272  * Entry for a peer in a bucket.
273  */
274 struct PeerInfo
275 {
276   /**
277    * Next peer entry (DLL)
278    */
279   struct PeerInfo *next;
280
281   /**
282    *  Prev peer entry (DLL)
283    */
284   struct PeerInfo *prev;
285
286   /**
287    * Count of outstanding messages for peer.  FIXME: NEEDED?
288    * FIXME: bound queue size!?
289    */
290   unsigned int pending_count;
291
292   /**
293    * Head of pending messages to be sent to this peer.
294    */
295   struct P2PPendingMessage *head;
296
297   /**
298    * Tail of pending messages to be sent to this peer.
299    */
300   struct P2PPendingMessage *tail;
301
302   /**
303    * Core handle for sending messages to this peer.
304    */
305   struct GNUNET_CORE_TransmitHandle *th;
306
307   /**
308    * Preference update context
309    */
310   struct GNUNET_CORE_InformationRequestContext *info_ctx;
311
312   /**
313    * Task for scheduling message sends.
314    */
315   GNUNET_SCHEDULER_TaskIdentifier send_task;
316
317   /**
318    * Task for scheduling preference updates
319    */
320   GNUNET_SCHEDULER_TaskIdentifier preference_task;
321
322   /**
323    * What is the identity of the peer?
324    */
325   struct GNUNET_PeerIdentity id;
326
327 #if 0
328   /**
329    * What is the average latency for replies received?
330    */
331   struct GNUNET_TIME_Relative latency;
332
333   /**
334    * Transport level distance to peer.
335    */
336   unsigned int distance;
337 #endif
338
339 };
340
341
342 /**
343  * Peers are grouped into buckets.
344  */
345 struct PeerBucket
346 {
347   /**
348    * Head of DLL
349    */
350   struct PeerInfo *head;
351
352   /**
353    * Tail of DLL
354    */
355   struct PeerInfo *tail;
356
357   /**
358    * Number of peers in the bucket.
359    */
360   unsigned int peers_size;
361 };
362
363
364 /**
365  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
366  */
367 static unsigned int closest_bucket;
368
369 /**
370  * How many peers have we added since we sent out our last
371  * find peer request?
372  */
373 static unsigned int newly_found_peers;
374
375 /**
376  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
377  */
378 static struct PeerBucket k_buckets[MAX_BUCKETS];
379
380 /**
381  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
382  */
383 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
384
385 /**
386  * Maximum size for each bucket.
387  */
388 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
389
390 /**
391  * Task that sends FIND PEER requests.
392  */
393 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
394
395 /**
396  * Identity of this peer.
397  */ 
398 static struct GNUNET_PeerIdentity my_identity;
399
400 /**
401  * Handle to GNUnet core.
402  */
403 static struct GNUNET_CORE_Handle *coreAPI;
404
405
406 /**
407  * Find the optimal bucket for this key.
408  *
409  * @param hc the hashcode to compare our identity to
410  * @return the proper bucket index, or GNUNET_SYSERR
411  *         on error (same hashcode)
412  */
413 static int
414 find_bucket (const GNUNET_HashCode * hc)
415 {
416   unsigned int bits;
417
418   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
419   if (bits == MAX_BUCKETS)
420     {
421       /* How can all bits match? Got my own ID? */
422       GNUNET_break (0);
423       return GNUNET_SYSERR; 
424     }
425   return MAX_BUCKETS - bits - 1;
426 }
427
428
429 /**
430  * Let GNUnet core know that we like the given peer.
431  *
432  * @param cls the 'struct PeerInfo' of the peer
433  * @param tc scheduler context.
434  */ 
435 static void
436 update_core_preference (void *cls,
437                         const struct GNUNET_SCHEDULER_TaskContext *tc);
438
439
440 /**
441  * Function called with statistics about the given peer.
442  *
443  * @param cls closure
444  * @param peer identifies the peer
445  * @param bpm_out set to the current bandwidth limit (sending) for this peer
446  * @param amount set to the amount that was actually reserved or unreserved;
447  *               either the full requested amount or zero (no partial reservations)
448  * @param res_delay if the reservation could not be satisfied (amount was 0), how
449  *        long should the client wait until re-trying?
450  * @param preference current traffic preference for the given peer
451  */
452 static void
453 update_core_preference_finish (void *cls,
454                                const struct GNUNET_PeerIdentity *peer,
455                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
456                                int32_t amount,
457                                struct GNUNET_TIME_Relative res_delay,
458                                uint64_t preference)
459 {
460   struct PeerInfo *peer_info = cls;
461
462   peer_info->info_ctx = NULL;
463   peer_info->preference_task
464     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
465                                     &update_core_preference, peer_info);
466 }
467
468
469 /**
470  * Let GNUnet core know that we like the given peer.
471  *
472  * @param cls the 'struct PeerInfo' of the peer
473  * @param tc scheduler context.
474  */ 
475 static void
476 update_core_preference (void *cls,
477                         const struct GNUNET_SCHEDULER_TaskContext *tc)
478 {
479   struct PeerInfo *peer = cls;
480   uint64_t preference;
481   unsigned int matching;
482   int bucket;
483
484   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
485   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
486     return;  
487   matching =
488     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
489                                       &peer->id.hashPubKey);
490   if (matching >= 64)
491     matching = 63;
492   bucket = find_bucket(&peer->id.hashPubKey);
493   if (bucket == GNUNET_SYSERR)
494     preference = 0;
495   else
496     preference = (1LL << matching) / k_buckets[bucket].peers_size;
497   if (preference == 0)
498     {
499       peer->preference_task
500         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
501                                         &update_core_preference, peer);
502       return;
503     }
504   peer->info_ctx =
505     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
506                                         GNUNET_TIME_UNIT_FOREVER_REL,
507                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
508                                         preference,
509                                         &update_core_preference_finish, peer);
510 }
511
512
513 /**
514  * Method called whenever a peer connects.
515  *
516  * @param cls closure
517  * @param peer peer identity this notification is about
518  * @param atsi performance data
519  */
520 static void
521 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
522                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
523 {
524   struct PeerInfo *ret;
525   int peer_bucket;
526
527   /* Check for connect to self message */
528   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
529     return;
530   if (GNUNET_YES ==
531       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
532                                               &peer->hashPubKey))
533   {
534     GNUNET_break (0);
535     return;
536   }
537   peer_bucket = find_bucket (&peer->hashPubKey);
538   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
539   ret = GNUNET_malloc (sizeof (struct PeerInfo));
540 #if 0
541   ret->latency = latency;
542   ret->distance = distance;
543 #endif
544   ret->id = *peer;
545   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
546                                      k_buckets[peer_bucket].tail,
547                                      k_buckets[peer_bucket].tail, ret);
548   k_buckets[peer_bucket].peers_size++;
549   closest_bucket = GNUNET_MAX (closest_bucket,
550                                peer_bucket);
551   if ( (peer_bucket > 0) &&
552        (k_buckets[peer_bucket].peers_size <= bucket_size) )
553     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
554   newly_found_peers++;
555   GNUNET_assert (GNUNET_OK ==
556                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
557                                                     &peer->hashPubKey, ret,
558                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
559 }
560
561
562 /**
563  * Method called whenever a peer disconnects.
564  *
565  * @param cls closure
566  * @param peer peer identity this notification is about
567  */
568 static void
569 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
570 {
571   struct PeerInfo *to_remove;
572   int current_bucket;
573   struct P2PPendingMessage *pos;
574
575   /* Check for disconnect from self message */
576   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
577     return;
578   to_remove =
579       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
580   if (NULL == to_remove)
581     {
582       GNUNET_break (0);
583       return;
584     }
585   GNUNET_assert (GNUNET_YES ==
586                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
587                                                        &peer->hashPubKey,
588                                                        to_remove));
589   if (NULL != to_remove->info_ctx)
590   {
591     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
592     to_remove->info_ctx = NULL;
593   }
594   current_bucket = find_bucket (&to_remove->id.hashPubKey);
595   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
596                                k_buckets[current_bucket].tail,
597                                to_remove);
598   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
599   k_buckets[current_bucket].peers_size--;
600   while ( (closest_bucket > 0) &&
601           (k_buckets[closest_bucket].peers_size == 0) )
602     closest_bucket--;
603
604   if (to_remove->th != NULL) 
605   {
606     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
607     to_remove->th = NULL;
608   }
609   while (NULL != (pos = to_remove->head))
610   {
611     GNUNET_CONTAINER_DLL_remove (to_remove->head,
612                                  to_remove->tail,
613                                  pos);
614     GNUNET_free (pos);
615   }
616 }
617
618
619 /**
620  * Called when core is ready to send a message we asked for
621  * out to the destination.
622  *
623  * @param cls the 'struct PeerInfo' of the target peer
624  * @param size number of bytes available in buf
625  * @param buf where the callee should write the message
626  * @return number of bytes written to buf
627  */
628 static size_t
629 core_transmit_notify (void *cls, size_t size, void *buf)
630 {
631   struct PeerInfo *peer = cls;
632   char *cbuf = buf;
633   struct P2PPendingMessage *pending;
634   size_t off;
635   size_t msize;
636
637   peer->th = NULL;
638   if (buf == NULL)
639   {
640     /* client disconnected */
641     return 0;
642   }
643   if (peer->head == NULL)
644   {
645     /* no messages pending */
646     return 0;
647   }
648   off = 0;
649   while ( (NULL != (pending = peer->head)) &&
650           (size - off >= (msize = ntohs (pending->msg->size))) )
651   {
652     memcpy (&cbuf[off], pending->msg, msize);
653     off += msize;
654     peer->pending_count--;
655     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
656     GNUNET_free (pending);
657   }
658   if (peer->head != NULL)
659     peer->th 
660       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
661                                            pending->importance,
662                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
663                                            &peer->id, msize,
664                                            &core_transmit_notify, peer);
665
666   return off;
667 }
668
669
670 /**
671  * Transmit all messages in the peer's message queue.
672  *
673  * @param peer message queue to process
674  */
675 static void
676 process_peer_queue (struct PeerInfo *peer)
677 {
678   struct P2PPendingMessage *pending;
679
680   if (NULL != (pending = peer->head))
681     return;
682   if (NULL != peer->th)
683     return;
684   peer->th 
685     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
686                                          pending->importance,
687                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
688                                          &peer->id,
689                                          ntohs (pending->msg->size),
690                                          &core_transmit_notify, peer);
691 }
692
693
694 /**
695  * To how many peers should we (on average) forward the request to
696  * obtain the desired target_replication count (on average).
697  *
698  * @param hop_count number of hops the message has traversed
699  * @param target_replication the number of total paths desired
700  * @return Some number of peers to forward the message to
701  */
702 static unsigned int
703 get_forward_count (uint32_t hop_count, 
704                    uint32_t target_replication)
705 {
706   uint32_t random_value;
707   uint32_t forward_count;
708   float target_value;
709
710   if (hop_count > GDS_NSE_get () * 4.0)
711   {
712     /* forcefully terminate */
713     return 0;
714   }
715   if (hop_count > GDS_NSE_get () * 2.0)
716   {
717     /* Once we have reached our ideal number of hops, only forward to 1 peer */
718     return 1;
719   }
720   /* bound by system-wide maximum */
721   target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
722                                    target_replication);
723   target_value =
724     1 + (target_replication - 1.0) / (GDS_NSE_get () +
725                                       ((float) (target_replication - 1.0) *
726                                        hop_count));
727   /* Set forward count to floor of target_value */
728   forward_count = (uint32_t) target_value;
729   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
730   target_value = target_value - forward_count;
731   random_value =
732     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
733   if (random_value < (target_value * UINT32_MAX))
734     forward_count++;
735   return forward_count;
736 }
737
738
739 /**
740  * Compute the distance between have and target as a 32-bit value.
741  * Differences in the lower bits must count stronger than differences
742  * in the higher bits.
743  *
744  * @return 0 if have==target, otherwise a number
745  *           that is larger as the distance between
746  *           the two hash codes increases
747  */
748 static unsigned int
749 distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
750 {
751   unsigned int bucket;
752   unsigned int msb;
753   unsigned int lsb;
754   unsigned int i;
755
756   /* We have to represent the distance between two 2^9 (=512)-bit
757    * numbers as a 2^5 (=32)-bit number with "0" being used for the
758    * two numbers being identical; furthermore, we need to
759    * guarantee that a difference in the number of matching
760    * bits is always represented in the result.
761    *
762    * We use 2^32/2^9 numerical values to distinguish between
763    * hash codes that have the same LSB bit distance and
764    * use the highest 2^9 bits of the result to signify the
765    * number of (mis)matching LSB bits; if we have 0 matching
766    * and hence 512 mismatching LSB bits we return -1 (since
767    * 512 itself cannot be represented with 9 bits) */
768
769   /* first, calculate the most significant 9 bits of our
770    * result, aka the number of LSBs */
771   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
772   /* bucket is now a value between 0 and 512 */
773   if (bucket == 512)
774     return 0;                   /* perfect match */
775   if (bucket == 0)
776     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
777                                  * below, we'd end up with max+1 (overflow)) */
778
779   /* calculate the most significant bits of the final result */
780   msb = (512 - bucket) << (32 - 9);
781   /* calculate the 32-9 least significant bits of the final result by
782    * looking at the differences in the 32-9 bits following the
783    * mismatching bit at 'bucket' */
784   lsb = 0;
785   for (i = bucket + 1;
786        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
787   {
788     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
789         GNUNET_CRYPTO_hash_get_bit (have, i))
790       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
791                                                  * last bit set will be 31 -- if
792                                                  * i does not reach 512 first... */
793   }
794   return msb | lsb;
795 }
796
797
798 /**
799  * Return a number that is larger the closer the
800  * "have" GNUNET_hash code is to the "target".
801  *
802  * @return inverse distance metric, non-zero.
803  *         Must fudge the value if NO bits match.
804  */
805 static unsigned int
806 inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
807 {
808   if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
809     return 1;                   /* Never return 0! */
810   return ((unsigned int) -1) - distance (target, have);
811 }
812
813
814 /**
815  * Check whether my identity is closer than any known peers.  If a
816  * non-null bloomfilter is given, check if this is the closest peer
817  * that hasn't already been routed to.
818  * FIXME: needed?
819  *
820  * @param key hash code to check closeness to
821  * @param bloom bloomfilter, exclude these entries from the decision
822  * @return GNUNET_YES if node location is closest,
823  *         GNUNET_NO otherwise.
824  */
825 /* static */
826 int
827 am_closest_peer (const GNUNET_HashCode *key,
828                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
829 {
830   int bits;
831   int other_bits;
832   int bucket_num;
833   int count;
834   struct PeerInfo *pos;
835   unsigned int my_distance;
836
837   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
838     return GNUNET_YES;
839   bucket_num = find_bucket (key);
840   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
841   my_distance = distance (&my_identity.hashPubKey, key);
842   pos = k_buckets[bucket_num].head;
843   count = 0;
844   while ((pos != NULL) && (count < bucket_size))
845   {
846     if ((bloom != NULL) &&
847         (GNUNET_YES ==
848          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
849     {
850       pos = pos->next;
851       continue;                 /* Skip already checked entries */
852     }
853     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
854     if (other_bits > bits)
855       return GNUNET_NO;
856     if (other_bits == bits)        /* We match the same number of bits */
857       return GNUNET_YES;
858     pos = pos->next;
859   }
860   /* No peers closer, we are the closest! */
861   return GNUNET_YES;
862 }
863
864
865 /**
866  * Select a peer from the routing table that would be a good routing
867  * destination for sending a message for "key".  The resulting peer
868  * must not be in the set of blocked peers.<p>
869  *
870  * Note that we should not ALWAYS select the closest peer to the
871  * target, peers further away from the target should be chosen with
872  * exponentially declining probability.
873  *
874  * FIXME: double-check that this is fine
875  * 
876  *
877  * @param key the key we are selecting a peer to route to
878  * @param bloom a bloomfilter containing entries this request has seen already
879  * @param hops how many hops has this message traversed thus far
880  * @return Peer to route to, or NULL on error
881  */
882 static struct PeerInfo *
883 select_peer (const GNUNET_HashCode *key,
884              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
885              uint32_t hops)
886 {
887   unsigned int bc;
888   unsigned int count;
889   unsigned int selected;
890   struct PeerInfo *pos;
891   unsigned int distance;
892   unsigned int largest_distance;
893   struct PeerInfo *chosen;
894
895   if (hops >= GDS_NSE_get ())
896   {
897     /* greedy selection (closest peer that is not in bloomfilter) */
898     largest_distance = 0;
899     chosen = NULL;
900     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
901     {
902       pos = k_buckets[bc].head;
903       count = 0;
904       while ((pos != NULL) && (count < bucket_size))
905       {
906         /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
907         if (GNUNET_NO ==
908             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
909         {
910           distance = inverse_distance (key, &pos->id.hashPubKey);
911           if (distance > largest_distance)
912           {
913             chosen = pos;
914             largest_distance = distance;
915           }
916         }
917         count++;
918         pos = pos->next;
919       }
920     }
921     return chosen;
922   }
923
924   /* select "random" peer */
925   /* count number of peers that are available and not filtered */
926   count = 0;
927   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
928   {
929     pos = k_buckets[bc].head;
930     while ((pos != NULL) && (count < bucket_size))
931     {
932       if (GNUNET_YES ==
933           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
934       {
935         pos = pos->next;
936         continue;               /* Ignore bloomfiltered peers */
937       }
938       count++;
939       pos = pos->next;
940     }
941   }
942   if (count == 0)               /* No peers to select from! */
943   {
944     return NULL;
945   }
946   /* Now actually choose a peer */
947   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
948   count = 0;
949   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
950   {
951     pos = k_buckets[bc].head;
952     while ((pos != NULL) && (count < bucket_size))
953     {
954       if (GNUNET_YES ==
955           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
956       {
957         pos = pos->next;
958         continue;               /* Ignore bloomfiltered peers */
959       }
960       if (0 == selected--)
961         return pos;
962       pos = pos->next;
963     }
964   }
965   GNUNET_break (0);
966   return NULL;
967 }
968
969
970 /**
971  * Compute the set of peers that the given request should be
972  * forwarded to.
973  *
974  * @param key routing key
975  * @param bloom bloom filter excluding peers as targets, all selected
976  *        peers will be added to the bloom filter
977  * @param hop_count number of hops the request has traversed so far
978  * @param target_replication desired number of replicas
979  * @param targets where to store an array of target peers (to be
980  *         free'd by the caller)
981  * @return number of peers returned in 'targets'.
982  */
983 static unsigned int
984 get_target_peers (const GNUNET_HashCode *key,
985                   struct GNUNET_CONTAINER_BloomFilter *bloom,
986                   uint32_t hop_count,
987                   uint32_t target_replication,
988                   struct PeerInfo ***targets)
989 {
990   unsigned int ret;
991   unsigned int off;
992   struct PeerInfo **rtargets;
993   struct PeerInfo *nxt;
994
995   ret = get_forward_count (hop_count, target_replication);
996   if (ret == 0)
997   {
998     *targets = NULL;
999     return 0;
1000   }
1001   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1002   off = 0;
1003   while (ret-- > 0)
1004   {
1005     nxt = select_peer (key, bloom, hop_count);
1006     if (nxt == NULL)
1007       break;
1008     rtargets[off++] = nxt;
1009     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1010   }
1011   if (0 == off)
1012   {
1013     GNUNET_free (rtargets);
1014     *targets = NULL;
1015     return 0;
1016   }
1017   *targets = rtargets;
1018   return off;
1019 }
1020
1021
1022 /**
1023  * Perform a PUT operation.   Forwards the given request to other
1024  * peers.   Does not store the data locally.  Does not give the
1025  * data to local clients.  May do nothing if this is the only
1026  * peer in the network (or if we are the closest peer in the
1027  * network).
1028  *
1029  * @param type type of the block
1030  * @param options routing options
1031  * @param desired_replication_level desired replication count
1032  * @param expiration_time when does the content expire
1033  * @param hop_count how many hops has this message traversed so far
1034  * @param bf Bloom filter of peers this PUT has already traversed
1035  * @param key key for the content
1036  * @param put_path_length number of entries in put_path
1037  * @param put_path peers this request has traversed so far (if tracked)
1038  * @param data payload to store
1039  * @param data_size number of bytes in data
1040  */
1041 void
1042 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1043                            enum GNUNET_DHT_RouteOption options,
1044                            uint32_t desired_replication_level,
1045                            struct GNUNET_TIME_Absolute expiration_time,
1046                            uint32_t hop_count,
1047                            struct GNUNET_CONTAINER_BloomFilter *bf,
1048                            const GNUNET_HashCode *key,
1049                            unsigned int put_path_length,
1050                            struct GNUNET_PeerIdentity *put_path,
1051                            const void *data,
1052                            size_t data_size)
1053 {
1054   unsigned int target_count;
1055   unsigned int i;
1056   struct PeerInfo **targets;
1057   struct PeerInfo *target;
1058   struct P2PPendingMessage *pending;
1059   size_t msize;
1060   struct PeerPutMessage *ppm;
1061   struct GNUNET_PeerIdentity *pp;
1062   
1063   target_count = get_target_peers (key, bf, hop_count,
1064                                    desired_replication_level,
1065                                    &targets);
1066   if (0 == target_count)
1067     return;
1068   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1069   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1070   {
1071     put_path_length = 0;
1072     msize = data_size + sizeof (struct PeerPutMessage);
1073   }
1074   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1075   {
1076     GNUNET_break (0);
1077     return;
1078   }
1079   for (i=0;i<target_count;i++)
1080   {
1081     target = targets[i];
1082     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1083     pending->importance = 0; /* FIXME */
1084     pending->timeout = expiration_time;   
1085     ppm = (struct PeerPutMessage*) &pending[1];
1086     pending->msg = &ppm->header;
1087     ppm->header.size = htons (msize);
1088     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1089     ppm->options = htonl (options);
1090     ppm->type = htonl (type);
1091     ppm->hop_count = htonl (hop_count + 1);
1092     ppm->desired_replication_level = htonl (desired_replication_level);
1093     ppm->put_path_length = htonl (put_path_length);
1094     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1095     GNUNET_assert (GNUNET_OK ==
1096                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1097                                                               ppm->bloomfilter,
1098                                                               DHT_BLOOM_SIZE));
1099     ppm->key = *key;
1100     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1101     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1102     memcpy (&pp[put_path_length], data, data_size);
1103     GNUNET_CONTAINER_DLL_insert (target->head,
1104                                  target->tail,
1105                                  pending);
1106     target->pending_count++;
1107     process_peer_queue (target);
1108   }
1109   GNUNET_free (targets);
1110 }
1111
1112
1113 /**
1114  * Perform a GET operation.  Forwards the given request to other
1115  * peers.  Does not lookup the key locally.  May do nothing if this is
1116  * the only peer in the network (or if we are the closest peer in the
1117  * network).
1118  *
1119  * @param type type of the block
1120  * @param options routing options
1121  * @param desired_replication_level desired replication count
1122  * @param hop_count how many hops did this request traverse so far?
1123  * @param key key for the content
1124  * @param xquery extended query
1125  * @param xquery_size number of bytes in xquery
1126  * @param reply_bf bloomfilter to filter duplicates
1127  * @param reply_bf_mutator mutator for reply_bf
1128  * @param peer_bf filter for peers not to select (again)
1129  */
1130 void
1131 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1132                            enum GNUNET_DHT_RouteOption options,
1133                            uint32_t desired_replication_level,
1134                            uint32_t hop_count,
1135                            const GNUNET_HashCode *key,
1136                            const void *xquery,
1137                            size_t xquery_size,
1138                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1139                            uint32_t reply_bf_mutator,
1140                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1141 {
1142   unsigned int target_count;
1143   unsigned int i;
1144   struct PeerInfo **targets;
1145   struct PeerInfo *target;
1146   struct P2PPendingMessage *pending;
1147   size_t msize;
1148   struct PeerGetMessage *pgm;
1149   char *xq;
1150   size_t reply_bf_size;
1151   
1152   target_count = get_target_peers (key, peer_bf, hop_count,
1153                                    desired_replication_level,
1154                                    &targets);
1155   if (0 == target_count)
1156     return;
1157   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1158   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1159   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1160   {
1161     GNUNET_break (0);
1162     return;
1163   }
1164   /* forward request */
1165   for (i=0;i<target_count;i++)
1166   {
1167     target = targets[i];
1168     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1169     pending->importance = 0; /* FIXME */
1170     pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
1171     pgm = (struct PeerGetMessage*) &pending[1];
1172     pending->msg = &pgm->header;
1173     pgm->header.size = htons (msize);
1174     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1175     pgm->options = htonl (options);
1176     pgm->type = htonl (type);
1177     pgm->hop_count = htonl (hop_count + 1);
1178     pgm->desired_replication_level = htonl (desired_replication_level);
1179     pgm->xquery_size = htonl (xquery_size);
1180     pgm->bf_mutator = reply_bf_mutator; 
1181     GNUNET_assert (GNUNET_OK ==
1182                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1183                                                               pgm->bloomfilter,
1184                                                               DHT_BLOOM_SIZE));
1185     pgm->key = *key;
1186     xq = (char *) &pgm[1];
1187     memcpy (xq, xquery, xquery_size);
1188     GNUNET_assert (GNUNET_OK ==
1189                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1190                                                               &xq[xquery_size],
1191                                                               reply_bf_size));
1192     GNUNET_CONTAINER_DLL_insert (target->head,
1193                                  target->tail,
1194                                  pending);
1195     target->pending_count++;
1196     process_peer_queue (target);
1197   }
1198   GNUNET_free (targets);
1199 }
1200
1201
1202 /**
1203  * Handle a reply (route to origin).  Only forwards the reply back to
1204  * the given peer.  Does not do local caching or forwarding to local
1205  * clients.
1206  *
1207  * @param target neighbour that should receive the block (if still connected)
1208  * @param type type of the block
1209  * @param expiration_time when does the content expire
1210  * @param key key for the content
1211  * @param put_path_length number of entries in put_path
1212  * @param put_path peers the original PUT traversed (if tracked)
1213  * @param get_path_length number of entries in put_path
1214  * @param get_path peers this reply has traversed so far (if tracked)
1215  * @param data payload of the reply
1216  * @param data_size number of bytes in data
1217  */
1218 void
1219 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1220                              enum GNUNET_BLOCK_Type type,
1221                              struct GNUNET_TIME_Absolute expiration_time,
1222                              const GNUNET_HashCode *key,
1223                              unsigned int put_path_length,
1224                              struct GNUNET_PeerIdentity *put_path,
1225                              unsigned int get_path_length,
1226                              struct GNUNET_PeerIdentity *get_path,
1227                              const void *data,
1228                              size_t data_size)
1229 {
1230   struct PeerInfo *pi;
1231   struct P2PPendingMessage *pending;
1232   size_t msize;
1233   struct PeerResultMessage *prm;
1234   struct GNUNET_PeerIdentity *paths;
1235   
1236   msize = data_size + sizeof (struct PeerResultMessage) + 
1237     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1238   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1239        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1240        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1241        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1242   {
1243     GNUNET_break (0);
1244     return;
1245   }
1246   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1247                                           &target->hashPubKey);
1248   if (NULL == pi)
1249   {
1250     /* peer disconnected in the meantime, drop reply */
1251     return;
1252   }
1253   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1254   pending->importance = 0; /* FIXME */
1255   pending->timeout = expiration_time;
1256   prm = (struct PeerResultMessage*) &pending[1];
1257   pending->msg = &prm->header;
1258   prm->header.size = htons (msize);
1259   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1260   prm->type = htonl (type);
1261   prm->put_path_length = htonl (put_path_length);
1262   prm->get_path_length = htonl (get_path_length);
1263   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1264   prm->key = *key;
1265   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1266   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1267   memcpy (&paths[put_path_length],
1268           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1269   memcpy (&paths[put_path_length + get_path_length],
1270           data, data_size);
1271   GNUNET_CONTAINER_DLL_insert (pi->head,
1272                                pi->tail,
1273                                pending);
1274   pi->pending_count++;
1275   process_peer_queue (pi);
1276 }
1277
1278
1279 /**
1280  * Closure for 'add_known_to_bloom'.
1281  */
1282 struct BloomConstructorContext
1283 {
1284   /**
1285    * Bloom filter under construction.
1286    */
1287   struct GNUNET_CONTAINER_BloomFilter *bloom;
1288
1289   /**
1290    * Mutator to use.
1291    */
1292   uint32_t bf_mutator;
1293 };
1294
1295
1296 /**
1297  * Add each of the peers we already know to the bloom filter of
1298  * the request so that we don't get duplicate HELLOs.
1299  *
1300  * @param cls the 'struct BloomConstructorContext'.
1301  * @param key peer identity to add to the bloom filter
1302  * @param value value the peer information (unused)
1303  * @return GNUNET_YES (we should continue to iterate)
1304  */
1305 static int
1306 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1307 {
1308   struct BloomConstructorContext *ctx = cls;
1309   GNUNET_HashCode mh;
1310
1311   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1312   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1313   return GNUNET_YES;
1314 }
1315
1316
1317 /**
1318  * Task to send a find peer message for our own peer identifier
1319  * so that we can find the closest peers in the network to ourselves
1320  * and attempt to connect to them.
1321  *
1322  * @param cls closure for this task
1323  * @param tc the context under which the task is running
1324  */
1325 static void
1326 send_find_peer_message (void *cls,
1327                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1328 {
1329   struct GNUNET_TIME_Relative next_send_time;
1330   struct BloomConstructorContext bcc;
1331
1332   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1333   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1334     return;
1335   if (newly_found_peers > bucket_size) 
1336   {
1337     /* If we are finding many peers already, no need to send out our request right now! */
1338     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1339                                                    &send_find_peer_message, NULL);
1340     newly_found_peers = 0;
1341     return;
1342   }
1343   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1344   bcc.bloom =
1345     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1346   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1347                                          &add_known_to_bloom,
1348                                          &bcc);
1349   // FIXME: pass priority!?
1350   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1351                              GNUNET_DHT_RO_FIND_PEER,
1352                              16 /* FIXME: replication level? */,
1353                              0,
1354                              &my_identity.hashPubKey,
1355                              NULL, 0,
1356                              bcc.bloom, bcc.bf_mutator, NULL);
1357   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1358   /* schedule next round */
1359   newly_found_peers = 0;
1360   next_send_time.rel_value =
1361     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1362     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1363                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1364   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1365                                                  &send_find_peer_message,
1366                                                  NULL);  
1367 }
1368
1369
1370 /**
1371  * To be called on core init/fail.
1372  *
1373  * @param cls service closure
1374  * @param server handle to the server for this service
1375  * @param identity the public identity of this peer
1376  * @param publicKey the public key of this peer
1377  */
1378 static void
1379 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1380            const struct GNUNET_PeerIdentity *identity,
1381            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1382 {
1383   struct GNUNET_TIME_Relative next_send_time;
1384
1385   GNUNET_assert (server != NULL);
1386   my_identity = *identity;
1387   /* FIXME: do upon 1st connect instead! */
1388   next_send_time.rel_value =
1389     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1390     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1391                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1392                                2) -
1393                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1394   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1395                                                  &send_find_peer_message,
1396                                                  NULL);
1397 }
1398
1399
1400 /**
1401  * Core handler for p2p put requests.
1402  *
1403  * @param cls closure
1404  * @param peer sender of the request
1405  * @param message message
1406  * @param peer peer identity this notification is about
1407  * @param atsi performance data
1408  * @return GNUNET_OK to keep the connection open,
1409  *         GNUNET_SYSERR to close it (signal serious error)
1410  */
1411 static int
1412 handle_dht_p2p_put (void *cls,
1413                     const struct GNUNET_PeerIdentity *peer,
1414                     const struct GNUNET_MessageHeader *message,
1415                     const struct GNUNET_TRANSPORT_ATS_Information
1416                     *atsi)
1417 {
1418   const struct PeerPutMessage *put;
1419   const struct GNUNET_PeerIdentity *put_path;
1420   const void *payload;
1421   uint32_t putlen;
1422   uint16_t msize;
1423   size_t payload_size;
1424   struct GNUNET_CONTAINER_BloomFilter *bf;
1425   GNUNET_HashCode test_key;
1426   
1427   msize = ntohs (message->size);
1428   if (msize < sizeof (struct PeerPutMessage))
1429   {
1430     GNUNET_break_op (0);
1431     return GNUNET_YES;
1432   }
1433   put = (const struct PeerPutMessage*) message;
1434   putlen = ntohl (put->put_path_length);
1435   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1436        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1437     {
1438       GNUNET_break_op (0);
1439       return GNUNET_YES;
1440     }
1441   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1442   payload = &put_path[putlen];
1443   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1444                           putlen * sizeof (struct GNUNET_PeerIdentity));
1445   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1446                                 ntohl (put->type),
1447                                 payload, payload_size,
1448                                 &test_key))
1449   {
1450   case GNUNET_YES:
1451     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1452     {
1453       GNUNET_break_op (0);
1454       return GNUNET_YES;
1455     }
1456     break;
1457   case GNUNET_NO:
1458     GNUNET_break_op (0);
1459     return GNUNET_YES;
1460   case GNUNET_SYSERR:
1461     /* cannot verify, good luck */
1462     break;
1463   }
1464   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1465                                           DHT_BLOOM_SIZE,
1466                                           DHT_BLOOM_K);
1467   {
1468     struct GNUNET_PeerIdentity pp[putlen+1];
1469   
1470     /* extend 'put path' by sender */
1471     memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1472     pp[putlen] = *peer;
1473
1474     /* give to local clients */
1475     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1476                              &put->key,
1477                              0, NULL,
1478                              putlen + 1,
1479                              pp,
1480                              ntohl (put->type),
1481                              payload_size,
1482                              payload);
1483     /* store locally */
1484     GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1485                               &put->key,
1486                               putlen + 1, pp,
1487                               ntohl (put->type),
1488                               payload_size,
1489                               payload);
1490     /* route to other peers */
1491     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1492                                ntohl (put->options),
1493                                ntohl (put->desired_replication_level),
1494                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1495                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1496                                bf,
1497                                &put->key,
1498                                putlen + 1, pp,
1499                                payload,
1500                                payload_size);
1501   }
1502   GNUNET_CONTAINER_bloomfilter_free (bf);
1503   return GNUNET_YES;
1504 }
1505
1506
1507 /**
1508  * Core handler for p2p get requests.
1509  *
1510  * @param cls closure
1511  * @param peer sender of the request
1512  * @param message message
1513  * @param peer peer identity this notification is about
1514  * @param atsi performance data
1515  * @return GNUNET_OK to keep the connection open,
1516  *         GNUNET_SYSERR to close it (signal serious error)
1517  */
1518 static int
1519 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1520                     const struct GNUNET_MessageHeader *message,
1521                     const struct GNUNET_TRANSPORT_ATS_Information
1522                     *atsi)
1523 {
1524   struct PeerGetMessage *get;
1525   uint32_t xquery_size;
1526   size_t reply_bf_size;
1527   uint16_t msize;
1528   enum GNUNET_BLOCK_Type type;
1529   enum GNUNET_DHT_RouteOption options;
1530   enum GNUNET_BLOCK_EvaluationResult eval;
1531   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1532   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1533   const char *xquery;
1534                       
1535   /* parse and validate message */
1536   msize = ntohs (message->size);
1537   if (msize < sizeof (struct PeerGetMessage))
1538   {
1539     GNUNET_break_op (0);
1540     return GNUNET_YES;
1541   }
1542   get = (struct PeerGetMessage *) message;
1543   xquery_size = ntohl (get->xquery_size);
1544   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1545   {
1546     GNUNET_break_op (0);
1547     return GNUNET_YES;
1548   }
1549   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1550   type = ntohl (get->type);
1551   options = ntohl (get->options);
1552   xquery = (const char*) &get[1];
1553   reply_bf = NULL;
1554   if (reply_bf_size > 0)
1555     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1556                                                   reply_bf_size,
1557                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1558   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1559                                 type,
1560                                 &get->key,
1561                                 &reply_bf,
1562                                 get->bf_mutator,
1563                                 xquery, xquery_size,
1564                                 NULL, 0);
1565   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1566   {
1567     /* request invalid or block type not supported */
1568     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1569     if (NULL != reply_bf)
1570       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1571     return GNUNET_YES;
1572   }
1573   peer_bf =
1574     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1575                                        DHT_BLOOM_SIZE,
1576                                        DHT_BLOOM_K);
1577
1578   /* remember request for routing replies */
1579   GDS_ROUTING_add (peer,
1580                    type,
1581                    &get->key,
1582                    xquery, xquery_size,
1583                    reply_bf, get->bf_mutator);
1584   /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1585
1586   /* local lookup (this may update the reply_bf) */
1587   GDS_DATACACHE_handle_get (&get->key,
1588                             type,
1589                             xquery, xquery_size,
1590                             &reply_bf, 
1591                             get->bf_mutator);
1592   /* FIXME: should track if the local lookup resulted in a
1593      definitive result and then NOT do P2P forwarding */
1594     
1595   /* P2P forwarding */
1596   GDS_NEIGHBOURS_handle_get (type,
1597                              options,
1598                              ntohl (get->desired_replication_level),
1599                              ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1600                              &get->key,
1601                              xquery, xquery_size,
1602                              reply_bf,
1603                              get->bf_mutator,
1604                              peer_bf);
1605   /* clean up */
1606   if (NULL != reply_bf)
1607     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1608   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1609   return GNUNET_YES;
1610 }
1611
1612
1613 /**
1614  * Core handler for p2p result messages.
1615  *
1616  * @param cls closure
1617  * @param message message
1618  * @param peer peer identity this notification is about
1619  * @param atsi performance data
1620  * @return GNUNET_YES (do not cut p2p connection)
1621  */
1622 static int
1623 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1624                        const struct GNUNET_MessageHeader *message,
1625                        const struct GNUNET_TRANSPORT_ATS_Information
1626                        *atsi)
1627 {
1628   const struct PeerResultMessage *prm;
1629   const struct GNUNET_PeerIdentity *put_path;
1630   const struct GNUNET_PeerIdentity *get_path;
1631   const void *data;
1632   uint32_t get_path_length;
1633   uint32_t put_path_length;
1634   uint16_t msize;
1635   size_t data_size;
1636   enum GNUNET_BLOCK_Type type;
1637                        
1638   /* parse and validate message */
1639   msize = ntohs (message->size);
1640   if (msize < sizeof (struct PeerResultMessage))
1641   {
1642     GNUNET_break_op (0);
1643     return GNUNET_YES;
1644   }
1645   prm = (struct PeerResultMessage *) message;
1646   put_path_length = ntohl (prm->put_path_length);
1647   get_path_length = ntohl (prm->get_path_length);
1648   if ( (msize < sizeof (struct PeerResultMessage) + 
1649         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1650        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1651        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1652   {
1653     GNUNET_break_op (0);
1654     return GNUNET_YES;
1655   } 
1656   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1657   get_path = &put_path[put_path_length];
1658   type = ntohl (prm->type);
1659   data = (const void*) &get_path[get_path_length];
1660   data_size = msize - (sizeof (struct PeerResultMessage) + 
1661                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1662   /* append 'peer' to 'get_path' */
1663   {    
1664     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1665     
1666     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1667     xget_path[get_path_length] = *peer;
1668
1669     /* forward to local clients */   
1670     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1671                              &prm->key,
1672                              get_path_length + 1,
1673                              xget_path,
1674                              put_path_length,
1675                              put_path,
1676                              type,
1677                              data_size, 
1678                              data);
1679
1680     /* forward to other peers */
1681     GDS_ROUTING_process (type,
1682                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1683                          &prm->key,
1684                          put_path_length,
1685                          put_path,
1686                          get_path_length + 1,
1687                          xget_path,
1688                          data,
1689                          data_size);                     
1690   }
1691   return GNUNET_YES;
1692 }
1693
1694
1695 /**
1696  * Initialize neighbours subsystem.
1697  *
1698  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1699  */
1700 int
1701 GDS_NEIGHBOURS_init ()
1702 {
1703   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1704     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1705     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1706     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1707     {NULL, 0, 0}
1708   };
1709   unsigned long long temp_config_num;
1710  
1711   if (GNUNET_OK ==
1712       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1713                                              &temp_config_num))
1714     bucket_size = (unsigned int) temp_config_num;  
1715   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1716                                  1,
1717                                  NULL,
1718                                  &core_init,
1719                                  &handle_core_connect,
1720                                  &handle_core_disconnect, 
1721                                  NULL,  /* Do we care about "status" updates? */
1722                                  NULL, GNUNET_NO,
1723                                  NULL, GNUNET_NO,
1724                                  core_handlers);
1725   if (coreAPI == NULL)
1726     return GNUNET_SYSERR;
1727   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1728 #if 0
1729   struct GNUNET_TIME_Relative next_send_time;
1730
1731   // FIXME!
1732   next_send_time.rel_value =
1733     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1734     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1735                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1736                                2) -
1737                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1738   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1739                                                  &send_find_peer_message,
1740                                                  &find_peer_context);  
1741 #endif
1742   return GNUNET_OK;
1743 }
1744
1745
1746 /**
1747  * Shutdown neighbours subsystem.
1748  */
1749 void
1750 GDS_NEIGHBOURS_done ()
1751 {
1752   if (coreAPI == NULL)
1753     return;
1754   GNUNET_CORE_disconnect (coreAPI);
1755   coreAPI = NULL;    
1756   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1757   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1758   all_known_peers = NULL;
1759   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1760   {
1761     GNUNET_SCHEDULER_cancel (find_peer_task);
1762     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1763   }
1764 }
1765
1766
1767 /* end of gnunet-service-dht_neighbours.c */
1768
1769
1770 #if 0
1771
1772
1773
1774 #endif