track HELLOs from peerinfo
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "dht.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50
51 /**
52  * How many buckets will we allow total.
53  */
54 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
55
56 /**
57  * What is the maximum number of peers in a given bucket.
58  */
59 #define DEFAULT_BUCKET_SIZE 4
60
61 /**
62  * Size of the bloom filter the DHT uses to filter peers.
63  */
64 #define DHT_BLOOM_SIZE 128
65
66 /**
67  * Desired replication level for FIND PEER requests
68  */
69 #define FIND_PEER_REPLICATION_LEVEL 4
70
71 /**
72  * Maximum allowed replication level for all requests.
73  */
74 #define MAXIMUM_REPLICATION_LEVEL 16
75
76 /**
77  * How often to update our preference levels for peers in our routing tables.
78  */
79 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
80
81 /**
82  * How long at least to wait before sending another find peer request.
83  */
84 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
85
86 /**
87  * How long at most to wait before sending another find peer request.
88  */
89 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
90
91 /**
92  * How long at most to wait for transmission of a GET request to another peer?
93  */
94 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
95
96
97 /**
98  * P2P PUT message
99  */
100 struct PeerPutMessage
101 {
102   /**
103    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
104    */
105   struct GNUNET_MessageHeader header;
106
107   /**
108    * Processing options
109    */
110   uint32_t options GNUNET_PACKED;
111
112   /**
113    * Content type.
114    */
115   uint32_t type GNUNET_PACKED;
116
117   /**
118    * Hop count
119    */
120   uint32_t hop_count GNUNET_PACKED;
121
122   /**
123    * Replication level for this message
124    */
125   uint32_t desired_replication_level GNUNET_PACKED;
126
127   /**
128    * Length of the PUT path that follows (if tracked).
129    */
130   uint32_t put_path_length GNUNET_PACKED;
131
132   /**
133    * When does the content expire?
134    */
135   struct GNUNET_TIME_AbsoluteNBO expiration_time;
136
137   /**
138    * Bloomfilter (for peer identities) to stop circular routes
139    */
140   char bloomfilter[DHT_BLOOM_SIZE];
141
142   /**
143    * The key we are storing under.
144    */
145   GNUNET_HashCode key;
146
147   /* put path (if tracked) */
148
149   /* Payload */
150
151 };
152
153
154 /**
155  * P2P Result message
156  */
157 struct PeerResultMessage
158 {
159   /**
160    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
161    */
162   struct GNUNET_MessageHeader header;
163
164   /**
165    * Content type.
166    */
167   uint32_t type GNUNET_PACKED;
168
169   /**
170    * Length of the PUT path that follows (if tracked).
171    */
172   uint32_t put_path_length GNUNET_PACKED;
173
174   /**
175    * Length of the GET path that follows (if tracked).
176    */
177   uint32_t get_path_length GNUNET_PACKED;
178
179   /**
180    * When does the content expire?
181    */
182   struct GNUNET_TIME_AbsoluteNBO expiration_time;
183
184   /**
185    * The key of the corresponding GET request.
186    */
187   GNUNET_HashCode key;
188
189   /* put path (if tracked) */
190
191   /* get path (if tracked) */
192
193   /* Payload */
194
195 };
196
197
198 /**
199  * P2P GET message
200  */
201 struct PeerGetMessage
202 {
203   /**
204    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
205    */
206   struct GNUNET_MessageHeader header;
207
208   /**
209    * Processing options
210    */
211   uint32_t options GNUNET_PACKED;
212
213   /**
214    * Desired content type.
215    */
216   uint32_t type GNUNET_PACKED;
217
218   /**
219    * Hop count
220    */
221   uint32_t hop_count GNUNET_PACKED;
222
223   /**
224    * Desired replication level for this request.
225    */
226   uint32_t desired_replication_level GNUNET_PACKED;
227
228   /**
229    * Size of the extended query.
230    */
231   uint32_t xquery_size;
232
233   /**
234    * Bloomfilter mutator.
235    */
236   uint32_t bf_mutator;
237
238   /**
239    * Bloomfilter (for peer identities) to stop circular routes
240    */
241   char bloomfilter[DHT_BLOOM_SIZE];
242
243   /**
244    * The key we are looking for.
245    */
246   GNUNET_HashCode key;
247
248   /* xquery */
249
250   /* result bloomfilter */
251
252 };
253
254
255 /**
256  * Linked list of messages to send to a particular other peer.
257  */
258 struct P2PPendingMessage
259 {
260   /**
261    * Pointer to next item in the list
262    */
263   struct P2PPendingMessage *next;
264
265   /**
266    * Pointer to previous item in the list
267    */
268   struct P2PPendingMessage *prev;
269
270   /**
271    * Message importance level.  FIXME: used? useful?
272    */
273   unsigned int importance;
274
275   /**
276    * When does this message time out?
277    */
278   struct GNUNET_TIME_Absolute timeout;
279
280   /**
281    * Actual message to be sent, allocated at the end of the struct:
282    * // msg = (cast) &pm[1]; 
283    * // memcpy (&pm[1], data, len);
284    */
285   const struct GNUNET_MessageHeader *msg;
286
287 };
288
289
290 /**
291  * Entry for a peer in a bucket.
292  */
293 struct PeerInfo
294 {
295   /**
296    * Next peer entry (DLL)
297    */
298   struct PeerInfo *next;
299
300   /**
301    *  Prev peer entry (DLL)
302    */
303   struct PeerInfo *prev;
304
305   /**
306    * Count of outstanding messages for peer.  FIXME: NEEDED?
307    * FIXME: bound queue size!?
308    */
309   unsigned int pending_count;
310
311   /**
312    * Head of pending messages to be sent to this peer.
313    */
314   struct P2PPendingMessage *head;
315
316   /**
317    * Tail of pending messages to be sent to this peer.
318    */
319   struct P2PPendingMessage *tail;
320
321   /**
322    * Core handle for sending messages to this peer.
323    */
324   struct GNUNET_CORE_TransmitHandle *th;
325
326   /**
327    * Preference update context
328    */
329   struct GNUNET_CORE_InformationRequestContext *info_ctx;
330
331   /**
332    * Task for scheduling preference updates
333    */
334   GNUNET_SCHEDULER_TaskIdentifier preference_task;
335
336   /**
337    * What is the identity of the peer?
338    */
339   struct GNUNET_PeerIdentity id;
340
341 #if 0
342   /**
343    * What is the average latency for replies received?
344    */
345   struct GNUNET_TIME_Relative latency;
346
347   /**
348    * Transport level distance to peer.
349    */
350   unsigned int distance;
351 #endif
352
353 };
354
355
356 /**
357  * Peers are grouped into buckets.
358  */
359 struct PeerBucket
360 {
361   /**
362    * Head of DLL
363    */
364   struct PeerInfo *head;
365
366   /**
367    * Tail of DLL
368    */
369   struct PeerInfo *tail;
370
371   /**
372    * Number of peers in the bucket.
373    */
374   unsigned int peers_size;
375 };
376
377
378 /**
379  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
380  */
381 static unsigned int closest_bucket;
382
383 /**
384  * How many peers have we added since we sent out our last
385  * find peer request?
386  */
387 static unsigned int newly_found_peers;
388
389 /**
390  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
391  */
392 static struct PeerBucket k_buckets[MAX_BUCKETS];
393
394 /**
395  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
396  */
397 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
398
399 /**
400  * Maximum size for each bucket.
401  */
402 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
403
404 /**
405  * Task that sends FIND PEER requests.
406  */
407 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
408
409 /**
410  * Identity of this peer.
411  */ 
412 static struct GNUNET_PeerIdentity my_identity;
413
414 /**
415  * Handle to GNUnet core.
416  */
417 static struct GNUNET_CORE_Handle *coreAPI;
418
419
420 /**
421  * Find the optimal bucket for this key.
422  *
423  * @param hc the hashcode to compare our identity to
424  * @return the proper bucket index, or GNUNET_SYSERR
425  *         on error (same hashcode)
426  */
427 static int
428 find_bucket (const GNUNET_HashCode * hc)
429 {
430   unsigned int bits;
431
432   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
433   if (bits == MAX_BUCKETS)
434     {
435       /* How can all bits match? Got my own ID? */
436       GNUNET_break (0);
437       return GNUNET_SYSERR; 
438     }
439   return MAX_BUCKETS - bits - 1;
440 }
441
442
443 /**
444  * Let GNUnet core know that we like the given peer.
445  *
446  * @param cls the 'struct PeerInfo' of the peer
447  * @param tc scheduler context.
448  */ 
449 static void
450 update_core_preference (void *cls,
451                         const struct GNUNET_SCHEDULER_TaskContext *tc);
452
453
454 /**
455  * Function called with statistics about the given peer.
456  *
457  * @param cls closure
458  * @param peer identifies the peer
459  * @param bpm_out set to the current bandwidth limit (sending) for this peer
460  * @param amount set to the amount that was actually reserved or unreserved;
461  *               either the full requested amount or zero (no partial reservations)
462  * @param res_delay if the reservation could not be satisfied (amount was 0), how
463  *        long should the client wait until re-trying?
464  * @param preference current traffic preference for the given peer
465  */
466 static void
467 update_core_preference_finish (void *cls,
468                                const struct GNUNET_PeerIdentity *peer,
469                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
470                                int32_t amount,
471                                struct GNUNET_TIME_Relative res_delay,
472                                uint64_t preference)
473 {
474   struct PeerInfo *peer_info = cls;
475
476   peer_info->info_ctx = NULL;
477   peer_info->preference_task
478     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
479                                     &update_core_preference, peer_info);
480 }
481
482
483 /**
484  * Let GNUnet core know that we like the given peer.
485  *
486  * @param cls the 'struct PeerInfo' of the peer
487  * @param tc scheduler context.
488  */ 
489 static void
490 update_core_preference (void *cls,
491                         const struct GNUNET_SCHEDULER_TaskContext *tc)
492 {
493   struct PeerInfo *peer = cls;
494   uint64_t preference;
495   unsigned int matching;
496   int bucket;
497
498   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
499   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
500     return;  
501   matching =
502     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
503                                       &peer->id.hashPubKey);
504   if (matching >= 64)
505     matching = 63;
506   bucket = find_bucket(&peer->id.hashPubKey);
507   if (bucket == GNUNET_SYSERR)
508     preference = 0;
509   else
510     preference = (1LL << matching) / k_buckets[bucket].peers_size;
511   if (preference == 0)
512     {
513       peer->preference_task
514         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
515                                         &update_core_preference, peer);
516       return;
517     }
518   peer->info_ctx =
519     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
520                                         GNUNET_TIME_UNIT_FOREVER_REL,
521                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
522                                         preference,
523                                         &update_core_preference_finish, peer);
524 }
525
526
527 /**
528  * Closure for 'add_known_to_bloom'.
529  */
530 struct BloomConstructorContext
531 {
532   /**
533    * Bloom filter under construction.
534    */
535   struct GNUNET_CONTAINER_BloomFilter *bloom;
536
537   /**
538    * Mutator to use.
539    */
540   uint32_t bf_mutator;
541 };
542
543
544 /**
545  * Add each of the peers we already know to the bloom filter of
546  * the request so that we don't get duplicate HELLOs.
547  *
548  * @param cls the 'struct BloomConstructorContext'.
549  * @param key peer identity to add to the bloom filter
550  * @param value value the peer information (unused)
551  * @return GNUNET_YES (we should continue to iterate)
552  */
553 static int
554 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
555 {
556   struct BloomConstructorContext *ctx = cls;
557   GNUNET_HashCode mh;
558
559   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
560   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
561   return GNUNET_YES;
562 }
563
564
565 /**
566  * Task to send a find peer message for our own peer identifier
567  * so that we can find the closest peers in the network to ourselves
568  * and attempt to connect to them.
569  *
570  * @param cls closure for this task
571  * @param tc the context under which the task is running
572  */
573 static void
574 send_find_peer_message (void *cls,
575                         const struct GNUNET_SCHEDULER_TaskContext *tc)
576 {
577   struct GNUNET_TIME_Relative next_send_time;
578   struct BloomConstructorContext bcc;
579
580   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
581   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
582     return;
583   if (newly_found_peers > bucket_size) 
584   {
585     /* If we are finding many peers already, no need to send out our request right now! */
586     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
587                                                    &send_find_peer_message, NULL);
588     newly_found_peers = 0;
589     return;
590   }
591   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
592   bcc.bloom =
593     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
594   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
595                                          &add_known_to_bloom,
596                                          &bcc);
597   // FIXME: pass priority!?
598   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
599                              GNUNET_DHT_RO_FIND_PEER,
600                              FIND_PEER_REPLICATION_LEVEL,
601                              0,
602                              &my_identity.hashPubKey,
603                              NULL, 0,
604                              bcc.bloom, bcc.bf_mutator, NULL);
605   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
606   /* schedule next round */
607   newly_found_peers = 0;
608   next_send_time.rel_value =
609     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
610     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
611                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
612   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
613                                                  &send_find_peer_message,
614                                                  NULL);  
615 }
616
617
618 /**
619  * Method called whenever a peer connects.
620  *
621  * @param cls closure
622  * @param peer peer identity this notification is about
623  * @param atsi performance data
624  */
625 static void
626 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
627                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
628 {
629   struct PeerInfo *ret;
630   int peer_bucket;
631
632   /* Check for connect to self message */
633   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
634     return;
635   if (GNUNET_YES ==
636       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
637                                               &peer->hashPubKey))
638   {
639     GNUNET_break (0);
640     return;
641   }
642   peer_bucket = find_bucket (&peer->hashPubKey);
643   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
644   ret = GNUNET_malloc (sizeof (struct PeerInfo));
645 #if 0
646   ret->latency = latency;
647   ret->distance = distance;
648 #endif
649   ret->id = *peer;
650   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
651                                     k_buckets[peer_bucket].tail, ret);
652   k_buckets[peer_bucket].peers_size++;
653   closest_bucket = GNUNET_MAX (closest_bucket,
654                                peer_bucket);
655   if ( (peer_bucket > 0) &&
656        (k_buckets[peer_bucket].peers_size <= bucket_size) )
657     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
658   newly_found_peers++;
659   GNUNET_assert (GNUNET_OK ==
660                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
661                                                     &peer->hashPubKey, ret,
662                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
663   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
664   {
665     /* got a first connection, good time to start with FIND PEER requests... */
666     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
667                                                NULL);    
668   }
669 }
670
671
672 /**
673  * Method called whenever a peer disconnects.
674  *
675  * @param cls closure
676  * @param peer peer identity this notification is about
677  */
678 static void
679 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
680 {
681   struct PeerInfo *to_remove;
682   int current_bucket;
683   struct P2PPendingMessage *pos;
684
685   /* Check for disconnect from self message */
686   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
687     return;
688   to_remove =
689       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
690   if (NULL == to_remove)
691     {
692       GNUNET_break (0);
693       return;
694     }
695   GNUNET_assert (GNUNET_YES ==
696                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
697                                                        &peer->hashPubKey,
698                                                        to_remove));
699   if (NULL != to_remove->info_ctx)
700   {
701     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
702     to_remove->info_ctx = NULL;
703   }
704   current_bucket = find_bucket (&to_remove->id.hashPubKey);
705   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
706                                k_buckets[current_bucket].tail,
707                                to_remove);
708   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
709   k_buckets[current_bucket].peers_size--;
710   while ( (closest_bucket > 0) &&
711           (k_buckets[closest_bucket].peers_size == 0) )
712     closest_bucket--;
713
714   if (to_remove->th != NULL) 
715   {
716     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
717     to_remove->th = NULL;
718   }
719   while (NULL != (pos = to_remove->head))
720   {
721     GNUNET_CONTAINER_DLL_remove (to_remove->head,
722                                  to_remove->tail,
723                                  pos);
724     GNUNET_free (pos);
725   }
726 }
727
728
729 /**
730  * Called when core is ready to send a message we asked for
731  * out to the destination.
732  *
733  * @param cls the 'struct PeerInfo' of the target peer
734  * @param size number of bytes available in buf
735  * @param buf where the callee should write the message
736  * @return number of bytes written to buf
737  */
738 static size_t
739 core_transmit_notify (void *cls, size_t size, void *buf)
740 {
741   struct PeerInfo *peer = cls;
742   char *cbuf = buf;
743   struct P2PPendingMessage *pending;
744   size_t off;
745   size_t msize;
746
747   peer->th = NULL;
748   if (buf == NULL)
749   {
750     /* client disconnected */
751     return 0;
752   }
753   if (peer->head == NULL)
754   {
755     /* no messages pending */
756     return 0;
757   }
758   off = 0;
759   while ( (NULL != (pending = peer->head)) &&
760           (size - off >= (msize = ntohs (pending->msg->size))) )
761   {
762     memcpy (&cbuf[off], pending->msg, msize);
763     off += msize;
764     peer->pending_count--;
765     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
766     GNUNET_free (pending);
767   }
768   if (peer->head != NULL)
769     peer->th 
770       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
771                                            pending->importance,
772                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
773                                            &peer->id, msize,
774                                            &core_transmit_notify, peer);
775   return off;
776 }
777
778
779 /**
780  * Transmit all messages in the peer's message queue.
781  *
782  * @param peer message queue to process
783  */
784 static void
785 process_peer_queue (struct PeerInfo *peer)
786 {
787   struct P2PPendingMessage *pending;
788
789   if (NULL != (pending = peer->head))
790     return;
791   if (NULL != peer->th)
792     return;
793   peer->th 
794     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
795                                          pending->importance,
796                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
797                                          &peer->id,
798                                          ntohs (pending->msg->size),
799                                          &core_transmit_notify, peer);
800 }
801
802
803 /**
804  * To how many peers should we (on average) forward the request to
805  * obtain the desired target_replication count (on average).
806  *
807  * @param hop_count number of hops the message has traversed
808  * @param target_replication the number of total paths desired
809  * @return Some number of peers to forward the message to
810  */
811 static unsigned int
812 get_forward_count (uint32_t hop_count, 
813                    uint32_t target_replication)
814 {
815   uint32_t random_value;
816   uint32_t forward_count;
817   float target_value;
818
819   if (hop_count > GDS_NSE_get () * 4.0)
820   {
821     /* forcefully terminate */
822     return 0;
823   }
824   if (hop_count > GDS_NSE_get () * 2.0)
825   {
826     /* Once we have reached our ideal number of hops, only forward to 1 peer */
827     return 1;
828   }
829   /* bound by system-wide maximum */
830   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
831                                    target_replication);
832   target_value =
833     1 + (target_replication - 1.0) / (GDS_NSE_get () +
834                                       ((float) (target_replication - 1.0) *
835                                        hop_count));
836   /* Set forward count to floor of target_value */
837   forward_count = (uint32_t) target_value;
838   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
839   target_value = target_value - forward_count;
840   random_value =
841     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
842   if (random_value < (target_value * UINT32_MAX))
843     forward_count++;
844   return forward_count;
845 }
846
847
848 /**
849  * Compute the distance between have and target as a 32-bit value.
850  * Differences in the lower bits must count stronger than differences
851  * in the higher bits.
852  *
853  * @return 0 if have==target, otherwise a number
854  *           that is larger as the distance between
855  *           the two hash codes increases
856  */
857 static unsigned int
858 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
859 {
860   unsigned int bucket;
861   unsigned int msb;
862   unsigned int lsb;
863   unsigned int i;
864
865   /* We have to represent the distance between two 2^9 (=512)-bit
866    * numbers as a 2^5 (=32)-bit number with "0" being used for the
867    * two numbers being identical; furthermore, we need to
868    * guarantee that a difference in the number of matching
869    * bits is always represented in the result.
870    *
871    * We use 2^32/2^9 numerical values to distinguish between
872    * hash codes that have the same LSB bit distance and
873    * use the highest 2^9 bits of the result to signify the
874    * number of (mis)matching LSB bits; if we have 0 matching
875    * and hence 512 mismatching LSB bits we return -1 (since
876    * 512 itself cannot be represented with 9 bits) */
877
878   /* first, calculate the most significant 9 bits of our
879    * result, aka the number of LSBs */
880   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
881   /* bucket is now a value between 0 and 512 */
882   if (bucket == 512)
883     return 0;                   /* perfect match */
884   if (bucket == 0)
885     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
886                                  * below, we'd end up with max+1 (overflow)) */
887
888   /* calculate the most significant bits of the final result */
889   msb = (512 - bucket) << (32 - 9);
890   /* calculate the 32-9 least significant bits of the final result by
891    * looking at the differences in the 32-9 bits following the
892    * mismatching bit at 'bucket' */
893   lsb = 0;
894   for (i = bucket + 1;
895        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
896   {
897     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
898         GNUNET_CRYPTO_hash_get_bit (have, i))
899       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
900                                                  * last bit set will be 31 -- if
901                                                  * i does not reach 512 first... */
902   }
903   return msb | lsb;
904 }
905
906
907 /**
908  * Check whether my identity is closer than any known peers.  If a
909  * non-null bloomfilter is given, check if this is the closest peer
910  * that hasn't already been routed to.
911  *
912  * @param key hash code to check closeness to
913  * @param bloom bloomfilter, exclude these entries from the decision
914  * @return GNUNET_YES if node location is closest,
915  *         GNUNET_NO otherwise.
916  */
917 static int
918 am_closest_peer (const GNUNET_HashCode *key,
919                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
920 {
921   int bits;
922   int other_bits;
923   int bucket_num;
924   int count;
925   struct PeerInfo *pos;
926   unsigned int my_distance;
927
928   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
929     return GNUNET_YES;
930   bucket_num = find_bucket (key);
931   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
932   my_distance = get_distance (&my_identity.hashPubKey, key);
933   pos = k_buckets[bucket_num].head;
934   count = 0;
935   while ((pos != NULL) && (count < bucket_size))
936   {
937     if ((bloom != NULL) &&
938         (GNUNET_YES ==
939          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
940     {
941       pos = pos->next;
942       continue;                 /* Skip already checked entries */
943     }
944     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
945     if (other_bits > bits)
946       return GNUNET_NO;
947     if (other_bits == bits)        /* We match the same number of bits */
948       return GNUNET_YES;
949     pos = pos->next;
950   }
951   /* No peers closer, we are the closest! */
952   return GNUNET_YES;
953 }
954
955
956 /**
957  * Select a peer from the routing table that would be a good routing
958  * destination for sending a message for "key".  The resulting peer
959  * must not be in the set of blocked peers.<p>
960  *
961  * Note that we should not ALWAYS select the closest peer to the
962  * target, peers further away from the target should be chosen with
963  * exponentially declining probability.
964  *
965  * FIXME: double-check that this is fine
966  * 
967  *
968  * @param key the key we are selecting a peer to route to
969  * @param bloom a bloomfilter containing entries this request has seen already
970  * @param hops how many hops has this message traversed thus far
971  * @return Peer to route to, or NULL on error
972  */
973 static struct PeerInfo *
974 select_peer (const GNUNET_HashCode *key,
975              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
976              uint32_t hops)
977 {
978   unsigned int bc;
979   unsigned int count;
980   unsigned int selected;
981   struct PeerInfo *pos;
982   unsigned int dist;
983   unsigned int smallest_distance;
984   struct PeerInfo *chosen;
985
986   if (hops >= GDS_NSE_get ())
987   {
988     /* greedy selection (closest peer that is not in bloomfilter) */
989     smallest_distance = UINT_MAX;
990     chosen = NULL;
991     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
992     {
993       pos = k_buckets[bc].head;
994       count = 0;
995       while ((pos != NULL) && (count < bucket_size))
996       {
997         if (GNUNET_NO ==
998             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
999         {
1000           dist = get_distance (key, &pos->id.hashPubKey);
1001           if (dist < smallest_distance)
1002           {
1003             chosen = pos;
1004             smallest_distance = dist;
1005           }
1006         }
1007         count++;
1008         pos = pos->next;
1009       }
1010     }
1011     return chosen;
1012   }
1013
1014   /* select "random" peer */
1015   /* count number of peers that are available and not filtered */
1016   count = 0;
1017   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1018   {
1019     pos = k_buckets[bc].head;
1020     while ((pos != NULL) && (count < bucket_size))
1021     {
1022       if (GNUNET_YES ==
1023           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1024       {
1025         pos = pos->next;
1026         continue;               /* Ignore bloomfiltered peers */
1027       }
1028       count++;
1029       pos = pos->next;
1030     }
1031   }
1032   if (count == 0)               /* No peers to select from! */
1033   {
1034     return NULL;
1035   }
1036   /* Now actually choose a peer */
1037   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1038   count = 0;
1039   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1040   {
1041     pos = k_buckets[bc].head;
1042     while ((pos != NULL) && (count < bucket_size))
1043     {
1044       if (GNUNET_YES ==
1045           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1046       {
1047         pos = pos->next;
1048         continue;               /* Ignore bloomfiltered peers */
1049       }
1050       if (0 == selected--)
1051         return pos;
1052       pos = pos->next;
1053     }
1054   }
1055   GNUNET_break (0);
1056   return NULL;
1057 }
1058
1059
1060 /**
1061  * Compute the set of peers that the given request should be
1062  * forwarded to.
1063  *
1064  * @param key routing key
1065  * @param bloom bloom filter excluding peers as targets, all selected
1066  *        peers will be added to the bloom filter
1067  * @param hop_count number of hops the request has traversed so far
1068  * @param target_replication desired number of replicas
1069  * @param targets where to store an array of target peers (to be
1070  *         free'd by the caller)
1071  * @return number of peers returned in 'targets'.
1072  */
1073 static unsigned int
1074 get_target_peers (const GNUNET_HashCode *key,
1075                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1076                   uint32_t hop_count,
1077                   uint32_t target_replication,
1078                   struct PeerInfo ***targets)
1079 {
1080   unsigned int ret;
1081   unsigned int off;
1082   struct PeerInfo **rtargets;
1083   struct PeerInfo *nxt;
1084
1085   ret = get_forward_count (hop_count, target_replication);
1086   if (ret == 0)
1087   {
1088     *targets = NULL;
1089     return 0;
1090   }
1091   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1092   off = 0;
1093   while (ret-- > 0)
1094   {
1095     nxt = select_peer (key, bloom, hop_count);
1096     if (nxt == NULL)
1097       break;
1098     rtargets[off++] = nxt;
1099     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1100   }
1101   if (0 == off)
1102   {
1103     GNUNET_free (rtargets);
1104     *targets = NULL;
1105     return 0;
1106   }
1107   *targets = rtargets;
1108   return off;
1109 }
1110
1111
1112 /**
1113  * Perform a PUT operation.   Forwards the given request to other
1114  * peers.   Does not store the data locally.  Does not give the
1115  * data to local clients.  May do nothing if this is the only
1116  * peer in the network (or if we are the closest peer in the
1117  * network).
1118  *
1119  * @param type type of the block
1120  * @param options routing options
1121  * @param desired_replication_level desired replication count
1122  * @param expiration_time when does the content expire
1123  * @param hop_count how many hops has this message traversed so far
1124  * @param bf Bloom filter of peers this PUT has already traversed
1125  * @param key key for the content
1126  * @param put_path_length number of entries in put_path
1127  * @param put_path peers this request has traversed so far (if tracked)
1128  * @param data payload to store
1129  * @param data_size number of bytes in data
1130  */
1131 void
1132 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1133                            enum GNUNET_DHT_RouteOption options,
1134                            uint32_t desired_replication_level,
1135                            struct GNUNET_TIME_Absolute expiration_time,
1136                            uint32_t hop_count,
1137                            struct GNUNET_CONTAINER_BloomFilter *bf,
1138                            const GNUNET_HashCode *key,
1139                            unsigned int put_path_length,
1140                            struct GNUNET_PeerIdentity *put_path,
1141                            const void *data,
1142                            size_t data_size)
1143 {
1144   unsigned int target_count;
1145   unsigned int i;
1146   struct PeerInfo **targets;
1147   struct PeerInfo *target;
1148   struct P2PPendingMessage *pending;
1149   size_t msize;
1150   struct PeerPutMessage *ppm;
1151   struct GNUNET_PeerIdentity *pp;
1152   
1153   target_count = get_target_peers (key, bf, hop_count,
1154                                    desired_replication_level,
1155                                    &targets);
1156   if (0 == target_count)
1157     return;
1158   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1159   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1160   {
1161     put_path_length = 0;
1162     msize = data_size + sizeof (struct PeerPutMessage);
1163   }
1164   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1165   {
1166     GNUNET_break (0);
1167     return;
1168   }
1169   for (i=0;i<target_count;i++)
1170   {
1171     target = targets[i];
1172     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1173     pending->importance = 0; /* FIXME */
1174     pending->timeout = expiration_time;   
1175     ppm = (struct PeerPutMessage*) &pending[1];
1176     pending->msg = &ppm->header;
1177     ppm->header.size = htons (msize);
1178     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1179     ppm->options = htonl (options);
1180     ppm->type = htonl (type);
1181     ppm->hop_count = htonl (hop_count + 1);
1182     ppm->desired_replication_level = htonl (desired_replication_level);
1183     ppm->put_path_length = htonl (put_path_length);
1184     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1185     GNUNET_assert (GNUNET_OK ==
1186                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1187                                                               ppm->bloomfilter,
1188                                                               DHT_BLOOM_SIZE));
1189     ppm->key = *key;
1190     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1191     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1192     memcpy (&pp[put_path_length], data, data_size);
1193     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1194                                       target->tail,
1195                                       pending);
1196     target->pending_count++;
1197     process_peer_queue (target);
1198   }
1199   GNUNET_free (targets);
1200 }
1201
1202
1203 /**
1204  * Perform a GET operation.  Forwards the given request to other
1205  * peers.  Does not lookup the key locally.  May do nothing if this is
1206  * the only peer in the network (or if we are the closest peer in the
1207  * network).
1208  *
1209  * @param type type of the block
1210  * @param options routing options
1211  * @param desired_replication_level desired replication count
1212  * @param hop_count how many hops did this request traverse so far?
1213  * @param key key for the content
1214  * @param xquery extended query
1215  * @param xquery_size number of bytes in xquery
1216  * @param reply_bf bloomfilter to filter duplicates
1217  * @param reply_bf_mutator mutator for reply_bf
1218  * @param peer_bf filter for peers not to select (again)
1219  */
1220 void
1221 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1222                            enum GNUNET_DHT_RouteOption options,
1223                            uint32_t desired_replication_level,
1224                            uint32_t hop_count,
1225                            const GNUNET_HashCode *key,
1226                            const void *xquery,
1227                            size_t xquery_size,
1228                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1229                            uint32_t reply_bf_mutator,
1230                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1231 {
1232   unsigned int target_count;
1233   unsigned int i;
1234   struct PeerInfo **targets;
1235   struct PeerInfo *target;
1236   struct P2PPendingMessage *pending;
1237   size_t msize;
1238   struct PeerGetMessage *pgm;
1239   char *xq;
1240   size_t reply_bf_size;
1241   
1242   target_count = get_target_peers (key, peer_bf, hop_count,
1243                                    desired_replication_level,
1244                                    &targets);
1245   if (0 == target_count)
1246     return;
1247   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1248   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1249   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1250   {
1251     GNUNET_break (0);
1252     return;
1253   }
1254   /* forward request */
1255   for (i=0;i<target_count;i++)
1256   {
1257     target = targets[i];
1258     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1259     pending->importance = 0; /* FIXME */
1260     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1261     pgm = (struct PeerGetMessage*) &pending[1];
1262     pending->msg = &pgm->header;
1263     pgm->header.size = htons (msize);
1264     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1265     pgm->options = htonl (options);
1266     pgm->type = htonl (type);
1267     pgm->hop_count = htonl (hop_count + 1);
1268     pgm->desired_replication_level = htonl (desired_replication_level);
1269     pgm->xquery_size = htonl (xquery_size);
1270     pgm->bf_mutator = reply_bf_mutator; 
1271     GNUNET_assert (GNUNET_OK ==
1272                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1273                                                               pgm->bloomfilter,
1274                                                               DHT_BLOOM_SIZE));
1275     pgm->key = *key;
1276     xq = (char *) &pgm[1];
1277     memcpy (xq, xquery, xquery_size);
1278     GNUNET_assert (GNUNET_OK ==
1279                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1280                                                               &xq[xquery_size],
1281                                                               reply_bf_size));
1282     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1283                                       target->tail,
1284                                       pending);
1285     target->pending_count++;
1286     process_peer_queue (target);
1287   }
1288   GNUNET_free (targets);
1289 }
1290
1291
1292 /**
1293  * Handle a reply (route to origin).  Only forwards the reply back to
1294  * the given peer.  Does not do local caching or forwarding to local
1295  * clients.
1296  *
1297  * @param target neighbour that should receive the block (if still connected)
1298  * @param type type of the block
1299  * @param expiration_time when does the content expire
1300  * @param key key for the content
1301  * @param put_path_length number of entries in put_path
1302  * @param put_path peers the original PUT traversed (if tracked)
1303  * @param get_path_length number of entries in put_path
1304  * @param get_path peers this reply has traversed so far (if tracked)
1305  * @param data payload of the reply
1306  * @param data_size number of bytes in data
1307  */
1308 void
1309 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1310                              enum GNUNET_BLOCK_Type type,
1311                              struct GNUNET_TIME_Absolute expiration_time,
1312                              const GNUNET_HashCode *key,
1313                              unsigned int put_path_length,
1314                              const struct GNUNET_PeerIdentity *put_path,
1315                              unsigned int get_path_length,
1316                              const struct GNUNET_PeerIdentity *get_path,
1317                              const void *data,
1318                              size_t data_size)
1319 {
1320   struct PeerInfo *pi;
1321   struct P2PPendingMessage *pending;
1322   size_t msize;
1323   struct PeerResultMessage *prm;
1324   struct GNUNET_PeerIdentity *paths;
1325   
1326   msize = data_size + sizeof (struct PeerResultMessage) + 
1327     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1328   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1329        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1330        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1331        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1332   {
1333     GNUNET_break (0);
1334     return;
1335   }
1336   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1337                                           &target->hashPubKey);
1338   if (NULL == pi)
1339   {
1340     /* peer disconnected in the meantime, drop reply */
1341     return;
1342   }
1343   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1344   pending->importance = 0; /* FIXME */
1345   pending->timeout = expiration_time;
1346   prm = (struct PeerResultMessage*) &pending[1];
1347   pending->msg = &prm->header;
1348   prm->header.size = htons (msize);
1349   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1350   prm->type = htonl (type);
1351   prm->put_path_length = htonl (put_path_length);
1352   prm->get_path_length = htonl (get_path_length);
1353   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1354   prm->key = *key;
1355   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1356   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1357   memcpy (&paths[put_path_length],
1358           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1359   memcpy (&paths[put_path_length + get_path_length],
1360           data, data_size);
1361   GNUNET_CONTAINER_DLL_insert (pi->head,
1362                                pi->tail,
1363                                pending);
1364   pi->pending_count++;
1365   process_peer_queue (pi);
1366 }
1367
1368
1369 /**
1370  * To be called on core init/fail.
1371  *
1372  * @param cls service closure
1373  * @param server handle to the server for this service
1374  * @param identity the public identity of this peer
1375  * @param publicKey the public key of this peer
1376  */
1377 static void
1378 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1379            const struct GNUNET_PeerIdentity *identity,
1380            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1381 {
1382   GNUNET_assert (server != NULL);
1383   my_identity = *identity;
1384 }
1385
1386
1387 /**
1388  * Core handler for p2p put requests.
1389  *
1390  * @param cls closure
1391  * @param peer sender of the request
1392  * @param message message
1393  * @param peer peer identity this notification is about
1394  * @param atsi performance data
1395  * @return GNUNET_OK to keep the connection open,
1396  *         GNUNET_SYSERR to close it (signal serious error)
1397  */
1398 static int
1399 handle_dht_p2p_put (void *cls,
1400                     const struct GNUNET_PeerIdentity *peer,
1401                     const struct GNUNET_MessageHeader *message,
1402                     const struct GNUNET_TRANSPORT_ATS_Information
1403                     *atsi)
1404 {
1405   const struct PeerPutMessage *put;
1406   const struct GNUNET_PeerIdentity *put_path;
1407   const void *payload;
1408   uint32_t putlen;
1409   uint16_t msize;
1410   size_t payload_size;
1411   enum GNUNET_DHT_RouteOption options;
1412   struct GNUNET_CONTAINER_BloomFilter *bf;
1413   GNUNET_HashCode test_key;
1414   
1415   msize = ntohs (message->size);
1416   if (msize < sizeof (struct PeerPutMessage))
1417   {
1418     GNUNET_break_op (0);
1419     return GNUNET_YES;
1420   }
1421   put = (const struct PeerPutMessage*) message;
1422   putlen = ntohl (put->put_path_length);
1423   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1424        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1425     {
1426       GNUNET_break_op (0);
1427       return GNUNET_YES;
1428     }
1429   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1430   payload = &put_path[putlen];
1431   options = ntohl (put->options);
1432   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1433                           putlen * sizeof (struct GNUNET_PeerIdentity));
1434   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1435                                 ntohl (put->type),
1436                                 payload, payload_size,
1437                                 &test_key))
1438   {
1439   case GNUNET_YES:
1440     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1441     {
1442       GNUNET_break_op (0);
1443       return GNUNET_YES;
1444     }
1445     break;
1446   case GNUNET_NO:
1447     GNUNET_break_op (0);
1448     return GNUNET_YES;
1449   case GNUNET_SYSERR:
1450     /* cannot verify, good luck */
1451     break;
1452   }
1453   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1454                                           DHT_BLOOM_SIZE,
1455                                           DHT_BLOOM_K);
1456   {
1457     struct GNUNET_PeerIdentity pp[putlen+1];
1458   
1459     /* extend 'put path' by sender */
1460     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1461     {
1462       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1463       pp[putlen] = *peer;
1464       putlen++;
1465     }
1466     else
1467       putlen = 0;
1468     
1469     /* give to local clients */
1470     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1471                              &put->key,
1472                              0, NULL,
1473                              putlen,
1474                              pp,
1475                              ntohl (put->type),
1476                              payload_size,
1477                              payload);
1478     /* store locally */
1479     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1480          (am_closest_peer (&put->key,
1481                            bf) ) )
1482       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1483                                 &put->key,
1484                                 putlen, pp,
1485                                 ntohl (put->type),
1486                                 payload_size,
1487                                 payload);
1488     /* route to other peers */
1489     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1490                                options,
1491                                ntohl (put->desired_replication_level),
1492                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1493                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1494                                bf,
1495                                &put->key,
1496                                putlen, pp,
1497                                payload,
1498                                payload_size);
1499   }
1500   GNUNET_CONTAINER_bloomfilter_free (bf);
1501   return GNUNET_YES;
1502 }
1503
1504
1505 /**
1506  * We have received a FIND PEER request.  Send matching
1507  * HELLOs back.
1508  *
1509  * @param sender sender of the FIND PEER request
1510  * @param key peers close to this key are desired
1511  * @param bf peers matching this bf are excluded
1512  * @param bf_mutator mutator for bf
1513  */
1514 static void
1515 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1516                   const GNUNET_HashCode *key,
1517                   struct GNUNET_CONTAINER_BloomFilter *bf,
1518                   uint32_t bf_mutator)
1519 {
1520   int bucket_idx;
1521   struct PeerBucket *bucket;
1522   struct PeerInfo *peer;
1523   unsigned int choice;
1524   GNUNET_HashCode mhash;
1525   const struct GNUNET_HELLO_Message *hello;
1526
1527   /* first, check about our own HELLO */
1528   if (NULL != GDS_my_hello)
1529   {
1530     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1531     if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))
1532       
1533       GDS_NEIGHBOURS_handle_reply (sender,
1534                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1535                                    GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1536                                    key,
1537                                    0, NULL,
1538                                    0, NULL,
1539                                    GDS_my_hello,
1540                                    GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message*) GDS_my_hello));
1541   }
1542
1543   /* then, also consider sending a random HELLO from the closest bucket */
1544   bucket_idx = find_bucket (key);
1545   if (bucket_idx == GNUNET_SYSERR)
1546     return;
1547   bucket = &k_buckets[bucket_idx];
1548   if (bucket->peers_size == 0)
1549     return;
1550   choice = GNUNET_CRYPTO_random_u32 (bucket->peers_size,
1551                                      GNUNET_CRYPTO_QUALITY_WEAK);
1552   peer = bucket->head;
1553   while (choice > 0)
1554   {
1555     GNUNET_assert (peer != NULL);
1556     peer = peer->next;
1557   }
1558   choice = bucket->peers_size;
1559   do
1560     {
1561       peer = peer->next;
1562       if (choice-- == 0)
1563         return; /* no non-masked peer available */
1564       if (peer == NULL)
1565         peer = bucket->head;
1566       GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1567       hello = GDS_HELLO_get (&peer->id);
1568     }
1569   while ( (hello == NULL) ||
1570           (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) );
1571   GDS_NEIGHBOURS_handle_reply (sender,
1572                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1573                                GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1574                                key,
1575                                0, NULL,
1576                                0, NULL,
1577                                hello,
1578                                GNUNET_HELLO_size (hello));    
1579 }
1580
1581
1582 /**
1583  * Core handler for p2p get requests.
1584  *
1585  * @param cls closure
1586  * @param peer sender of the request
1587  * @param message message
1588  * @param peer peer identity this notification is about
1589  * @param atsi performance data
1590  * @return GNUNET_OK to keep the connection open,
1591  *         GNUNET_SYSERR to close it (signal serious error)
1592  */
1593 static int
1594 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1595                     const struct GNUNET_MessageHeader *message,
1596                     const struct GNUNET_TRANSPORT_ATS_Information
1597                     *atsi)
1598 {
1599   struct PeerGetMessage *get;
1600   uint32_t xquery_size;
1601   size_t reply_bf_size;
1602   uint16_t msize;
1603   enum GNUNET_BLOCK_Type type;
1604   enum GNUNET_DHT_RouteOption options;
1605   enum GNUNET_BLOCK_EvaluationResult eval;
1606   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1607   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1608   const char *xquery;
1609                       
1610   /* parse and validate message */
1611   msize = ntohs (message->size);
1612   if (msize < sizeof (struct PeerGetMessage))
1613   {
1614     GNUNET_break_op (0);
1615     return GNUNET_YES;
1616   }
1617   get = (struct PeerGetMessage *) message;
1618   xquery_size = ntohl (get->xquery_size);
1619   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1620   {
1621     GNUNET_break_op (0);
1622     return GNUNET_YES;
1623   }
1624   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1625   type = ntohl (get->type);
1626   options = ntohl (get->options);
1627   xquery = (const char*) &get[1];
1628   reply_bf = NULL;
1629   if (reply_bf_size > 0)
1630     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1631                                                   reply_bf_size,
1632                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1633   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1634                                 type,
1635                                 &get->key,
1636                                 &reply_bf,
1637                                 get->bf_mutator,
1638                                 xquery, xquery_size,
1639                                 NULL, 0);
1640   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1641   {
1642     /* request invalid or block type not supported */
1643     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1644     if (NULL != reply_bf)
1645       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1646     return GNUNET_YES;
1647   }
1648   peer_bf =
1649     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1650                                        DHT_BLOOM_SIZE,
1651                                        DHT_BLOOM_K);
1652
1653   /* remember request for routing replies */
1654   GDS_ROUTING_add (peer,
1655                    type,
1656                    options,
1657                    &get->key,
1658                    xquery, xquery_size,
1659                    reply_bf, get->bf_mutator);
1660
1661   /* local lookup (this may update the reply_bf) */
1662   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1663        (am_closest_peer (&get->key,
1664                          peer_bf) ) )
1665   {
1666     if ( (0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1667     {
1668       handle_find_peer (peer,
1669                         &get->key,
1670                         reply_bf,
1671                         get->bf_mutator);
1672     }
1673     else
1674     {
1675       eval = GDS_DATACACHE_handle_get (&get->key,
1676                                        type,
1677                                        xquery, xquery_size,
1678                                        &reply_bf, 
1679                                        get->bf_mutator);
1680     }
1681   }
1682   
1683   /* P2P forwarding */
1684   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1685     GDS_NEIGHBOURS_handle_get (type,
1686                                options,
1687                                ntohl (get->desired_replication_level),
1688                                ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1689                                &get->key,
1690                                xquery, xquery_size,
1691                                reply_bf,
1692                                get->bf_mutator,
1693                                peer_bf);
1694   /* clean up */
1695   if (NULL != reply_bf)
1696     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1697   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1698   return GNUNET_YES;
1699 }
1700
1701
1702 /**
1703  * Core handler for p2p result messages.
1704  *
1705  * @param cls closure
1706  * @param message message
1707  * @param peer peer identity this notification is about
1708  * @param atsi performance data
1709  * @return GNUNET_YES (do not cut p2p connection)
1710  */
1711 static int
1712 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1713                        const struct GNUNET_MessageHeader *message,
1714                        const struct GNUNET_TRANSPORT_ATS_Information
1715                        *atsi)
1716 {
1717   const struct PeerResultMessage *prm;
1718   const struct GNUNET_PeerIdentity *put_path;
1719   const struct GNUNET_PeerIdentity *get_path;
1720   const void *data;
1721   uint32_t get_path_length;
1722   uint32_t put_path_length;
1723   uint16_t msize;
1724   size_t data_size;
1725   enum GNUNET_BLOCK_Type type;
1726                        
1727   /* parse and validate message */
1728   msize = ntohs (message->size);
1729   if (msize < sizeof (struct PeerResultMessage))
1730   {
1731     GNUNET_break_op (0);
1732     return GNUNET_YES;
1733   }
1734   prm = (struct PeerResultMessage *) message;
1735   put_path_length = ntohl (prm->put_path_length);
1736   get_path_length = ntohl (prm->get_path_length);
1737   if ( (msize < sizeof (struct PeerResultMessage) + 
1738         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1739        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1740        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1741   {
1742     GNUNET_break_op (0);
1743     return GNUNET_YES;
1744   } 
1745   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1746   get_path = &put_path[put_path_length];
1747   type = ntohl (prm->type);
1748   data = (const void*) &get_path[get_path_length];
1749   data_size = msize - (sizeof (struct PeerResultMessage) + 
1750                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1751
1752   /* if we got a HELLO, consider it for our own routing table */
1753   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1754   {
1755     const struct GNUNET_MessageHeader *h;
1756     struct GNUNET_PeerIdentity pid;
1757     int bucket;
1758
1759     /* Should be a HELLO, validate and consider using it! */
1760     if (data_size < sizeof (struct GNUNET_MessageHeader))
1761     {
1762       GNUNET_break_op (0);
1763       return GNUNET_YES;
1764     }
1765     h = data;
1766     if (data_size != ntohs (h->size))
1767     {
1768       GNUNET_break_op (0);
1769       return GNUNET_YES;
1770     }
1771     if (GNUNET_OK !=
1772         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) h,
1773                              &pid))
1774     {
1775       GNUNET_break_op (0);
1776       return GNUNET_YES;
1777     }
1778     bucket = find_bucket (&pid.hashPubKey);
1779     if ( (bucket >= 0) &&
1780          (k_buckets[bucket].peers_size < bucket_size) )
1781     {    
1782       if (NULL != GDS_transport_handle)
1783         GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
1784                                       h, NULL, NULL);
1785       (void) GNUNET_CORE_peer_request_connect (coreAPI,
1786                                                &pid, 
1787                                                NULL, NULL);
1788     }   
1789   }
1790
1791   /* append 'peer' to 'get_path' */
1792   {    
1793     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1794
1795     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1796     xget_path[get_path_length] = *peer;
1797     get_path_length++;
1798
1799     /* forward to local clients */   
1800     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1801                              &prm->key,
1802                              get_path_length,
1803                              xget_path,
1804                              put_path_length,
1805                              put_path,
1806                              type,
1807                              data_size, 
1808                              data);
1809
1810     /* forward to other peers */
1811     GDS_ROUTING_process (type,
1812                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1813                          &prm->key,
1814                          put_path_length,
1815                          put_path,
1816                          get_path_length,
1817                          xget_path,
1818                          data,
1819                          data_size);                     
1820   }
1821   return GNUNET_YES;
1822 }
1823
1824
1825 /**
1826  * Initialize neighbours subsystem.
1827  *
1828  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1829  */
1830 int
1831 GDS_NEIGHBOURS_init ()
1832 {
1833   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1834     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1835     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1836     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1837     {NULL, 0, 0}
1838   };
1839   unsigned long long temp_config_num;
1840  
1841   if (GNUNET_OK ==
1842       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1843                                              &temp_config_num))
1844     bucket_size = (unsigned int) temp_config_num;  
1845   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1846                                  1,
1847                                  NULL,
1848                                  &core_init,
1849                                  &handle_core_connect,
1850                                  &handle_core_disconnect, 
1851                                  NULL,  /* Do we care about "status" updates? */
1852                                  NULL, GNUNET_NO,
1853                                  NULL, GNUNET_NO,
1854                                  core_handlers);
1855   if (coreAPI == NULL)
1856     return GNUNET_SYSERR;
1857   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1858   return GNUNET_OK;
1859 }
1860
1861
1862 /**
1863  * Shutdown neighbours subsystem.
1864  */
1865 void
1866 GDS_NEIGHBOURS_done ()
1867 {
1868   if (coreAPI == NULL)
1869     return;
1870   GNUNET_CORE_disconnect (coreAPI);
1871   coreAPI = NULL;    
1872   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1873   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1874   all_known_peers = NULL;
1875   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1876   {
1877     GNUNET_SCHEDULER_cancel (find_peer_task);
1878     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1879   }
1880 }
1881
1882
1883 /* end of gnunet-service-dht_neighbours.c */