give priority to results
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_nse_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_datacache_lib.h"
36 #include "gnunet_transport_service.h"
37 #include "gnunet_hello_lib.h"
38 #include "gnunet_dht_service.h"
39 #include "gnunet_statistics_service.h"
40 #include "dht.h"
41 #include "gnunet-service-dht.h"
42 #include "gnunet-service-dht_clients.h"
43 #include "gnunet-service-dht_datacache.h"
44 #include "gnunet-service-dht_nse.h"
45 #include "gnunet-service-dht_routing.h"
46 #include <fenv.h>
47
48 /**
49  * How many buckets will we allow total.
50  */
51 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
52
53 /**
54  * What is the maximum number of peers in a given bucket.
55  */
56 #define DEFAULT_BUCKET_SIZE 4
57
58 /**
59  * Size of the bloom filter the DHT uses to filter peers.
60  */
61 #define DHT_BLOOM_SIZE 128
62
63 /**
64  * Desired replication level for FIND PEER requests
65  */
66 #define FIND_PEER_REPLICATION_LEVEL 4
67
68 /**
69  * Maximum allowed replication level for all requests.
70  */
71 #define MAXIMUM_REPLICATION_LEVEL 16
72
73 /**
74  * How often to update our preference levels for peers in our routing tables.
75  */
76 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
77
78 /**
79  * How long at least to wait before sending another find peer request.
80  */
81 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
82
83 /**
84  * How long at most to wait before sending another find peer request.
85  */
86 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
87
88 /**
89  * How long at most to wait for transmission of a GET request to another peer?
90  */
91 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
92
93
94 /**
95  * P2P PUT message
96  */
97 struct PeerPutMessage
98 {
99   /**
100    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Processing options
106    */
107   uint32_t options GNUNET_PACKED;
108
109   /**
110    * Content type.
111    */
112   uint32_t type GNUNET_PACKED;
113
114   /**
115    * Hop count
116    */
117   uint32_t hop_count GNUNET_PACKED;
118
119   /**
120    * Replication level for this message
121    */
122   uint32_t desired_replication_level GNUNET_PACKED;
123
124   /**
125    * Length of the PUT path that follows (if tracked).
126    */
127   uint32_t put_path_length GNUNET_PACKED;
128
129   /**
130    * When does the content expire?
131    */
132   struct GNUNET_TIME_AbsoluteNBO expiration_time;
133
134   /**
135    * Bloomfilter (for peer identities) to stop circular routes
136    */
137   char bloomfilter[DHT_BLOOM_SIZE];
138
139   /**
140    * The key we are storing under.
141    */
142   GNUNET_HashCode key;
143
144   /* put path (if tracked) */
145
146   /* Payload */
147
148 };
149
150
151 /**
152  * P2P Result message
153  */
154 struct PeerResultMessage
155 {
156   /**
157    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
158    */
159   struct GNUNET_MessageHeader header;
160
161   /**
162    * Content type.
163    */
164   uint32_t type GNUNET_PACKED;
165
166   /**
167    * Length of the PUT path that follows (if tracked).
168    */
169   uint32_t put_path_length GNUNET_PACKED;
170
171   /**
172    * Length of the GET path that follows (if tracked).
173    */
174   uint32_t get_path_length GNUNET_PACKED;
175
176   /**
177    * When does the content expire?
178    */
179   struct GNUNET_TIME_AbsoluteNBO expiration_time;
180
181   /**
182    * The key of the corresponding GET request.
183    */
184   GNUNET_HashCode key;
185
186   /* put path (if tracked) */
187
188   /* get path (if tracked) */
189
190   /* Payload */
191
192 };
193
194
195 /**
196  * P2P GET message
197  */
198 struct PeerGetMessage
199 {
200   /**
201    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
202    */
203   struct GNUNET_MessageHeader header;
204
205   /**
206    * Processing options
207    */
208   uint32_t options GNUNET_PACKED;
209
210   /**
211    * Desired content type.
212    */
213   uint32_t type GNUNET_PACKED;
214
215   /**
216    * Hop count
217    */
218   uint32_t hop_count GNUNET_PACKED;
219
220   /**
221    * Desired replication level for this request.
222    */
223   uint32_t desired_replication_level GNUNET_PACKED;
224
225   /**
226    * Size of the extended query.
227    */
228   uint32_t xquery_size;
229
230   /**
231    * Bloomfilter mutator.
232    */
233   uint32_t bf_mutator;
234
235   /**
236    * Bloomfilter (for peer identities) to stop circular routes
237    */
238   char bloomfilter[DHT_BLOOM_SIZE];
239
240   /**
241    * The key we are looking for.
242    */
243   GNUNET_HashCode key;
244
245   /* xquery */
246
247   /* result bloomfilter */
248
249 };
250
251
252 /**
253  * Linked list of messages to send to a particular other peer.
254  */
255 struct P2PPendingMessage
256 {
257   /**
258    * Pointer to next item in the list
259    */
260   struct P2PPendingMessage *next;
261
262   /**
263    * Pointer to previous item in the list
264    */
265   struct P2PPendingMessage *prev;
266
267   /**
268    * Message importance level.  FIXME: used? useful?
269    */
270   unsigned int importance;
271
272   /**
273    * When does this message time out?
274    */
275   struct GNUNET_TIME_Absolute timeout;
276
277   /**
278    * Actual message to be sent, allocated at the end of the struct:
279    * // msg = (cast) &pm[1]; 
280    * // memcpy (&pm[1], data, len);
281    */
282   const struct GNUNET_MessageHeader *msg;
283
284 };
285
286
287 /**
288  * Entry for a peer in a bucket.
289  */
290 struct PeerInfo
291 {
292   /**
293    * Next peer entry (DLL)
294    */
295   struct PeerInfo *next;
296
297   /**
298    *  Prev peer entry (DLL)
299    */
300   struct PeerInfo *prev;
301
302   /**
303    * Count of outstanding messages for peer.  FIXME: NEEDED?
304    * FIXME: bound queue size!?
305    */
306   unsigned int pending_count;
307
308   /**
309    * Head of pending messages to be sent to this peer.
310    */
311   struct P2PPendingMessage *head;
312
313   /**
314    * Tail of pending messages to be sent to this peer.
315    */
316   struct P2PPendingMessage *tail;
317
318   /**
319    * Core handle for sending messages to this peer.
320    */
321   struct GNUNET_CORE_TransmitHandle *th;
322
323   /**
324    * Preference update context
325    */
326   struct GNUNET_CORE_InformationRequestContext *info_ctx;
327
328   /**
329    * Task for scheduling message sends.
330    */
331   GNUNET_SCHEDULER_TaskIdentifier send_task;
332
333   /**
334    * Task for scheduling preference updates
335    */
336   GNUNET_SCHEDULER_TaskIdentifier preference_task;
337
338   /**
339    * What is the identity of the peer?
340    */
341   struct GNUNET_PeerIdentity id;
342
343 #if 0
344   /**
345    * What is the average latency for replies received?
346    */
347   struct GNUNET_TIME_Relative latency;
348
349   /**
350    * Transport level distance to peer.
351    */
352   unsigned int distance;
353 #endif
354
355 };
356
357
358 /**
359  * Peers are grouped into buckets.
360  */
361 struct PeerBucket
362 {
363   /**
364    * Head of DLL
365    */
366   struct PeerInfo *head;
367
368   /**
369    * Tail of DLL
370    */
371   struct PeerInfo *tail;
372
373   /**
374    * Number of peers in the bucket.
375    */
376   unsigned int peers_size;
377 };
378
379
380 /**
381  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
382  */
383 static unsigned int closest_bucket;
384
385 /**
386  * How many peers have we added since we sent out our last
387  * find peer request?
388  */
389 static unsigned int newly_found_peers;
390
391 /**
392  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
393  */
394 static struct PeerBucket k_buckets[MAX_BUCKETS];
395
396 /**
397  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
398  */
399 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
400
401 /**
402  * Maximum size for each bucket.
403  */
404 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
405
406 /**
407  * Task that sends FIND PEER requests.
408  */
409 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
410
411 /**
412  * Identity of this peer.
413  */ 
414 static struct GNUNET_PeerIdentity my_identity;
415
416 /**
417  * Handle to GNUnet core.
418  */
419 static struct GNUNET_CORE_Handle *coreAPI;
420
421
422 /**
423  * Find the optimal bucket for this key.
424  *
425  * @param hc the hashcode to compare our identity to
426  * @return the proper bucket index, or GNUNET_SYSERR
427  *         on error (same hashcode)
428  */
429 static int
430 find_bucket (const GNUNET_HashCode * hc)
431 {
432   unsigned int bits;
433
434   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
435   if (bits == MAX_BUCKETS)
436     {
437       /* How can all bits match? Got my own ID? */
438       GNUNET_break (0);
439       return GNUNET_SYSERR; 
440     }
441   return MAX_BUCKETS - bits - 1;
442 }
443
444
445 /**
446  * Let GNUnet core know that we like the given peer.
447  *
448  * @param cls the 'struct PeerInfo' of the peer
449  * @param tc scheduler context.
450  */ 
451 static void
452 update_core_preference (void *cls,
453                         const struct GNUNET_SCHEDULER_TaskContext *tc);
454
455
456 /**
457  * Function called with statistics about the given peer.
458  *
459  * @param cls closure
460  * @param peer identifies the peer
461  * @param bpm_out set to the current bandwidth limit (sending) for this peer
462  * @param amount set to the amount that was actually reserved or unreserved;
463  *               either the full requested amount or zero (no partial reservations)
464  * @param res_delay if the reservation could not be satisfied (amount was 0), how
465  *        long should the client wait until re-trying?
466  * @param preference current traffic preference for the given peer
467  */
468 static void
469 update_core_preference_finish (void *cls,
470                                const struct GNUNET_PeerIdentity *peer,
471                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
472                                int32_t amount,
473                                struct GNUNET_TIME_Relative res_delay,
474                                uint64_t preference)
475 {
476   struct PeerInfo *peer_info = cls;
477
478   peer_info->info_ctx = NULL;
479   peer_info->preference_task
480     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
481                                     &update_core_preference, peer_info);
482 }
483
484
485 /**
486  * Let GNUnet core know that we like the given peer.
487  *
488  * @param cls the 'struct PeerInfo' of the peer
489  * @param tc scheduler context.
490  */ 
491 static void
492 update_core_preference (void *cls,
493                         const struct GNUNET_SCHEDULER_TaskContext *tc)
494 {
495   struct PeerInfo *peer = cls;
496   uint64_t preference;
497   unsigned int matching;
498   int bucket;
499
500   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
501   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
502     return;  
503   matching =
504     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
505                                       &peer->id.hashPubKey);
506   if (matching >= 64)
507     matching = 63;
508   bucket = find_bucket(&peer->id.hashPubKey);
509   if (bucket == GNUNET_SYSERR)
510     preference = 0;
511   else
512     preference = (1LL << matching) / k_buckets[bucket].peers_size;
513   if (preference == 0)
514     {
515       peer->preference_task
516         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
517                                         &update_core_preference, peer);
518       return;
519     }
520   peer->info_ctx =
521     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
522                                         GNUNET_TIME_UNIT_FOREVER_REL,
523                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
524                                         preference,
525                                         &update_core_preference_finish, peer);
526 }
527
528
529 /**
530  * Method called whenever a peer connects.
531  *
532  * @param cls closure
533  * @param peer peer identity this notification is about
534  * @param atsi performance data
535  */
536 static void
537 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
538                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
539 {
540   struct PeerInfo *ret;
541   int peer_bucket;
542
543   /* Check for connect to self message */
544   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
545     return;
546   if (GNUNET_YES ==
547       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
548                                               &peer->hashPubKey))
549   {
550     GNUNET_break (0);
551     return;
552   }
553   peer_bucket = find_bucket (&peer->hashPubKey);
554   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
555   ret = GNUNET_malloc (sizeof (struct PeerInfo));
556 #if 0
557   ret->latency = latency;
558   ret->distance = distance;
559 #endif
560   ret->id = *peer;
561   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
562                                     k_buckets[peer_bucket].tail, ret);
563   k_buckets[peer_bucket].peers_size++;
564   closest_bucket = GNUNET_MAX (closest_bucket,
565                                peer_bucket);
566   if ( (peer_bucket > 0) &&
567        (k_buckets[peer_bucket].peers_size <= bucket_size) )
568     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
569   newly_found_peers++;
570   GNUNET_assert (GNUNET_OK ==
571                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
572                                                     &peer->hashPubKey, ret,
573                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
574 }
575
576
577 /**
578  * Method called whenever a peer disconnects.
579  *
580  * @param cls closure
581  * @param peer peer identity this notification is about
582  */
583 static void
584 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
585 {
586   struct PeerInfo *to_remove;
587   int current_bucket;
588   struct P2PPendingMessage *pos;
589
590   /* Check for disconnect from self message */
591   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
592     return;
593   to_remove =
594       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
595   if (NULL == to_remove)
596     {
597       GNUNET_break (0);
598       return;
599     }
600   GNUNET_assert (GNUNET_YES ==
601                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
602                                                        &peer->hashPubKey,
603                                                        to_remove));
604   if (NULL != to_remove->info_ctx)
605   {
606     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
607     to_remove->info_ctx = NULL;
608   }
609   current_bucket = find_bucket (&to_remove->id.hashPubKey);
610   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
611                                k_buckets[current_bucket].tail,
612                                to_remove);
613   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
614   k_buckets[current_bucket].peers_size--;
615   while ( (closest_bucket > 0) &&
616           (k_buckets[closest_bucket].peers_size == 0) )
617     closest_bucket--;
618
619   if (to_remove->th != NULL) 
620   {
621     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
622     to_remove->th = NULL;
623   }
624   while (NULL != (pos = to_remove->head))
625   {
626     GNUNET_CONTAINER_DLL_remove (to_remove->head,
627                                  to_remove->tail,
628                                  pos);
629     GNUNET_free (pos);
630   }
631 }
632
633
634 /**
635  * Called when core is ready to send a message we asked for
636  * out to the destination.
637  *
638  * @param cls the 'struct PeerInfo' of the target peer
639  * @param size number of bytes available in buf
640  * @param buf where the callee should write the message
641  * @return number of bytes written to buf
642  */
643 static size_t
644 core_transmit_notify (void *cls, size_t size, void *buf)
645 {
646   struct PeerInfo *peer = cls;
647   char *cbuf = buf;
648   struct P2PPendingMessage *pending;
649   size_t off;
650   size_t msize;
651
652   peer->th = NULL;
653   if (buf == NULL)
654   {
655     /* client disconnected */
656     return 0;
657   }
658   if (peer->head == NULL)
659   {
660     /* no messages pending */
661     return 0;
662   }
663   off = 0;
664   while ( (NULL != (pending = peer->head)) &&
665           (size - off >= (msize = ntohs (pending->msg->size))) )
666   {
667     memcpy (&cbuf[off], pending->msg, msize);
668     off += msize;
669     peer->pending_count--;
670     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
671     GNUNET_free (pending);
672   }
673   if (peer->head != NULL)
674     peer->th 
675       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
676                                            pending->importance,
677                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
678                                            &peer->id, msize,
679                                            &core_transmit_notify, peer);
680   return off;
681 }
682
683
684 /**
685  * Transmit all messages in the peer's message queue.
686  *
687  * @param peer message queue to process
688  */
689 static void
690 process_peer_queue (struct PeerInfo *peer)
691 {
692   struct P2PPendingMessage *pending;
693
694   if (NULL != (pending = peer->head))
695     return;
696   if (NULL != peer->th)
697     return;
698   peer->th 
699     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
700                                          pending->importance,
701                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
702                                          &peer->id,
703                                          ntohs (pending->msg->size),
704                                          &core_transmit_notify, peer);
705 }
706
707
708 /**
709  * To how many peers should we (on average) forward the request to
710  * obtain the desired target_replication count (on average).
711  *
712  * @param hop_count number of hops the message has traversed
713  * @param target_replication the number of total paths desired
714  * @return Some number of peers to forward the message to
715  */
716 static unsigned int
717 get_forward_count (uint32_t hop_count, 
718                    uint32_t target_replication)
719 {
720   uint32_t random_value;
721   uint32_t forward_count;
722   float target_value;
723
724   if (hop_count > GDS_NSE_get () * 4.0)
725   {
726     /* forcefully terminate */
727     return 0;
728   }
729   if (hop_count > GDS_NSE_get () * 2.0)
730   {
731     /* Once we have reached our ideal number of hops, only forward to 1 peer */
732     return 1;
733   }
734   /* bound by system-wide maximum */
735   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
736                                    target_replication);
737   target_value =
738     1 + (target_replication - 1.0) / (GDS_NSE_get () +
739                                       ((float) (target_replication - 1.0) *
740                                        hop_count));
741   /* Set forward count to floor of target_value */
742   forward_count = (uint32_t) target_value;
743   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
744   target_value = target_value - forward_count;
745   random_value =
746     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
747   if (random_value < (target_value * UINT32_MAX))
748     forward_count++;
749   return forward_count;
750 }
751
752
753 /**
754  * Compute the distance between have and target as a 32-bit value.
755  * Differences in the lower bits must count stronger than differences
756  * in the higher bits.
757  *
758  * @return 0 if have==target, otherwise a number
759  *           that is larger as the distance between
760  *           the two hash codes increases
761  */
762 static unsigned int
763 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
764 {
765   unsigned int bucket;
766   unsigned int msb;
767   unsigned int lsb;
768   unsigned int i;
769
770   /* We have to represent the distance between two 2^9 (=512)-bit
771    * numbers as a 2^5 (=32)-bit number with "0" being used for the
772    * two numbers being identical; furthermore, we need to
773    * guarantee that a difference in the number of matching
774    * bits is always represented in the result.
775    *
776    * We use 2^32/2^9 numerical values to distinguish between
777    * hash codes that have the same LSB bit distance and
778    * use the highest 2^9 bits of the result to signify the
779    * number of (mis)matching LSB bits; if we have 0 matching
780    * and hence 512 mismatching LSB bits we return -1 (since
781    * 512 itself cannot be represented with 9 bits) */
782
783   /* first, calculate the most significant 9 bits of our
784    * result, aka the number of LSBs */
785   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
786   /* bucket is now a value between 0 and 512 */
787   if (bucket == 512)
788     return 0;                   /* perfect match */
789   if (bucket == 0)
790     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
791                                  * below, we'd end up with max+1 (overflow)) */
792
793   /* calculate the most significant bits of the final result */
794   msb = (512 - bucket) << (32 - 9);
795   /* calculate the 32-9 least significant bits of the final result by
796    * looking at the differences in the 32-9 bits following the
797    * mismatching bit at 'bucket' */
798   lsb = 0;
799   for (i = bucket + 1;
800        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
801   {
802     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
803         GNUNET_CRYPTO_hash_get_bit (have, i))
804       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
805                                                  * last bit set will be 31 -- if
806                                                  * i does not reach 512 first... */
807   }
808   return msb | lsb;
809 }
810
811
812 /**
813  * Check whether my identity is closer than any known peers.  If a
814  * non-null bloomfilter is given, check if this is the closest peer
815  * that hasn't already been routed to.
816  *
817  * @param key hash code to check closeness to
818  * @param bloom bloomfilter, exclude these entries from the decision
819  * @return GNUNET_YES if node location is closest,
820  *         GNUNET_NO otherwise.
821  */
822 static int
823 am_closest_peer (const GNUNET_HashCode *key,
824                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
825 {
826   int bits;
827   int other_bits;
828   int bucket_num;
829   int count;
830   struct PeerInfo *pos;
831   unsigned int my_distance;
832
833   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
834     return GNUNET_YES;
835   bucket_num = find_bucket (key);
836   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
837   my_distance = get_distance (&my_identity.hashPubKey, key);
838   pos = k_buckets[bucket_num].head;
839   count = 0;
840   while ((pos != NULL) && (count < bucket_size))
841   {
842     if ((bloom != NULL) &&
843         (GNUNET_YES ==
844          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
845     {
846       pos = pos->next;
847       continue;                 /* Skip already checked entries */
848     }
849     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
850     if (other_bits > bits)
851       return GNUNET_NO;
852     if (other_bits == bits)        /* We match the same number of bits */
853       return GNUNET_YES;
854     pos = pos->next;
855   }
856   /* No peers closer, we are the closest! */
857   return GNUNET_YES;
858 }
859
860
861 /**
862  * Select a peer from the routing table that would be a good routing
863  * destination for sending a message for "key".  The resulting peer
864  * must not be in the set of blocked peers.<p>
865  *
866  * Note that we should not ALWAYS select the closest peer to the
867  * target, peers further away from the target should be chosen with
868  * exponentially declining probability.
869  *
870  * FIXME: double-check that this is fine
871  * 
872  *
873  * @param key the key we are selecting a peer to route to
874  * @param bloom a bloomfilter containing entries this request has seen already
875  * @param hops how many hops has this message traversed thus far
876  * @return Peer to route to, or NULL on error
877  */
878 static struct PeerInfo *
879 select_peer (const GNUNET_HashCode *key,
880              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
881              uint32_t hops)
882 {
883   unsigned int bc;
884   unsigned int count;
885   unsigned int selected;
886   struct PeerInfo *pos;
887   unsigned int dist;
888   unsigned int smallest_distance;
889   struct PeerInfo *chosen;
890
891   if (hops >= GDS_NSE_get ())
892   {
893     /* greedy selection (closest peer that is not in bloomfilter) */
894     smallest_distance = UINT_MAX;
895     chosen = NULL;
896     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
897     {
898       pos = k_buckets[bc].head;
899       count = 0;
900       while ((pos != NULL) && (count < bucket_size))
901       {
902         if (GNUNET_NO ==
903             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
904         {
905           dist = get_distance (key, &pos->id.hashPubKey);
906           if (dist < smallest_distance)
907           {
908             chosen = pos;
909             smallest_distance = dist;
910           }
911         }
912         count++;
913         pos = pos->next;
914       }
915     }
916     return chosen;
917   }
918
919   /* select "random" peer */
920   /* count number of peers that are available and not filtered */
921   count = 0;
922   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
923   {
924     pos = k_buckets[bc].head;
925     while ((pos != NULL) && (count < bucket_size))
926     {
927       if (GNUNET_YES ==
928           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
929       {
930         pos = pos->next;
931         continue;               /* Ignore bloomfiltered peers */
932       }
933       count++;
934       pos = pos->next;
935     }
936   }
937   if (count == 0)               /* No peers to select from! */
938   {
939     return NULL;
940   }
941   /* Now actually choose a peer */
942   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
943   count = 0;
944   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
945   {
946     pos = k_buckets[bc].head;
947     while ((pos != NULL) && (count < bucket_size))
948     {
949       if (GNUNET_YES ==
950           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
951       {
952         pos = pos->next;
953         continue;               /* Ignore bloomfiltered peers */
954       }
955       if (0 == selected--)
956         return pos;
957       pos = pos->next;
958     }
959   }
960   GNUNET_break (0);
961   return NULL;
962 }
963
964
965 /**
966  * Compute the set of peers that the given request should be
967  * forwarded to.
968  *
969  * @param key routing key
970  * @param bloom bloom filter excluding peers as targets, all selected
971  *        peers will be added to the bloom filter
972  * @param hop_count number of hops the request has traversed so far
973  * @param target_replication desired number of replicas
974  * @param targets where to store an array of target peers (to be
975  *         free'd by the caller)
976  * @return number of peers returned in 'targets'.
977  */
978 static unsigned int
979 get_target_peers (const GNUNET_HashCode *key,
980                   struct GNUNET_CONTAINER_BloomFilter *bloom,
981                   uint32_t hop_count,
982                   uint32_t target_replication,
983                   struct PeerInfo ***targets)
984 {
985   unsigned int ret;
986   unsigned int off;
987   struct PeerInfo **rtargets;
988   struct PeerInfo *nxt;
989
990   ret = get_forward_count (hop_count, target_replication);
991   if (ret == 0)
992   {
993     *targets = NULL;
994     return 0;
995   }
996   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
997   off = 0;
998   while (ret-- > 0)
999   {
1000     nxt = select_peer (key, bloom, hop_count);
1001     if (nxt == NULL)
1002       break;
1003     rtargets[off++] = nxt;
1004     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1005   }
1006   if (0 == off)
1007   {
1008     GNUNET_free (rtargets);
1009     *targets = NULL;
1010     return 0;
1011   }
1012   *targets = rtargets;
1013   return off;
1014 }
1015
1016
1017 /**
1018  * Perform a PUT operation.   Forwards the given request to other
1019  * peers.   Does not store the data locally.  Does not give the
1020  * data to local clients.  May do nothing if this is the only
1021  * peer in the network (or if we are the closest peer in the
1022  * network).
1023  *
1024  * @param type type of the block
1025  * @param options routing options
1026  * @param desired_replication_level desired replication count
1027  * @param expiration_time when does the content expire
1028  * @param hop_count how many hops has this message traversed so far
1029  * @param bf Bloom filter of peers this PUT has already traversed
1030  * @param key key for the content
1031  * @param put_path_length number of entries in put_path
1032  * @param put_path peers this request has traversed so far (if tracked)
1033  * @param data payload to store
1034  * @param data_size number of bytes in data
1035  */
1036 void
1037 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1038                            enum GNUNET_DHT_RouteOption options,
1039                            uint32_t desired_replication_level,
1040                            struct GNUNET_TIME_Absolute expiration_time,
1041                            uint32_t hop_count,
1042                            struct GNUNET_CONTAINER_BloomFilter *bf,
1043                            const GNUNET_HashCode *key,
1044                            unsigned int put_path_length,
1045                            struct GNUNET_PeerIdentity *put_path,
1046                            const void *data,
1047                            size_t data_size)
1048 {
1049   unsigned int target_count;
1050   unsigned int i;
1051   struct PeerInfo **targets;
1052   struct PeerInfo *target;
1053   struct P2PPendingMessage *pending;
1054   size_t msize;
1055   struct PeerPutMessage *ppm;
1056   struct GNUNET_PeerIdentity *pp;
1057   
1058   target_count = get_target_peers (key, bf, hop_count,
1059                                    desired_replication_level,
1060                                    &targets);
1061   if (0 == target_count)
1062     return;
1063   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1064   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1065   {
1066     put_path_length = 0;
1067     msize = data_size + sizeof (struct PeerPutMessage);
1068   }
1069   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1070   {
1071     GNUNET_break (0);
1072     return;
1073   }
1074   for (i=0;i<target_count;i++)
1075   {
1076     target = targets[i];
1077     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1078     pending->importance = 0; /* FIXME */
1079     pending->timeout = expiration_time;   
1080     ppm = (struct PeerPutMessage*) &pending[1];
1081     pending->msg = &ppm->header;
1082     ppm->header.size = htons (msize);
1083     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1084     ppm->options = htonl (options);
1085     ppm->type = htonl (type);
1086     ppm->hop_count = htonl (hop_count + 1);
1087     ppm->desired_replication_level = htonl (desired_replication_level);
1088     ppm->put_path_length = htonl (put_path_length);
1089     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1090     GNUNET_assert (GNUNET_OK ==
1091                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1092                                                               ppm->bloomfilter,
1093                                                               DHT_BLOOM_SIZE));
1094     ppm->key = *key;
1095     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1096     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1097     memcpy (&pp[put_path_length], data, data_size);
1098     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1099                                       target->tail,
1100                                       pending);
1101     target->pending_count++;
1102     process_peer_queue (target);
1103   }
1104   GNUNET_free (targets);
1105 }
1106
1107
1108 /**
1109  * Perform a GET operation.  Forwards the given request to other
1110  * peers.  Does not lookup the key locally.  May do nothing if this is
1111  * the only peer in the network (or if we are the closest peer in the
1112  * network).
1113  *
1114  * @param type type of the block
1115  * @param options routing options
1116  * @param desired_replication_level desired replication count
1117  * @param hop_count how many hops did this request traverse so far?
1118  * @param key key for the content
1119  * @param xquery extended query
1120  * @param xquery_size number of bytes in xquery
1121  * @param reply_bf bloomfilter to filter duplicates
1122  * @param reply_bf_mutator mutator for reply_bf
1123  * @param peer_bf filter for peers not to select (again)
1124  */
1125 void
1126 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1127                            enum GNUNET_DHT_RouteOption options,
1128                            uint32_t desired_replication_level,
1129                            uint32_t hop_count,
1130                            const GNUNET_HashCode *key,
1131                            const void *xquery,
1132                            size_t xquery_size,
1133                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1134                            uint32_t reply_bf_mutator,
1135                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1136 {
1137   unsigned int target_count;
1138   unsigned int i;
1139   struct PeerInfo **targets;
1140   struct PeerInfo *target;
1141   struct P2PPendingMessage *pending;
1142   size_t msize;
1143   struct PeerGetMessage *pgm;
1144   char *xq;
1145   size_t reply_bf_size;
1146   
1147   target_count = get_target_peers (key, peer_bf, hop_count,
1148                                    desired_replication_level,
1149                                    &targets);
1150   if (0 == target_count)
1151     return;
1152   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1153   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1154   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1155   {
1156     GNUNET_break (0);
1157     return;
1158   }
1159   /* forward request */
1160   for (i=0;i<target_count;i++)
1161   {
1162     target = targets[i];
1163     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1164     pending->importance = 0; /* FIXME */
1165     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1166     pgm = (struct PeerGetMessage*) &pending[1];
1167     pending->msg = &pgm->header;
1168     pgm->header.size = htons (msize);
1169     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1170     pgm->options = htonl (options);
1171     pgm->type = htonl (type);
1172     pgm->hop_count = htonl (hop_count + 1);
1173     pgm->desired_replication_level = htonl (desired_replication_level);
1174     pgm->xquery_size = htonl (xquery_size);
1175     pgm->bf_mutator = reply_bf_mutator; 
1176     GNUNET_assert (GNUNET_OK ==
1177                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1178                                                               pgm->bloomfilter,
1179                                                               DHT_BLOOM_SIZE));
1180     pgm->key = *key;
1181     xq = (char *) &pgm[1];
1182     memcpy (xq, xquery, xquery_size);
1183     GNUNET_assert (GNUNET_OK ==
1184                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1185                                                               &xq[xquery_size],
1186                                                               reply_bf_size));
1187     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1188                                       target->tail,
1189                                       pending);
1190     target->pending_count++;
1191     process_peer_queue (target);
1192   }
1193   GNUNET_free (targets);
1194 }
1195
1196
1197 /**
1198  * Handle a reply (route to origin).  Only forwards the reply back to
1199  * the given peer.  Does not do local caching or forwarding to local
1200  * clients.
1201  *
1202  * @param target neighbour that should receive the block (if still connected)
1203  * @param type type of the block
1204  * @param expiration_time when does the content expire
1205  * @param key key for the content
1206  * @param put_path_length number of entries in put_path
1207  * @param put_path peers the original PUT traversed (if tracked)
1208  * @param get_path_length number of entries in put_path
1209  * @param get_path peers this reply has traversed so far (if tracked)
1210  * @param data payload of the reply
1211  * @param data_size number of bytes in data
1212  */
1213 void
1214 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1215                              enum GNUNET_BLOCK_Type type,
1216                              struct GNUNET_TIME_Absolute expiration_time,
1217                              const GNUNET_HashCode *key,
1218                              unsigned int put_path_length,
1219                              struct GNUNET_PeerIdentity *put_path,
1220                              unsigned int get_path_length,
1221                              struct GNUNET_PeerIdentity *get_path,
1222                              const void *data,
1223                              size_t data_size)
1224 {
1225   struct PeerInfo *pi;
1226   struct P2PPendingMessage *pending;
1227   size_t msize;
1228   struct PeerResultMessage *prm;
1229   struct GNUNET_PeerIdentity *paths;
1230   
1231   msize = data_size + sizeof (struct PeerResultMessage) + 
1232     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1233   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1234        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1235        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1236        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1237   {
1238     GNUNET_break (0);
1239     return;
1240   }
1241   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1242                                           &target->hashPubKey);
1243   if (NULL == pi)
1244   {
1245     /* peer disconnected in the meantime, drop reply */
1246     return;
1247   }
1248   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1249   pending->importance = 0; /* FIXME */
1250   pending->timeout = expiration_time;
1251   prm = (struct PeerResultMessage*) &pending[1];
1252   pending->msg = &prm->header;
1253   prm->header.size = htons (msize);
1254   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1255   prm->type = htonl (type);
1256   prm->put_path_length = htonl (put_path_length);
1257   prm->get_path_length = htonl (get_path_length);
1258   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1259   prm->key = *key;
1260   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1261   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1262   memcpy (&paths[put_path_length],
1263           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1264   memcpy (&paths[put_path_length + get_path_length],
1265           data, data_size);
1266   GNUNET_CONTAINER_DLL_insert (pi->head,
1267                                pi->tail,
1268                                pending);
1269   pi->pending_count++;
1270   process_peer_queue (pi);
1271 }
1272
1273
1274 /**
1275  * Closure for 'add_known_to_bloom'.
1276  */
1277 struct BloomConstructorContext
1278 {
1279   /**
1280    * Bloom filter under construction.
1281    */
1282   struct GNUNET_CONTAINER_BloomFilter *bloom;
1283
1284   /**
1285    * Mutator to use.
1286    */
1287   uint32_t bf_mutator;
1288 };
1289
1290
1291 /**
1292  * Add each of the peers we already know to the bloom filter of
1293  * the request so that we don't get duplicate HELLOs.
1294  *
1295  * @param cls the 'struct BloomConstructorContext'.
1296  * @param key peer identity to add to the bloom filter
1297  * @param value value the peer information (unused)
1298  * @return GNUNET_YES (we should continue to iterate)
1299  */
1300 static int
1301 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
1302 {
1303   struct BloomConstructorContext *ctx = cls;
1304   GNUNET_HashCode mh;
1305
1306   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1307   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
1308   return GNUNET_YES;
1309 }
1310
1311
1312 /**
1313  * Task to send a find peer message for our own peer identifier
1314  * so that we can find the closest peers in the network to ourselves
1315  * and attempt to connect to them.
1316  *
1317  * @param cls closure for this task
1318  * @param tc the context under which the task is running
1319  */
1320 static void
1321 send_find_peer_message (void *cls,
1322                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1323 {
1324   struct GNUNET_TIME_Relative next_send_time;
1325   struct BloomConstructorContext bcc;
1326
1327   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1328   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1329     return;
1330   if (newly_found_peers > bucket_size) 
1331   {
1332     /* If we are finding many peers already, no need to send out our request right now! */
1333     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1334                                                    &send_find_peer_message, NULL);
1335     newly_found_peers = 0;
1336     return;
1337   }
1338   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
1339   bcc.bloom =
1340     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1341   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
1342                                          &add_known_to_bloom,
1343                                          &bcc);
1344   // FIXME: pass priority!?
1345   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
1346                              GNUNET_DHT_RO_FIND_PEER,
1347                              FIND_PEER_REPLICATION_LEVEL,
1348                              0,
1349                              &my_identity.hashPubKey,
1350                              NULL, 0,
1351                              bcc.bloom, bcc.bf_mutator, NULL);
1352   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
1353   /* schedule next round */
1354   newly_found_peers = 0;
1355   next_send_time.rel_value =
1356     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
1357     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1358                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
1359   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1360                                                  &send_find_peer_message,
1361                                                  NULL);  
1362 }
1363
1364
1365 /**
1366  * To be called on core init/fail.
1367  *
1368  * @param cls service closure
1369  * @param server handle to the server for this service
1370  * @param identity the public identity of this peer
1371  * @param publicKey the public key of this peer
1372  */
1373 static void
1374 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1375            const struct GNUNET_PeerIdentity *identity,
1376            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1377 {
1378   struct GNUNET_TIME_Relative next_send_time;
1379
1380   GNUNET_assert (server != NULL);
1381   my_identity = *identity;
1382   /* FIXME: do upon 1st connect instead! */
1383   next_send_time.rel_value =
1384     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1385     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1386                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1387                                2) -
1388                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1389   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
1390                                                  &send_find_peer_message,
1391                                                  NULL);
1392 }
1393
1394
1395 /**
1396  * Core handler for p2p put requests.
1397  *
1398  * @param cls closure
1399  * @param peer sender of the request
1400  * @param message message
1401  * @param peer peer identity this notification is about
1402  * @param atsi performance data
1403  * @return GNUNET_OK to keep the connection open,
1404  *         GNUNET_SYSERR to close it (signal serious error)
1405  */
1406 static int
1407 handle_dht_p2p_put (void *cls,
1408                     const struct GNUNET_PeerIdentity *peer,
1409                     const struct GNUNET_MessageHeader *message,
1410                     const struct GNUNET_TRANSPORT_ATS_Information
1411                     *atsi)
1412 {
1413   const struct PeerPutMessage *put;
1414   const struct GNUNET_PeerIdentity *put_path;
1415   const void *payload;
1416   uint32_t putlen;
1417   uint16_t msize;
1418   size_t payload_size;
1419   enum GNUNET_DHT_RouteOption options;
1420   struct GNUNET_CONTAINER_BloomFilter *bf;
1421   GNUNET_HashCode test_key;
1422   
1423   msize = ntohs (message->size);
1424   if (msize < sizeof (struct PeerPutMessage))
1425   {
1426     GNUNET_break_op (0);
1427     return GNUNET_YES;
1428   }
1429   put = (const struct PeerPutMessage*) message;
1430   putlen = ntohl (put->put_path_length);
1431   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1432        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1433     {
1434       GNUNET_break_op (0);
1435       return GNUNET_YES;
1436     }
1437   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1438   payload = &put_path[putlen];
1439   options = ntohl (put->options);
1440   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1441                           putlen * sizeof (struct GNUNET_PeerIdentity));
1442   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1443                                 ntohl (put->type),
1444                                 payload, payload_size,
1445                                 &test_key))
1446   {
1447   case GNUNET_YES:
1448     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1449     {
1450       GNUNET_break_op (0);
1451       return GNUNET_YES;
1452     }
1453     break;
1454   case GNUNET_NO:
1455     GNUNET_break_op (0);
1456     return GNUNET_YES;
1457   case GNUNET_SYSERR:
1458     /* cannot verify, good luck */
1459     break;
1460   }
1461   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1462                                           DHT_BLOOM_SIZE,
1463                                           DHT_BLOOM_K);
1464   {
1465     struct GNUNET_PeerIdentity pp[putlen+1];
1466   
1467     /* extend 'put path' by sender */
1468     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1469     {
1470       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1471       pp[putlen] = *peer;
1472       putlen++;
1473     }
1474     else
1475       putlen = 0;
1476     
1477     /* give to local clients */
1478     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1479                              &put->key,
1480                              0, NULL,
1481                              putlen,
1482                              pp,
1483                              ntohl (put->type),
1484                              payload_size,
1485                              payload);
1486     /* store locally */
1487     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1488          (am_closest_peer (&put->key,
1489                            bf) ) )
1490       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1491                                 &put->key,
1492                                 putlen, pp,
1493                                 ntohl (put->type),
1494                                 payload_size,
1495                                 payload);
1496     /* route to other peers */
1497     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1498                                options,
1499                                ntohl (put->desired_replication_level),
1500                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1501                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1502                                bf,
1503                                &put->key,
1504                                putlen, pp,
1505                                payload,
1506                                payload_size);
1507   }
1508   GNUNET_CONTAINER_bloomfilter_free (bf);
1509   return GNUNET_YES;
1510 }
1511
1512
1513 /**
1514  * Core handler for p2p get requests.
1515  *
1516  * @param cls closure
1517  * @param peer sender of the request
1518  * @param message message
1519  * @param peer peer identity this notification is about
1520  * @param atsi performance data
1521  * @return GNUNET_OK to keep the connection open,
1522  *         GNUNET_SYSERR to close it (signal serious error)
1523  */
1524 static int
1525 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1526                     const struct GNUNET_MessageHeader *message,
1527                     const struct GNUNET_TRANSPORT_ATS_Information
1528                     *atsi)
1529 {
1530   struct PeerGetMessage *get;
1531   uint32_t xquery_size;
1532   size_t reply_bf_size;
1533   uint16_t msize;
1534   enum GNUNET_BLOCK_Type type;
1535   enum GNUNET_DHT_RouteOption options;
1536   enum GNUNET_BLOCK_EvaluationResult eval;
1537   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1538   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1539   const char *xquery;
1540                       
1541   /* parse and validate message */
1542   msize = ntohs (message->size);
1543   if (msize < sizeof (struct PeerGetMessage))
1544   {
1545     GNUNET_break_op (0);
1546     return GNUNET_YES;
1547   }
1548   get = (struct PeerGetMessage *) message;
1549   xquery_size = ntohl (get->xquery_size);
1550   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1551   {
1552     GNUNET_break_op (0);
1553     return GNUNET_YES;
1554   }
1555   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1556   type = ntohl (get->type);
1557   options = ntohl (get->options);
1558   xquery = (const char*) &get[1];
1559   reply_bf = NULL;
1560   if (reply_bf_size > 0)
1561     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1562                                                   reply_bf_size,
1563                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1564   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1565                                 type,
1566                                 &get->key,
1567                                 &reply_bf,
1568                                 get->bf_mutator,
1569                                 xquery, xquery_size,
1570                                 NULL, 0);
1571   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1572   {
1573     /* request invalid or block type not supported */
1574     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1575     if (NULL != reply_bf)
1576       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1577     return GNUNET_YES;
1578   }
1579   peer_bf =
1580     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1581                                        DHT_BLOOM_SIZE,
1582                                        DHT_BLOOM_K);
1583
1584   /* remember request for routing replies */
1585   GDS_ROUTING_add (peer,
1586                    type,
1587                    options,
1588                    &get->key,
1589                    xquery, xquery_size,
1590                    reply_bf, get->bf_mutator);
1591   /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1592
1593   /* local lookup (this may update the reply_bf) */
1594   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1595        (am_closest_peer (&get->key,
1596                          peer_bf) ) )
1597     GDS_DATACACHE_handle_get (&get->key,
1598                               type,
1599                               xquery, xquery_size,
1600                               &reply_bf, 
1601                               get->bf_mutator);
1602   /* FIXME: should track if the local lookup resulted in a
1603      definitive result and then NOT do P2P forwarding */
1604   
1605     /* P2P forwarding */
1606   GDS_NEIGHBOURS_handle_get (type,
1607                              options,
1608                              ntohl (get->desired_replication_level),
1609                              ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1610                              &get->key,
1611                              xquery, xquery_size,
1612                              reply_bf,
1613                              get->bf_mutator,
1614                              peer_bf);
1615   /* clean up */
1616   if (NULL != reply_bf)
1617     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1618   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1619   return GNUNET_YES;
1620 }
1621
1622
1623 /**
1624  * Core handler for p2p result messages.
1625  *
1626  * @param cls closure
1627  * @param message message
1628  * @param peer peer identity this notification is about
1629  * @param atsi performance data
1630  * @return GNUNET_YES (do not cut p2p connection)
1631  */
1632 static int
1633 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1634                        const struct GNUNET_MessageHeader *message,
1635                        const struct GNUNET_TRANSPORT_ATS_Information
1636                        *atsi)
1637 {
1638   const struct PeerResultMessage *prm;
1639   const struct GNUNET_PeerIdentity *put_path;
1640   const struct GNUNET_PeerIdentity *get_path;
1641   const void *data;
1642   uint32_t get_path_length;
1643   uint32_t put_path_length;
1644   uint16_t msize;
1645   size_t data_size;
1646   enum GNUNET_BLOCK_Type type;
1647                        
1648   /* parse and validate message */
1649   msize = ntohs (message->size);
1650   if (msize < sizeof (struct PeerResultMessage))
1651   {
1652     GNUNET_break_op (0);
1653     return GNUNET_YES;
1654   }
1655   prm = (struct PeerResultMessage *) message;
1656   put_path_length = ntohl (prm->put_path_length);
1657   get_path_length = ntohl (prm->get_path_length);
1658   if ( (msize < sizeof (struct PeerResultMessage) + 
1659         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1660        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1661        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1662   {
1663     GNUNET_break_op (0);
1664     return GNUNET_YES;
1665   } 
1666   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1667   get_path = &put_path[put_path_length];
1668   type = ntohl (prm->type);
1669   data = (const void*) &get_path[get_path_length];
1670   data_size = msize - (sizeof (struct PeerResultMessage) + 
1671                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1672   /* append 'peer' to 'get_path' */
1673   {    
1674     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1675
1676     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1677     xget_path[get_path_length] = *peer;
1678     get_path_length++;
1679
1680     /* forward to local clients */   
1681     GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1682                              &prm->key,
1683                              get_path_length,
1684                              xget_path,
1685                              put_path_length,
1686                              put_path,
1687                              type,
1688                              data_size, 
1689                              data);
1690
1691     /* forward to other peers */
1692     GDS_ROUTING_process (type,
1693                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1694                          &prm->key,
1695                          put_path_length,
1696                          put_path,
1697                          get_path_length,
1698                          xget_path,
1699                          data,
1700                          data_size);                     
1701   }
1702   return GNUNET_YES;
1703 }
1704
1705
1706 /**
1707  * Initialize neighbours subsystem.
1708  *
1709  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1710  */
1711 int
1712 GDS_NEIGHBOURS_init ()
1713 {
1714   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1715     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1716     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1717     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1718     {NULL, 0, 0}
1719   };
1720   unsigned long long temp_config_num;
1721  
1722   if (GNUNET_OK ==
1723       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1724                                              &temp_config_num))
1725     bucket_size = (unsigned int) temp_config_num;  
1726   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1727                                  1,
1728                                  NULL,
1729                                  &core_init,
1730                                  &handle_core_connect,
1731                                  &handle_core_disconnect, 
1732                                  NULL,  /* Do we care about "status" updates? */
1733                                  NULL, GNUNET_NO,
1734                                  NULL, GNUNET_NO,
1735                                  core_handlers);
1736   if (coreAPI == NULL)
1737     return GNUNET_SYSERR;
1738   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1739 #if 0
1740   struct GNUNET_TIME_Relative next_send_time;
1741
1742   // FIXME!
1743   next_send_time.rel_value =
1744     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
1745     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1746                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
1747                                2) -
1748                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
1749   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
1750                                                  &send_find_peer_message,
1751                                                  &find_peer_context);  
1752 #endif
1753   return GNUNET_OK;
1754 }
1755
1756
1757 /**
1758  * Shutdown neighbours subsystem.
1759  */
1760 void
1761 GDS_NEIGHBOURS_done ()
1762 {
1763   if (coreAPI == NULL)
1764     return;
1765   GNUNET_CORE_disconnect (coreAPI);
1766   coreAPI = NULL;    
1767   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1768   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1769   all_known_peers = NULL;
1770   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1771   {
1772     GNUNET_SCHEDULER_cancel (find_peer_task);
1773     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1774   }
1775 }
1776
1777
1778 /* end of gnunet-service-dht_neighbours.c */
1779
1780
1781 #if 0
1782
1783
1784
1785 #endif