fix
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "dht.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50
51 /**
52  * How many buckets will we allow total.
53  */
54 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
55
56 /**
57  * What is the maximum number of peers in a given bucket.
58  */
59 #define DEFAULT_BUCKET_SIZE 4
60
61 /**
62  * Size of the bloom filter the DHT uses to filter peers.
63  */
64 #define DHT_BLOOM_SIZE 128
65
66 /**
67  * Desired replication level for FIND PEER requests
68  */
69 #define FIND_PEER_REPLICATION_LEVEL 4
70
71 /**
72  * Maximum allowed replication level for all requests.
73  */
74 #define MAXIMUM_REPLICATION_LEVEL 16
75
76 /**
77  * How often to update our preference levels for peers in our routing tables.
78  */
79 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
80
81 /**
82  * How long at least to wait before sending another find peer request.
83  */
84 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
85
86 /**
87  * How long at most to wait before sending another find peer request.
88  */
89 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
90
91 /**
92  * How long at most to wait for transmission of a GET request to another peer?
93  */
94 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
95
96
97 /**
98  * P2P PUT message
99  */
100 struct PeerPutMessage
101 {
102   /**
103    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
104    */
105   struct GNUNET_MessageHeader header;
106
107   /**
108    * Processing options
109    */
110   uint32_t options GNUNET_PACKED;
111
112   /**
113    * Content type.
114    */
115   uint32_t type GNUNET_PACKED;
116
117   /**
118    * Hop count
119    */
120   uint32_t hop_count GNUNET_PACKED;
121
122   /**
123    * Replication level for this message
124    */
125   uint32_t desired_replication_level GNUNET_PACKED;
126
127   /**
128    * Length of the PUT path that follows (if tracked).
129    */
130   uint32_t put_path_length GNUNET_PACKED;
131
132   /**
133    * When does the content expire?
134    */
135   struct GNUNET_TIME_AbsoluteNBO expiration_time;
136
137   /**
138    * Bloomfilter (for peer identities) to stop circular routes
139    */
140   char bloomfilter[DHT_BLOOM_SIZE];
141
142   /**
143    * The key we are storing under.
144    */
145   GNUNET_HashCode key;
146
147   /* put path (if tracked) */
148
149   /* Payload */
150
151 };
152
153
154 /**
155  * P2P Result message
156  */
157 struct PeerResultMessage
158 {
159   /**
160    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
161    */
162   struct GNUNET_MessageHeader header;
163
164   /**
165    * Content type.
166    */
167   uint32_t type GNUNET_PACKED;
168
169   /**
170    * Length of the PUT path that follows (if tracked).
171    */
172   uint32_t put_path_length GNUNET_PACKED;
173
174   /**
175    * Length of the GET path that follows (if tracked).
176    */
177   uint32_t get_path_length GNUNET_PACKED;
178
179   /**
180    * When does the content expire?
181    */
182   struct GNUNET_TIME_AbsoluteNBO expiration_time;
183
184   /**
185    * The key of the corresponding GET request.
186    */
187   GNUNET_HashCode key;
188
189   /* put path (if tracked) */
190
191   /* get path (if tracked) */
192
193   /* Payload */
194
195 };
196
197
198 /**
199  * P2P GET message
200  */
201 struct PeerGetMessage
202 {
203   /**
204    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
205    */
206   struct GNUNET_MessageHeader header;
207
208   /**
209    * Processing options
210    */
211   uint32_t options GNUNET_PACKED;
212
213   /**
214    * Desired content type.
215    */
216   uint32_t type GNUNET_PACKED;
217
218   /**
219    * Hop count
220    */
221   uint32_t hop_count GNUNET_PACKED;
222
223   /**
224    * Desired replication level for this request.
225    */
226   uint32_t desired_replication_level GNUNET_PACKED;
227
228   /**
229    * Size of the extended query.
230    */
231   uint32_t xquery_size;
232
233   /**
234    * Bloomfilter mutator.
235    */
236   uint32_t bf_mutator;
237
238   /**
239    * Bloomfilter (for peer identities) to stop circular routes
240    */
241   char bloomfilter[DHT_BLOOM_SIZE];
242
243   /**
244    * The key we are looking for.
245    */
246   GNUNET_HashCode key;
247
248   /* xquery */
249
250   /* result bloomfilter */
251
252 };
253
254
255 /**
256  * Linked list of messages to send to a particular other peer.
257  */
258 struct P2PPendingMessage
259 {
260   /**
261    * Pointer to next item in the list
262    */
263   struct P2PPendingMessage *next;
264
265   /**
266    * Pointer to previous item in the list
267    */
268   struct P2PPendingMessage *prev;
269
270   /**
271    * Message importance level.  FIXME: used? useful?
272    */
273   unsigned int importance;
274
275   /**
276    * When does this message time out?
277    */
278   struct GNUNET_TIME_Absolute timeout;
279
280   /**
281    * Actual message to be sent, allocated at the end of the struct:
282    * // msg = (cast) &pm[1]; 
283    * // memcpy (&pm[1], data, len);
284    */
285   const struct GNUNET_MessageHeader *msg;
286
287 };
288
289
290 /**
291  * Entry for a peer in a bucket.
292  */
293 struct PeerInfo
294 {
295   /**
296    * Next peer entry (DLL)
297    */
298   struct PeerInfo *next;
299
300   /**
301    *  Prev peer entry (DLL)
302    */
303   struct PeerInfo *prev;
304
305   /**
306    * Count of outstanding messages for peer.  FIXME: NEEDED?
307    * FIXME: bound queue size!?
308    */
309   unsigned int pending_count;
310
311   /**
312    * Head of pending messages to be sent to this peer.
313    */
314   struct P2PPendingMessage *head;
315
316   /**
317    * Tail of pending messages to be sent to this peer.
318    */
319   struct P2PPendingMessage *tail;
320
321   /**
322    * Core handle for sending messages to this peer.
323    */
324   struct GNUNET_CORE_TransmitHandle *th;
325
326   /**
327    * Preference update context
328    */
329   struct GNUNET_CORE_InformationRequestContext *info_ctx;
330
331   /**
332    * Task for scheduling preference updates
333    */
334   GNUNET_SCHEDULER_TaskIdentifier preference_task;
335
336   /**
337    * What is the identity of the peer?
338    */
339   struct GNUNET_PeerIdentity id;
340
341 #if 0
342   /**
343    * What is the average latency for replies received?
344    */
345   struct GNUNET_TIME_Relative latency;
346
347   /**
348    * Transport level distance to peer.
349    */
350   unsigned int distance;
351 #endif
352
353 };
354
355
356 /**
357  * Peers are grouped into buckets.
358  */
359 struct PeerBucket
360 {
361   /**
362    * Head of DLL
363    */
364   struct PeerInfo *head;
365
366   /**
367    * Tail of DLL
368    */
369   struct PeerInfo *tail;
370
371   /**
372    * Number of peers in the bucket.
373    */
374   unsigned int peers_size;
375 };
376
377
378 /**
379  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
380  */
381 static unsigned int closest_bucket;
382
383 /**
384  * How many peers have we added since we sent out our last
385  * find peer request?
386  */
387 static unsigned int newly_found_peers;
388
389 /**
390  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
391  */
392 static struct PeerBucket k_buckets[MAX_BUCKETS];
393
394 /**
395  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
396  */
397 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
398
399 /**
400  * Maximum size for each bucket.
401  */
402 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
403
404 /**
405  * Task that sends FIND PEER requests.
406  */
407 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
408
409 /**
410  * Identity of this peer.
411  */ 
412 static struct GNUNET_PeerIdentity my_identity;
413
414 /**
415  * Handle to GNUnet core.
416  */
417 static struct GNUNET_CORE_Handle *coreAPI;
418
419
420 /**
421  * Find the optimal bucket for this key.
422  *
423  * @param hc the hashcode to compare our identity to
424  * @return the proper bucket index, or GNUNET_SYSERR
425  *         on error (same hashcode)
426  */
427 static int
428 find_bucket (const GNUNET_HashCode * hc)
429 {
430   unsigned int bits;
431
432   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
433   if (bits == MAX_BUCKETS)
434     {
435       /* How can all bits match? Got my own ID? */
436       GNUNET_break (0);
437       return GNUNET_SYSERR; 
438     }
439   return MAX_BUCKETS - bits - 1;
440 }
441
442
443 /**
444  * Let GNUnet core know that we like the given peer.
445  *
446  * @param cls the 'struct PeerInfo' of the peer
447  * @param tc scheduler context.
448  */ 
449 static void
450 update_core_preference (void *cls,
451                         const struct GNUNET_SCHEDULER_TaskContext *tc);
452
453
454 /**
455  * Function called with statistics about the given peer.
456  *
457  * @param cls closure
458  * @param peer identifies the peer
459  * @param bpm_out set to the current bandwidth limit (sending) for this peer
460  * @param amount set to the amount that was actually reserved or unreserved;
461  *               either the full requested amount or zero (no partial reservations)
462  * @param res_delay if the reservation could not be satisfied (amount was 0), how
463  *        long should the client wait until re-trying?
464  * @param preference current traffic preference for the given peer
465  */
466 static void
467 update_core_preference_finish (void *cls,
468                                const struct GNUNET_PeerIdentity *peer,
469                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
470                                int32_t amount,
471                                struct GNUNET_TIME_Relative res_delay,
472                                uint64_t preference)
473 {
474   struct PeerInfo *peer_info = cls;
475
476   peer_info->info_ctx = NULL;
477   peer_info->preference_task
478     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
479                                     &update_core_preference, peer_info);
480 }
481
482
483 /**
484  * Let GNUnet core know that we like the given peer.
485  *
486  * @param cls the 'struct PeerInfo' of the peer
487  * @param tc scheduler context.
488  */ 
489 static void
490 update_core_preference (void *cls,
491                         const struct GNUNET_SCHEDULER_TaskContext *tc)
492 {
493   struct PeerInfo *peer = cls;
494   uint64_t preference;
495   unsigned int matching;
496   int bucket;
497
498   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
499   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
500     return;  
501   matching =
502     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
503                                       &peer->id.hashPubKey);
504   if (matching >= 64)
505     matching = 63;
506   bucket = find_bucket(&peer->id.hashPubKey);
507   if (bucket == GNUNET_SYSERR)
508     preference = 0;
509   else
510     preference = (1LL << matching) / k_buckets[bucket].peers_size;
511   if (preference == 0)
512     {
513       peer->preference_task
514         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
515                                         &update_core_preference, peer);
516       return;
517     }
518   peer->info_ctx =
519     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
520                                         GNUNET_TIME_UNIT_FOREVER_REL,
521                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
522                                         preference,
523                                         &update_core_preference_finish, peer);
524 }
525
526
527 /**
528  * Closure for 'add_known_to_bloom'.
529  */
530 struct BloomConstructorContext
531 {
532   /**
533    * Bloom filter under construction.
534    */
535   struct GNUNET_CONTAINER_BloomFilter *bloom;
536
537   /**
538    * Mutator to use.
539    */
540   uint32_t bf_mutator;
541 };
542
543
544 /**
545  * Add each of the peers we already know to the bloom filter of
546  * the request so that we don't get duplicate HELLOs.
547  *
548  * @param cls the 'struct BloomConstructorContext'.
549  * @param key peer identity to add to the bloom filter
550  * @param value value the peer information (unused)
551  * @return GNUNET_YES (we should continue to iterate)
552  */
553 static int
554 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
555 {
556   struct BloomConstructorContext *ctx = cls;
557   GNUNET_HashCode mh;
558
559   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
560   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
561   return GNUNET_YES;
562 }
563
564
565 /**
566  * Task to send a find peer message for our own peer identifier
567  * so that we can find the closest peers in the network to ourselves
568  * and attempt to connect to them.
569  *
570  * @param cls closure for this task
571  * @param tc the context under which the task is running
572  */
573 static void
574 send_find_peer_message (void *cls,
575                         const struct GNUNET_SCHEDULER_TaskContext *tc)
576 {
577   struct GNUNET_TIME_Relative next_send_time;
578   struct BloomConstructorContext bcc;
579
580   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
581   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
582     return;
583   if (newly_found_peers > bucket_size) 
584   {
585     /* If we are finding many peers already, no need to send out our request right now! */
586     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
587                                                    &send_find_peer_message, NULL);
588     newly_found_peers = 0;
589     return;
590   }
591   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
592   bcc.bloom =
593     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
594   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
595                                          &add_known_to_bloom,
596                                          &bcc);
597   // FIXME: pass priority!?
598   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
599                              GNUNET_DHT_RO_FIND_PEER,
600                              FIND_PEER_REPLICATION_LEVEL,
601                              0,
602                              &my_identity.hashPubKey,
603                              NULL, 0,
604                              bcc.bloom, bcc.bf_mutator, NULL);
605   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
606   /* schedule next round */
607   newly_found_peers = 0;
608   next_send_time.rel_value =
609     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
610     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
611                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
612   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
613                                                  &send_find_peer_message,
614                                                  NULL);  
615 }
616
617
618 /**
619  * Method called whenever a peer connects.
620  *
621  * @param cls closure
622  * @param peer peer identity this notification is about
623  * @param atsi performance data
624  */
625 static void
626 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
627                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
628 {
629   struct PeerInfo *ret;
630   int peer_bucket;
631
632   /* Check for connect to self message */
633   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
634     return;
635   if (GNUNET_YES ==
636       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
637                                               &peer->hashPubKey))
638   {
639     GNUNET_break (0);
640     return;
641   }
642   peer_bucket = find_bucket (&peer->hashPubKey);
643   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
644   ret = GNUNET_malloc (sizeof (struct PeerInfo));
645 #if 0
646   ret->latency = latency;
647   ret->distance = distance;
648 #endif
649   ret->id = *peer;
650   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
651                                     k_buckets[peer_bucket].tail, ret);
652   k_buckets[peer_bucket].peers_size++;
653   closest_bucket = GNUNET_MAX (closest_bucket,
654                                peer_bucket);
655   if ( (peer_bucket > 0) &&
656        (k_buckets[peer_bucket].peers_size <= bucket_size) )
657     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
658   newly_found_peers++;
659   GNUNET_assert (GNUNET_OK ==
660                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
661                                                     &peer->hashPubKey, ret,
662                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
663   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
664   {
665     /* got a first connection, good time to start with FIND PEER requests... */
666     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
667                                                NULL);    
668   }
669 }
670
671
672 /**
673  * Method called whenever a peer disconnects.
674  *
675  * @param cls closure
676  * @param peer peer identity this notification is about
677  */
678 static void
679 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
680 {
681   struct PeerInfo *to_remove;
682   int current_bucket;
683   struct P2PPendingMessage *pos;
684
685   /* Check for disconnect from self message */
686   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
687     return;
688   to_remove =
689       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
690   if (NULL == to_remove)
691     {
692       GNUNET_break (0);
693       return;
694     }
695   GNUNET_assert (GNUNET_YES ==
696                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
697                                                        &peer->hashPubKey,
698                                                        to_remove));
699   if (NULL != to_remove->info_ctx)
700   {
701     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
702     to_remove->info_ctx = NULL;
703   }
704   current_bucket = find_bucket (&to_remove->id.hashPubKey);
705   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
706                                k_buckets[current_bucket].tail,
707                                to_remove);
708   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
709   k_buckets[current_bucket].peers_size--;
710   while ( (closest_bucket > 0) &&
711           (k_buckets[closest_bucket].peers_size == 0) )
712     closest_bucket--;
713
714   if (to_remove->th != NULL) 
715   {
716     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
717     to_remove->th = NULL;
718   }
719   while (NULL != (pos = to_remove->head))
720   {
721     GNUNET_CONTAINER_DLL_remove (to_remove->head,
722                                  to_remove->tail,
723                                  pos);
724     GNUNET_free (pos);
725   }
726 }
727
728
729 /**
730  * Called when core is ready to send a message we asked for
731  * out to the destination.
732  *
733  * @param cls the 'struct PeerInfo' of the target peer
734  * @param size number of bytes available in buf
735  * @param buf where the callee should write the message
736  * @return number of bytes written to buf
737  */
738 static size_t
739 core_transmit_notify (void *cls, size_t size, void *buf)
740 {
741   struct PeerInfo *peer = cls;
742   char *cbuf = buf;
743   struct P2PPendingMessage *pending;
744   size_t off;
745   size_t msize;
746
747   peer->th = NULL;
748   if (buf == NULL)
749   {
750     /* client disconnected */
751     return 0;
752   }
753   if (peer->head == NULL)
754   {
755     /* no messages pending */
756     return 0;
757   }
758   off = 0;
759   while ( (NULL != (pending = peer->head)) &&
760           (size - off >= (msize = ntohs (pending->msg->size))) )
761   {
762     memcpy (&cbuf[off], pending->msg, msize);
763     off += msize;
764     peer->pending_count--;
765     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
766     GNUNET_free (pending);
767   }
768   if (peer->head != NULL)
769     peer->th 
770       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
771                                            pending->importance,
772                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
773                                            &peer->id, msize,
774                                            &core_transmit_notify, peer);
775   return off;
776 }
777
778
779 /**
780  * Transmit all messages in the peer's message queue.
781  *
782  * @param peer message queue to process
783  */
784 static void
785 process_peer_queue (struct PeerInfo *peer)
786 {
787   struct P2PPendingMessage *pending;
788
789   if (NULL != (pending = peer->head))
790     return;
791   if (NULL != peer->th)
792     return;
793   peer->th 
794     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
795                                          pending->importance,
796                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
797                                          &peer->id,
798                                          ntohs (pending->msg->size),
799                                          &core_transmit_notify, peer);
800 }
801
802
803 /**
804  * To how many peers should we (on average) forward the request to
805  * obtain the desired target_replication count (on average).
806  *
807  * @param hop_count number of hops the message has traversed
808  * @param target_replication the number of total paths desired
809  * @return Some number of peers to forward the message to
810  */
811 static unsigned int
812 get_forward_count (uint32_t hop_count, 
813                    uint32_t target_replication)
814 {
815   uint32_t random_value;
816   uint32_t forward_count;
817   float target_value;
818
819   if (hop_count > GDS_NSE_get () * 4.0)
820   {
821     /* forcefully terminate */
822     return 0;
823   }
824   if (hop_count > GDS_NSE_get () * 2.0)
825   {
826     /* Once we have reached our ideal number of hops, only forward to 1 peer */
827     return 1;
828   }
829   /* bound by system-wide maximum */
830   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
831                                    target_replication);
832   target_value =
833     1 + (target_replication - 1.0) / (GDS_NSE_get () +
834                                       ((float) (target_replication - 1.0) *
835                                        hop_count));
836   /* Set forward count to floor of target_value */
837   forward_count = (uint32_t) target_value;
838   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
839   target_value = target_value - forward_count;
840   random_value =
841     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
842   if (random_value < (target_value * UINT32_MAX))
843     forward_count++;
844   return forward_count;
845 }
846
847
848 /**
849  * Compute the distance between have and target as a 32-bit value.
850  * Differences in the lower bits must count stronger than differences
851  * in the higher bits.
852  *
853  * @return 0 if have==target, otherwise a number
854  *           that is larger as the distance between
855  *           the two hash codes increases
856  */
857 static unsigned int
858 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
859 {
860   unsigned int bucket;
861   unsigned int msb;
862   unsigned int lsb;
863   unsigned int i;
864
865   /* We have to represent the distance between two 2^9 (=512)-bit
866    * numbers as a 2^5 (=32)-bit number with "0" being used for the
867    * two numbers being identical; furthermore, we need to
868    * guarantee that a difference in the number of matching
869    * bits is always represented in the result.
870    *
871    * We use 2^32/2^9 numerical values to distinguish between
872    * hash codes that have the same LSB bit distance and
873    * use the highest 2^9 bits of the result to signify the
874    * number of (mis)matching LSB bits; if we have 0 matching
875    * and hence 512 mismatching LSB bits we return -1 (since
876    * 512 itself cannot be represented with 9 bits) */
877
878   /* first, calculate the most significant 9 bits of our
879    * result, aka the number of LSBs */
880   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
881   /* bucket is now a value between 0 and 512 */
882   if (bucket == 512)
883     return 0;                   /* perfect match */
884   if (bucket == 0)
885     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
886                                  * below, we'd end up with max+1 (overflow)) */
887
888   /* calculate the most significant bits of the final result */
889   msb = (512 - bucket) << (32 - 9);
890   /* calculate the 32-9 least significant bits of the final result by
891    * looking at the differences in the 32-9 bits following the
892    * mismatching bit at 'bucket' */
893   lsb = 0;
894   for (i = bucket + 1;
895        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
896   {
897     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
898         GNUNET_CRYPTO_hash_get_bit (have, i))
899       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
900                                                  * last bit set will be 31 -- if
901                                                  * i does not reach 512 first... */
902   }
903   return msb | lsb;
904 }
905
906
907 /**
908  * Check whether my identity is closer than any known peers.  If a
909  * non-null bloomfilter is given, check if this is the closest peer
910  * that hasn't already been routed to.
911  *
912  * @param key hash code to check closeness to
913  * @param bloom bloomfilter, exclude these entries from the decision
914  * @return GNUNET_YES if node location is closest,
915  *         GNUNET_NO otherwise.
916  */
917 static int
918 am_closest_peer (const GNUNET_HashCode *key,
919                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
920 {
921   int bits;
922   int other_bits;
923   int bucket_num;
924   int count;
925   struct PeerInfo *pos;
926
927   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
928     return GNUNET_YES;
929   bucket_num = find_bucket (key);
930   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
931   pos = k_buckets[bucket_num].head;
932   count = 0;
933   while ((pos != NULL) && (count < bucket_size))
934   {
935     if ((bloom != NULL) &&
936         (GNUNET_YES ==
937          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
938     {
939       pos = pos->next;
940       continue;                 /* Skip already checked entries */
941     }
942     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
943     if (other_bits > bits)
944       return GNUNET_NO;
945     if (other_bits == bits)        /* We match the same number of bits */
946       return GNUNET_YES;
947     pos = pos->next;
948   }
949   /* No peers closer, we are the closest! */
950   return GNUNET_YES;
951 }
952
953
954 /**
955  * Select a peer from the routing table that would be a good routing
956  * destination for sending a message for "key".  The resulting peer
957  * must not be in the set of blocked peers.<p>
958  *
959  * Note that we should not ALWAYS select the closest peer to the
960  * target, peers further away from the target should be chosen with
961  * exponentially declining probability.
962  *
963  * FIXME: double-check that this is fine
964  * 
965  *
966  * @param key the key we are selecting a peer to route to
967  * @param bloom a bloomfilter containing entries this request has seen already
968  * @param hops how many hops has this message traversed thus far
969  * @return Peer to route to, or NULL on error
970  */
971 static struct PeerInfo *
972 select_peer (const GNUNET_HashCode *key,
973              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
974              uint32_t hops)
975 {
976   unsigned int bc;
977   unsigned int count;
978   unsigned int selected;
979   struct PeerInfo *pos;
980   unsigned int dist;
981   unsigned int smallest_distance;
982   struct PeerInfo *chosen;
983
984   if (hops >= GDS_NSE_get ())
985   {
986     /* greedy selection (closest peer that is not in bloomfilter) */
987     smallest_distance = UINT_MAX;
988     chosen = NULL;
989     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
990     {
991       pos = k_buckets[bc].head;
992       count = 0;
993       while ((pos != NULL) && (count < bucket_size))
994       {
995         if (GNUNET_NO ==
996             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
997         {
998           dist = get_distance (key, &pos->id.hashPubKey);
999           if (dist < smallest_distance)
1000           {
1001             chosen = pos;
1002             smallest_distance = dist;
1003           }
1004         }
1005         count++;
1006         pos = pos->next;
1007       }
1008     }
1009     return chosen;
1010   }
1011
1012   /* select "random" peer */
1013   /* count number of peers that are available and not filtered */
1014   count = 0;
1015   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1016   {
1017     pos = k_buckets[bc].head;
1018     while ((pos != NULL) && (count < bucket_size))
1019     {
1020       if (GNUNET_YES ==
1021           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1022       {
1023         pos = pos->next;
1024         continue;               /* Ignore bloomfiltered peers */
1025       }
1026       count++;
1027       pos = pos->next;
1028     }
1029   }
1030   if (count == 0)               /* No peers to select from! */
1031   {
1032     return NULL;
1033   }
1034   /* Now actually choose a peer */
1035   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1036   count = 0;
1037   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1038   {
1039     pos = k_buckets[bc].head;
1040     while ((pos != NULL) && (count < bucket_size))
1041     {
1042       if (GNUNET_YES ==
1043           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1044       {
1045         pos = pos->next;
1046         continue;               /* Ignore bloomfiltered peers */
1047       }
1048       if (0 == selected--)
1049         return pos;
1050       pos = pos->next;
1051     }
1052   }
1053   GNUNET_break (0);
1054   return NULL;
1055 }
1056
1057
1058 /**
1059  * Compute the set of peers that the given request should be
1060  * forwarded to.
1061  *
1062  * @param key routing key
1063  * @param bloom bloom filter excluding peers as targets, all selected
1064  *        peers will be added to the bloom filter
1065  * @param hop_count number of hops the request has traversed so far
1066  * @param target_replication desired number of replicas
1067  * @param targets where to store an array of target peers (to be
1068  *         free'd by the caller)
1069  * @return number of peers returned in 'targets'.
1070  */
1071 static unsigned int
1072 get_target_peers (const GNUNET_HashCode *key,
1073                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1074                   uint32_t hop_count,
1075                   uint32_t target_replication,
1076                   struct PeerInfo ***targets)
1077 {
1078   unsigned int ret;
1079   unsigned int off;
1080   struct PeerInfo **rtargets;
1081   struct PeerInfo *nxt;
1082
1083   ret = get_forward_count (hop_count, target_replication);
1084   if (ret == 0)
1085   {
1086     *targets = NULL;
1087     return 0;
1088   }
1089   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1090   off = 0;
1091   while (ret-- > 0)
1092   {
1093     nxt = select_peer (key, bloom, hop_count);
1094     if (nxt == NULL)
1095       break;
1096     rtargets[off++] = nxt;
1097     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1098   }
1099   if (0 == off)
1100   {
1101     GNUNET_free (rtargets);
1102     *targets = NULL;
1103     return 0;
1104   }
1105   *targets = rtargets;
1106   return off;
1107 }
1108
1109
1110 /**
1111  * Perform a PUT operation.   Forwards the given request to other
1112  * peers.   Does not store the data locally.  Does not give the
1113  * data to local clients.  May do nothing if this is the only
1114  * peer in the network (or if we are the closest peer in the
1115  * network).
1116  *
1117  * @param type type of the block
1118  * @param options routing options
1119  * @param desired_replication_level desired replication count
1120  * @param expiration_time when does the content expire
1121  * @param hop_count how many hops has this message traversed so far
1122  * @param bf Bloom filter of peers this PUT has already traversed
1123  * @param key key for the content
1124  * @param put_path_length number of entries in put_path
1125  * @param put_path peers this request has traversed so far (if tracked)
1126  * @param data payload to store
1127  * @param data_size number of bytes in data
1128  */
1129 void
1130 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1131                            enum GNUNET_DHT_RouteOption options,
1132                            uint32_t desired_replication_level,
1133                            struct GNUNET_TIME_Absolute expiration_time,
1134                            uint32_t hop_count,
1135                            struct GNUNET_CONTAINER_BloomFilter *bf,
1136                            const GNUNET_HashCode *key,
1137                            unsigned int put_path_length,
1138                            struct GNUNET_PeerIdentity *put_path,
1139                            const void *data,
1140                            size_t data_size)
1141 {
1142   unsigned int target_count;
1143   unsigned int i;
1144   struct PeerInfo **targets;
1145   struct PeerInfo *target;
1146   struct P2PPendingMessage *pending;
1147   size_t msize;
1148   struct PeerPutMessage *ppm;
1149   struct GNUNET_PeerIdentity *pp;
1150   
1151   target_count = get_target_peers (key, bf, hop_count,
1152                                    desired_replication_level,
1153                                    &targets);
1154   if (0 == target_count)
1155     return;
1156   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1157   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1158   {
1159     put_path_length = 0;
1160     msize = data_size + sizeof (struct PeerPutMessage);
1161   }
1162   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1163   {
1164     GNUNET_break (0);
1165     return;
1166   }
1167   for (i=0;i<target_count;i++)
1168   {
1169     target = targets[i];
1170     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1171     pending->importance = 0; /* FIXME */
1172     pending->timeout = expiration_time;   
1173     ppm = (struct PeerPutMessage*) &pending[1];
1174     pending->msg = &ppm->header;
1175     ppm->header.size = htons (msize);
1176     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1177     ppm->options = htonl (options);
1178     ppm->type = htonl (type);
1179     ppm->hop_count = htonl (hop_count + 1);
1180     ppm->desired_replication_level = htonl (desired_replication_level);
1181     ppm->put_path_length = htonl (put_path_length);
1182     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1183     GNUNET_assert (GNUNET_OK ==
1184                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1185                                                               ppm->bloomfilter,
1186                                                               DHT_BLOOM_SIZE));
1187     ppm->key = *key;
1188     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1189     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1190     memcpy (&pp[put_path_length], data, data_size);
1191     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1192                                       target->tail,
1193                                       pending);
1194     target->pending_count++;
1195     process_peer_queue (target);
1196   }
1197   GNUNET_free (targets);
1198 }
1199
1200
1201 /**
1202  * Perform a GET operation.  Forwards the given request to other
1203  * peers.  Does not lookup the key locally.  May do nothing if this is
1204  * the only peer in the network (or if we are the closest peer in the
1205  * network).
1206  *
1207  * @param type type of the block
1208  * @param options routing options
1209  * @param desired_replication_level desired replication count
1210  * @param hop_count how many hops did this request traverse so far?
1211  * @param key key for the content
1212  * @param xquery extended query
1213  * @param xquery_size number of bytes in xquery
1214  * @param reply_bf bloomfilter to filter duplicates
1215  * @param reply_bf_mutator mutator for reply_bf
1216  * @param peer_bf filter for peers not to select (again)
1217  */
1218 void
1219 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1220                            enum GNUNET_DHT_RouteOption options,
1221                            uint32_t desired_replication_level,
1222                            uint32_t hop_count,
1223                            const GNUNET_HashCode *key,
1224                            const void *xquery,
1225                            size_t xquery_size,
1226                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1227                            uint32_t reply_bf_mutator,
1228                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1229 {
1230   unsigned int target_count;
1231   unsigned int i;
1232   struct PeerInfo **targets;
1233   struct PeerInfo *target;
1234   struct P2PPendingMessage *pending;
1235   size_t msize;
1236   struct PeerGetMessage *pgm;
1237   char *xq;
1238   size_t reply_bf_size;
1239   
1240   target_count = get_target_peers (key, peer_bf, hop_count,
1241                                    desired_replication_level,
1242                                    &targets);
1243   if (0 == target_count)
1244     return;
1245   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1246   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1247   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1248   {
1249     GNUNET_break (0);
1250     return;
1251   }
1252   /* forward request */
1253   for (i=0;i<target_count;i++)
1254   {
1255     target = targets[i];
1256     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1257     pending->importance = 0; /* FIXME */
1258     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1259     pgm = (struct PeerGetMessage*) &pending[1];
1260     pending->msg = &pgm->header;
1261     pgm->header.size = htons (msize);
1262     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1263     pgm->options = htonl (options);
1264     pgm->type = htonl (type);
1265     pgm->hop_count = htonl (hop_count + 1);
1266     pgm->desired_replication_level = htonl (desired_replication_level);
1267     pgm->xquery_size = htonl (xquery_size);
1268     pgm->bf_mutator = reply_bf_mutator; 
1269     GNUNET_assert (GNUNET_OK ==
1270                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1271                                                               pgm->bloomfilter,
1272                                                               DHT_BLOOM_SIZE));
1273     pgm->key = *key;
1274     xq = (char *) &pgm[1];
1275     memcpy (xq, xquery, xquery_size);
1276     GNUNET_assert (GNUNET_OK ==
1277                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1278                                                               &xq[xquery_size],
1279                                                               reply_bf_size));
1280     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1281                                       target->tail,
1282                                       pending);
1283     target->pending_count++;
1284     process_peer_queue (target);
1285   }
1286   GNUNET_free (targets);
1287 }
1288
1289
1290 /**
1291  * Handle a reply (route to origin).  Only forwards the reply back to
1292  * the given peer.  Does not do local caching or forwarding to local
1293  * clients.
1294  *
1295  * @param target neighbour that should receive the block (if still connected)
1296  * @param type type of the block
1297  * @param expiration_time when does the content expire
1298  * @param key key for the content
1299  * @param put_path_length number of entries in put_path
1300  * @param put_path peers the original PUT traversed (if tracked)
1301  * @param get_path_length number of entries in put_path
1302  * @param get_path peers this reply has traversed so far (if tracked)
1303  * @param data payload of the reply
1304  * @param data_size number of bytes in data
1305  */
1306 void
1307 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1308                              enum GNUNET_BLOCK_Type type,
1309                              struct GNUNET_TIME_Absolute expiration_time,
1310                              const GNUNET_HashCode *key,
1311                              unsigned int put_path_length,
1312                              const struct GNUNET_PeerIdentity *put_path,
1313                              unsigned int get_path_length,
1314                              const struct GNUNET_PeerIdentity *get_path,
1315                              const void *data,
1316                              size_t data_size)
1317 {
1318   struct PeerInfo *pi;
1319   struct P2PPendingMessage *pending;
1320   size_t msize;
1321   struct PeerResultMessage *prm;
1322   struct GNUNET_PeerIdentity *paths;
1323   
1324   msize = data_size + sizeof (struct PeerResultMessage) + 
1325     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1326   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1327        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1328        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1329        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1330   {
1331     GNUNET_break (0);
1332     return;
1333   }
1334   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1335                                           &target->hashPubKey);
1336   if (NULL == pi)
1337   {
1338     /* peer disconnected in the meantime, drop reply */
1339     return;
1340   }
1341   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1342   pending->importance = 0; /* FIXME */
1343   pending->timeout = expiration_time;
1344   prm = (struct PeerResultMessage*) &pending[1];
1345   pending->msg = &prm->header;
1346   prm->header.size = htons (msize);
1347   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1348   prm->type = htonl (type);
1349   prm->put_path_length = htonl (put_path_length);
1350   prm->get_path_length = htonl (get_path_length);
1351   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1352   prm->key = *key;
1353   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1354   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1355   memcpy (&paths[put_path_length],
1356           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1357   memcpy (&paths[put_path_length + get_path_length],
1358           data, data_size);
1359   GNUNET_CONTAINER_DLL_insert (pi->head,
1360                                pi->tail,
1361                                pending);
1362   pi->pending_count++;
1363   process_peer_queue (pi);
1364 }
1365
1366
1367 /**
1368  * To be called on core init/fail.
1369  *
1370  * @param cls service closure
1371  * @param server handle to the server for this service
1372  * @param identity the public identity of this peer
1373  * @param publicKey the public key of this peer
1374  */
1375 static void
1376 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1377            const struct GNUNET_PeerIdentity *identity,
1378            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1379 {
1380   GNUNET_assert (server != NULL);
1381   my_identity = *identity;
1382 }
1383
1384
1385 /**
1386  * Core handler for p2p put requests.
1387  *
1388  * @param cls closure
1389  * @param peer sender of the request
1390  * @param message message
1391  * @param peer peer identity this notification is about
1392  * @param atsi performance data
1393  * @return GNUNET_OK to keep the connection open,
1394  *         GNUNET_SYSERR to close it (signal serious error)
1395  */
1396 static int
1397 handle_dht_p2p_put (void *cls,
1398                     const struct GNUNET_PeerIdentity *peer,
1399                     const struct GNUNET_MessageHeader *message,
1400                     const struct GNUNET_TRANSPORT_ATS_Information
1401                     *atsi)
1402 {
1403   const struct PeerPutMessage *put;
1404   const struct GNUNET_PeerIdentity *put_path;
1405   const void *payload;
1406   uint32_t putlen;
1407   uint16_t msize;
1408   size_t payload_size;
1409   enum GNUNET_DHT_RouteOption options;
1410   struct GNUNET_CONTAINER_BloomFilter *bf;
1411   GNUNET_HashCode test_key;
1412   
1413   msize = ntohs (message->size);
1414   if (msize < sizeof (struct PeerPutMessage))
1415   {
1416     GNUNET_break_op (0);
1417     return GNUNET_YES;
1418   }
1419   put = (const struct PeerPutMessage*) message;
1420   putlen = ntohl (put->put_path_length);
1421   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1422        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1423     {
1424       GNUNET_break_op (0);
1425       return GNUNET_YES;
1426     }
1427   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1428   payload = &put_path[putlen];
1429   options = ntohl (put->options);
1430   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1431                           putlen * sizeof (struct GNUNET_PeerIdentity));
1432   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1433                                 ntohl (put->type),
1434                                 payload, payload_size,
1435                                 &test_key))
1436   {
1437   case GNUNET_YES:
1438     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1439     {
1440       GNUNET_break_op (0);
1441       return GNUNET_YES;
1442     }
1443     break;
1444   case GNUNET_NO:
1445     GNUNET_break_op (0);
1446     return GNUNET_YES;
1447   case GNUNET_SYSERR:
1448     /* cannot verify, good luck */
1449     break;
1450   }
1451   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1452                                           DHT_BLOOM_SIZE,
1453                                           DHT_BLOOM_K);
1454   {
1455     struct GNUNET_PeerIdentity pp[putlen+1];
1456   
1457     /* extend 'put path' by sender */
1458     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1459     {
1460       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1461       pp[putlen] = *peer;
1462       putlen++;
1463     }
1464     else
1465       putlen = 0;
1466     
1467     /* give to local clients */
1468     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1469                              &put->key,
1470                              0, NULL,
1471                              putlen,
1472                              pp,
1473                              ntohl (put->type),
1474                              payload_size,
1475                              payload);
1476     /* store locally */
1477     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1478          (am_closest_peer (&put->key,
1479                            bf) ) )
1480       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1481                                 &put->key,
1482                                 putlen, pp,
1483                                 ntohl (put->type),
1484                                 payload_size,
1485                                 payload);
1486     /* route to other peers */
1487     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1488                                options,
1489                                ntohl (put->desired_replication_level),
1490                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1491                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1492                                bf,
1493                                &put->key,
1494                                putlen, pp,
1495                                payload,
1496                                payload_size);
1497   }
1498   GNUNET_CONTAINER_bloomfilter_free (bf);
1499   return GNUNET_YES;
1500 }
1501
1502
1503 /**
1504  * We have received a FIND PEER request.  Send matching
1505  * HELLOs back.
1506  *
1507  * @param sender sender of the FIND PEER request
1508  * @param key peers close to this key are desired
1509  * @param bf peers matching this bf are excluded
1510  * @param bf_mutator mutator for bf
1511  */
1512 static void
1513 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1514                   const GNUNET_HashCode *key,
1515                   struct GNUNET_CONTAINER_BloomFilter *bf,
1516                   uint32_t bf_mutator)
1517 {
1518   int bucket_idx;
1519   struct PeerBucket *bucket;
1520   struct PeerInfo *peer;
1521   unsigned int choice;
1522   GNUNET_HashCode mhash;
1523   const struct GNUNET_HELLO_Message *hello;
1524
1525   /* first, check about our own HELLO */
1526   if (NULL != GDS_my_hello)
1527   {
1528     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1529     if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))
1530       
1531       GDS_NEIGHBOURS_handle_reply (sender,
1532                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1533                                    GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1534                                    key,
1535                                    0, NULL,
1536                                    0, NULL,
1537                                    GDS_my_hello,
1538                                    GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message*) GDS_my_hello));
1539   }
1540
1541   /* then, also consider sending a random HELLO from the closest bucket */
1542   bucket_idx = find_bucket (key);
1543   if (bucket_idx == GNUNET_SYSERR)
1544     return;
1545   bucket = &k_buckets[bucket_idx];
1546   if (bucket->peers_size == 0)
1547     return;
1548   choice = GNUNET_CRYPTO_random_u32 (bucket->peers_size,
1549                                      GNUNET_CRYPTO_QUALITY_WEAK);
1550   peer = bucket->head;
1551   while (choice > 0)
1552   {
1553     GNUNET_assert (peer != NULL);
1554     peer = peer->next;
1555   }
1556   choice = bucket->peers_size;
1557   do
1558     {
1559       peer = peer->next;
1560       if (choice-- == 0)
1561         return; /* no non-masked peer available */
1562       if (peer == NULL)
1563         peer = bucket->head;
1564       GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1565       hello = GDS_HELLO_get (&peer->id);
1566     }
1567   while ( (hello == NULL) ||
1568           (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) );
1569   GDS_NEIGHBOURS_handle_reply (sender,
1570                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1571                                GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1572                                key,
1573                                0, NULL,
1574                                0, NULL,
1575                                hello,
1576                                GNUNET_HELLO_size (hello));    
1577 }
1578
1579
1580 /**
1581  * Core handler for p2p get requests.
1582  *
1583  * @param cls closure
1584  * @param peer sender of the request
1585  * @param message message
1586  * @param peer peer identity this notification is about
1587  * @param atsi performance data
1588  * @return GNUNET_OK to keep the connection open,
1589  *         GNUNET_SYSERR to close it (signal serious error)
1590  */
1591 static int
1592 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1593                     const struct GNUNET_MessageHeader *message,
1594                     const struct GNUNET_TRANSPORT_ATS_Information
1595                     *atsi)
1596 {
1597   struct PeerGetMessage *get;
1598   uint32_t xquery_size;
1599   size_t reply_bf_size;
1600   uint16_t msize;
1601   enum GNUNET_BLOCK_Type type;
1602   enum GNUNET_DHT_RouteOption options;
1603   enum GNUNET_BLOCK_EvaluationResult eval;
1604   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1605   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1606   const char *xquery;
1607                       
1608   /* parse and validate message */
1609   msize = ntohs (message->size);
1610   if (msize < sizeof (struct PeerGetMessage))
1611   {
1612     GNUNET_break_op (0);
1613     return GNUNET_YES;
1614   }
1615   get = (struct PeerGetMessage *) message;
1616   xquery_size = ntohl (get->xquery_size);
1617   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1618   {
1619     GNUNET_break_op (0);
1620     return GNUNET_YES;
1621   }
1622   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1623   type = ntohl (get->type);
1624   options = ntohl (get->options);
1625   xquery = (const char*) &get[1];
1626   reply_bf = NULL;
1627   if (reply_bf_size > 0)
1628     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1629                                                   reply_bf_size,
1630                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1631   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1632                                 type,
1633                                 &get->key,
1634                                 &reply_bf,
1635                                 get->bf_mutator,
1636                                 xquery, xquery_size,
1637                                 NULL, 0);
1638   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1639   {
1640     /* request invalid or block type not supported */
1641     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1642     if (NULL != reply_bf)
1643       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1644     return GNUNET_YES;
1645   }
1646   peer_bf =
1647     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1648                                        DHT_BLOOM_SIZE,
1649                                        DHT_BLOOM_K);
1650
1651   /* remember request for routing replies */
1652   GDS_ROUTING_add (peer,
1653                    type,
1654                    options,
1655                    &get->key,
1656                    xquery, xquery_size,
1657                    reply_bf, get->bf_mutator);
1658
1659   /* local lookup (this may update the reply_bf) */
1660   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1661        (am_closest_peer (&get->key,
1662                          peer_bf) ) )
1663   {
1664     if ( (0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1665     {
1666       handle_find_peer (peer,
1667                         &get->key,
1668                         reply_bf,
1669                         get->bf_mutator);
1670     }
1671     else
1672     {
1673       eval = GDS_DATACACHE_handle_get (&get->key,
1674                                        type,
1675                                        xquery, xquery_size,
1676                                        &reply_bf, 
1677                                        get->bf_mutator);
1678     }
1679   }
1680   
1681   /* P2P forwarding */
1682   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1683     GDS_NEIGHBOURS_handle_get (type,
1684                                options,
1685                                ntohl (get->desired_replication_level),
1686                                ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1687                                &get->key,
1688                                xquery, xquery_size,
1689                                reply_bf,
1690                                get->bf_mutator,
1691                                peer_bf);
1692   /* clean up */
1693   if (NULL != reply_bf)
1694     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1695   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1696   return GNUNET_YES;
1697 }
1698
1699
1700 /**
1701  * Core handler for p2p result messages.
1702  *
1703  * @param cls closure
1704  * @param message message
1705  * @param peer peer identity this notification is about
1706  * @param atsi performance data
1707  * @return GNUNET_YES (do not cut p2p connection)
1708  */
1709 static int
1710 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1711                        const struct GNUNET_MessageHeader *message,
1712                        const struct GNUNET_TRANSPORT_ATS_Information
1713                        *atsi)
1714 {
1715   const struct PeerResultMessage *prm;
1716   const struct GNUNET_PeerIdentity *put_path;
1717   const struct GNUNET_PeerIdentity *get_path;
1718   const void *data;
1719   uint32_t get_path_length;
1720   uint32_t put_path_length;
1721   uint16_t msize;
1722   size_t data_size;
1723   enum GNUNET_BLOCK_Type type;
1724                        
1725   /* parse and validate message */
1726   msize = ntohs (message->size);
1727   if (msize < sizeof (struct PeerResultMessage))
1728   {
1729     GNUNET_break_op (0);
1730     return GNUNET_YES;
1731   }
1732   prm = (struct PeerResultMessage *) message;
1733   put_path_length = ntohl (prm->put_path_length);
1734   get_path_length = ntohl (prm->get_path_length);
1735   if ( (msize < sizeof (struct PeerResultMessage) + 
1736         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1737        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1738        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1739   {
1740     GNUNET_break_op (0);
1741     return GNUNET_YES;
1742   } 
1743   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1744   get_path = &put_path[put_path_length];
1745   type = ntohl (prm->type);
1746   data = (const void*) &get_path[get_path_length];
1747   data_size = msize - (sizeof (struct PeerResultMessage) + 
1748                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1749
1750   /* if we got a HELLO, consider it for our own routing table */
1751   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1752   {
1753     const struct GNUNET_MessageHeader *h;
1754     struct GNUNET_PeerIdentity pid;
1755     int bucket;
1756
1757     /* Should be a HELLO, validate and consider using it! */
1758     if (data_size < sizeof (struct GNUNET_MessageHeader))
1759     {
1760       GNUNET_break_op (0);
1761       return GNUNET_YES;
1762     }
1763     h = data;
1764     if (data_size != ntohs (h->size))
1765     {
1766       GNUNET_break_op (0);
1767       return GNUNET_YES;
1768     }
1769     if (GNUNET_OK !=
1770         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) h,
1771                              &pid))
1772     {
1773       GNUNET_break_op (0);
1774       return GNUNET_YES;
1775     }
1776     bucket = find_bucket (&pid.hashPubKey);
1777     if ( (bucket >= 0) &&
1778          (k_buckets[bucket].peers_size < bucket_size) )
1779     {    
1780       if (NULL != GDS_transport_handle)
1781         GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
1782                                       h, NULL, NULL);
1783       (void) GNUNET_CORE_peer_request_connect (coreAPI,
1784                                                &pid, 
1785                                                NULL, NULL);
1786     }   
1787   }
1788
1789   /* append 'peer' to 'get_path' */
1790   {    
1791     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1792
1793     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1794     xget_path[get_path_length] = *peer;
1795     get_path_length++;
1796
1797     /* forward to local clients */   
1798     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1799                              &prm->key,
1800                              get_path_length,
1801                              xget_path,
1802                              put_path_length,
1803                              put_path,
1804                              type,
1805                              data_size, 
1806                              data);
1807
1808     /* forward to other peers */
1809     GDS_ROUTING_process (type,
1810                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1811                          &prm->key,
1812                          put_path_length,
1813                          put_path,
1814                          get_path_length,
1815                          xget_path,
1816                          data,
1817                          data_size);                     
1818   }
1819   return GNUNET_YES;
1820 }
1821
1822
1823 /**
1824  * Initialize neighbours subsystem.
1825  *
1826  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1827  */
1828 int
1829 GDS_NEIGHBOURS_init ()
1830 {
1831   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1832     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1833     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1834     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1835     {NULL, 0, 0}
1836   };
1837   unsigned long long temp_config_num;
1838  
1839   if (GNUNET_OK ==
1840       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1841                                              &temp_config_num))
1842     bucket_size = (unsigned int) temp_config_num;  
1843   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1844                                  1,
1845                                  NULL,
1846                                  &core_init,
1847                                  &handle_core_connect,
1848                                  &handle_core_disconnect, 
1849                                  NULL,  /* Do we care about "status" updates? */
1850                                  NULL, GNUNET_NO,
1851                                  NULL, GNUNET_NO,
1852                                  core_handlers);
1853   if (coreAPI == NULL)
1854     return GNUNET_SYSERR;
1855   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1856   return GNUNET_OK;
1857 }
1858
1859
1860 /**
1861  * Shutdown neighbours subsystem.
1862  */
1863 void
1864 GDS_NEIGHBOURS_done ()
1865 {
1866   if (coreAPI == NULL)
1867     return;
1868   GNUNET_CORE_disconnect (coreAPI);
1869   coreAPI = NULL;    
1870   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1871   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1872   all_known_peers = NULL;
1873   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1874   {
1875     GNUNET_SCHEDULER_cancel (find_peer_task);
1876     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1877   }
1878 }
1879
1880
1881 /* end of gnunet-service-dht_neighbours.c */