update
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_peerinfo_service.h"
42 #include "dht.h"
43 #include "gnunet-service-dht.h"
44 #include "gnunet-service-dht_clients.h"
45 #include "gnunet-service-dht_datacache.h"
46 #include "gnunet-service-dht_nse.h"
47 #include "gnunet-service-dht_routing.h"
48 #include <fenv.h>
49
50 /**
51  * How many buckets will we allow total.
52  */
53 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
54
55 /**
56  * What is the maximum number of peers in a given bucket.
57  */
58 #define DEFAULT_BUCKET_SIZE 4
59
60 /**
61  * Size of the bloom filter the DHT uses to filter peers.
62  */
63 #define DHT_BLOOM_SIZE 128
64
65 /**
66  * Desired replication level for FIND PEER requests
67  */
68 #define FIND_PEER_REPLICATION_LEVEL 4
69
70 /**
71  * Maximum allowed replication level for all requests.
72  */
73 #define MAXIMUM_REPLICATION_LEVEL 16
74
75 /**
76  * How often to update our preference levels for peers in our routing tables.
77  */
78 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
79
80 /**
81  * How long at least to wait before sending another find peer request.
82  */
83 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
84
85 /**
86  * How long at most to wait before sending another find peer request.
87  */
88 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
89
90 /**
91  * How long at most to wait for transmission of a GET request to another peer?
92  */
93 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
94
95
96 /**
97  * P2P PUT message
98  */
99 struct PeerPutMessage
100 {
101   /**
102    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Processing options
108    */
109   uint32_t options GNUNET_PACKED;
110
111   /**
112    * Content type.
113    */
114   uint32_t type GNUNET_PACKED;
115
116   /**
117    * Hop count
118    */
119   uint32_t hop_count GNUNET_PACKED;
120
121   /**
122    * Replication level for this message
123    */
124   uint32_t desired_replication_level GNUNET_PACKED;
125
126   /**
127    * Length of the PUT path that follows (if tracked).
128    */
129   uint32_t put_path_length GNUNET_PACKED;
130
131   /**
132    * When does the content expire?
133    */
134   struct GNUNET_TIME_AbsoluteNBO expiration_time;
135
136   /**
137    * Bloomfilter (for peer identities) to stop circular routes
138    */
139   char bloomfilter[DHT_BLOOM_SIZE];
140
141   /**
142    * The key we are storing under.
143    */
144   GNUNET_HashCode key;
145
146   /* put path (if tracked) */
147
148   /* Payload */
149
150 };
151
152
153 /**
154  * P2P Result message
155  */
156 struct PeerResultMessage
157 {
158   /**
159    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
160    */
161   struct GNUNET_MessageHeader header;
162
163   /**
164    * Content type.
165    */
166   uint32_t type GNUNET_PACKED;
167
168   /**
169    * Length of the PUT path that follows (if tracked).
170    */
171   uint32_t put_path_length GNUNET_PACKED;
172
173   /**
174    * Length of the GET path that follows (if tracked).
175    */
176   uint32_t get_path_length GNUNET_PACKED;
177
178   /**
179    * When does the content expire?
180    */
181   struct GNUNET_TIME_AbsoluteNBO expiration_time;
182
183   /**
184    * The key of the corresponding GET request.
185    */
186   GNUNET_HashCode key;
187
188   /* put path (if tracked) */
189
190   /* get path (if tracked) */
191
192   /* Payload */
193
194 };
195
196
197 /**
198  * P2P GET message
199  */
200 struct PeerGetMessage
201 {
202   /**
203    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
204    */
205   struct GNUNET_MessageHeader header;
206
207   /**
208    * Processing options
209    */
210   uint32_t options GNUNET_PACKED;
211
212   /**
213    * Desired content type.
214    */
215   uint32_t type GNUNET_PACKED;
216
217   /**
218    * Hop count
219    */
220   uint32_t hop_count GNUNET_PACKED;
221
222   /**
223    * Desired replication level for this request.
224    */
225   uint32_t desired_replication_level GNUNET_PACKED;
226
227   /**
228    * Size of the extended query.
229    */
230   uint32_t xquery_size;
231
232   /**
233    * Bloomfilter mutator.
234    */
235   uint32_t bf_mutator;
236
237   /**
238    * Bloomfilter (for peer identities) to stop circular routes
239    */
240   char bloomfilter[DHT_BLOOM_SIZE];
241
242   /**
243    * The key we are looking for.
244    */
245   GNUNET_HashCode key;
246
247   /* xquery */
248
249   /* result bloomfilter */
250
251 };
252
253
254 /**
255  * Linked list of messages to send to a particular other peer.
256  */
257 struct P2PPendingMessage
258 {
259   /**
260    * Pointer to next item in the list
261    */
262   struct P2PPendingMessage *next;
263
264   /**
265    * Pointer to previous item in the list
266    */
267   struct P2PPendingMessage *prev;
268
269   /**
270    * Message importance level.  FIXME: used? useful?
271    */
272   unsigned int importance;
273
274   /**
275    * When does this message time out?
276    */
277   struct GNUNET_TIME_Absolute timeout;
278
279   /**
280    * Actual message to be sent, allocated at the end of the struct:
281    * // msg = (cast) &pm[1]; 
282    * // memcpy (&pm[1], data, len);
283    */
284   const struct GNUNET_MessageHeader *msg;
285
286 };
287
288
289 /**
290  * Entry for a peer in a bucket.
291  */
292 struct PeerInfo
293 {
294   /**
295    * Next peer entry (DLL)
296    */
297   struct PeerInfo *next;
298
299   /**
300    *  Prev peer entry (DLL)
301    */
302   struct PeerInfo *prev;
303
304   /**
305    * Count of outstanding messages for peer.  FIXME: NEEDED?
306    * FIXME: bound queue size!?
307    */
308   unsigned int pending_count;
309
310   /**
311    * Head of pending messages to be sent to this peer.
312    */
313   struct P2PPendingMessage *head;
314
315   /**
316    * Tail of pending messages to be sent to this peer.
317    */
318   struct P2PPendingMessage *tail;
319
320   /**
321    * Core handle for sending messages to this peer.
322    */
323   struct GNUNET_CORE_TransmitHandle *th;
324
325   /**
326    * Preference update context
327    */
328   struct GNUNET_CORE_InformationRequestContext *info_ctx;
329
330   /**
331    * HELLO message for the peer, NULL if not known.  FIXME: track
332    * separately? FIXME: free!?
333    */
334   struct GNUNET_HELLO_Message *hello;
335
336   /**
337    * Task for scheduling preference updates
338    */
339   GNUNET_SCHEDULER_TaskIdentifier preference_task;
340
341   /**
342    * What is the identity of the peer?
343    */
344   struct GNUNET_PeerIdentity id;
345
346 #if 0
347   /**
348    * What is the average latency for replies received?
349    */
350   struct GNUNET_TIME_Relative latency;
351
352   /**
353    * Transport level distance to peer.
354    */
355   unsigned int distance;
356 #endif
357
358 };
359
360
361 /**
362  * Peers are grouped into buckets.
363  */
364 struct PeerBucket
365 {
366   /**
367    * Head of DLL
368    */
369   struct PeerInfo *head;
370
371   /**
372    * Tail of DLL
373    */
374   struct PeerInfo *tail;
375
376   /**
377    * Number of peers in the bucket.
378    */
379   unsigned int peers_size;
380 };
381
382
383 /**
384  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
385  */
386 static unsigned int closest_bucket;
387
388 /**
389  * How many peers have we added since we sent out our last
390  * find peer request?
391  */
392 static unsigned int newly_found_peers;
393
394 /**
395  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
396  */
397 static struct PeerBucket k_buckets[MAX_BUCKETS];
398
399 /**
400  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
401  */
402 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
403
404 /**
405  * Maximum size for each bucket.
406  */
407 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
408
409 /**
410  * Task that sends FIND PEER requests.
411  */
412 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
413
414 /**
415  * Identity of this peer.
416  */ 
417 static struct GNUNET_PeerIdentity my_identity;
418
419 /**
420  * Handle to GNUnet core.
421  */
422 static struct GNUNET_CORE_Handle *coreAPI;
423
424 /**
425  * Handle for peerinfo notifications.
426  */
427 static struct GNUNET_PEERINFO_NotifyContext *pnc;
428
429
430 /**
431  * Find the optimal bucket for this key.
432  *
433  * @param hc the hashcode to compare our identity to
434  * @return the proper bucket index, or GNUNET_SYSERR
435  *         on error (same hashcode)
436  */
437 static int
438 find_bucket (const GNUNET_HashCode * hc)
439 {
440   unsigned int bits;
441
442   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
443   if (bits == MAX_BUCKETS)
444     {
445       /* How can all bits match? Got my own ID? */
446       GNUNET_break (0);
447       return GNUNET_SYSERR; 
448     }
449   return MAX_BUCKETS - bits - 1;
450 }
451
452
453 /**
454  * Let GNUnet core know that we like the given peer.
455  *
456  * @param cls the 'struct PeerInfo' of the peer
457  * @param tc scheduler context.
458  */ 
459 static void
460 update_core_preference (void *cls,
461                         const struct GNUNET_SCHEDULER_TaskContext *tc);
462
463
464 /**
465  * Function called with statistics about the given peer.
466  *
467  * @param cls closure
468  * @param peer identifies the peer
469  * @param bpm_out set to the current bandwidth limit (sending) for this peer
470  * @param amount set to the amount that was actually reserved or unreserved;
471  *               either the full requested amount or zero (no partial reservations)
472  * @param res_delay if the reservation could not be satisfied (amount was 0), how
473  *        long should the client wait until re-trying?
474  * @param preference current traffic preference for the given peer
475  */
476 static void
477 update_core_preference_finish (void *cls,
478                                const struct GNUNET_PeerIdentity *peer,
479                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
480                                int32_t amount,
481                                struct GNUNET_TIME_Relative res_delay,
482                                uint64_t preference)
483 {
484   struct PeerInfo *peer_info = cls;
485
486   peer_info->info_ctx = NULL;
487   peer_info->preference_task
488     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
489                                     &update_core_preference, peer_info);
490 }
491
492
493 /**
494  * Let GNUnet core know that we like the given peer.
495  *
496  * @param cls the 'struct PeerInfo' of the peer
497  * @param tc scheduler context.
498  */ 
499 static void
500 update_core_preference (void *cls,
501                         const struct GNUNET_SCHEDULER_TaskContext *tc)
502 {
503   struct PeerInfo *peer = cls;
504   uint64_t preference;
505   unsigned int matching;
506   int bucket;
507
508   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
509   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
510     return;  
511   matching =
512     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
513                                       &peer->id.hashPubKey);
514   if (matching >= 64)
515     matching = 63;
516   bucket = find_bucket(&peer->id.hashPubKey);
517   if (bucket == GNUNET_SYSERR)
518     preference = 0;
519   else
520     preference = (1LL << matching) / k_buckets[bucket].peers_size;
521   if (preference == 0)
522     {
523       peer->preference_task
524         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
525                                         &update_core_preference, peer);
526       return;
527     }
528   peer->info_ctx =
529     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
530                                         GNUNET_TIME_UNIT_FOREVER_REL,
531                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
532                                         preference,
533                                         &update_core_preference_finish, peer);
534 }
535
536
537 /**
538  * Method called whenever a peer connects.
539  *
540  * @param cls closure
541  * @param peer peer identity this notification is about
542  * @param atsi performance data
543  */
544 static void
545 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
546                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
547 {
548   struct PeerInfo *ret;
549   int peer_bucket;
550
551   /* Check for connect to self message */
552   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
553     return;
554   if (GNUNET_YES ==
555       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
556                                               &peer->hashPubKey))
557   {
558     GNUNET_break (0);
559     return;
560   }
561   peer_bucket = find_bucket (&peer->hashPubKey);
562   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
563   ret = GNUNET_malloc (sizeof (struct PeerInfo));
564 #if 0
565   ret->latency = latency;
566   ret->distance = distance;
567 #endif
568   ret->id = *peer;
569   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
570                                     k_buckets[peer_bucket].tail, ret);
571   k_buckets[peer_bucket].peers_size++;
572   closest_bucket = GNUNET_MAX (closest_bucket,
573                                peer_bucket);
574   if ( (peer_bucket > 0) &&
575        (k_buckets[peer_bucket].peers_size <= bucket_size) )
576     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
577   newly_found_peers++;
578   GNUNET_assert (GNUNET_OK ==
579                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
580                                                     &peer->hashPubKey, ret,
581                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
582 }
583
584
585 /**
586  * Method called whenever a peer disconnects.
587  *
588  * @param cls closure
589  * @param peer peer identity this notification is about
590  */
591 static void
592 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
593 {
594   struct PeerInfo *to_remove;
595   int current_bucket;
596   struct P2PPendingMessage *pos;
597
598   /* Check for disconnect from self message */
599   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
600     return;
601   to_remove =
602       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
603   if (NULL == to_remove)
604     {
605       GNUNET_break (0);
606       return;
607     }
608   GNUNET_assert (GNUNET_YES ==
609                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
610                                                        &peer->hashPubKey,
611                                                        to_remove));
612   if (NULL != to_remove->info_ctx)
613   {
614     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
615     to_remove->info_ctx = NULL;
616   }
617   current_bucket = find_bucket (&to_remove->id.hashPubKey);
618   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
619                                k_buckets[current_bucket].tail,
620                                to_remove);
621   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
622   k_buckets[current_bucket].peers_size--;
623   while ( (closest_bucket > 0) &&
624           (k_buckets[closest_bucket].peers_size == 0) )
625     closest_bucket--;
626
627   if (to_remove->th != NULL) 
628   {
629     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
630     to_remove->th = NULL;
631   }
632   while (NULL != (pos = to_remove->head))
633   {
634     GNUNET_CONTAINER_DLL_remove (to_remove->head,
635                                  to_remove->tail,
636                                  pos);
637     GNUNET_free (pos);
638   }
639 }
640
641
642 /**
643  * Called when core is ready to send a message we asked for
644  * out to the destination.
645  *
646  * @param cls the 'struct PeerInfo' of the target peer
647  * @param size number of bytes available in buf
648  * @param buf where the callee should write the message
649  * @return number of bytes written to buf
650  */
651 static size_t
652 core_transmit_notify (void *cls, size_t size, void *buf)
653 {
654   struct PeerInfo *peer = cls;
655   char *cbuf = buf;
656   struct P2PPendingMessage *pending;
657   size_t off;
658   size_t msize;
659
660   peer->th = NULL;
661   if (buf == NULL)
662   {
663     /* client disconnected */
664     return 0;
665   }
666   if (peer->head == NULL)
667   {
668     /* no messages pending */
669     return 0;
670   }
671   off = 0;
672   while ( (NULL != (pending = peer->head)) &&
673           (size - off >= (msize = ntohs (pending->msg->size))) )
674   {
675     memcpy (&cbuf[off], pending->msg, msize);
676     off += msize;
677     peer->pending_count--;
678     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
679     GNUNET_free (pending);
680   }
681   if (peer->head != NULL)
682     peer->th 
683       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
684                                            pending->importance,
685                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
686                                            &peer->id, msize,
687                                            &core_transmit_notify, peer);
688   return off;
689 }
690
691
692 /**
693  * Transmit all messages in the peer's message queue.
694  *
695  * @param peer message queue to process
696  */
697 static void
698 process_peer_queue (struct PeerInfo *peer)
699 {
700   struct P2PPendingMessage *pending;
701
702   if (NULL != (pending = peer->head))
703     return;
704   if (NULL != peer->th)
705     return;
706   peer->th 
707     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
708                                          pending->importance,
709                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
710                                          &peer->id,
711                                          ntohs (pending->msg->size),
712                                          &core_transmit_notify, peer);
713 }
714
715
716 /**
717  * To how many peers should we (on average) forward the request to
718  * obtain the desired target_replication count (on average).
719  *
720  * @param hop_count number of hops the message has traversed
721  * @param target_replication the number of total paths desired
722  * @return Some number of peers to forward the message to
723  */
724 static unsigned int
725 get_forward_count (uint32_t hop_count, 
726                    uint32_t target_replication)
727 {
728   uint32_t random_value;
729   uint32_t forward_count;
730   float target_value;
731
732   if (hop_count > GDS_NSE_get () * 4.0)
733   {
734     /* forcefully terminate */
735     return 0;
736   }
737   if (hop_count > GDS_NSE_get () * 2.0)
738   {
739     /* Once we have reached our ideal number of hops, only forward to 1 peer */
740     return 1;
741   }
742   /* bound by system-wide maximum */
743   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
744                                    target_replication);
745   target_value =
746     1 + (target_replication - 1.0) / (GDS_NSE_get () +
747                                       ((float) (target_replication - 1.0) *
748                                        hop_count));
749   /* Set forward count to floor of target_value */
750   forward_count = (uint32_t) target_value;
751   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
752   target_value = target_value - forward_count;
753   random_value =
754     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
755   if (random_value < (target_value * UINT32_MAX))
756     forward_count++;
757   return forward_count;
758 }
759
760
761 /**
762  * Compute the distance between have and target as a 32-bit value.
763  * Differences in the lower bits must count stronger than differences
764  * in the higher bits.
765  *
766  * @return 0 if have==target, otherwise a number
767  *           that is larger as the distance between
768  *           the two hash codes increases
769  */
770 static unsigned int
771 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
772 {
773   unsigned int bucket;
774   unsigned int msb;
775   unsigned int lsb;
776   unsigned int i;
777
778   /* We have to represent the distance between two 2^9 (=512)-bit
779    * numbers as a 2^5 (=32)-bit number with "0" being used for the
780    * two numbers being identical; furthermore, we need to
781    * guarantee that a difference in the number of matching
782    * bits is always represented in the result.
783    *
784    * We use 2^32/2^9 numerical values to distinguish between
785    * hash codes that have the same LSB bit distance and
786    * use the highest 2^9 bits of the result to signify the
787    * number of (mis)matching LSB bits; if we have 0 matching
788    * and hence 512 mismatching LSB bits we return -1 (since
789    * 512 itself cannot be represented with 9 bits) */
790
791   /* first, calculate the most significant 9 bits of our
792    * result, aka the number of LSBs */
793   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
794   /* bucket is now a value between 0 and 512 */
795   if (bucket == 512)
796     return 0;                   /* perfect match */
797   if (bucket == 0)
798     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
799                                  * below, we'd end up with max+1 (overflow)) */
800
801   /* calculate the most significant bits of the final result */
802   msb = (512 - bucket) << (32 - 9);
803   /* calculate the 32-9 least significant bits of the final result by
804    * looking at the differences in the 32-9 bits following the
805    * mismatching bit at 'bucket' */
806   lsb = 0;
807   for (i = bucket + 1;
808        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
809   {
810     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
811         GNUNET_CRYPTO_hash_get_bit (have, i))
812       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
813                                                  * last bit set will be 31 -- if
814                                                  * i does not reach 512 first... */
815   }
816   return msb | lsb;
817 }
818
819
820 /**
821  * Check whether my identity is closer than any known peers.  If a
822  * non-null bloomfilter is given, check if this is the closest peer
823  * that hasn't already been routed to.
824  *
825  * @param key hash code to check closeness to
826  * @param bloom bloomfilter, exclude these entries from the decision
827  * @return GNUNET_YES if node location is closest,
828  *         GNUNET_NO otherwise.
829  */
830 static int
831 am_closest_peer (const GNUNET_HashCode *key,
832                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
833 {
834   int bits;
835   int other_bits;
836   int bucket_num;
837   int count;
838   struct PeerInfo *pos;
839   unsigned int my_distance;
840
841   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
842     return GNUNET_YES;
843   bucket_num = find_bucket (key);
844   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
845   my_distance = get_distance (&my_identity.hashPubKey, key);
846   pos = k_buckets[bucket_num].head;
847   count = 0;
848   while ((pos != NULL) && (count < bucket_size))
849   {
850     if ((bloom != NULL) &&
851         (GNUNET_YES ==
852          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
853     {
854       pos = pos->next;
855       continue;                 /* Skip already checked entries */
856     }
857     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
858     if (other_bits > bits)
859       return GNUNET_NO;
860     if (other_bits == bits)        /* We match the same number of bits */
861       return GNUNET_YES;
862     pos = pos->next;
863   }
864   /* No peers closer, we are the closest! */
865   return GNUNET_YES;
866 }
867
868
869 /**
870  * Select a peer from the routing table that would be a good routing
871  * destination for sending a message for "key".  The resulting peer
872  * must not be in the set of blocked peers.<p>
873  *
874  * Note that we should not ALWAYS select the closest peer to the
875  * target, peers further away from the target should be chosen with
876  * exponentially declining probability.
877  *
878  * FIXME: double-check that this is fine
879  * 
880  *
881  * @param key the key we are selecting a peer to route to
882  * @param bloom a bloomfilter containing entries this request has seen already
883  * @param hops how many hops has this message traversed thus far
884  * @return Peer to route to, or NULL on error
885  */
886 static struct PeerInfo *
887 select_peer (const GNUNET_HashCode *key,
888              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
889              uint32_t hops)
890 {
891   unsigned int bc;
892   unsigned int count;
893   unsigned int selected;
894   struct PeerInfo *pos;
895   unsigned int dist;
896   unsigned int smallest_distance;
897   struct PeerInfo *chosen;
898
899   if (hops >= GDS_NSE_get ())
900   {
901     /* greedy selection (closest peer that is not in bloomfilter) */
902     smallest_distance = UINT_MAX;
903     chosen = NULL;
904     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
905     {
906       pos = k_buckets[bc].head;
907       count = 0;
908       while ((pos != NULL) && (count < bucket_size))
909       {
910         if (GNUNET_NO ==
911             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
912         {
913           dist = get_distance (key, &pos->id.hashPubKey);
914           if (dist < smallest_distance)
915           {
916             chosen = pos;
917             smallest_distance = dist;
918           }
919         }
920         count++;
921         pos = pos->next;
922       }
923     }
924     return chosen;
925   }
926
927   /* select "random" peer */
928   /* count number of peers that are available and not filtered */
929   count = 0;
930   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
931   {
932     pos = k_buckets[bc].head;
933     while ((pos != NULL) && (count < bucket_size))
934     {
935       if (GNUNET_YES ==
936           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
937       {
938         pos = pos->next;
939         continue;               /* Ignore bloomfiltered peers */
940       }
941       count++;
942       pos = pos->next;
943     }
944   }
945   if (count == 0)               /* No peers to select from! */
946   {
947     return NULL;
948   }
949   /* Now actually choose a peer */
950   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
951   count = 0;
952   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
953   {
954     pos = k_buckets[bc].head;
955     while ((pos != NULL) && (count < bucket_size))
956     {
957       if (GNUNET_YES ==
958           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
959       {
960         pos = pos->next;
961         continue;               /* Ignore bloomfiltered peers */
962       }
963       if (0 == selected--)
964         return pos;
965       pos = pos->next;
966     }
967   }
968   GNUNET_break (0);
969   return NULL;
970 }
971
972
973 /**
974  * Compute the set of peers that the given request should be
975  * forwarded to.
976  *
977  * @param key routing key
978  * @param bloom bloom filter excluding peers as targets, all selected
979  *        peers will be added to the bloom filter
980  * @param hop_count number of hops the request has traversed so far
981  * @param target_replication desired number of replicas
982  * @param targets where to store an array of target peers (to be
983  *         free'd by the caller)
984  * @return number of peers returned in 'targets'.
985  */
986 static unsigned int
987 get_target_peers (const GNUNET_HashCode *key,
988                   struct GNUNET_CONTAINER_BloomFilter *bloom,
989                   uint32_t hop_count,
990                   uint32_t target_replication,
991                   struct PeerInfo ***targets)
992 {
993   unsigned int ret;
994   unsigned int off;
995   struct PeerInfo **rtargets;
996   struct PeerInfo *nxt;
997
998   ret = get_forward_count (hop_count, target_replication);
999   if (ret == 0)
1000   {
1001     *targets = NULL;
1002     return 0;
1003   }
1004   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1005   off = 0;
1006   while (ret-- > 0)
1007   {
1008     nxt = select_peer (key, bloom, hop_count);
1009     if (nxt == NULL)
1010       break;
1011     rtargets[off++] = nxt;
1012     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1013   }
1014   if (0 == off)
1015   {
1016     GNUNET_free (rtargets);
1017     *targets = NULL;
1018     return 0;
1019   }
1020   *targets = rtargets;
1021   return off;
1022 }
1023
1024
1025 /**
1026  * Perform a PUT operation.   Forwards the given request to other
1027  * peers.   Does not store the data locally.  Does not give the
1028  * data to local clients.  May do nothing if this is the only
1029  * peer in the network (or if we are the closest peer in the
1030  * network).
1031  *
1032  * @param type type of the block
1033  * @param options routing options
1034  * @param desired_replication_level desired replication count
1035  * @param expiration_time when does the content expire
1036  * @param hop_count how many hops has this message traversed so far
1037  * @param bf Bloom filter of peers this PUT has already traversed
1038  * @param key key for the content
1039  * @param put_path_length number of entries in put_path
1040  * @param put_path peers this request has traversed so far (if tracked)
1041  * @param data payload to store
1042  * @param data_size number of bytes in data
1043  */
1044 void
1045 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1046                            enum GNUNET_DHT_RouteOption options,
1047                            uint32_t desired_replication_level,
1048                            struct GNUNET_TIME_Absolute expiration_time,
1049                            uint32_t hop_count,
1050                            struct GNUNET_CONTAINER_BloomFilter *bf,
1051                            const GNUNET_HashCode *key,
1052                            unsigned int put_path_length,
1053                            struct GNUNET_PeerIdentity *put_path,
1054                            const void *data,
1055                            size_t data_size)
1056 {
1057   unsigned int target_count;
1058   unsigned int i;
1059   struct PeerInfo **targets;
1060   struct PeerInfo *target;
1061   struct P2PPendingMessage *pending;
1062   size_t msize;
1063   struct PeerPutMessage *ppm;
1064   struct GNUNET_PeerIdentity *pp;
1065   
1066   target_count = get_target_peers (key, bf, hop_count,
1067                                    desired_replication_level,
1068                                    &targets);
1069   if (0 == target_count)
1070     return;
1071   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1072   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1073   {
1074     put_path_length = 0;
1075     msize = data_size + sizeof (struct PeerPutMessage);
1076   }
1077   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1078   {
1079     GNUNET_break (0);
1080     return;
1081   }
1082   for (i=0;i<target_count;i++)
1083   {
1084     target = targets[i];
1085     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1086     pending->importance = 0; /* FIXME */
1087     pending->timeout = expiration_time;   
1088     ppm = (struct PeerPutMessage*) &pending[1];
1089     pending->msg = &ppm->header;
1090     ppm->header.size = htons (msize);
1091     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1092     ppm->options = htonl (options);
1093     ppm->type = htonl (type);
1094     ppm->hop_count = htonl (hop_count + 1);
1095     ppm->desired_replication_level = htonl (desired_replication_level);
1096     ppm->put_path_length = htonl (put_path_length);
1097     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1098     GNUNET_assert (GNUNET_OK ==
1099                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1100                                                               ppm->bloomfilter,
1101                                                               DHT_BLOOM_SIZE));
1102     ppm->key = *key;
1103     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1104     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1105     memcpy (&pp[put_path_length], data, data_size);
1106     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1107                                       target->tail,
1108                                       pending);
1109     target->pending_count++;
1110     process_peer_queue (target);
1111   }
1112   GNUNET_free (targets);
1113 }
1114
1115
1116 /**
1117  * Perform a GET operation.  Forwards the given request to other
1118  * peers.  Does not lookup the key locally.  May do nothing if this is
1119  * the only peer in the network (or if we are the closest peer in the
1120  * network).
1121  *
1122  * @param type type of the block
1123  * @param options routing options
1124  * @param desired_replication_level desired replication count
1125  * @param hop_count how many hops did this request traverse so far?
1126  * @param key key for the content
1127  * @param xquery extended query
1128  * @param xquery_size number of bytes in xquery
1129  * @param reply_bf bloomfilter to filter duplicates
1130  * @param reply_bf_mutator mutator for reply_bf
1131  * @param peer_bf filter for peers not to select (again)
1132  */
1133 void
1134 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1135                            enum GNUNET_DHT_RouteOption options,
1136                            uint32_t desired_replication_level,
1137                            uint32_t hop_count,
1138                            const GNUNET_HashCode *key,
1139                            const void *xquery,
1140                            size_t xquery_size,
1141                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1142                            uint32_t reply_bf_mutator,
1143                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1144 {
1145   unsigned int target_count;
1146   unsigned int i;
1147   struct PeerInfo **targets;
1148   struct PeerInfo *target;
1149   struct P2PPendingMessage *pending;
1150   size_t msize;
1151   struct PeerGetMessage *pgm;
1152   char *xq;
1153   size_t reply_bf_size;
1154   
1155   target_count = get_target_peers (key, peer_bf, hop_count,
1156                                    desired_replication_level,
1157                                    &targets);
1158   if (0 == target_count)
1159     return;
1160   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1161   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1162   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1163   {
1164     GNUNET_break (0);
1165     return;
1166   }
1167   /* forward request */
1168   for (i=0;i<target_count;i++)
1169   {
1170     target = targets[i];
1171     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1172     pending->importance = 0; /* FIXME */
1173     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1174     pgm = (struct PeerGetMessage*) &pending[1];
1175     pending->msg = &pgm->header;
1176     pgm->header.size = htons (msize);
1177     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1178     pgm->options = htonl (options);
1179     pgm->type = htonl (type);
1180     pgm->hop_count = htonl (hop_count + 1);
1181     pgm->desired_replication_level = htonl (desired_replication_level);
1182     pgm->xquery_size = htonl (xquery_size);
1183     pgm->bf_mutator = reply_bf_mutator; 
1184     GNUNET_assert (GNUNET_OK ==
1185                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1186                                                               pgm->bloomfilter,
1187                                                               DHT_BLOOM_SIZE));
1188     pgm->key = *key;
1189     xq = (char *) &pgm[1];
1190     memcpy (xq, xquery, xquery_size);
1191     GNUNET_assert (GNUNET_OK ==
1192                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1193                                                               &xq[xquery_size],
1194                                                               reply_bf_size));
1195     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1196                                       target->tail,
1197                                       pending);
1198     target->pending_count++;
1199     process_peer_queue (target);
1200   }
1201   GNUNET_free (targets);
1202 }
1203
1204
1205 /**
1206  * Handle a reply (route to origin).  Only forwards the reply back to
1207  * the given peer.  Does not do local caching or forwarding to local
1208  * clients.
1209  *
1210  * @param target neighbour that should receive the block (if still connected)
1211  * @param type type of the block
1212  * @param expiration_time when does the content expire
1213  * @param key key for the content
1214  * @param put_path_length number of entries in put_path
1215  * @param put_path peers the original PUT traversed (if tracked)
1216  * @param get_path_length number of entries in put_path
1217  * @param get_path peers this reply has traversed so far (if tracked)
1218  * @param data payload of the reply
1219  * @param data_size number of bytes in data
1220  */
1221 void
1222 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1223                              enum GNUNET_BLOCK_Type type,
1224                              struct GNUNET_TIME_Absolute expiration_time,
1225                              const GNUNET_HashCode *key,
1226                              unsigned int put_path_length,
1227                              struct GNUNET_PeerIdentity *put_path,
1228                              unsigned int get_path_length,
1229                              struct GNUNET_PeerIdentity *get_path,
1230                              const void *data,
1231                              size_t data_size)
1232 {
1233   struct PeerInfo *pi;
1234   struct P2PPendingMessage *pending;
1235   size_t msize;
1236   struct PeerResultMessage *prm;
1237   struct GNUNET_PeerIdentity *paths;
1238   
1239   msize = data_size + sizeof (struct PeerResultMessage) + 
1240     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1241   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1242        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1243        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1244        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1245   {
1246     GNUNET_break (0);
1247     return;
1248   }
1249   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1250                                           &target->hashPubKey);
1251   if (NULL == pi)
1252   {
1253     /* peer disconnected in the meantime, drop reply */
1254     return;
1255   }
1256   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1257   pending->importance = 0; /* FIXME */
1258   pending->timeout = expiration_time;
1259   prm = (struct PeerResultMessage*) &pending[1];
1260   pending->msg = &prm->header;
1261   prm->header.size = htons (msize);
1262   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1263   prm->type = htonl (type);
1264   prm->put_path_length = htonl (put_path_length);
1265   prm->get_path_length = htonl (get_path_length);
1266   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1267   prm->key = *key;
1268   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1269   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1270   memcpy (&paths[put_path_length],
1271           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1272   memcpy (&paths[put_path_length + get_path_length],
1273           data, data_size);
1274   GNUNET_CONTAINER_DLL_insert (pi->head,
1275                                pi->tail,
1276                                pending);
1277   pi->pending_count++;
1278   process_peer_queue (pi);
1279 }
1280
1281
1282 /**
1283  * Closure for 'add_known_to_bloom'.
1284  */
1285 struct BloomConstructorContext
1286 {
1287   /**
1288    * Bloom filter under construction.
1289    */
1290   struct GNUNET_CONTAINER_BloomFilter *bloom;
1291
1292   /**
1293    * Mutator to use.
1294    */
1295   uint32_t bf_mutator;
1296 };
1297
1298
1299 /**
1300  * Add each of the peers we already know to the bloom filter of
1301  * the request so that we don't get duplicate HELLOs.
1302  *
1303  * @param cls the 'struct BloomConstructorContext'.
1304  * @param key peer identity to add to the bloom filter
1305  * @param value value the peer information (unused)
1306  * @return GNUNET_YES (we should continue to iterate)
1307  */
1308 static int
1309 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1310 {
1311   struct BloomConstructorContext *ctx = cls;
1312   GNUNET_HashCode mh;
1313
1314   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1315   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1316   return GNUNET_YES;
1317 }
1318
1319
1320 /**
1321  * Task to send a find peer message for our own peer identifier
1322  * so that we can find the closest peers in the network to ourselves
1323  * and attempt to connect to them.
1324  *
1325  * @param cls closure for this task
1326  * @param tc the context under which the task is running
1327  */
1328 static void
1329 send_find_peer_message (void *cls,
1330                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1331 {
1332   struct GNUNET_TIME_Relative next_send_time;
1333   struct BloomConstructorContext bcc;
1334
1335   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1336   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1337     return;
1338   if (newly_found_peers > bucket_size) 
1339   {
1340     /* If we are finding many peers already, no need to send out our request right now! */
1341     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1342                                                    &send_find_peer_message, NULL);
1343     newly_found_peers = 0;
1344     return;
1345   }
1346   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1347   bcc.bloom =
1348     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1349   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1350                                          &add_known_to_bloom,
1351                                          &bcc);
1352   // FIXME: pass priority!?
1353   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1354                              GNUNET_DHT_RO_FIND_PEER,
1355                              FIND_PEER_REPLICATION_LEVEL,
1356                              0,
1357                              &my_identity.hashPubKey,
1358                              NULL, 0,
1359                              bcc.bloom, bcc.bf_mutator, NULL);
1360   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1361   /* schedule next round */
1362   newly_found_peers = 0;
1363   next_send_time.rel_value =
1364     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1365     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1366                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1367   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1368                                                  &send_find_peer_message,
1369                                                  NULL);  
1370 }
1371
1372
1373 /**
1374  * To be called on core init/fail.
1375  *
1376  * @param cls service closure
1377  * @param server handle to the server for this service
1378  * @param identity the public identity of this peer
1379  * @param publicKey the public key of this peer
1380  */
1381 static void
1382 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1383            const struct GNUNET_PeerIdentity *identity,
1384            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1385 {
1386   struct GNUNET_TIME_Relative next_send_time;
1387
1388   GNUNET_assert (server != NULL);
1389   my_identity = *identity;
1390   /* FIXME: do upon 1st connect instead! */
1391   next_send_time.rel_value =
1392     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1393     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1394                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1395                                2) -
1396                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1397   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1398                                                  &send_find_peer_message,
1399                                                  NULL);
1400 }
1401
1402
1403 /**
1404  * Core handler for p2p put requests.
1405  *
1406  * @param cls closure
1407  * @param peer sender of the request
1408  * @param message message
1409  * @param peer peer identity this notification is about
1410  * @param atsi performance data
1411  * @return GNUNET_OK to keep the connection open,
1412  *         GNUNET_SYSERR to close it (signal serious error)
1413  */
1414 static int
1415 handle_dht_p2p_put (void *cls,
1416                     const struct GNUNET_PeerIdentity *peer,
1417                     const struct GNUNET_MessageHeader *message,
1418                     const struct GNUNET_TRANSPORT_ATS_Information
1419                     *atsi)
1420 {
1421   const struct PeerPutMessage *put;
1422   const struct GNUNET_PeerIdentity *put_path;
1423   const void *payload;
1424   uint32_t putlen;
1425   uint16_t msize;
1426   size_t payload_size;
1427   enum GNUNET_DHT_RouteOption options;
1428   struct GNUNET_CONTAINER_BloomFilter *bf;
1429   GNUNET_HashCode test_key;
1430   
1431   msize = ntohs (message->size);
1432   if (msize < sizeof (struct PeerPutMessage))
1433   {
1434     GNUNET_break_op (0);
1435     return GNUNET_YES;
1436   }
1437   put = (const struct PeerPutMessage*) message;
1438   putlen = ntohl (put->put_path_length);
1439   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1440        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1441     {
1442       GNUNET_break_op (0);
1443       return GNUNET_YES;
1444     }
1445   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1446   payload = &put_path[putlen];
1447   options = ntohl (put->options);
1448   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1449                           putlen * sizeof (struct GNUNET_PeerIdentity));
1450   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1451                                 ntohl (put->type),
1452                                 payload, payload_size,
1453                                 &test_key))
1454   {
1455   case GNUNET_YES:
1456     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1457     {
1458       GNUNET_break_op (0);
1459       return GNUNET_YES;
1460     }
1461     break;
1462   case GNUNET_NO:
1463     GNUNET_break_op (0);
1464     return GNUNET_YES;
1465   case GNUNET_SYSERR:
1466     /* cannot verify, good luck */
1467     break;
1468   }
1469   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1470                                           DHT_BLOOM_SIZE,
1471                                           DHT_BLOOM_K);
1472   {
1473     struct GNUNET_PeerIdentity pp[putlen+1];
1474   
1475     /* extend 'put path' by sender */
1476     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1477     {
1478       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1479       pp[putlen] = *peer;
1480       putlen++;
1481     }
1482     else
1483       putlen = 0;
1484     
1485     /* give to local clients */
1486     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1487                              &put->key,
1488                              0, NULL,
1489                              putlen,
1490                              pp,
1491                              ntohl (put->type),
1492                              payload_size,
1493                              payload);
1494     /* store locally */
1495     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1496          (am_closest_peer (&put->key,
1497                            bf) ) )
1498       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1499                                 &put->key,
1500                                 putlen, pp,
1501                                 ntohl (put->type),
1502                                 payload_size,
1503                                 payload);
1504     /* route to other peers */
1505     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1506                                options,
1507                                ntohl (put->desired_replication_level),
1508                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1509                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1510                                bf,
1511                                &put->key,
1512                                putlen, pp,
1513                                payload,
1514                                payload_size);
1515   }
1516   GNUNET_CONTAINER_bloomfilter_free (bf);
1517   return GNUNET_YES;
1518 }
1519
1520
1521 /**
1522  * We have received a FIND PEER request.  Send matching
1523  * HELLOs back.
1524  *
1525  * @param sender sender of the FIND PEER request
1526  * @param key peers close to this key are desired
1527  * @param bf peers matching this bf are excluded
1528  * @param bf_mutator mutator for bf
1529  */
1530 static void
1531 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1532                   const GNUNET_HashCode *key,
1533                   struct GNUNET_CONTAINER_BloomFilter *bf,
1534                   uint32_t bf_mutator)
1535 {
1536   int bucket_idx;
1537   struct PeerBucket *bucket;
1538   struct PeerInfo *peer;
1539   unsigned int choice;
1540   GNUNET_HashCode mhash;
1541
1542   /* first, check about our own HELLO */
1543   if (NULL != GDS_my_hello)
1544   {
1545     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1546     if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))
1547       
1548       GDS_NEIGHBOURS_handle_reply (sender,
1549                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1550                                    GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1551                                    key,
1552                                    0, NULL,
1553                                    0, NULL,
1554                                    GDS_my_hello,
1555                                    GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message*) GDS_my_hello));
1556   }
1557
1558   /* then, also consider sending a random HELLO from the closest bucket */
1559   bucket_idx = find_bucket (key);
1560   if (bucket_idx == GNUNET_SYSERR)
1561     return;
1562   bucket = &k_buckets[bucket_idx];
1563   if (bucket->peers_size == 0)
1564     return;
1565   choice = GNUNET_CRYPTO_random_u32 (bucket->peers_size,
1566                                      GNUNET_CRYPTO_QUALITY_WEAK);
1567   peer = bucket->head;
1568   while (choice > 0)
1569   {
1570     GNUNET_assert (peer != NULL);
1571     peer = peer->next;
1572   }
1573   choice = bucket->peers_size;
1574   do
1575     {
1576       peer = peer->next;
1577       if (choice-- == 0)
1578         return; /* no non-masked peer available */
1579       if (peer == NULL)
1580         peer = bucket->head;
1581       GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1582     }
1583   while ( (peer->hello == NULL) ||
1584           (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) );
1585   GDS_NEIGHBOURS_handle_reply (sender,
1586                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1587                                GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1588                                key,
1589                                0, NULL,
1590                                0, NULL,
1591                                peer->hello,
1592                                GNUNET_HELLO_size (peer->hello));    
1593 }
1594
1595
1596 /**
1597  * Core handler for p2p get requests.
1598  *
1599  * @param cls closure
1600  * @param peer sender of the request
1601  * @param message message
1602  * @param peer peer identity this notification is about
1603  * @param atsi performance data
1604  * @return GNUNET_OK to keep the connection open,
1605  *         GNUNET_SYSERR to close it (signal serious error)
1606  */
1607 static int
1608 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1609                     const struct GNUNET_MessageHeader *message,
1610                     const struct GNUNET_TRANSPORT_ATS_Information
1611                     *atsi)
1612 {
1613   struct PeerGetMessage *get;
1614   uint32_t xquery_size;
1615   size_t reply_bf_size;
1616   uint16_t msize;
1617   enum GNUNET_BLOCK_Type type;
1618   enum GNUNET_DHT_RouteOption options;
1619   enum GNUNET_BLOCK_EvaluationResult eval;
1620   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1621   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1622   const char *xquery;
1623                       
1624   /* parse and validate message */
1625   msize = ntohs (message->size);
1626   if (msize < sizeof (struct PeerGetMessage))
1627   {
1628     GNUNET_break_op (0);
1629     return GNUNET_YES;
1630   }
1631   get = (struct PeerGetMessage *) message;
1632   xquery_size = ntohl (get->xquery_size);
1633   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1634   {
1635     GNUNET_break_op (0);
1636     return GNUNET_YES;
1637   }
1638   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1639   type = ntohl (get->type);
1640   options = ntohl (get->options);
1641   xquery = (const char*) &get[1];
1642   reply_bf = NULL;
1643   if (reply_bf_size > 0)
1644     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1645                                                   reply_bf_size,
1646                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1647   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1648                                 type,
1649                                 &get->key,
1650                                 &reply_bf,
1651                                 get->bf_mutator,
1652                                 xquery, xquery_size,
1653                                 NULL, 0);
1654   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1655   {
1656     /* request invalid or block type not supported */
1657     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1658     if (NULL != reply_bf)
1659       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1660     return GNUNET_YES;
1661   }
1662   peer_bf =
1663     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1664                                        DHT_BLOOM_SIZE,
1665                                        DHT_BLOOM_K);
1666
1667   /* remember request for routing replies */
1668   GDS_ROUTING_add (peer,
1669                    type,
1670                    options,
1671                    &get->key,
1672                    xquery, xquery_size,
1673                    reply_bf, get->bf_mutator);
1674
1675   /* local lookup (this may update the reply_bf) */
1676   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1677        (am_closest_peer (&get->key,
1678                          peer_bf) ) )
1679   {
1680     if ( (0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1681     {
1682       handle_find_peer (peer,
1683                         &get->key,
1684                         reply_bf,
1685                         get->bf_mutator);
1686     }
1687     else
1688     {
1689       GDS_DATACACHE_handle_get (&get->key,
1690                                 type,
1691                                 xquery, xquery_size,
1692                                 &reply_bf, 
1693                                 get->bf_mutator);
1694       /* FIXME: should track if the local lookup resulted in a
1695          definitive result and then NOT do P2P forwarding */
1696     }
1697   }
1698   
1699     /* P2P forwarding */
1700   GDS_NEIGHBOURS_handle_get (type,
1701                              options,
1702                              ntohl (get->desired_replication_level),
1703                              ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1704                              &get->key,
1705                              xquery, xquery_size,
1706                              reply_bf,
1707                              get->bf_mutator,
1708                              peer_bf);
1709   /* clean up */
1710   if (NULL != reply_bf)
1711     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1712   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1713   return GNUNET_YES;
1714 }
1715
1716
1717 /**
1718  * Core handler for p2p result messages.
1719  *
1720  * @param cls closure
1721  * @param message message
1722  * @param peer peer identity this notification is about
1723  * @param atsi performance data
1724  * @return GNUNET_YES (do not cut p2p connection)
1725  */
1726 static int
1727 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1728                        const struct GNUNET_MessageHeader *message,
1729                        const struct GNUNET_TRANSPORT_ATS_Information
1730                        *atsi)
1731 {
1732   const struct PeerResultMessage *prm;
1733   const struct GNUNET_PeerIdentity *put_path;
1734   const struct GNUNET_PeerIdentity *get_path;
1735   const void *data;
1736   uint32_t get_path_length;
1737   uint32_t put_path_length;
1738   uint16_t msize;
1739   size_t data_size;
1740   enum GNUNET_BLOCK_Type type;
1741                        
1742   /* parse and validate message */
1743   msize = ntohs (message->size);
1744   if (msize < sizeof (struct PeerResultMessage))
1745   {
1746     GNUNET_break_op (0);
1747     return GNUNET_YES;
1748   }
1749   prm = (struct PeerResultMessage *) message;
1750   put_path_length = ntohl (prm->put_path_length);
1751   get_path_length = ntohl (prm->get_path_length);
1752   if ( (msize < sizeof (struct PeerResultMessage) + 
1753         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1754        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1755        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1756   {
1757     GNUNET_break_op (0);
1758     return GNUNET_YES;
1759   } 
1760   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1761   get_path = &put_path[put_path_length];
1762   type = ntohl (prm->type);
1763   data = (const void*) &get_path[get_path_length];
1764   data_size = msize - (sizeof (struct PeerResultMessage) + 
1765                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1766
1767   /* if we got a HELLO, consider it for our own routing table */
1768   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1769   {
1770     const struct GNUNET_MessageHeader *h;
1771     struct GNUNET_PeerIdentity pid;
1772     int bucket;
1773
1774     /* Should be a HELLO, validate and consider using it! */
1775     if (data_size < sizeof (struct GNUNET_MessageHeader))
1776     {
1777       GNUNET_break_op (0);
1778       return GNUNET_YES;
1779     }
1780     h = data;
1781     if (data_size != ntohs (h->size))
1782     {
1783       GNUNET_break_op (0);
1784       return GNUNET_YES;
1785     }
1786     if (GNUNET_OK !=
1787         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) h,
1788                              &pid))
1789     {
1790       GNUNET_break_op (0);
1791       return GNUNET_YES;
1792     }
1793     bucket = find_bucket (&pid.hashPubKey);
1794     if ( (bucket >= 0) &&
1795          (k_buckets[bucket].peers_size < bucket_size) )
1796     {    
1797       if (NULL != GDS_transport_handle)
1798         GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
1799                                       h, NULL, NULL);
1800       (void) GNUNET_CORE_peer_request_connect (coreAPI,
1801                                                &pid, 
1802                                                NULL, NULL);
1803     }   
1804   }
1805
1806   /* append 'peer' to 'get_path' */
1807   {    
1808     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1809
1810     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1811     xget_path[get_path_length] = *peer;
1812     get_path_length++;
1813
1814     /* forward to local clients */   
1815     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1816                              &prm->key,
1817                              get_path_length,
1818                              xget_path,
1819                              put_path_length,
1820                              put_path,
1821                              type,
1822                              data_size, 
1823                              data);
1824
1825     /* forward to other peers */
1826     GDS_ROUTING_process (type,
1827                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1828                          &prm->key,
1829                          put_path_length,
1830                          put_path,
1831                          get_path_length,
1832                          xget_path,
1833                          data,
1834                          data_size);                     
1835   }
1836   return GNUNET_YES;
1837 }
1838
1839
1840 /**
1841  * Function called for each HELLO known to PEERINFO.
1842  *
1843  * @param cls closure
1844  * @param peer id of the peer, NULL for last call
1845  * @param hello hello message for the peer (can be NULL)
1846  * @param error message
1847  */
1848 static void
1849 process_hello (void *cls,
1850                const struct GNUNET_PeerIdentity *
1851                peer,
1852                const struct GNUNET_HELLO_Message *
1853                hello, const char *err_msg)
1854 {
1855   // FIXME: consider moving HELLO processing to another file!
1856   // FIXME: first, filter HELLOs without addresses (!)
1857   // FIXME: track HELLOs (for responding to FIND PEER requests)
1858   // FIXME: add code to possibly ask core to establish connections
1859   //        (using our own peerinfo is better than using FIND PEER!)
1860 }
1861
1862
1863 /**
1864  * Initialize neighbours subsystem.
1865  *
1866  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1867  */
1868 int
1869 GDS_NEIGHBOURS_init ()
1870 {
1871   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1872     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1873     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1874     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1875     {NULL, 0, 0}
1876   };
1877   unsigned long long temp_config_num;
1878  
1879   if (GNUNET_OK ==
1880       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1881                                              &temp_config_num))
1882     bucket_size = (unsigned int) temp_config_num;  
1883   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1884                                  1,
1885                                  NULL,
1886                                  &core_init,
1887                                  &handle_core_connect,
1888                                  &handle_core_disconnect, 
1889                                  NULL,  /* Do we care about "status" updates? */
1890                                  NULL, GNUNET_NO,
1891                                  NULL, GNUNET_NO,
1892                                  core_handlers);
1893   if (coreAPI == NULL)
1894     return GNUNET_SYSERR;
1895   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1896   pnc = GNUNET_PEERINFO_notify (GDS_cfg,
1897                                 &process_hello,
1898                                 NULL);
1899   return GNUNET_OK;
1900 }
1901
1902
1903 /**
1904  * Shutdown neighbours subsystem.
1905  */
1906 void
1907 GDS_NEIGHBOURS_done ()
1908 {
1909   if (coreAPI == NULL)
1910     return;
1911   if (NULL != pnc)
1912   {
1913     GNUNET_PEERINFO_notify_cancel (pnc);
1914     pnc = NULL;
1915   }
1916   GNUNET_CORE_disconnect (coreAPI);
1917   coreAPI = NULL;    
1918   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1919   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1920   all_known_peers = NULL;
1921   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1922   {
1923     GNUNET_SCHEDULER_cancel (find_peer_task);
1924     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1925   }
1926 }
1927
1928
1929 /* end of gnunet-service-dht_neighbours.c */