getting client API to compile
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_core_service.h"
36 #include "gnunet_datacache_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_hello_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_peerinfo_service.h"
42 #include "dht.h"
43 #include "gnunet-service-dht.h"
44 #include "gnunet-service-dht_clients.h"
45 #include "gnunet-service-dht_datacache.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50
51 /**
52  * How many buckets will we allow total.
53  */
54 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
55
56 /**
57  * What is the maximum number of peers in a given bucket.
58  */
59 #define DEFAULT_BUCKET_SIZE 4
60
61 /**
62  * Size of the bloom filter the DHT uses to filter peers.
63  */
64 #define DHT_BLOOM_SIZE 128
65
66 /**
67  * Desired replication level for FIND PEER requests
68  */
69 #define FIND_PEER_REPLICATION_LEVEL 4
70
71 /**
72  * Maximum allowed replication level for all requests.
73  */
74 #define MAXIMUM_REPLICATION_LEVEL 16
75
76 /**
77  * How often to update our preference levels for peers in our routing tables.
78  */
79 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
80
81 /**
82  * How long at least to wait before sending another find peer request.
83  */
84 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
85
86 /**
87  * How long at most to wait before sending another find peer request.
88  */
89 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
90
91 /**
92  * How long at most to wait for transmission of a GET request to another peer?
93  */
94 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
95
96
97 /**
98  * P2P PUT message
99  */
100 struct PeerPutMessage
101 {
102   /**
103    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
104    */
105   struct GNUNET_MessageHeader header;
106
107   /**
108    * Processing options
109    */
110   uint32_t options GNUNET_PACKED;
111
112   /**
113    * Content type.
114    */
115   uint32_t type GNUNET_PACKED;
116
117   /**
118    * Hop count
119    */
120   uint32_t hop_count GNUNET_PACKED;
121
122   /**
123    * Replication level for this message
124    */
125   uint32_t desired_replication_level GNUNET_PACKED;
126
127   /**
128    * Length of the PUT path that follows (if tracked).
129    */
130   uint32_t put_path_length GNUNET_PACKED;
131
132   /**
133    * When does the content expire?
134    */
135   struct GNUNET_TIME_AbsoluteNBO expiration_time;
136
137   /**
138    * Bloomfilter (for peer identities) to stop circular routes
139    */
140   char bloomfilter[DHT_BLOOM_SIZE];
141
142   /**
143    * The key we are storing under.
144    */
145   GNUNET_HashCode key;
146
147   /* put path (if tracked) */
148
149   /* Payload */
150
151 };
152
153
154 /**
155  * P2P Result message
156  */
157 struct PeerResultMessage
158 {
159   /**
160    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
161    */
162   struct GNUNET_MessageHeader header;
163
164   /**
165    * Content type.
166    */
167   uint32_t type GNUNET_PACKED;
168
169   /**
170    * Length of the PUT path that follows (if tracked).
171    */
172   uint32_t put_path_length GNUNET_PACKED;
173
174   /**
175    * Length of the GET path that follows (if tracked).
176    */
177   uint32_t get_path_length GNUNET_PACKED;
178
179   /**
180    * When does the content expire?
181    */
182   struct GNUNET_TIME_AbsoluteNBO expiration_time;
183
184   /**
185    * The key of the corresponding GET request.
186    */
187   GNUNET_HashCode key;
188
189   /* put path (if tracked) */
190
191   /* get path (if tracked) */
192
193   /* Payload */
194
195 };
196
197
198 /**
199  * P2P GET message
200  */
201 struct PeerGetMessage
202 {
203   /**
204    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
205    */
206   struct GNUNET_MessageHeader header;
207
208   /**
209    * Processing options
210    */
211   uint32_t options GNUNET_PACKED;
212
213   /**
214    * Desired content type.
215    */
216   uint32_t type GNUNET_PACKED;
217
218   /**
219    * Hop count
220    */
221   uint32_t hop_count GNUNET_PACKED;
222
223   /**
224    * Desired replication level for this request.
225    */
226   uint32_t desired_replication_level GNUNET_PACKED;
227
228   /**
229    * Size of the extended query.
230    */
231   uint32_t xquery_size;
232
233   /**
234    * Bloomfilter mutator.
235    */
236   uint32_t bf_mutator;
237
238   /**
239    * Bloomfilter (for peer identities) to stop circular routes
240    */
241   char bloomfilter[DHT_BLOOM_SIZE];
242
243   /**
244    * The key we are looking for.
245    */
246   GNUNET_HashCode key;
247
248   /* xquery */
249
250   /* result bloomfilter */
251
252 };
253
254
255 /**
256  * Linked list of messages to send to a particular other peer.
257  */
258 struct P2PPendingMessage
259 {
260   /**
261    * Pointer to next item in the list
262    */
263   struct P2PPendingMessage *next;
264
265   /**
266    * Pointer to previous item in the list
267    */
268   struct P2PPendingMessage *prev;
269
270   /**
271    * Message importance level.  FIXME: used? useful?
272    */
273   unsigned int importance;
274
275   /**
276    * When does this message time out?
277    */
278   struct GNUNET_TIME_Absolute timeout;
279
280   /**
281    * Actual message to be sent, allocated at the end of the struct:
282    * // msg = (cast) &pm[1]; 
283    * // memcpy (&pm[1], data, len);
284    */
285   const struct GNUNET_MessageHeader *msg;
286
287 };
288
289
290 /**
291  * Entry for a peer in a bucket.
292  */
293 struct PeerInfo
294 {
295   /**
296    * Next peer entry (DLL)
297    */
298   struct PeerInfo *next;
299
300   /**
301    *  Prev peer entry (DLL)
302    */
303   struct PeerInfo *prev;
304
305   /**
306    * Count of outstanding messages for peer.  FIXME: NEEDED?
307    * FIXME: bound queue size!?
308    */
309   unsigned int pending_count;
310
311   /**
312    * Head of pending messages to be sent to this peer.
313    */
314   struct P2PPendingMessage *head;
315
316   /**
317    * Tail of pending messages to be sent to this peer.
318    */
319   struct P2PPendingMessage *tail;
320
321   /**
322    * Core handle for sending messages to this peer.
323    */
324   struct GNUNET_CORE_TransmitHandle *th;
325
326   /**
327    * Preference update context
328    */
329   struct GNUNET_CORE_InformationRequestContext *info_ctx;
330
331   /**
332    * HELLO message for the peer, NULL if not known.  FIXME: track
333    * separately? FIXME: free!?
334    */
335   struct GNUNET_HELLO_Message *hello;
336
337   /**
338    * Task for scheduling preference updates
339    */
340   GNUNET_SCHEDULER_TaskIdentifier preference_task;
341
342   /**
343    * What is the identity of the peer?
344    */
345   struct GNUNET_PeerIdentity id;
346
347 #if 0
348   /**
349    * What is the average latency for replies received?
350    */
351   struct GNUNET_TIME_Relative latency;
352
353   /**
354    * Transport level distance to peer.
355    */
356   unsigned int distance;
357 #endif
358
359 };
360
361
362 /**
363  * Peers are grouped into buckets.
364  */
365 struct PeerBucket
366 {
367   /**
368    * Head of DLL
369    */
370   struct PeerInfo *head;
371
372   /**
373    * Tail of DLL
374    */
375   struct PeerInfo *tail;
376
377   /**
378    * Number of peers in the bucket.
379    */
380   unsigned int peers_size;
381 };
382
383
384 /**
385  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
386  */
387 static unsigned int closest_bucket;
388
389 /**
390  * How many peers have we added since we sent out our last
391  * find peer request?
392  */
393 static unsigned int newly_found_peers;
394
395 /**
396  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
397  */
398 static struct PeerBucket k_buckets[MAX_BUCKETS];
399
400 /**
401  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
402  */
403 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
404
405 /**
406  * Maximum size for each bucket.
407  */
408 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
409
410 /**
411  * Task that sends FIND PEER requests.
412  */
413 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
414
415 /**
416  * Identity of this peer.
417  */ 
418 static struct GNUNET_PeerIdentity my_identity;
419
420 /**
421  * Handle to GNUnet core.
422  */
423 static struct GNUNET_CORE_Handle *coreAPI;
424
425 /**
426  * Handle for peerinfo notifications.
427  */
428 static struct GNUNET_PEERINFO_NotifyContext *pnc;
429
430
431 /**
432  * Find the optimal bucket for this key.
433  *
434  * @param hc the hashcode to compare our identity to
435  * @return the proper bucket index, or GNUNET_SYSERR
436  *         on error (same hashcode)
437  */
438 static int
439 find_bucket (const GNUNET_HashCode * hc)
440 {
441   unsigned int bits;
442
443   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
444   if (bits == MAX_BUCKETS)
445     {
446       /* How can all bits match? Got my own ID? */
447       GNUNET_break (0);
448       return GNUNET_SYSERR; 
449     }
450   return MAX_BUCKETS - bits - 1;
451 }
452
453
454 /**
455  * Let GNUnet core know that we like the given peer.
456  *
457  * @param cls the 'struct PeerInfo' of the peer
458  * @param tc scheduler context.
459  */ 
460 static void
461 update_core_preference (void *cls,
462                         const struct GNUNET_SCHEDULER_TaskContext *tc);
463
464
465 /**
466  * Function called with statistics about the given peer.
467  *
468  * @param cls closure
469  * @param peer identifies the peer
470  * @param bpm_out set to the current bandwidth limit (sending) for this peer
471  * @param amount set to the amount that was actually reserved or unreserved;
472  *               either the full requested amount or zero (no partial reservations)
473  * @param res_delay if the reservation could not be satisfied (amount was 0), how
474  *        long should the client wait until re-trying?
475  * @param preference current traffic preference for the given peer
476  */
477 static void
478 update_core_preference_finish (void *cls,
479                                const struct GNUNET_PeerIdentity *peer,
480                                struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
481                                int32_t amount,
482                                struct GNUNET_TIME_Relative res_delay,
483                                uint64_t preference)
484 {
485   struct PeerInfo *peer_info = cls;
486
487   peer_info->info_ctx = NULL;
488   peer_info->preference_task
489     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
490                                     &update_core_preference, peer_info);
491 }
492
493
494 /**
495  * Let GNUnet core know that we like the given peer.
496  *
497  * @param cls the 'struct PeerInfo' of the peer
498  * @param tc scheduler context.
499  */ 
500 static void
501 update_core_preference (void *cls,
502                         const struct GNUNET_SCHEDULER_TaskContext *tc)
503 {
504   struct PeerInfo *peer = cls;
505   uint64_t preference;
506   unsigned int matching;
507   int bucket;
508
509   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
510   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
511     return;  
512   matching =
513     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
514                                       &peer->id.hashPubKey);
515   if (matching >= 64)
516     matching = 63;
517   bucket = find_bucket(&peer->id.hashPubKey);
518   if (bucket == GNUNET_SYSERR)
519     preference = 0;
520   else
521     preference = (1LL << matching) / k_buckets[bucket].peers_size;
522   if (preference == 0)
523     {
524       peer->preference_task
525         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
526                                         &update_core_preference, peer);
527       return;
528     }
529   peer->info_ctx =
530     GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
531                                         GNUNET_TIME_UNIT_FOREVER_REL,
532                                         GNUNET_BANDWIDTH_VALUE_MAX, 0,
533                                         preference,
534                                         &update_core_preference_finish, peer);
535 }
536
537
538 /**
539  * Closure for 'add_known_to_bloom'.
540  */
541 struct BloomConstructorContext
542 {
543   /**
544    * Bloom filter under construction.
545    */
546   struct GNUNET_CONTAINER_BloomFilter *bloom;
547
548   /**
549    * Mutator to use.
550    */
551   uint32_t bf_mutator;
552 };
553
554
555 /**
556  * Add each of the peers we already know to the bloom filter of
557  * the request so that we don't get duplicate HELLOs.
558  *
559  * @param cls the 'struct BloomConstructorContext'.
560  * @param key peer identity to add to the bloom filter
561  * @param value value the peer information (unused)
562  * @return GNUNET_YES (we should continue to iterate)
563  */
564 static int
565 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
566 {
567   struct BloomConstructorContext *ctx = cls;
568   GNUNET_HashCode mh;
569
570   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
571   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
572   return GNUNET_YES;
573 }
574
575
576 /**
577  * Task to send a find peer message for our own peer identifier
578  * so that we can find the closest peers in the network to ourselves
579  * and attempt to connect to them.
580  *
581  * @param cls closure for this task
582  * @param tc the context under which the task is running
583  */
584 static void
585 send_find_peer_message (void *cls,
586                         const struct GNUNET_SCHEDULER_TaskContext *tc)
587 {
588   struct GNUNET_TIME_Relative next_send_time;
589   struct BloomConstructorContext bcc;
590
591   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
592   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
593     return;
594   if (newly_found_peers > bucket_size) 
595   {
596     /* If we are finding many peers already, no need to send out our request right now! */
597     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
598                                                    &send_find_peer_message, NULL);
599     newly_found_peers = 0;
600     return;
601   }
602   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
603   bcc.bloom =
604     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
605   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
606                                          &add_known_to_bloom,
607                                          &bcc);
608   // FIXME: pass priority!?
609   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
610                              GNUNET_DHT_RO_FIND_PEER,
611                              FIND_PEER_REPLICATION_LEVEL,
612                              0,
613                              &my_identity.hashPubKey,
614                              NULL, 0,
615                              bcc.bloom, bcc.bf_mutator, NULL);
616   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
617   /* schedule next round */
618   newly_found_peers = 0;
619   next_send_time.rel_value =
620     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
621     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
622                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
623   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
624                                                  &send_find_peer_message,
625                                                  NULL);  
626 }
627
628
629 /**
630  * Method called whenever a peer connects.
631  *
632  * @param cls closure
633  * @param peer peer identity this notification is about
634  * @param atsi performance data
635  */
636 static void
637 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
638                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
639 {
640   struct PeerInfo *ret;
641   int peer_bucket;
642
643   /* Check for connect to self message */
644   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
645     return;
646   if (GNUNET_YES ==
647       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
648                                               &peer->hashPubKey))
649   {
650     GNUNET_break (0);
651     return;
652   }
653   peer_bucket = find_bucket (&peer->hashPubKey);
654   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
655   ret = GNUNET_malloc (sizeof (struct PeerInfo));
656 #if 0
657   ret->latency = latency;
658   ret->distance = distance;
659 #endif
660   ret->id = *peer;
661   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
662                                     k_buckets[peer_bucket].tail, ret);
663   k_buckets[peer_bucket].peers_size++;
664   closest_bucket = GNUNET_MAX (closest_bucket,
665                                peer_bucket);
666   if ( (peer_bucket > 0) &&
667        (k_buckets[peer_bucket].peers_size <= bucket_size) )
668     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
669   newly_found_peers++;
670   GNUNET_assert (GNUNET_OK ==
671                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
672                                                     &peer->hashPubKey, ret,
673                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
674   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
675   {
676     /* got a first connection, good time to start with FIND PEER requests... */
677     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
678                                                NULL);    
679   }
680 }
681
682
683 /**
684  * Method called whenever a peer disconnects.
685  *
686  * @param cls closure
687  * @param peer peer identity this notification is about
688  */
689 static void
690 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
691 {
692   struct PeerInfo *to_remove;
693   int current_bucket;
694   struct P2PPendingMessage *pos;
695
696   /* Check for disconnect from self message */
697   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
698     return;
699   to_remove =
700       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
701   if (NULL == to_remove)
702     {
703       GNUNET_break (0);
704       return;
705     }
706   GNUNET_assert (GNUNET_YES ==
707                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
708                                                        &peer->hashPubKey,
709                                                        to_remove));
710   if (NULL != to_remove->info_ctx)
711   {
712     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
713     to_remove->info_ctx = NULL;
714   }
715   current_bucket = find_bucket (&to_remove->id.hashPubKey);
716   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
717                                k_buckets[current_bucket].tail,
718                                to_remove);
719   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
720   k_buckets[current_bucket].peers_size--;
721   while ( (closest_bucket > 0) &&
722           (k_buckets[closest_bucket].peers_size == 0) )
723     closest_bucket--;
724
725   if (to_remove->th != NULL) 
726   {
727     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
728     to_remove->th = NULL;
729   }
730   while (NULL != (pos = to_remove->head))
731   {
732     GNUNET_CONTAINER_DLL_remove (to_remove->head,
733                                  to_remove->tail,
734                                  pos);
735     GNUNET_free (pos);
736   }
737 }
738
739
740 /**
741  * Called when core is ready to send a message we asked for
742  * out to the destination.
743  *
744  * @param cls the 'struct PeerInfo' of the target peer
745  * @param size number of bytes available in buf
746  * @param buf where the callee should write the message
747  * @return number of bytes written to buf
748  */
749 static size_t
750 core_transmit_notify (void *cls, size_t size, void *buf)
751 {
752   struct PeerInfo *peer = cls;
753   char *cbuf = buf;
754   struct P2PPendingMessage *pending;
755   size_t off;
756   size_t msize;
757
758   peer->th = NULL;
759   if (buf == NULL)
760   {
761     /* client disconnected */
762     return 0;
763   }
764   if (peer->head == NULL)
765   {
766     /* no messages pending */
767     return 0;
768   }
769   off = 0;
770   while ( (NULL != (pending = peer->head)) &&
771           (size - off >= (msize = ntohs (pending->msg->size))) )
772   {
773     memcpy (&cbuf[off], pending->msg, msize);
774     off += msize;
775     peer->pending_count--;
776     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
777     GNUNET_free (pending);
778   }
779   if (peer->head != NULL)
780     peer->th 
781       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
782                                            pending->importance,
783                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
784                                            &peer->id, msize,
785                                            &core_transmit_notify, peer);
786   return off;
787 }
788
789
790 /**
791  * Transmit all messages in the peer's message queue.
792  *
793  * @param peer message queue to process
794  */
795 static void
796 process_peer_queue (struct PeerInfo *peer)
797 {
798   struct P2PPendingMessage *pending;
799
800   if (NULL != (pending = peer->head))
801     return;
802   if (NULL != peer->th)
803     return;
804   peer->th 
805     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
806                                          pending->importance,
807                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
808                                          &peer->id,
809                                          ntohs (pending->msg->size),
810                                          &core_transmit_notify, peer);
811 }
812
813
814 /**
815  * To how many peers should we (on average) forward the request to
816  * obtain the desired target_replication count (on average).
817  *
818  * @param hop_count number of hops the message has traversed
819  * @param target_replication the number of total paths desired
820  * @return Some number of peers to forward the message to
821  */
822 static unsigned int
823 get_forward_count (uint32_t hop_count, 
824                    uint32_t target_replication)
825 {
826   uint32_t random_value;
827   uint32_t forward_count;
828   float target_value;
829
830   if (hop_count > GDS_NSE_get () * 4.0)
831   {
832     /* forcefully terminate */
833     return 0;
834   }
835   if (hop_count > GDS_NSE_get () * 2.0)
836   {
837     /* Once we have reached our ideal number of hops, only forward to 1 peer */
838     return 1;
839   }
840   /* bound by system-wide maximum */
841   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
842                                    target_replication);
843   target_value =
844     1 + (target_replication - 1.0) / (GDS_NSE_get () +
845                                       ((float) (target_replication - 1.0) *
846                                        hop_count));
847   /* Set forward count to floor of target_value */
848   forward_count = (uint32_t) target_value;
849   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
850   target_value = target_value - forward_count;
851   random_value =
852     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
853   if (random_value < (target_value * UINT32_MAX))
854     forward_count++;
855   return forward_count;
856 }
857
858
859 /**
860  * Compute the distance between have and target as a 32-bit value.
861  * Differences in the lower bits must count stronger than differences
862  * in the higher bits.
863  *
864  * @return 0 if have==target, otherwise a number
865  *           that is larger as the distance between
866  *           the two hash codes increases
867  */
868 static unsigned int
869 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
870 {
871   unsigned int bucket;
872   unsigned int msb;
873   unsigned int lsb;
874   unsigned int i;
875
876   /* We have to represent the distance between two 2^9 (=512)-bit
877    * numbers as a 2^5 (=32)-bit number with "0" being used for the
878    * two numbers being identical; furthermore, we need to
879    * guarantee that a difference in the number of matching
880    * bits is always represented in the result.
881    *
882    * We use 2^32/2^9 numerical values to distinguish between
883    * hash codes that have the same LSB bit distance and
884    * use the highest 2^9 bits of the result to signify the
885    * number of (mis)matching LSB bits; if we have 0 matching
886    * and hence 512 mismatching LSB bits we return -1 (since
887    * 512 itself cannot be represented with 9 bits) */
888
889   /* first, calculate the most significant 9 bits of our
890    * result, aka the number of LSBs */
891   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
892   /* bucket is now a value between 0 and 512 */
893   if (bucket == 512)
894     return 0;                   /* perfect match */
895   if (bucket == 0)
896     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
897                                  * below, we'd end up with max+1 (overflow)) */
898
899   /* calculate the most significant bits of the final result */
900   msb = (512 - bucket) << (32 - 9);
901   /* calculate the 32-9 least significant bits of the final result by
902    * looking at the differences in the 32-9 bits following the
903    * mismatching bit at 'bucket' */
904   lsb = 0;
905   for (i = bucket + 1;
906        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
907   {
908     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
909         GNUNET_CRYPTO_hash_get_bit (have, i))
910       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
911                                                  * last bit set will be 31 -- if
912                                                  * i does not reach 512 first... */
913   }
914   return msb | lsb;
915 }
916
917
918 /**
919  * Check whether my identity is closer than any known peers.  If a
920  * non-null bloomfilter is given, check if this is the closest peer
921  * that hasn't already been routed to.
922  *
923  * @param key hash code to check closeness to
924  * @param bloom bloomfilter, exclude these entries from the decision
925  * @return GNUNET_YES if node location is closest,
926  *         GNUNET_NO otherwise.
927  */
928 static int
929 am_closest_peer (const GNUNET_HashCode *key,
930                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
931 {
932   int bits;
933   int other_bits;
934   int bucket_num;
935   int count;
936   struct PeerInfo *pos;
937   unsigned int my_distance;
938
939   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
940     return GNUNET_YES;
941   bucket_num = find_bucket (key);
942   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
943   my_distance = get_distance (&my_identity.hashPubKey, key);
944   pos = k_buckets[bucket_num].head;
945   count = 0;
946   while ((pos != NULL) && (count < bucket_size))
947   {
948     if ((bloom != NULL) &&
949         (GNUNET_YES ==
950          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
951     {
952       pos = pos->next;
953       continue;                 /* Skip already checked entries */
954     }
955     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
956     if (other_bits > bits)
957       return GNUNET_NO;
958     if (other_bits == bits)        /* We match the same number of bits */
959       return GNUNET_YES;
960     pos = pos->next;
961   }
962   /* No peers closer, we are the closest! */
963   return GNUNET_YES;
964 }
965
966
967 /**
968  * Select a peer from the routing table that would be a good routing
969  * destination for sending a message for "key".  The resulting peer
970  * must not be in the set of blocked peers.<p>
971  *
972  * Note that we should not ALWAYS select the closest peer to the
973  * target, peers further away from the target should be chosen with
974  * exponentially declining probability.
975  *
976  * FIXME: double-check that this is fine
977  * 
978  *
979  * @param key the key we are selecting a peer to route to
980  * @param bloom a bloomfilter containing entries this request has seen already
981  * @param hops how many hops has this message traversed thus far
982  * @return Peer to route to, or NULL on error
983  */
984 static struct PeerInfo *
985 select_peer (const GNUNET_HashCode *key,
986              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
987              uint32_t hops)
988 {
989   unsigned int bc;
990   unsigned int count;
991   unsigned int selected;
992   struct PeerInfo *pos;
993   unsigned int dist;
994   unsigned int smallest_distance;
995   struct PeerInfo *chosen;
996
997   if (hops >= GDS_NSE_get ())
998   {
999     /* greedy selection (closest peer that is not in bloomfilter) */
1000     smallest_distance = UINT_MAX;
1001     chosen = NULL;
1002     for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1003     {
1004       pos = k_buckets[bc].head;
1005       count = 0;
1006       while ((pos != NULL) && (count < bucket_size))
1007       {
1008         if (GNUNET_NO ==
1009             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1010         {
1011           dist = get_distance (key, &pos->id.hashPubKey);
1012           if (dist < smallest_distance)
1013           {
1014             chosen = pos;
1015             smallest_distance = dist;
1016           }
1017         }
1018         count++;
1019         pos = pos->next;
1020       }
1021     }
1022     return chosen;
1023   }
1024
1025   /* select "random" peer */
1026   /* count number of peers that are available and not filtered */
1027   count = 0;
1028   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1029   {
1030     pos = k_buckets[bc].head;
1031     while ((pos != NULL) && (count < bucket_size))
1032     {
1033       if (GNUNET_YES ==
1034           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1035       {
1036         pos = pos->next;
1037         continue;               /* Ignore bloomfiltered peers */
1038       }
1039       count++;
1040       pos = pos->next;
1041     }
1042   }
1043   if (count == 0)               /* No peers to select from! */
1044   {
1045     return NULL;
1046   }
1047   /* Now actually choose a peer */
1048   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1049   count = 0;
1050   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1051   {
1052     pos = k_buckets[bc].head;
1053     while ((pos != NULL) && (count < bucket_size))
1054     {
1055       if (GNUNET_YES ==
1056           GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1057       {
1058         pos = pos->next;
1059         continue;               /* Ignore bloomfiltered peers */
1060       }
1061       if (0 == selected--)
1062         return pos;
1063       pos = pos->next;
1064     }
1065   }
1066   GNUNET_break (0);
1067   return NULL;
1068 }
1069
1070
1071 /**
1072  * Compute the set of peers that the given request should be
1073  * forwarded to.
1074  *
1075  * @param key routing key
1076  * @param bloom bloom filter excluding peers as targets, all selected
1077  *        peers will be added to the bloom filter
1078  * @param hop_count number of hops the request has traversed so far
1079  * @param target_replication desired number of replicas
1080  * @param targets where to store an array of target peers (to be
1081  *         free'd by the caller)
1082  * @return number of peers returned in 'targets'.
1083  */
1084 static unsigned int
1085 get_target_peers (const GNUNET_HashCode *key,
1086                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1087                   uint32_t hop_count,
1088                   uint32_t target_replication,
1089                   struct PeerInfo ***targets)
1090 {
1091   unsigned int ret;
1092   unsigned int off;
1093   struct PeerInfo **rtargets;
1094   struct PeerInfo *nxt;
1095
1096   ret = get_forward_count (hop_count, target_replication);
1097   if (ret == 0)
1098   {
1099     *targets = NULL;
1100     return 0;
1101   }
1102   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1103   off = 0;
1104   while (ret-- > 0)
1105   {
1106     nxt = select_peer (key, bloom, hop_count);
1107     if (nxt == NULL)
1108       break;
1109     rtargets[off++] = nxt;
1110     GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
1111   }
1112   if (0 == off)
1113   {
1114     GNUNET_free (rtargets);
1115     *targets = NULL;
1116     return 0;
1117   }
1118   *targets = rtargets;
1119   return off;
1120 }
1121
1122
1123 /**
1124  * Perform a PUT operation.   Forwards the given request to other
1125  * peers.   Does not store the data locally.  Does not give the
1126  * data to local clients.  May do nothing if this is the only
1127  * peer in the network (or if we are the closest peer in the
1128  * network).
1129  *
1130  * @param type type of the block
1131  * @param options routing options
1132  * @param desired_replication_level desired replication count
1133  * @param expiration_time when does the content expire
1134  * @param hop_count how many hops has this message traversed so far
1135  * @param bf Bloom filter of peers this PUT has already traversed
1136  * @param key key for the content
1137  * @param put_path_length number of entries in put_path
1138  * @param put_path peers this request has traversed so far (if tracked)
1139  * @param data payload to store
1140  * @param data_size number of bytes in data
1141  */
1142 void
1143 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1144                            enum GNUNET_DHT_RouteOption options,
1145                            uint32_t desired_replication_level,
1146                            struct GNUNET_TIME_Absolute expiration_time,
1147                            uint32_t hop_count,
1148                            struct GNUNET_CONTAINER_BloomFilter *bf,
1149                            const GNUNET_HashCode *key,
1150                            unsigned int put_path_length,
1151                            struct GNUNET_PeerIdentity *put_path,
1152                            const void *data,
1153                            size_t data_size)
1154 {
1155   unsigned int target_count;
1156   unsigned int i;
1157   struct PeerInfo **targets;
1158   struct PeerInfo *target;
1159   struct P2PPendingMessage *pending;
1160   size_t msize;
1161   struct PeerPutMessage *ppm;
1162   struct GNUNET_PeerIdentity *pp;
1163   
1164   target_count = get_target_peers (key, bf, hop_count,
1165                                    desired_replication_level,
1166                                    &targets);
1167   if (0 == target_count)
1168     return;
1169   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1170   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1171   {
1172     put_path_length = 0;
1173     msize = data_size + sizeof (struct PeerPutMessage);
1174   }
1175   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1176   {
1177     GNUNET_break (0);
1178     return;
1179   }
1180   for (i=0;i<target_count;i++)
1181   {
1182     target = targets[i];
1183     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1184     pending->importance = 0; /* FIXME */
1185     pending->timeout = expiration_time;   
1186     ppm = (struct PeerPutMessage*) &pending[1];
1187     pending->msg = &ppm->header;
1188     ppm->header.size = htons (msize);
1189     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1190     ppm->options = htonl (options);
1191     ppm->type = htonl (type);
1192     ppm->hop_count = htonl (hop_count + 1);
1193     ppm->desired_replication_level = htonl (desired_replication_level);
1194     ppm->put_path_length = htonl (put_path_length);
1195     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1196     GNUNET_assert (GNUNET_OK ==
1197                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1198                                                               ppm->bloomfilter,
1199                                                               DHT_BLOOM_SIZE));
1200     ppm->key = *key;
1201     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1202     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1203     memcpy (&pp[put_path_length], data, data_size);
1204     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1205                                       target->tail,
1206                                       pending);
1207     target->pending_count++;
1208     process_peer_queue (target);
1209   }
1210   GNUNET_free (targets);
1211 }
1212
1213
1214 /**
1215  * Perform a GET operation.  Forwards the given request to other
1216  * peers.  Does not lookup the key locally.  May do nothing if this is
1217  * the only peer in the network (or if we are the closest peer in the
1218  * network).
1219  *
1220  * @param type type of the block
1221  * @param options routing options
1222  * @param desired_replication_level desired replication count
1223  * @param hop_count how many hops did this request traverse so far?
1224  * @param key key for the content
1225  * @param xquery extended query
1226  * @param xquery_size number of bytes in xquery
1227  * @param reply_bf bloomfilter to filter duplicates
1228  * @param reply_bf_mutator mutator for reply_bf
1229  * @param peer_bf filter for peers not to select (again)
1230  */
1231 void
1232 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1233                            enum GNUNET_DHT_RouteOption options,
1234                            uint32_t desired_replication_level,
1235                            uint32_t hop_count,
1236                            const GNUNET_HashCode *key,
1237                            const void *xquery,
1238                            size_t xquery_size,
1239                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1240                            uint32_t reply_bf_mutator,
1241                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1242 {
1243   unsigned int target_count;
1244   unsigned int i;
1245   struct PeerInfo **targets;
1246   struct PeerInfo *target;
1247   struct P2PPendingMessage *pending;
1248   size_t msize;
1249   struct PeerGetMessage *pgm;
1250   char *xq;
1251   size_t reply_bf_size;
1252   
1253   target_count = get_target_peers (key, peer_bf, hop_count,
1254                                    desired_replication_level,
1255                                    &targets);
1256   if (0 == target_count)
1257     return;
1258   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1259   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1260   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1261   {
1262     GNUNET_break (0);
1263     return;
1264   }
1265   /* forward request */
1266   for (i=0;i<target_count;i++)
1267   {
1268     target = targets[i];
1269     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1270     pending->importance = 0; /* FIXME */
1271     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1272     pgm = (struct PeerGetMessage*) &pending[1];
1273     pending->msg = &pgm->header;
1274     pgm->header.size = htons (msize);
1275     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1276     pgm->options = htonl (options);
1277     pgm->type = htonl (type);
1278     pgm->hop_count = htonl (hop_count + 1);
1279     pgm->desired_replication_level = htonl (desired_replication_level);
1280     pgm->xquery_size = htonl (xquery_size);
1281     pgm->bf_mutator = reply_bf_mutator; 
1282     GNUNET_assert (GNUNET_OK ==
1283                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1284                                                               pgm->bloomfilter,
1285                                                               DHT_BLOOM_SIZE));
1286     pgm->key = *key;
1287     xq = (char *) &pgm[1];
1288     memcpy (xq, xquery, xquery_size);
1289     GNUNET_assert (GNUNET_OK ==
1290                    GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1291                                                               &xq[xquery_size],
1292                                                               reply_bf_size));
1293     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1294                                       target->tail,
1295                                       pending);
1296     target->pending_count++;
1297     process_peer_queue (target);
1298   }
1299   GNUNET_free (targets);
1300 }
1301
1302
1303 /**
1304  * Handle a reply (route to origin).  Only forwards the reply back to
1305  * the given peer.  Does not do local caching or forwarding to local
1306  * clients.
1307  *
1308  * @param target neighbour that should receive the block (if still connected)
1309  * @param type type of the block
1310  * @param expiration_time when does the content expire
1311  * @param key key for the content
1312  * @param put_path_length number of entries in put_path
1313  * @param put_path peers the original PUT traversed (if tracked)
1314  * @param get_path_length number of entries in put_path
1315  * @param get_path peers this reply has traversed so far (if tracked)
1316  * @param data payload of the reply
1317  * @param data_size number of bytes in data
1318  */
1319 void
1320 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1321                              enum GNUNET_BLOCK_Type type,
1322                              struct GNUNET_TIME_Absolute expiration_time,
1323                              const GNUNET_HashCode *key,
1324                              unsigned int put_path_length,
1325                              const struct GNUNET_PeerIdentity *put_path,
1326                              unsigned int get_path_length,
1327                              const struct GNUNET_PeerIdentity *get_path,
1328                              const void *data,
1329                              size_t data_size)
1330 {
1331   struct PeerInfo *pi;
1332   struct P2PPendingMessage *pending;
1333   size_t msize;
1334   struct PeerResultMessage *prm;
1335   struct GNUNET_PeerIdentity *paths;
1336   
1337   msize = data_size + sizeof (struct PeerResultMessage) + 
1338     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1339   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1340        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1341        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1342        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1343   {
1344     GNUNET_break (0);
1345     return;
1346   }
1347   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1348                                           &target->hashPubKey);
1349   if (NULL == pi)
1350   {
1351     /* peer disconnected in the meantime, drop reply */
1352     return;
1353   }
1354   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1355   pending->importance = 0; /* FIXME */
1356   pending->timeout = expiration_time;
1357   prm = (struct PeerResultMessage*) &pending[1];
1358   pending->msg = &prm->header;
1359   prm->header.size = htons (msize);
1360   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1361   prm->type = htonl (type);
1362   prm->put_path_length = htonl (put_path_length);
1363   prm->get_path_length = htonl (get_path_length);
1364   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1365   prm->key = *key;
1366   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1367   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1368   memcpy (&paths[put_path_length],
1369           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1370   memcpy (&paths[put_path_length + get_path_length],
1371           data, data_size);
1372   GNUNET_CONTAINER_DLL_insert (pi->head,
1373                                pi->tail,
1374                                pending);
1375   pi->pending_count++;
1376   process_peer_queue (pi);
1377 }
1378
1379
1380 /**
1381  * To be called on core init/fail.
1382  *
1383  * @param cls service closure
1384  * @param server handle to the server for this service
1385  * @param identity the public identity of this peer
1386  * @param publicKey the public key of this peer
1387  */
1388 static void
1389 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1390            const struct GNUNET_PeerIdentity *identity,
1391            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1392 {
1393   GNUNET_assert (server != NULL);
1394   my_identity = *identity;
1395 }
1396
1397
1398 /**
1399  * Core handler for p2p put requests.
1400  *
1401  * @param cls closure
1402  * @param peer sender of the request
1403  * @param message message
1404  * @param peer peer identity this notification is about
1405  * @param atsi performance data
1406  * @return GNUNET_OK to keep the connection open,
1407  *         GNUNET_SYSERR to close it (signal serious error)
1408  */
1409 static int
1410 handle_dht_p2p_put (void *cls,
1411                     const struct GNUNET_PeerIdentity *peer,
1412                     const struct GNUNET_MessageHeader *message,
1413                     const struct GNUNET_TRANSPORT_ATS_Information
1414                     *atsi)
1415 {
1416   const struct PeerPutMessage *put;
1417   const struct GNUNET_PeerIdentity *put_path;
1418   const void *payload;
1419   uint32_t putlen;
1420   uint16_t msize;
1421   size_t payload_size;
1422   enum GNUNET_DHT_RouteOption options;
1423   struct GNUNET_CONTAINER_BloomFilter *bf;
1424   GNUNET_HashCode test_key;
1425   
1426   msize = ntohs (message->size);
1427   if (msize < sizeof (struct PeerPutMessage))
1428   {
1429     GNUNET_break_op (0);
1430     return GNUNET_YES;
1431   }
1432   put = (const struct PeerPutMessage*) message;
1433   putlen = ntohl (put->put_path_length);
1434   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1435        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1436     {
1437       GNUNET_break_op (0);
1438       return GNUNET_YES;
1439     }
1440   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1441   payload = &put_path[putlen];
1442   options = ntohl (put->options);
1443   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1444                           putlen * sizeof (struct GNUNET_PeerIdentity));
1445   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1446                                 ntohl (put->type),
1447                                 payload, payload_size,
1448                                 &test_key))
1449   {
1450   case GNUNET_YES:
1451     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1452     {
1453       GNUNET_break_op (0);
1454       return GNUNET_YES;
1455     }
1456     break;
1457   case GNUNET_NO:
1458     GNUNET_break_op (0);
1459     return GNUNET_YES;
1460   case GNUNET_SYSERR:
1461     /* cannot verify, good luck */
1462     break;
1463   }
1464   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1465                                           DHT_BLOOM_SIZE,
1466                                           DHT_BLOOM_K);
1467   {
1468     struct GNUNET_PeerIdentity pp[putlen+1];
1469   
1470     /* extend 'put path' by sender */
1471     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1472     {
1473       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1474       pp[putlen] = *peer;
1475       putlen++;
1476     }
1477     else
1478       putlen = 0;
1479     
1480     /* give to local clients */
1481     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1482                              &put->key,
1483                              0, NULL,
1484                              putlen,
1485                              pp,
1486                              ntohl (put->type),
1487                              payload_size,
1488                              payload);
1489     /* store locally */
1490     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1491          (am_closest_peer (&put->key,
1492                            bf) ) )
1493       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1494                                 &put->key,
1495                                 putlen, pp,
1496                                 ntohl (put->type),
1497                                 payload_size,
1498                                 payload);
1499     /* route to other peers */
1500     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1501                                options,
1502                                ntohl (put->desired_replication_level),
1503                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1504                                ntohl (put->hop_count) + 1 /* who adds +1? */,
1505                                bf,
1506                                &put->key,
1507                                putlen, pp,
1508                                payload,
1509                                payload_size);
1510   }
1511   GNUNET_CONTAINER_bloomfilter_free (bf);
1512   return GNUNET_YES;
1513 }
1514
1515
1516 /**
1517  * We have received a FIND PEER request.  Send matching
1518  * HELLOs back.
1519  *
1520  * @param sender sender of the FIND PEER request
1521  * @param key peers close to this key are desired
1522  * @param bf peers matching this bf are excluded
1523  * @param bf_mutator mutator for bf
1524  */
1525 static void
1526 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1527                   const GNUNET_HashCode *key,
1528                   struct GNUNET_CONTAINER_BloomFilter *bf,
1529                   uint32_t bf_mutator)
1530 {
1531   int bucket_idx;
1532   struct PeerBucket *bucket;
1533   struct PeerInfo *peer;
1534   unsigned int choice;
1535   GNUNET_HashCode mhash;
1536
1537   /* first, check about our own HELLO */
1538   if (NULL != GDS_my_hello)
1539   {
1540     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1541     if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))
1542       
1543       GDS_NEIGHBOURS_handle_reply (sender,
1544                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1545                                    GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1546                                    key,
1547                                    0, NULL,
1548                                    0, NULL,
1549                                    GDS_my_hello,
1550                                    GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message*) GDS_my_hello));
1551   }
1552
1553   /* then, also consider sending a random HELLO from the closest bucket */
1554   bucket_idx = find_bucket (key);
1555   if (bucket_idx == GNUNET_SYSERR)
1556     return;
1557   bucket = &k_buckets[bucket_idx];
1558   if (bucket->peers_size == 0)
1559     return;
1560   choice = GNUNET_CRYPTO_random_u32 (bucket->peers_size,
1561                                      GNUNET_CRYPTO_QUALITY_WEAK);
1562   peer = bucket->head;
1563   while (choice > 0)
1564   {
1565     GNUNET_assert (peer != NULL);
1566     peer = peer->next;
1567   }
1568   choice = bucket->peers_size;
1569   do
1570     {
1571       peer = peer->next;
1572       if (choice-- == 0)
1573         return; /* no non-masked peer available */
1574       if (peer == NULL)
1575         peer = bucket->head;
1576       GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1577     }
1578   while ( (peer->hello == NULL) ||
1579           (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) );
1580   GDS_NEIGHBOURS_handle_reply (sender,
1581                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1582                                GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1583                                key,
1584                                0, NULL,
1585                                0, NULL,
1586                                peer->hello,
1587                                GNUNET_HELLO_size (peer->hello));    
1588 }
1589
1590
1591 /**
1592  * Core handler for p2p get requests.
1593  *
1594  * @param cls closure
1595  * @param peer sender of the request
1596  * @param message message
1597  * @param peer peer identity this notification is about
1598  * @param atsi performance data
1599  * @return GNUNET_OK to keep the connection open,
1600  *         GNUNET_SYSERR to close it (signal serious error)
1601  */
1602 static int
1603 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1604                     const struct GNUNET_MessageHeader *message,
1605                     const struct GNUNET_TRANSPORT_ATS_Information
1606                     *atsi)
1607 {
1608   struct PeerGetMessage *get;
1609   uint32_t xquery_size;
1610   size_t reply_bf_size;
1611   uint16_t msize;
1612   enum GNUNET_BLOCK_Type type;
1613   enum GNUNET_DHT_RouteOption options;
1614   enum GNUNET_BLOCK_EvaluationResult eval;
1615   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1616   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1617   const char *xquery;
1618                       
1619   /* parse and validate message */
1620   msize = ntohs (message->size);
1621   if (msize < sizeof (struct PeerGetMessage))
1622   {
1623     GNUNET_break_op (0);
1624     return GNUNET_YES;
1625   }
1626   get = (struct PeerGetMessage *) message;
1627   xquery_size = ntohl (get->xquery_size);
1628   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1629   {
1630     GNUNET_break_op (0);
1631     return GNUNET_YES;
1632   }
1633   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1634   type = ntohl (get->type);
1635   options = ntohl (get->options);
1636   xquery = (const char*) &get[1];
1637   reply_bf = NULL;
1638   if (reply_bf_size > 0)
1639     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1640                                                   reply_bf_size,
1641                                                   GNUNET_DHT_GET_BLOOMFILTER_K);
1642   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1643                                 type,
1644                                 &get->key,
1645                                 &reply_bf,
1646                                 get->bf_mutator,
1647                                 xquery, xquery_size,
1648                                 NULL, 0);
1649   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1650   {
1651     /* request invalid or block type not supported */
1652     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1653     if (NULL != reply_bf)
1654       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1655     return GNUNET_YES;
1656   }
1657   peer_bf =
1658     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1659                                        DHT_BLOOM_SIZE,
1660                                        DHT_BLOOM_K);
1661
1662   /* remember request for routing replies */
1663   GDS_ROUTING_add (peer,
1664                    type,
1665                    options,
1666                    &get->key,
1667                    xquery, xquery_size,
1668                    reply_bf, get->bf_mutator);
1669
1670   /* local lookup (this may update the reply_bf) */
1671   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1672        (am_closest_peer (&get->key,
1673                          peer_bf) ) )
1674   {
1675     if ( (0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1676     {
1677       handle_find_peer (peer,
1678                         &get->key,
1679                         reply_bf,
1680                         get->bf_mutator);
1681     }
1682     else
1683     {
1684       eval = GDS_DATACACHE_handle_get (&get->key,
1685                                        type,
1686                                        xquery, xquery_size,
1687                                        &reply_bf, 
1688                                        get->bf_mutator);
1689     }
1690   }
1691   
1692   /* P2P forwarding */
1693   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1694     GDS_NEIGHBOURS_handle_get (type,
1695                                options,
1696                                ntohl (get->desired_replication_level),
1697                                ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1698                                &get->key,
1699                                xquery, xquery_size,
1700                                reply_bf,
1701                                get->bf_mutator,
1702                                peer_bf);
1703   /* clean up */
1704   if (NULL != reply_bf)
1705     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1706   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1707   return GNUNET_YES;
1708 }
1709
1710
1711 /**
1712  * Core handler for p2p result messages.
1713  *
1714  * @param cls closure
1715  * @param message message
1716  * @param peer peer identity this notification is about
1717  * @param atsi performance data
1718  * @return GNUNET_YES (do not cut p2p connection)
1719  */
1720 static int
1721 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1722                        const struct GNUNET_MessageHeader *message,
1723                        const struct GNUNET_TRANSPORT_ATS_Information
1724                        *atsi)
1725 {
1726   const struct PeerResultMessage *prm;
1727   const struct GNUNET_PeerIdentity *put_path;
1728   const struct GNUNET_PeerIdentity *get_path;
1729   const void *data;
1730   uint32_t get_path_length;
1731   uint32_t put_path_length;
1732   uint16_t msize;
1733   size_t data_size;
1734   enum GNUNET_BLOCK_Type type;
1735                        
1736   /* parse and validate message */
1737   msize = ntohs (message->size);
1738   if (msize < sizeof (struct PeerResultMessage))
1739   {
1740     GNUNET_break_op (0);
1741     return GNUNET_YES;
1742   }
1743   prm = (struct PeerResultMessage *) message;
1744   put_path_length = ntohl (prm->put_path_length);
1745   get_path_length = ntohl (prm->get_path_length);
1746   if ( (msize < sizeof (struct PeerResultMessage) + 
1747         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1748        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1749        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1750   {
1751     GNUNET_break_op (0);
1752     return GNUNET_YES;
1753   } 
1754   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1755   get_path = &put_path[put_path_length];
1756   type = ntohl (prm->type);
1757   data = (const void*) &get_path[get_path_length];
1758   data_size = msize - (sizeof (struct PeerResultMessage) + 
1759                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1760
1761   /* if we got a HELLO, consider it for our own routing table */
1762   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1763   {
1764     const struct GNUNET_MessageHeader *h;
1765     struct GNUNET_PeerIdentity pid;
1766     int bucket;
1767
1768     /* Should be a HELLO, validate and consider using it! */
1769     if (data_size < sizeof (struct GNUNET_MessageHeader))
1770     {
1771       GNUNET_break_op (0);
1772       return GNUNET_YES;
1773     }
1774     h = data;
1775     if (data_size != ntohs (h->size))
1776     {
1777       GNUNET_break_op (0);
1778       return GNUNET_YES;
1779     }
1780     if (GNUNET_OK !=
1781         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) h,
1782                              &pid))
1783     {
1784       GNUNET_break_op (0);
1785       return GNUNET_YES;
1786     }
1787     bucket = find_bucket (&pid.hashPubKey);
1788     if ( (bucket >= 0) &&
1789          (k_buckets[bucket].peers_size < bucket_size) )
1790     {    
1791       if (NULL != GDS_transport_handle)
1792         GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
1793                                       h, NULL, NULL);
1794       (void) GNUNET_CORE_peer_request_connect (coreAPI,
1795                                                &pid, 
1796                                                NULL, NULL);
1797     }   
1798   }
1799
1800   /* append 'peer' to 'get_path' */
1801   {    
1802     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
1803
1804     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1805     xget_path[get_path_length] = *peer;
1806     get_path_length++;
1807
1808     /* forward to local clients */   
1809     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1810                              &prm->key,
1811                              get_path_length,
1812                              xget_path,
1813                              put_path_length,
1814                              put_path,
1815                              type,
1816                              data_size, 
1817                              data);
1818
1819     /* forward to other peers */
1820     GDS_ROUTING_process (type,
1821                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
1822                          &prm->key,
1823                          put_path_length,
1824                          put_path,
1825                          get_path_length,
1826                          xget_path,
1827                          data,
1828                          data_size);                     
1829   }
1830   return GNUNET_YES;
1831 }
1832
1833
1834 /**
1835  * Function called for each HELLO known to PEERINFO.
1836  *
1837  * @param cls closure
1838  * @param peer id of the peer, NULL for last call
1839  * @param hello hello message for the peer (can be NULL)
1840  * @param error message
1841  */
1842 static void
1843 process_hello (void *cls,
1844                const struct GNUNET_PeerIdentity *
1845                peer,
1846                const struct GNUNET_HELLO_Message *
1847                hello, const char *err_msg)
1848 {
1849   // FIXME: consider moving HELLO processing to another file!
1850   // FIXME: first, filter HELLOs without addresses (!)
1851   // FIXME: track HELLOs (for responding to FIND PEER requests)
1852   // FIXME: add code to possibly ask core to establish connections
1853   //        (using our own peerinfo is better than using FIND PEER!)
1854 }
1855
1856
1857 /**
1858  * Initialize neighbours subsystem.
1859  *
1860  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1861  */
1862 int
1863 GDS_NEIGHBOURS_init ()
1864 {
1865   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1866     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
1867     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
1868     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
1869     {NULL, 0, 0}
1870   };
1871   unsigned long long temp_config_num;
1872  
1873   if (GNUNET_OK ==
1874       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
1875                                              &temp_config_num))
1876     bucket_size = (unsigned int) temp_config_num;  
1877   coreAPI = GNUNET_CORE_connect (GDS_cfg,
1878                                  1,
1879                                  NULL,
1880                                  &core_init,
1881                                  &handle_core_connect,
1882                                  &handle_core_disconnect, 
1883                                  NULL,  /* Do we care about "status" updates? */
1884                                  NULL, GNUNET_NO,
1885                                  NULL, GNUNET_NO,
1886                                  core_handlers);
1887   if (coreAPI == NULL)
1888     return GNUNET_SYSERR;
1889   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
1890   pnc = GNUNET_PEERINFO_notify (GDS_cfg,
1891                                 &process_hello,
1892                                 NULL);
1893   return GNUNET_OK;
1894 }
1895
1896
1897 /**
1898  * Shutdown neighbours subsystem.
1899  */
1900 void
1901 GDS_NEIGHBOURS_done ()
1902 {
1903   if (coreAPI == NULL)
1904     return;
1905   if (NULL != pnc)
1906   {
1907     GNUNET_PEERINFO_notify_cancel (pnc);
1908     pnc = NULL;
1909   }
1910   GNUNET_CORE_disconnect (coreAPI);
1911   coreAPI = NULL;    
1912   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
1913   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
1914   all_known_peers = NULL;
1915   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
1916   {
1917     GNUNET_SCHEDULER_cancel (find_peer_task);
1918     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
1919   }
1920 }
1921
1922
1923 /* end of gnunet-service-dht_neighbours.c */