respect path tracking and demultiplex everywhere options
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_datacache_lib.h"
36 #include "gnunet_transport_service.h"
37 #include "gnunet_hello_lib.h"
38 #include "gnunet_dht_service.h"
39 #include "gnunet_statistics_service.h"
40 #include "dht.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_nse.h"
45 #include "gnunet-service-dht_routing.h"
46 #include <fenv.h>
47
48 /**
49  * How many buckets will we allow total.
50  */
51 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
52
53 /**
54  * What is the maximum number of peers in a given bucket.
55  */
56 #define DEFAULT_BUCKET_SIZE 4
57
58 /**
59  * Size of the bloom filter the DHT uses to filter peers.
60  */
61 #define DHT_BLOOM_SIZE 128
62
63 /**
64  * Desired replication level for FIND PEER requests
65  */
66 #define FIND_PEER_REPLICATION_LEVEL 4
67
68 /**
69  * How often to update our preference levels for peers in our routing tables.
70  */
71 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
72
73 /**
74  * How long at least to wait before sending another find peer request.
75  */
76 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
77
78 /**
79  * How long at most to wait before sending another find peer request.
80  */
81 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
82
83
84 /**
85  * P2P PUT message
86  */
87 struct PeerPutMessage
88 {
89   /**
90    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
91    */
92   struct GNUNET_MessageHeader header;
93
94   /**
95    * Processing options
96    */
97   uint32_t options GNUNET_PACKED;
98
99   /**
100    * Content type.
101    */
102   uint32_t type GNUNET_PACKED;
103
104   /**
105    * Hop count
106    */
107   uint32_t hop_count GNUNET_PACKED;
108
109   /**
110    * Replication level for this message
111    */
112   uint32_t desired_replication_level GNUNET_PACKED;
113
114   /**
115    * Length of the PUT path that follows (if tracked).
116    */
117   uint32_t put_path_length GNUNET_PACKED;
118
119   /**
120    * When does the content expire?
121    */
122   struct GNUNET_TIME_AbsoluteNBO expiration_time;
123
124   /**
125    * Bloomfilter (for peer identities) to stop circular routes
126    */
127   char bloomfilter[DHT_BLOOM_SIZE];
128
129   /**
130    * The key we are storing under.
131    */
132   GNUNET_HashCode key;
133
134   /* put path (if tracked) */
135
136   /* Payload */
137
138 };
139
140
141 /**
142  * P2P Result message
143  */
144 struct PeerResultMessage
145 {
146   /**
147    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
148    */
149   struct GNUNET_MessageHeader header;
150
151   /**
152    * Content type.
153    */
154   uint32_t type GNUNET_PACKED;
155
156   /**
157    * Length of the PUT path that follows (if tracked).
158    */
159   uint32_t put_path_length GNUNET_PACKED;
160
161   /**
162    * Length of the GET path that follows (if tracked).
163    */
164   uint32_t get_path_length GNUNET_PACKED;
165
166   /**
167    * When does the content expire?
168    */
169   struct GNUNET_TIME_AbsoluteNBO expiration_time;
170
171   /**
172    * The key of the corresponding GET request.
173    */
174   GNUNET_HashCode key;
175
176   /* put path (if tracked) */
177
178   /* get path (if tracked) */
179
180   /* Payload */
181
182 };
183
184
185 /**
186  * P2P GET message
187  */
188 struct PeerGetMessage
189 {
190   /**
191    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
192    */
193   struct GNUNET_MessageHeader header;
194
195   /**
196    * Processing options
197    */
198   uint32_t options GNUNET_PACKED;
199
200   /**
201    * Desired content type.
202    */
203   uint32_t type GNUNET_PACKED;
204
205   /**
206    * Hop count
207    */
208   uint32_t hop_count GNUNET_PACKED;
209
210   /**
211    * Desired replication level for this request.
212    */
213   uint32_t desired_replication_level GNUNET_PACKED;
214
215   /**
216    * Size of the extended query.
217    */
218   uint32_t xquery_size;
219
220   /**
221    * Bloomfilter mutator.
222    */
223   uint32_t bf_mutator;
224
225   /**
226    * Bloomfilter (for peer identities) to stop circular routes
227    */
228   char bloomfilter[DHT_BLOOM_SIZE];
229
230   /**
231    * The key we are looking for.
232    */
233   GNUNET_HashCode key;
234
235   /* xquery */
236
237   /* result bloomfilter */
238
239 };
240
241
242 /**
243  * Linked list of messages to send to a particular other peer.
244  */
245 struct P2PPendingMessage
246 {
247   /**
248    * Pointer to next item in the list
249    */
250   struct P2PPendingMessage *next;
251
252   /**
253    * Pointer to previous item in the list
254    */
255   struct P2PPendingMessage *prev;
256
257   /**
258    * Message importance level.  FIXME: used? useful?
259    */
260   unsigned int importance;
261
262   /**
263    * When does this message time out?
264    */
265   struct GNUNET_TIME_Absolute timeout;
266
267   /**
268    * Actual message to be sent, allocated at the end of the struct:
269    * // msg = (cast) &pm[1]; 
270    * // memcpy (&pm[1], data, len);
271    */
272   const struct GNUNET_MessageHeader *msg;
273
274 };
275
276
277 /**
278  * Entry for a peer in a bucket.
279  */
280 struct PeerInfo
281 {
282   /**
283    * Next peer entry (DLL)
284    */
285   struct PeerInfo *next;
286
287   /**
288    *  Prev peer entry (DLL)
289    */
290   struct PeerInfo *prev;
291
292   /**
293    * Count of outstanding messages for peer.  FIXME: NEEDED?
294    * FIXME: bound queue size!?
295    */
296   unsigned int pending_count;
297
298   /**
299    * Head of pending messages to be sent to this peer.
300    */
301   struct P2PPendingMessage *head;
302
303   /**
304    * Tail of pending messages to be sent to this peer.
305    */
306   struct P2PPendingMessage *tail;
307
308   /**
309    * Core handle for sending messages to this peer.
310    */
311   struct GNUNET_CORE_TransmitHandle *th;
312
313   /**
314    * Preference update context
315    */
316   struct GNUNET_CORE_InformationRequestContext *info_ctx;
317
318   /**
319    * Task for scheduling message sends.
320    */
321   GNUNET_SCHEDULER_TaskIdentifier send_task;
322
323   /**
324    * Task for scheduling preference updates
325    */
326   GNUNET_SCHEDULER_TaskIdentifier preference_task;
327
328   /**
329    * What is the identity of the peer?
330    */
331   struct GNUNET_PeerIdentity id;
332
333 #if 0
334   /**
335    * What is the average latency for replies received?
336    */
337   struct GNUNET_TIME_Relative latency;
338
339   /**
340    * Transport level distance to peer.
341    */
342   unsigned int distance;
343 #endif
344
345 };
346
347
348 /**
349  * Peers are grouped into buckets.
350  */
351 struct PeerBucket
352 {
353   /**
354    * Head of DLL
355    */
356   struct PeerInfo *head;
357
358   /**
359    * Tail of DLL
360    */
361   struct PeerInfo *tail;
362
363   /**
364    * Number of peers in the bucket.
365    */
366   unsigned int peers_size;
367 };
368
369
370 /**
371  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
372  */
373 static unsigned int closest_bucket;
374
375 /**
376  * How many peers have we added since we sent out our last
377  * find peer request?
378  */
379 static unsigned int newly_found_peers;
380
381 /**
382  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
383  */
384 static struct PeerBucket k_buckets[MAX_BUCKETS];
385
386 /**
387  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
388  */
389 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
390
391 /**
392  * Maximum size for each bucket.
393  */
394 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
395
396 /**
397  * Task that sends FIND PEER requests.
398  */
399 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
400
401 /**
402  * Identity of this peer.
403  */ 
404 static struct GNUNET_PeerIdentity my_identity;
405
406 /**
407  * Handle to GNUnet core.
408  */
409 static struct GNUNET_CORE_Handle *coreAPI;
410
411
412 /**
413  * Find the optimal bucket for this key.
414  *
415  * @param hc the hashcode to compare our identity to
416  * @return the proper bucket index, or GNUNET_SYSERR
417  *         on error (same hashcode)
418  */
419 static int
420 find_bucket (const GNUNET_HashCode * hc)
421 {
422   unsigned int bits;
423
424   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
425   if (bits == MAX_BUCKETS)
426     {
427       /* How can all bits match? Got my own ID? */
428       GNUNET_break (0);
429       return GNUNET_SYSERR; 
430     }
431   return MAX_BUCKETS - bits - 1;
432 }
433
434
435 /**
436  * Let GNUnet core know that we like the given peer.
437  *
438  * @param cls the 'struct PeerInfo' of the peer
439  * @param tc scheduler context.
440  */ 
441 static void
442 update_core_preference (void *cls,
443                         const struct GNUNET_SCHEDULER_TaskContext *tc);
444
445
446 /**
447  * Function called with statistics about the given peer.
448  *
449  * @param cls closure
450  * @param peer identifies the peer
451  * @param bpm_out set to the current bandwidth limit (sending) for this peer
452  * @param amount set to the amount that was actually reserved or unreserved;
453  *               either the full requested amount or zero (no partial reservations)
454  * @param res_delay if the reservation could not be satisfied (amount was 0), how
455  *        long should the client wait until re-trying?
456  * @param preference current traffic preference for the given peer
457  */
458 static void
459 update_core_preference_finish (void *cls,
460                                const struct GNUNET_PeerIdentity *peer,
461                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
462                                int32_t amount,
463                                struct GNUNET_TIME_Relative res_delay,
464                                uint64_t preference)
465 {
466   struct PeerInfo *peer_info = cls;
467
468   peer_info->info_ctx = NULL;
469   peer_info->preference_task
470     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
471                                     &update_core_preference, peer_info);
472 }
473
474
475 /**
476  * Let GNUnet core know that we like the given peer.
477  *
478  * @param cls the 'struct PeerInfo' of the peer
479  * @param tc scheduler context.
480  */ 
481 static void
482 update_core_preference (void *cls,
483                         const struct GNUNET_SCHEDULER_TaskContext *tc)
484 {
485   struct PeerInfo *peer = cls;
486   uint64_t preference;
487   unsigned int matching;
488   int bucket;
489
490   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
491   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
492     return;  
493   matching =
494     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
495                                       &peer->id.hashPubKey);
496   if (matching >= 64)
497     matching = 63;
498   bucket = find_bucket(&peer->id.hashPubKey);
499   if (bucket == GNUNET_SYSERR)
500     preference = 0;
501   else
502     preference = (1LL << matching) / k_buckets[bucket].peers_size;
503   if (preference == 0)
504     {
505       peer->preference_task
506         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
507                                         &update_core_preference, peer);
508       return;
509     }
510   peer->info_ctx =
511     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
512                                         GNUNET_TIME_UNIT_FOREVER_REL,
513                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
514                                         preference,
515                                         &update_core_preference_finish, peer);
516 }
517
518
519 /**
520  * Method called whenever a peer connects.
521  *
522  * @param cls closure
523  * @param peer peer identity this notification is about
524  * @param atsi performance data
525  */
526 static void
527 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
528                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
529 {
530   struct PeerInfo *ret;
531   int peer_bucket;
532
533   /* Check for connect to self message */
534   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
535     return;
536   if (GNUNET_YES ==
537       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
538                                               &peer->hashPubKey))
539   {
540     GNUNET_break (0);
541     return;
542   }
543   peer_bucket = find_bucket (&peer->hashPubKey);
544   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
545   ret = GNUNET_malloc (sizeof (struct PeerInfo));
546 #if 0
547   ret->latency = latency;
548   ret->distance = distance;
549 #endif
550   ret->id = *peer;
551   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
552                                      k_buckets[peer_bucket].tail,
553                                      k_buckets[peer_bucket].tail, ret);
554   k_buckets[peer_bucket].peers_size++;
555   closest_bucket = GNUNET_MAX (closest_bucket,
556                                peer_bucket);
557   if ( (peer_bucket > 0) &&
558        (k_buckets[peer_bucket].peers_size <= bucket_size) )
559     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
560   newly_found_peers++;
561   GNUNET_assert (GNUNET_OK ==
562                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
563                                                     &peer->hashPubKey, ret,
564                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
565 }
566
567
568 /**
569  * Method called whenever a peer disconnects.
570  *
571  * @param cls closure
572  * @param peer peer identity this notification is about
573  */
574 static void
575 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
576 {
577   struct PeerInfo *to_remove;
578   int current_bucket;
579   struct P2PPendingMessage *pos;
580
581   /* Check for disconnect from self message */
582   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
583     return;
584   to_remove =
585       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
586   if (NULL == to_remove)
587     {
588       GNUNET_break (0);
589       return;
590     }
591   GNUNET_assert (GNUNET_YES ==
592                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
593                                                        &peer->hashPubKey,
594                                                        to_remove));
595   if (NULL != to_remove->info_ctx)
596   {
597     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
598     to_remove->info_ctx = NULL;
599   }
600   current_bucket = find_bucket (&to_remove->id.hashPubKey);
601   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
602                                k_buckets[current_bucket].tail,
603                                to_remove);
604   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
605   k_buckets[current_bucket].peers_size--;
606   while ( (closest_bucket > 0) &&
607           (k_buckets[closest_bucket].peers_size == 0) )
608     closest_bucket--;
609
610   if (to_remove->th != NULL) 
611   {
612     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
613     to_remove->th = NULL;
614   }
615   while (NULL != (pos = to_remove->head))
616   {
617     GNUNET_CONTAINER_DLL_remove (to_remove->head,
618                                  to_remove->tail,
619                                  pos);
620     GNUNET_free (pos);
621   }
622 }
623
624
625 /**
626  * Called when core is ready to send a message we asked for
627  * out to the destination.
628  *
629  * @param cls the 'struct PeerInfo' of the target peer
630  * @param size number of bytes available in buf
631  * @param buf where the callee should write the message
632  * @return number of bytes written to buf
633  */
634 static size_t
635 core_transmit_notify (void *cls, size_t size, void *buf)
636 {
637   struct PeerInfo *peer = cls;
638   char *cbuf = buf;
639   struct P2PPendingMessage *pending;
640   size_t off;
641   size_t msize;
642
643   peer->th = NULL;
644   if (buf == NULL)
645   {
646     /* client disconnected */
647     return 0;
648   }
649   if (peer->head == NULL)
650   {
651     /* no messages pending */
652     return 0;
653   }
654   off = 0;
655   while ( (NULL != (pending = peer->head)) &&
656           (size - off >= (msize = ntohs (pending->msg->size))) )
657   {
658     memcpy (&cbuf[off], pending->msg, msize);
659     off += msize;
660     peer->pending_count--;
661     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
662     GNUNET_free (pending);
663   }
664   if (peer->head != NULL)
665     peer->th 
666       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
667                                            pending->importance,
668                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
669                                            &peer->id, msize,
670                                            &core_transmit_notify, peer);
671   return off;
672 }
673
674
675 /**
676  * Transmit all messages in the peer's message queue.
677  *
678  * @param peer message queue to process
679  */
680 static void
681 process_peer_queue (struct PeerInfo *peer)
682 {
683   struct P2PPendingMessage *pending;
684
685   if (NULL != (pending = peer->head))
686     return;
687   if (NULL != peer->th)
688     return;
689   peer->th 
690     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
691                                          pending->importance,
692                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
693                                          &peer->id,
694                                          ntohs (pending->msg->size),
695                                          &core_transmit_notify, peer);
696 }
697
698
699 /**
700  * To how many peers should we (on average) forward the request to
701  * obtain the desired target_replication count (on average).
702  *
703  * @param hop_count number of hops the message has traversed
704  * @param target_replication the number of total paths desired
705  * @return Some number of peers to forward the message to
706  */
707 static unsigned int
708 get_forward_count (uint32_t hop_count, 
709                    uint32_t target_replication)
710 {
711   uint32_t random_value;
712   uint32_t forward_count;
713   float target_value;
714
715   if (hop_count > GDS_NSE_get () * 4.0)
716   {
717     /* forcefully terminate */
718     return 0;
719   }
720   if (hop_count > GDS_NSE_get () * 2.0)
721   {
722     /* Once we have reached our ideal number of hops, only forward to 1 peer */
723     return 1;
724   }
725   /* bound by system-wide maximum */
726   target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
727                                    target_replication);
728   target_value =
729     1 + (target_replication - 1.0) / (GDS_NSE_get () +
730                                       ((float) (target_replication - 1.0) *
731                                        hop_count));
732   /* Set forward count to floor of target_value */
733   forward_count = (uint32_t) target_value;
734   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
735   target_value = target_value - forward_count;
736   random_value =
737     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
738   if (random_value < (target_value * UINT32_MAX))
739     forward_count++;
740   return forward_count;
741 }
742
743
744 /**
745  * Compute the distance between have and target as a 32-bit value.
746  * Differences in the lower bits must count stronger than differences
747  * in the higher bits.
748  *
749  * @return 0 if have==target, otherwise a number
750  *           that is larger as the distance between
751  *           the two hash codes increases
752  */
753 static unsigned int
754 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
755 {
756   unsigned int bucket;
757   unsigned int msb;
758   unsigned int lsb;
759   unsigned int i;
760
761   /* We have to represent the distance between two 2^9 (=512)-bit
762    * numbers as a 2^5 (=32)-bit number with "0" being used for the
763    * two numbers being identical; furthermore, we need to
764    * guarantee that a difference in the number of matching
765    * bits is always represented in the result.
766    *
767    * We use 2^32/2^9 numerical values to distinguish between
768    * hash codes that have the same LSB bit distance and
769    * use the highest 2^9 bits of the result to signify the
770    * number of (mis)matching LSB bits; if we have 0 matching
771    * and hence 512 mismatching LSB bits we return -1 (since
772    * 512 itself cannot be represented with 9 bits) */
773
774   /* first, calculate the most significant 9 bits of our
775    * result, aka the number of LSBs */
776   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
777   /* bucket is now a value between 0 and 512 */
778   if (bucket == 512)
779     return 0;                   /* perfect match */
780   if (bucket == 0)
781     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
782                                  * below, we'd end up with max+1 (overflow)) */
783
784   /* calculate the most significant bits of the final result */
785   msb = (512 - bucket) << (32 - 9);
786   /* calculate the 32-9 least significant bits of the final result by
787    * looking at the differences in the 32-9 bits following the
788    * mismatching bit at 'bucket' */
789   lsb = 0;
790   for (i = bucket + 1;
791        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
792   {
793     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
794         GNUNET_CRYPTO_hash_get_bit (have, i))
795       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
796                                                  * last bit set will be 31 -- if
797                                                  * i does not reach 512 first... */
798   }
799   return msb | lsb;
800 }
801
802
803 /**
804  * Check whether my identity is closer than any known peers.  If a
805  * non-null bloomfilter is given, check if this is the closest peer
806  * that hasn't already been routed to.
807  *
808  * @param key hash code to check closeness to
809  * @param bloom bloomfilter, exclude these entries from the decision
810  * @return GNUNET_YES if node location is closest,
811  *         GNUNET_NO otherwise.
812  */
813 static int
814 am_closest_peer (const GNUNET_HashCode *key,
815                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
816 {
817   int bits;
818   int other_bits;
819   int bucket_num;
820   int count;
821   struct PeerInfo *pos;
822   unsigned int my_distance;
823
824   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
825     return GNUNET_YES;
826   bucket_num = find_bucket (key);
827   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
828   my_distance = get_distance (&my_identity.hashPubKey, key);
829   pos = k_buckets[bucket_num].head;
830   count = 0;
831   while ((pos != NULL) && (count < bucket_size))
832   {
833     if ((bloom != NULL) &&
834         (GNUNET_YES ==
835          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
836     {
837       pos = pos->next;
838       continue;                 /* Skip already checked entries */
839     }
840     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
841     if (other_bits > bits)
842       return GNUNET_NO;
843     if (other_bits == bits)        /* We match the same number of bits */
844       return GNUNET_YES;
845     pos = pos->next;
846   }
847   /* No peers closer, we are the closest! */
848   return GNUNET_YES;
849 }
850
851
852 /**
853  * Select a peer from the routing table that would be a good routing
854  * destination for sending a message for "key".  The resulting peer
855  * must not be in the set of blocked peers.<p>
856  *
857  * Note that we should not ALWAYS select the closest peer to the
858  * target, peers further away from the target should be chosen with
859  * exponentially declining probability.
860  *
861  * FIXME: double-check that this is fine
862  * 
863  *
864  * @param key the key we are selecting a peer to route to
865  * @param bloom a bloomfilter containing entries this request has seen already
866  * @param hops how many hops has this message traversed thus far
867  * @return Peer to route to, or NULL on error
868  */
869 static struct PeerInfo *
870 select_peer (const GNUNET_HashCode *key,
871              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
872              uint32_t hops)
873 {
874   unsigned int bc;
875   unsigned int count;
876   unsigned int selected;
877   struct PeerInfo *pos;
878   unsigned int dist;
879   unsigned int smallest_distance;
880   struct PeerInfo *chosen;
881
882   if (hops >= GDS_NSE_get ())
883   {
884     /* greedy selection (closest peer that is not in bloomfilter) */
885     smallest_distance = UINT_MAX;
886     chosen = NULL;
887     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
888     {
889       pos = k_buckets[bc].head;
890       count = 0;
891       while ((pos != NULL) && (count < bucket_size))
892       {
893         if (GNUNET_NO ==
894             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
895         {
896           dist = get_distance (key, &pos->id.hashPubKey);
897           if (dist < smallest_distance)
898           {
899             chosen = pos;
900             smallest_distance = dist;
901           }
902         }
903         count++;
904         pos = pos->next;
905       }
906     }
907     return chosen;
908   }
909
910   /* select "random" peer */
911   /* count number of peers that are available and not filtered */
912   count = 0;
913   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
914   {
915     pos = k_buckets[bc].head;
916     while ((pos != NULL) && (count < bucket_size))
917     {
918       if (GNUNET_YES ==
919           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
920       {
921         pos = pos->next;
922         continue;               /* Ignore bloomfiltered peers */
923       }
924       count++;
925       pos = pos->next;
926     }
927   }
928   if (count == 0)               /* No peers to select from! */
929   {
930     return NULL;
931   }
932   /* Now actually choose a peer */
933   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
934   count = 0;
935   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
936   {
937     pos = k_buckets[bc].head;
938     while ((pos != NULL) && (count < bucket_size))
939     {
940       if (GNUNET_YES ==
941           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
942       {
943         pos = pos->next;
944         continue;               /* Ignore bloomfiltered peers */
945       }
946       if (0 == selected--)
947         return pos;
948       pos = pos->next;
949     }
950   }
951   GNUNET_break (0);
952   return NULL;
953 }
954
955
956 /**
957  * Compute the set of peers that the given request should be
958  * forwarded to.
959  *
960  * @param key routing key
961  * @param bloom bloom filter excluding peers as targets, all selected
962  *        peers will be added to the bloom filter
963  * @param hop_count number of hops the request has traversed so far
964  * @param target_replication desired number of replicas
965  * @param targets where to store an array of target peers (to be
966  *         free'd by the caller)
967  * @return number of peers returned in 'targets'.
968  */
969 static unsigned int
970 get_target_peers (const GNUNET_HashCode *key,
971                   struct GNUNET_CONTAINER_BloomFilter *bloom,
972                   uint32_t hop_count,
973                   uint32_t target_replication,
974                   struct PeerInfo ***targets)
975 {
976   unsigned int ret;
977   unsigned int off;
978   struct PeerInfo **rtargets;
979   struct PeerInfo *nxt;
980
981   ret = get_forward_count (hop_count, target_replication);
982   if (ret == 0)
983   {
984     *targets = NULL;
985     return 0;
986   }
987   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
988   off = 0;
989   while (ret-- > 0)
990   {
991     nxt = select_peer (key, bloom, hop_count);
992     if (nxt == NULL)
993       break;
994     rtargets[off++] = nxt;
995     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
996   }
997   if (0 == off)
998   {
999     GNUNET_free (rtargets);
1000     *targets = NULL;
1001     return 0;
1002   }
1003   *targets = rtargets;
1004   return off;
1005 }
1006
1007
1008 /**
1009  * Perform a PUT operation.   Forwards the given request to other
1010  * peers.   Does not store the data locally.  Does not give the
1011  * data to local clients.  May do nothing if this is the only
1012  * peer in the network (or if we are the closest peer in the
1013  * network).
1014  *
1015  * @param type type of the block
1016  * @param options routing options
1017  * @param desired_replication_level desired replication count
1018  * @param expiration_time when does the content expire
1019  * @param hop_count how many hops has this message traversed so far
1020  * @param bf Bloom filter of peers this PUT has already traversed
1021  * @param key key for the content
1022  * @param put_path_length number of entries in put_path
1023  * @param put_path peers this request has traversed so far (if tracked)
1024  * @param data payload to store
1025  * @param data_size number of bytes in data
1026  */
1027 void
1028 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1029                            enum GNUNET_DHT_RouteOption options,
1030                            uint32_t desired_replication_level,
1031                            struct GNUNET_TIME_Absolute expiration_time,
1032                            uint32_t hop_count,
1033                            struct GNUNET_CONTAINER_BloomFilter *bf,
1034                            const GNUNET_HashCode *key,
1035                            unsigned int put_path_length,
1036                            struct GNUNET_PeerIdentity *put_path,
1037                            const void *data,
1038                            size_t data_size)
1039 {
1040   unsigned int target_count;
1041   unsigned int i;
1042   struct PeerInfo **targets;
1043   struct PeerInfo *target;
1044   struct P2PPendingMessage *pending;
1045   size_t msize;
1046   struct PeerPutMessage *ppm;
1047   struct GNUNET_PeerIdentity *pp;
1048   
1049   target_count = get_target_peers (key, bf, hop_count,
1050                                    desired_replication_level,
1051                                    &targets);
1052   if (0 == target_count)
1053     return;
1054   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1055   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1056   {
1057     put_path_length = 0;
1058     msize = data_size + sizeof (struct PeerPutMessage);
1059   }
1060   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1061   {
1062     GNUNET_break (0);
1063     return;
1064   }
1065   for (i=0;i<target_count;i++)
1066   {
1067     target = targets[i];
1068     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1069     pending->importance = 0; /* FIXME */
1070     pending->timeout = expiration_time;   
1071     ppm = (struct PeerPutMessage*) &pending[1];
1072     pending->msg = &ppm->header;
1073     ppm->header.size = htons (msize);
1074     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1075     ppm->options = htonl (options);
1076     ppm->type = htonl (type);
1077     ppm->hop_count = htonl (hop_count + 1);
1078     ppm->desired_replication_level = htonl (desired_replication_level);
1079     ppm->put_path_length = htonl (put_path_length);
1080     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1081     GNUNET_assert (GNUNET_OK ==
1082                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1083                                                               ppm->bloomfilter,
1084                                                               DHT_BLOOM_SIZE));
1085     ppm->key = *key;
1086     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1087     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1088     memcpy (&pp[put_path_length], data, data_size);
1089     GNUNET_CONTAINER_DLL_insert (target->head,
1090                                  target->tail,
1091                                  pending);
1092     target->pending_count++;
1093     process_peer_queue (target);
1094   }
1095   GNUNET_free (targets);
1096 }
1097
1098
1099 /**
1100  * Perform a GET operation.  Forwards the given request to other
1101  * peers.  Does not lookup the key locally.  May do nothing if this is
1102  * the only peer in the network (or if we are the closest peer in the
1103  * network).
1104  *
1105  * @param type type of the block
1106  * @param options routing options
1107  * @param desired_replication_level desired replication count
1108  * @param hop_count how many hops did this request traverse so far?
1109  * @param key key for the content
1110  * @param xquery extended query
1111  * @param xquery_size number of bytes in xquery
1112  * @param reply_bf bloomfilter to filter duplicates
1113  * @param reply_bf_mutator mutator for reply_bf
1114  * @param peer_bf filter for peers not to select (again)
1115  */
1116 void
1117 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1118                            enum GNUNET_DHT_RouteOption options,
1119                            uint32_t desired_replication_level,
1120                            uint32_t hop_count,
1121                            const GNUNET_HashCode *key,
1122                            const void *xquery,
1123                            size_t xquery_size,
1124                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1125                            uint32_t reply_bf_mutator,
1126                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1127 {
1128   unsigned int target_count;
1129   unsigned int i;
1130   struct PeerInfo **targets;
1131   struct PeerInfo *target;
1132   struct P2PPendingMessage *pending;
1133   size_t msize;
1134   struct PeerGetMessage *pgm;
1135   char *xq;
1136   size_t reply_bf_size;
1137   
1138   target_count = get_target_peers (key, peer_bf, hop_count,
1139                                    desired_replication_level,
1140                                    &targets);
1141   if (0 == target_count)
1142     return;
1143   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1144   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1145   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1146   {
1147     GNUNET_break (0);
1148     return;
1149   }
1150   /* forward request */
1151   for (i=0;i<target_count;i++)
1152   {
1153     target = targets[i];
1154     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1155     pending->importance = 0; /* FIXME */
1156     pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
1157     pgm = (struct PeerGetMessage*) &pending[1];
1158     pending->msg = &pgm->header;
1159     pgm->header.size = htons (msize);
1160     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1161     pgm->options = htonl (options);
1162     pgm->type = htonl (type);
1163     pgm->hop_count = htonl (hop_count + 1);
1164     pgm->desired_replication_level = htonl (desired_replication_level);
1165     pgm->xquery_size = htonl (xquery_size);
1166     pgm->bf_mutator = reply_bf_mutator; 
1167     GNUNET_assert (GNUNET_OK ==
1168                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1169                                                               pgm->bloomfilter,
1170                                                               DHT_BLOOM_SIZE));
1171     pgm->key = *key;
1172     xq = (char *) &pgm[1];
1173     memcpy (xq, xquery, xquery_size);
1174     GNUNET_assert (GNUNET_OK ==
1175                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1176                                                               &xq[xquery_size],
1177                                                               reply_bf_size));
1178     GNUNET_CONTAINER_DLL_insert (target->head,
1179                                  target->tail,
1180                                  pending);
1181     target->pending_count++;
1182     process_peer_queue (target);
1183   }
1184   GNUNET_free (targets);
1185 }
1186
1187
1188 /**
1189  * Handle a reply (route to origin).  Only forwards the reply back to
1190  * the given peer.  Does not do local caching or forwarding to local
1191  * clients.
1192  *
1193  * @param target neighbour that should receive the block (if still connected)
1194  * @param type type of the block
1195  * @param expiration_time when does the content expire
1196  * @param key key for the content
1197  * @param put_path_length number of entries in put_path
1198  * @param put_path peers the original PUT traversed (if tracked)
1199  * @param get_path_length number of entries in put_path
1200  * @param get_path peers this reply has traversed so far (if tracked)
1201  * @param data payload of the reply
1202  * @param data_size number of bytes in data
1203  */
1204 void
1205 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1206                              enum GNUNET_BLOCK_Type type,
1207                              struct GNUNET_TIME_Absolute expiration_time,
1208                              const GNUNET_HashCode *key,
1209                              unsigned int put_path_length,
1210                              struct GNUNET_PeerIdentity *put_path,
1211                              unsigned int get_path_length,
1212                              struct GNUNET_PeerIdentity *get_path,
1213                              const void *data,
1214                              size_t data_size)
1215 {
1216   struct PeerInfo *pi;
1217   struct P2PPendingMessage *pending;
1218   size_t msize;
1219   struct PeerResultMessage *prm;
1220   struct GNUNET_PeerIdentity *paths;
1221   
1222   msize = data_size + sizeof (struct PeerResultMessage) + 
1223     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1224   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1225        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1226        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1227        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1228   {
1229     GNUNET_break (0);
1230     return;
1231   }
1232   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1233                                           &target->hashPubKey);
1234   if (NULL == pi)
1235   {
1236     /* peer disconnected in the meantime, drop reply */
1237     return;
1238   }
1239   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1240   pending->importance = 0; /* FIXME */
1241   pending->timeout = expiration_time;
1242   prm = (struct PeerResultMessage*) &pending[1];
1243   pending->msg = &prm->header;
1244   prm->header.size = htons (msize);
1245   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1246   prm->type = htonl (type);
1247   prm->put_path_length = htonl (put_path_length);
1248   prm->get_path_length = htonl (get_path_length);
1249   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1250   prm->key = *key;
1251   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1252   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1253   memcpy (&paths[put_path_length],
1254           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1255   memcpy (&paths[put_path_length + get_path_length],
1256           data, data_size);
1257   GNUNET_CONTAINER_DLL_insert (pi->head,
1258                                pi->tail,
1259                                pending);
1260   pi->pending_count++;
1261   process_peer_queue (pi);
1262 }
1263
1264
1265 /**
1266  * Closure for 'add_known_to_bloom'.
1267  */
1268 struct BloomConstructorContext
1269 {
1270   /**
1271    * Bloom filter under construction.
1272    */
1273   struct GNUNET_CONTAINER_BloomFilter *bloom;
1274
1275   /**
1276    * Mutator to use.
1277    */
1278   uint32_t bf_mutator;
1279 };
1280
1281
1282 /**
1283  * Add each of the peers we already know to the bloom filter of
1284  * the request so that we don't get duplicate HELLOs.
1285  *
1286  * @param cls the 'struct BloomConstructorContext'.
1287  * @param key peer identity to add to the bloom filter
1288  * @param value value the peer information (unused)
1289  * @return GNUNET_YES (we should continue to iterate)
1290  */
1291 static int
1292 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1293 {
1294   struct BloomConstructorContext *ctx = cls;
1295   GNUNET_HashCode mh;
1296
1297   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1298   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1299   return GNUNET_YES;
1300 }
1301
1302
1303 /**
1304  * Task to send a find peer message for our own peer identifier
1305  * so that we can find the closest peers in the network to ourselves
1306  * and attempt to connect to them.
1307  *
1308  * @param cls closure for this task
1309  * @param tc the context under which the task is running
1310  */
1311 static void
1312 send_find_peer_message (void *cls,
1313                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1314 {
1315   struct GNUNET_TIME_Relative next_send_time;
1316   struct BloomConstructorContext bcc;
1317
1318   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1319   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1320     return;
1321   if (newly_found_peers > bucket_size) 
1322   {
1323     /* If we are finding many peers already, no need to send out our request right now! */
1324     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1325                                                    &send_find_peer_message, NULL);
1326     newly_found_peers = 0;
1327     return;
1328   }
1329   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1330   bcc.bloom =
1331     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1332   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1333                                          &add_known_to_bloom,
1334                                          &bcc);
1335   // FIXME: pass priority!?
1336   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1337                              GNUNET_DHT_RO_FIND_PEER,
1338                              FIND_PEER_REPLICATION_LEVEL,
1339                              0,
1340                              &my_identity.hashPubKey,
1341                              NULL, 0,
1342                              bcc.bloom, bcc.bf_mutator, NULL);
1343   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1344   /* schedule next round */
1345   newly_found_peers = 0;
1346   next_send_time.rel_value =
1347     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1348     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1349                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1350   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1351                                                  &send_find_peer_message,
1352                                                  NULL);  
1353 }
1354
1355
1356 /**
1357  * To be called on core init/fail.
1358  *
1359  * @param cls service closure
1360  * @param server handle to the server for this service
1361  * @param identity the public identity of this peer
1362  * @param publicKey the public key of this peer
1363  */
1364 static void
1365 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1366            const struct GNUNET_PeerIdentity *identity,
1367            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1368 {
1369   struct GNUNET_TIME_Relative next_send_time;
1370
1371   GNUNET_assert (server != NULL);
1372   my_identity = *identity;
1373   /* FIXME: do upon 1st connect instead! */
1374   next_send_time.rel_value =
1375     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1376     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1377                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1378                                2) -
1379                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1380   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1381                                                  &send_find_peer_message,
1382                                                  NULL);
1383 }
1384
1385
1386 /**
1387  * Core handler for p2p put requests.
1388  *
1389  * @param cls closure
1390  * @param peer sender of the request
1391  * @param message message
1392  * @param peer peer identity this notification is about
1393  * @param atsi performance data
1394  * @return GNUNET_OK to keep the connection open,
1395  *         GNUNET_SYSERR to close it (signal serious error)
1396  */
1397 static int
1398 handle_dht_p2p_put (void *cls,
1399                     const struct GNUNET_PeerIdentity *peer,
1400                     const struct GNUNET_MessageHeader *message,
1401                     const struct GNUNET_TRANSPORT_ATS_Information
1402                     *atsi)
1403 {
1404   const struct PeerPutMessage *put;
1405   const struct GNUNET_PeerIdentity *put_path;
1406   const void *payload;
1407   uint32_t putlen;
1408   uint16_t msize;
1409   size_t payload_size;
1410   enum GNUNET_DHT_RouteOption options;
1411   struct GNUNET_CONTAINER_BloomFilter *bf;
1412   GNUNET_HashCode test_key;
1413   
1414   msize = ntohs (message->size);
1415   if (msize < sizeof (struct PeerPutMessage))
1416   {
1417     GNUNET_break_op (0);
1418     return GNUNET_YES;
1419   }
1420   put = (const struct PeerPutMessage*) message;
1421   putlen = ntohl (put->put_path_length);
1422   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1423        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1424     {
1425       GNUNET_break_op (0);
1426       return GNUNET_YES;
1427     }
1428   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1429   payload = &put_path[putlen];
1430   options = ntohl (put->options);
1431   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1432                           putlen * sizeof (struct GNUNET_PeerIdentity));
1433   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1434                                 ntohl (put->type),
1435                                 payload, payload_size,
1436                                 &test_key))
1437   {
1438   case GNUNET_YES:
1439     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1440     {
1441       GNUNET_break_op (0);
1442       return GNUNET_YES;
1443     }
1444     break;
1445   case GNUNET_NO:
1446     GNUNET_break_op (0);
1447     return GNUNET_YES;
1448   case GNUNET_SYSERR:
1449     /* cannot verify, good luck */
1450     break;
1451   }
1452   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1453                                           DHT_BLOOM_SIZE,
1454                                           DHT_BLOOM_K);
1455   {
1456     struct GNUNET_PeerIdentity pp[putlen+1];
1457   
1458     /* extend 'put path' by sender */
1459     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1460     {
1461       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1462       pp[putlen] = *peer;
1463       putlen++;
1464     }
1465     else
1466       putlen = 0;
1467     
1468     /* give to local clients */
1469     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1470                              &put->key,
1471                              0, NULL,
1472                              putlen,
1473                              pp,
1474                              ntohl (put->type),
1475                              payload_size,
1476                              payload);
1477     /* store locally */
1478     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1479          (am_closest_peer (&put->key,
1480                            bf) ) )
1481       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1482                                 &put->key,
1483                                 putlen, pp,
1484                                 ntohl (put->type),
1485                                 payload_size,
1486                                 payload);
1487     /* route to other peers */
1488     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1489                                options,
1490                                ntohl (put->desired_replication_level),
1491                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1492                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1493                                bf,
1494                                &put->key,
1495                                putlen, pp,
1496                                payload,
1497                                payload_size);
1498   }
1499   GNUNET_CONTAINER_bloomfilter_free (bf);
1500   return GNUNET_YES;
1501 }
1502
1503
1504 /**
1505  * Core handler for p2p get requests.
1506  *
1507  * @param cls closure
1508  * @param peer sender of the request
1509  * @param message message
1510  * @param peer peer identity this notification is about
1511  * @param atsi performance data
1512  * @return GNUNET_OK to keep the connection open,
1513  *         GNUNET_SYSERR to close it (signal serious error)
1514  */
1515 static int
1516 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1517                     const struct GNUNET_MessageHeader *message,
1518                     const struct GNUNET_TRANSPORT_ATS_Information
1519                     *atsi)
1520 {
1521   struct PeerGetMessage *get;
1522   uint32_t xquery_size;
1523   size_t reply_bf_size;
1524   uint16_t msize;
1525   enum GNUNET_BLOCK_Type type;
1526   enum GNUNET_DHT_RouteOption options;
1527   enum GNUNET_BLOCK_EvaluationResult eval;
1528   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1529   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1530   const char *xquery;
1531                       
1532   /* parse and validate message */
1533   msize = ntohs (message->size);
1534   if (msize < sizeof (struct PeerGetMessage))
1535   {
1536     GNUNET_break_op (0);
1537     return GNUNET_YES;
1538   }
1539   get = (struct PeerGetMessage *) message;
1540   xquery_size = ntohl (get->xquery_size);
1541   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1542   {
1543     GNUNET_break_op (0);
1544     return GNUNET_YES;
1545   }
1546   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1547   type = ntohl (get->type);
1548   options = ntohl (get->options);
1549   xquery = (const char*) &get[1];
1550   reply_bf = NULL;
1551   if (reply_bf_size > 0)
1552     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1553                                                   reply_bf_size,
1554                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1555   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1556                                 type,
1557                                 &get->key,
1558                                 &reply_bf,
1559                                 get->bf_mutator,
1560                                 xquery, xquery_size,
1561                                 NULL, 0);
1562   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1563   {
1564     /* request invalid or block type not supported */
1565     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1566     if (NULL != reply_bf)
1567       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1568     return GNUNET_YES;
1569   }
1570   peer_bf =
1571     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1572                                        DHT_BLOOM_SIZE,
1573                                        DHT_BLOOM_K);
1574
1575   /* remember request for routing replies */
1576   GDS_ROUTING_add (peer,
1577                    type,
1578                    options,
1579                    &get->key,
1580                    xquery, xquery_size,
1581                    reply_bf, get->bf_mutator);
1582   /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1583
1584   /* local lookup (this may update the reply_bf) */
1585   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1586        (am_closest_peer (&get->key,
1587                          peer_bf) ) )
1588     GDS_DATACACHE_handle_get (&get->key,
1589                               type,
1590                               xquery, xquery_size,
1591                               &reply_bf, 
1592                               get->bf_mutator);
1593   /* FIXME: should track if the local lookup resulted in a
1594      definitive result and then NOT do P2P forwarding */
1595   
1596     /* P2P forwarding */
1597   GDS_NEIGHBOURS_handle_get (type,
1598                              options,
1599                              ntohl (get->desired_replication_level),
1600                              ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1601                              &get->key,
1602                              xquery, xquery_size,
1603                              reply_bf,
1604                              get->bf_mutator,
1605                              peer_bf);
1606   /* clean up */
1607   if (NULL != reply_bf)
1608     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1609   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1610   return GNUNET_YES;
1611 }
1612
1613
1614 /**
1615  * Core handler for p2p result messages.
1616  *
1617  * @param cls closure
1618  * @param message message
1619  * @param peer peer identity this notification is about
1620  * @param atsi performance data
1621  * @return GNUNET_YES (do not cut p2p connection)
1622  */
1623 static int
1624 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1625                        const struct GNUNET_MessageHeader *message,
1626                        const struct GNUNET_TRANSPORT_ATS_Information
1627                        *atsi)
1628 {
1629   const struct PeerResultMessage *prm;
1630   const struct GNUNET_PeerIdentity *put_path;
1631   const struct GNUNET_PeerIdentity *get_path;
1632   const void *data;
1633   uint32_t get_path_length;
1634   uint32_t put_path_length;
1635   uint16_t msize;
1636   size_t data_size;
1637   enum GNUNET_BLOCK_Type type;
1638                        
1639   /* parse and validate message */
1640   msize = ntohs (message->size);
1641   if (msize < sizeof (struct PeerResultMessage))
1642   {
1643     GNUNET_break_op (0);
1644     return GNUNET_YES;
1645   }
1646   prm = (struct PeerResultMessage *) message;
1647   put_path_length = ntohl (prm->put_path_length);
1648   get_path_length = ntohl (prm->get_path_length);
1649   if ( (msize < sizeof (struct PeerResultMessage) + 
1650         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1651        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1652        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1653   {
1654     GNUNET_break_op (0);
1655     return GNUNET_YES;
1656   } 
1657   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1658   get_path = &put_path[put_path_length];
1659   type = ntohl (prm->type);
1660   data = (const void*) &get_path[get_path_length];
1661   data_size = msize - (sizeof (struct PeerResultMessage) + 
1662                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1663   /* append 'peer' to 'get_path' */
1664   {    
1665     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1666
1667     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1668     xget_path[get_path_length] = *peer;
1669     get_path_length++;
1670
1671     /* forward to local clients */   
1672     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1673                              &prm->key,
1674                              get_path_length,
1675                              xget_path,
1676                              put_path_length,
1677                              put_path,
1678                              type,
1679                              data_size, 
1680                              data);
1681
1682     /* forward to other peers */
1683     GDS_ROUTING_process (type,
1684                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1685                          &prm->key,
1686                          put_path_length,
1687                          put_path,
1688                          get_path_length,
1689                          xget_path,
1690                          data,
1691                          data_size);                     
1692   }
1693   return GNUNET_YES;
1694 }
1695
1696
1697 /**
1698  * Initialize neighbours subsystem.
1699  *
1700  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1701  */
1702 int
1703 GDS_NEIGHBOURS_init ()
1704 {
1705   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1706     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1707     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1708     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1709     {NULL, 0, 0}
1710   };
1711   unsigned long long temp_config_num;
1712  
1713   if (GNUNET_OK ==
1714       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1715                                              &temp_config_num))
1716     bucket_size = (unsigned int) temp_config_num;  
1717   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1718                                  1,
1719                                  NULL,
1720                                  &core_init,
1721                                  &handle_core_connect,
1722                                  &handle_core_disconnect, 
1723                                  NULL,  /* Do we care about "status" updates? */
1724                                  NULL, GNUNET_NO,
1725                                  NULL, GNUNET_NO,
1726                                  core_handlers);
1727   if (coreAPI == NULL)
1728     return GNUNET_SYSERR;
1729   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1730 #if 0
1731   struct GNUNET_TIME_Relative next_send_time;
1732
1733   // FIXME!
1734   next_send_time.rel_value =
1735     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1736     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1737                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1738                                2) -
1739                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1740   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1741                                                  &send_find_peer_message,
1742                                                  &find_peer_context);  
1743 #endif
1744   return GNUNET_OK;
1745 }
1746
1747
1748 /**
1749  * Shutdown neighbours subsystem.
1750  */
1751 void
1752 GDS_NEIGHBOURS_done ()
1753 {
1754   if (coreAPI == NULL)
1755     return;
1756   GNUNET_CORE_disconnect (coreAPI);
1757   coreAPI = NULL;    
1758   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1759   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1760   all_known_peers = NULL;
1761   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1762   {
1763     GNUNET_SCHEDULER_cancel (find_peer_task);
1764     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1765   }
1766 }
1767
1768
1769 /* end of gnunet-service-dht_neighbours.c */
1770
1771
1772 #if 0
1773
1774
1775
1776 #endif