6de93c0eb788f4497ce9640aab59f319aa2216a8
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_ats_service.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50 #include "dht.h"
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * How often to update our preference levels for peers in our routing tables.
74  */
75 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92
93 /**
94  * P2P PUT message
95  */
96 struct PeerPutMessage
97 {
98   /**
99    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
100    */
101   struct GNUNET_MessageHeader header;
102
103   /**
104    * Processing options
105    */
106   uint32_t options GNUNET_PACKED;
107
108   /**
109    * Content type.
110    */
111   uint32_t type GNUNET_PACKED;
112
113   /**
114    * Hop count
115    */
116   uint32_t hop_count GNUNET_PACKED;
117
118   /**
119    * Replication level for this message
120    */
121   uint32_t desired_replication_level GNUNET_PACKED;
122
123   /**
124    * Length of the PUT path that follows (if tracked).
125    */
126   uint32_t put_path_length GNUNET_PACKED;
127
128   /**
129    * When does the content expire?
130    */
131   struct GNUNET_TIME_AbsoluteNBO expiration_time;
132
133   /**
134    * Bloomfilter (for peer identities) to stop circular routes
135    */
136   char bloomfilter[DHT_BLOOM_SIZE];
137
138   /**
139    * The key we are storing under.
140    */
141   GNUNET_HashCode key;
142
143   /* put path (if tracked) */
144
145   /* Payload */
146
147 };
148
149
150 /**
151  * P2P Result message
152  */
153 struct PeerResultMessage
154 {
155   /**
156    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
157    */
158   struct GNUNET_MessageHeader header;
159
160   /**
161    * Content type.
162    */
163   uint32_t type GNUNET_PACKED;
164
165   /**
166    * Length of the PUT path that follows (if tracked).
167    */
168   uint32_t put_path_length GNUNET_PACKED;
169
170   /**
171    * Length of the GET path that follows (if tracked).
172    */
173   uint32_t get_path_length GNUNET_PACKED;
174
175   /**
176    * When does the content expire?
177    */
178   struct GNUNET_TIME_AbsoluteNBO expiration_time;
179
180   /**
181    * The key of the corresponding GET request.
182    */
183   GNUNET_HashCode key;
184
185   /* put path (if tracked) */
186
187   /* get path (if tracked) */
188
189   /* Payload */
190
191 };
192
193
194 /**
195  * P2P GET message
196  */
197 struct PeerGetMessage
198 {
199   /**
200    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
201    */
202   struct GNUNET_MessageHeader header;
203
204   /**
205    * Processing options
206    */
207   uint32_t options GNUNET_PACKED;
208
209   /**
210    * Desired content type.
211    */
212   uint32_t type GNUNET_PACKED;
213
214   /**
215    * Hop count
216    */
217   uint32_t hop_count GNUNET_PACKED;
218
219   /**
220    * Desired replication level for this request.
221    */
222   uint32_t desired_replication_level GNUNET_PACKED;
223
224   /**
225    * Size of the extended query.
226    */
227   uint32_t xquery_size;
228
229   /**
230    * Bloomfilter mutator.
231    */
232   uint32_t bf_mutator;
233
234   /**
235    * Bloomfilter (for peer identities) to stop circular routes
236    */
237   char bloomfilter[DHT_BLOOM_SIZE];
238
239   /**
240    * The key we are looking for.
241    */
242   GNUNET_HashCode key;
243
244   /* xquery */
245
246   /* result bloomfilter */
247
248 };
249
250
251 /**
252  * Linked list of messages to send to a particular other peer.
253  */
254 struct P2PPendingMessage
255 {
256   /**
257    * Pointer to next item in the list
258    */
259   struct P2PPendingMessage *next;
260
261   /**
262    * Pointer to previous item in the list
263    */
264   struct P2PPendingMessage *prev;
265
266   /**
267    * Message importance level.  FIXME: used? useful?
268    */
269   unsigned int importance;
270
271   /**
272    * When does this message time out?
273    */
274   struct GNUNET_TIME_Absolute timeout;
275
276   /**
277    * Actual message to be sent, allocated at the end of the struct:
278    * // msg = (cast) &pm[1]; 
279    * // memcpy (&pm[1], data, len);
280    */
281   const struct GNUNET_MessageHeader *msg;
282
283 };
284
285
286 /**
287  * Entry for a peer in a bucket.
288  */
289 struct PeerInfo
290 {
291   /**
292    * Next peer entry (DLL)
293    */
294   struct PeerInfo *next;
295
296   /**
297    *  Prev peer entry (DLL)
298    */
299   struct PeerInfo *prev;
300
301   /**
302    * Count of outstanding messages for peer.  FIXME: NEEDED?
303    * FIXME: bound queue size!?
304    */
305   unsigned int pending_count;
306
307   /**
308    * Head of pending messages to be sent to this peer.
309    */
310   struct P2PPendingMessage *head;
311
312   /**
313    * Tail of pending messages to be sent to this peer.
314    */
315   struct P2PPendingMessage *tail;
316
317   /**
318    * Core handle for sending messages to this peer.
319    */
320   struct GNUNET_CORE_TransmitHandle *th;
321
322   /**
323    * Preference update context
324    */
325   struct GNUNET_ATS_InformationRequestContext *info_ctx;
326
327   /**
328    * Task for scheduling preference updates
329    */
330   GNUNET_SCHEDULER_TaskIdentifier preference_task;
331
332   /**
333    * What is the identity of the peer?
334    */
335   struct GNUNET_PeerIdentity id;
336
337 #if 0
338   /**
339    * What is the average latency for replies received?
340    */
341   struct GNUNET_TIME_Relative latency;
342
343   /**
344    * Transport level distance to peer.
345    */
346   unsigned int distance;
347 #endif
348
349 };
350
351
352 /**
353  * Peers are grouped into buckets.
354  */
355 struct PeerBucket
356 {
357   /**
358    * Head of DLL
359    */
360   struct PeerInfo *head;
361
362   /**
363    * Tail of DLL
364    */
365   struct PeerInfo *tail;
366
367   /**
368    * Number of peers in the bucket.
369    */
370   unsigned int peers_size;
371 };
372
373
374 /**
375  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
376  */
377 static unsigned int closest_bucket;
378
379 /**
380  * How many peers have we added since we sent out our last
381  * find peer request?
382  */
383 static unsigned int newly_found_peers;
384
385 /**
386  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
387  */
388 static struct PeerBucket k_buckets[MAX_BUCKETS];
389
390 /**
391  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
392  */
393 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
394
395 /**
396  * Maximum size for each bucket.
397  */
398 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
399
400 /**
401  * Task that sends FIND PEER requests.
402  */
403 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
404
405 /**
406  * Identity of this peer.
407  */ 
408 static struct GNUNET_PeerIdentity my_identity;
409
410 /**
411  * Handle to CORE.
412  */
413 static struct GNUNET_CORE_Handle *coreAPI;
414
415 /**
416  * Handle to ATS.
417  */
418 static struct GNUNET_ATS_Handle *atsAPI;
419
420
421
422 /**
423  * Find the optimal bucket for this key.
424  *
425  * @param hc the hashcode to compare our identity to
426  * @return the proper bucket index, or GNUNET_SYSERR
427  *         on error (same hashcode)
428  */
429 static int
430 find_bucket (const GNUNET_HashCode * hc)
431 {
432   unsigned int bits;
433
434   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
435   if (bits == MAX_BUCKETS)
436     {
437       /* How can all bits match? Got my own ID? */
438       GNUNET_break (0);
439       return GNUNET_SYSERR; 
440     }
441   return MAX_BUCKETS - bits - 1;
442 }
443
444
445 /**
446  * Let GNUnet core know that we like the given peer.
447  *
448  * @param cls the 'struct PeerInfo' of the peer
449  * @param tc scheduler context.
450  */ 
451 static void
452 update_core_preference (void *cls,
453                         const struct GNUNET_SCHEDULER_TaskContext *tc);
454
455
456 /**
457  * Function called with statistics about the given peer.
458  *
459  * @param cls closure
460  * @param peer identifies the peer
461  * @param amount set to the amount that was actually reserved or unreserved;
462  *               either the full requested amount or zero (no partial reservations)
463  * @param res_delay if the reservation could not be satisfied (amount was 0), how
464  *        long should the client wait until re-trying?
465  */
466 static void
467 update_core_preference_finish (void *cls,
468                                const struct GNUNET_PeerIdentity *peer,
469                                int32_t amount,
470                                struct GNUNET_TIME_Relative res_delay)
471 {
472   struct PeerInfo *peer_info = cls;
473
474   peer_info->info_ctx = NULL;
475   peer_info->preference_task
476     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
477                                     &update_core_preference, peer_info);
478 }
479
480
481 /**
482  * Let GNUnet core know that we like the given peer.
483  *
484  * @param cls the 'struct PeerInfo' of the peer
485  * @param tc scheduler context.
486  */ 
487 static void
488 update_core_preference (void *cls,
489                         const struct GNUNET_SCHEDULER_TaskContext *tc)
490 {
491   struct PeerInfo *peer = cls;
492   uint64_t preference;
493   unsigned int matching;
494   int bucket;
495
496   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
497   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
498     return;  
499   matching =
500     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
501                                       &peer->id.hashPubKey);
502   if (matching >= 64)
503     matching = 63;
504   bucket = find_bucket (&peer->id.hashPubKey);
505   if (bucket == GNUNET_SYSERR)
506     preference = 0;
507   else
508   {
509     GNUNET_assert (k_buckets[bucket].peers_size != 0);
510     preference = (1LL << matching) / k_buckets[bucket].peers_size;
511   }
512   if (preference == 0)
513     {
514       peer->preference_task
515         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
516                                         &update_core_preference, peer);
517       return;
518     }
519   GNUNET_STATISTICS_update (GDS_stats,
520                             gettext_noop ("# Preference updates given to core"), 1,
521                             GNUNET_NO);
522   peer->info_ctx =
523     GNUNET_ATS_peer_change_preference (atsAPI, &peer->id,
524                                        0,
525                                        preference,
526                                        &update_core_preference_finish, peer);
527 }
528
529
530 /**
531  * Closure for 'add_known_to_bloom'.
532  */
533 struct BloomConstructorContext
534 {
535   /**
536    * Bloom filter under construction.
537    */
538   struct GNUNET_CONTAINER_BloomFilter *bloom;
539
540   /**
541    * Mutator to use.
542    */
543   uint32_t bf_mutator;
544 };
545
546
547 /**
548  * Add each of the peers we already know to the bloom filter of
549  * the request so that we don't get duplicate HELLOs.
550  *
551  * @param cls the 'struct BloomConstructorContext'.
552  * @param key peer identity to add to the bloom filter
553  * @param value value the peer information (unused)
554  * @return GNUNET_YES (we should continue to iterate)
555  */
556 static int
557 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
558 {
559   struct BloomConstructorContext *ctx = cls;
560   GNUNET_HashCode mh;
561
562   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
565               GNUNET_h2s (key),
566               ctx->bf_mutator);
567   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
568   return GNUNET_YES;
569 }
570
571
572 /**
573  * Task to send a find peer message for our own peer identifier
574  * so that we can find the closest peers in the network to ourselves
575  * and attempt to connect to them.
576  *
577  * @param cls closure for this task
578  * @param tc the context under which the task is running
579  */
580 static void
581 send_find_peer_message (void *cls,
582                         const struct GNUNET_SCHEDULER_TaskContext *tc)
583 {
584   struct GNUNET_TIME_Relative next_send_time;
585   struct BloomConstructorContext bcc;
586   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
587
588   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
589   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
590     return;
591   if (newly_found_peers > bucket_size) 
592   {
593     /* If we are finding many peers already, no need to send out our request right now! */
594     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
595                                                    &send_find_peer_message, NULL);
596     newly_found_peers = 0;
597     return;
598   }
599   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
600   bcc.bloom =
601     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K);
602   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
603                                          &add_known_to_bloom,
604                                          &bcc);
605   GNUNET_STATISTICS_update (GDS_stats,
606                             gettext_noop ("# FIND PEER messages initiated"), 1,
607                             GNUNET_NO);
608   peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
609                                                DHT_BLOOM_SIZE,
610                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
611   // FIXME: pass priority!?
612   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
613                              GNUNET_DHT_RO_FIND_PEER,
614                              FIND_PEER_REPLICATION_LEVEL,
615                              0,
616                              &my_identity.hashPubKey,
617                              NULL, 0,
618                              bcc.bloom, bcc.bf_mutator, 
619                              peer_bf);
620   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
621   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
622   /* schedule next round */
623   next_send_time.rel_value =
624     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
625     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
626                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / (newly_found_peers+1));
627   newly_found_peers = 0;
628   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
629                                                  &send_find_peer_message,
630                                                  NULL);  
631 }
632
633
634 /**
635  * Method called whenever a peer connects.
636  *
637  * @param cls closure
638  * @param peer peer identity this notification is about
639  * @param atsi performance data
640  */
641 static void
642 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
643                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
644 {
645   struct PeerInfo *ret;
646   int peer_bucket;
647
648   /* Check for connect to self message */
649   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
650     return;
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652               "Connected %s to %s\n",
653               GNUNET_i2s (&my_identity),
654               GNUNET_h2s (&peer->hashPubKey));
655   if (GNUNET_YES ==
656       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
657                                               &peer->hashPubKey))
658   {
659     GNUNET_break (0);
660     return;
661   }
662   GNUNET_STATISTICS_update (GDS_stats,
663                             gettext_noop ("# Peers connected"), 1,
664                             GNUNET_NO);
665   peer_bucket = find_bucket (&peer->hashPubKey);
666   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
667   ret = GNUNET_malloc (sizeof (struct PeerInfo));
668 #if 0
669   ret->latency = latency;
670   ret->distance = distance;
671 #endif
672   ret->id = *peer;
673   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
674                                     k_buckets[peer_bucket].tail, ret);
675   k_buckets[peer_bucket].peers_size++;
676   closest_bucket = GNUNET_MAX (closest_bucket,
677                                peer_bucket);
678   if ( (peer_bucket > 0) &&
679        (k_buckets[peer_bucket].peers_size <= bucket_size) )
680   {
681     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
682     newly_found_peers++;
683   }
684   GNUNET_assert (GNUNET_OK ==
685                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
686                                                     &peer->hashPubKey, ret,
687                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
688   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
689   {
690     /* got a first connection, good time to start with FIND PEER requests... */
691     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
692                                                NULL);    
693   }
694 }
695
696
697 /**
698  * Method called whenever a peer disconnects.
699  *
700  * @param cls closure
701  * @param peer peer identity this notification is about
702  */
703 static void
704 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
705 {
706   struct PeerInfo *to_remove;
707   int current_bucket;
708   struct P2PPendingMessage *pos;
709   unsigned int discarded;
710
711   /* Check for disconnect from self message */
712   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
713     return;
714   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715               "Disconnected %s from %s\n",
716               GNUNET_i2s (&my_identity),
717               GNUNET_h2s (&peer->hashPubKey));
718   to_remove =
719       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
720   if (NULL == to_remove)
721     {
722       GNUNET_break (0);
723       return;
724     }
725   GNUNET_STATISTICS_update (GDS_stats,
726                             gettext_noop ("# Peers connected"), -1,
727                             GNUNET_NO);
728   GNUNET_assert (GNUNET_YES ==
729                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
730                                                        &peer->hashPubKey,
731                                                        to_remove));
732   if (NULL != to_remove->info_ctx)
733   {
734     GNUNET_ATS_peer_change_preference_cancel (to_remove->info_ctx);
735     to_remove->info_ctx = NULL;
736   }
737   if (GNUNET_SCHEDULER_NO_TASK != to_remove->preference_task)
738   {
739     GNUNET_SCHEDULER_cancel (to_remove->preference_task);
740     to_remove->preference_task = GNUNET_SCHEDULER_NO_TASK;
741   }
742   current_bucket = find_bucket (&to_remove->id.hashPubKey);
743   GNUNET_assert (current_bucket >= 0);
744   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
745                                k_buckets[current_bucket].tail,
746                                to_remove);
747   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
748   k_buckets[current_bucket].peers_size--;
749   while ( (closest_bucket > 0) &&
750           (k_buckets[closest_bucket].peers_size == 0) )
751     closest_bucket--;
752
753   if (to_remove->th != NULL) 
754   {
755     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
756     to_remove->th = NULL;
757   }
758   discarded = 0;
759   while (NULL != (pos = to_remove->head))
760   {
761     GNUNET_CONTAINER_DLL_remove (to_remove->head,
762                                  to_remove->tail,
763                                  pos);
764     discarded++;
765     GNUNET_free (pos);
766   }
767   GNUNET_STATISTICS_update (GDS_stats,
768                             gettext_noop ("# Queued messages discarded (peer disconnected)"), discarded,
769                             GNUNET_NO);
770   GNUNET_free (to_remove);
771 }
772
773
774 /**
775  * Called when core is ready to send a message we asked for
776  * out to the destination.
777  *
778  * @param cls the 'struct PeerInfo' of the target peer
779  * @param size number of bytes available in buf
780  * @param buf where the callee should write the message
781  * @return number of bytes written to buf
782  */
783 static size_t
784 core_transmit_notify (void *cls, size_t size, void *buf)
785 {
786   struct PeerInfo *peer = cls;
787   char *cbuf = buf;
788   struct P2PPendingMessage *pending;
789   size_t off;
790   size_t msize;
791
792   peer->th = NULL;
793   while ( (NULL != (pending = peer->head)) &&
794           (GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value == 0) )
795   {
796     peer->pending_count--;
797     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
798     GNUNET_free (pending);
799   }
800   if (pending == NULL)
801   {
802     /* no messages pending */
803     return 0;
804   }
805   if (buf == NULL)
806   {
807     peer->th 
808       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
809                                            pending->importance,
810                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
811                                            &peer->id, ntohs (pending->msg->size),
812                                            &core_transmit_notify, peer);
813     GNUNET_break (NULL != peer->th);
814     return 0;
815   }
816   off = 0;
817   while ( (NULL != (pending = peer->head)) &&
818           (size - off >= (msize = ntohs (pending->msg->size))) )
819   {
820     GNUNET_STATISTICS_update (GDS_stats,
821                               gettext_noop ("# Bytes transmitted to other peers"), msize,
822                               GNUNET_NO);
823     memcpy (&cbuf[off], pending->msg, msize);
824     off += msize;
825     peer->pending_count--;
826     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
827     GNUNET_free (pending);
828   }
829   if (peer->head != NULL)
830   {
831     peer->th 
832       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
833                                            pending->importance,
834                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
835                                            &peer->id, msize,
836                                            &core_transmit_notify, peer);
837     GNUNET_break (NULL != peer->th);
838   }
839   return off;
840 }
841
842
843 /**
844  * Transmit all messages in the peer's message queue.
845  *
846  * @param peer message queue to process
847  */
848 static void
849 process_peer_queue (struct PeerInfo *peer)
850 {
851   struct P2PPendingMessage *pending;
852
853   if (NULL == (pending = peer->head))
854     return;
855   if (NULL != peer->th)
856     return;
857   GNUNET_STATISTICS_update (GDS_stats,
858                             gettext_noop ("# Bytes of bandwdith requested from core"),
859                             ntohs (pending->msg->size),
860                             GNUNET_NO);
861   peer->th 
862     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
863                                          pending->importance,
864                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
865                                          &peer->id,
866                                          ntohs (pending->msg->size),
867                                          &core_transmit_notify, peer);
868   GNUNET_break (NULL != peer->th);
869 }
870
871
872 /**
873  * To how many peers should we (on average) forward the request to
874  * obtain the desired target_replication count (on average).
875  *
876  * @param hop_count number of hops the message has traversed
877  * @param target_replication the number of total paths desired
878  * @return Some number of peers to forward the message to
879  */
880 static unsigned int
881 get_forward_count (uint32_t hop_count, 
882                    uint32_t target_replication)
883 {
884   uint32_t random_value;
885   uint32_t forward_count;
886   float target_value;
887
888   if (hop_count > GDS_NSE_get () * 6.0)
889   {
890     /* forcefully terminate */
891     return 0;
892   }
893   if (hop_count > GDS_NSE_get () * 4.0)
894   {
895     /* Once we have reached our ideal number of hops, only forward to 1 peer */
896     return 1;
897   }
898   /* bound by system-wide maximum */
899   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
900                                    target_replication);
901   target_value =
902     1 + (target_replication - 1.0) / (GDS_NSE_get () +
903                                       ((float) (target_replication - 1.0) *
904                                        hop_count));
905   /* Set forward count to floor of target_value */
906   forward_count = (uint32_t) target_value;
907   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
908   target_value = target_value - forward_count;
909   random_value =
910     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); 
911   if (random_value < (target_value * UINT32_MAX))
912     forward_count++;
913   return forward_count;
914 }
915
916
917 /**
918  * Compute the distance between have and target as a 32-bit value.
919  * Differences in the lower bits must count stronger than differences
920  * in the higher bits.
921  *
922  * @return 0 if have==target, otherwise a number
923  *           that is larger as the distance between
924  *           the two hash codes increases
925  */
926 static unsigned int
927 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
928 {
929   unsigned int bucket;
930   unsigned int msb;
931   unsigned int lsb;
932   unsigned int i;
933
934   /* We have to represent the distance between two 2^9 (=512)-bit
935    * numbers as a 2^5 (=32)-bit number with "0" being used for the
936    * two numbers being identical; furthermore, we need to
937    * guarantee that a difference in the number of matching
938    * bits is always represented in the result.
939    *
940    * We use 2^32/2^9 numerical values to distinguish between
941    * hash codes that have the same LSB bit distance and
942    * use the highest 2^9 bits of the result to signify the
943    * number of (mis)matching LSB bits; if we have 0 matching
944    * and hence 512 mismatching LSB bits we return -1 (since
945    * 512 itself cannot be represented with 9 bits) */
946
947   /* first, calculate the most significant 9 bits of our
948    * result, aka the number of LSBs */
949   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
950   /* bucket is now a value between 0 and 512 */
951   if (bucket == 512)
952     return 0;                   /* perfect match */
953   if (bucket == 0)
954     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
955                                  * below, we'd end up with max+1 (overflow)) */
956
957   /* calculate the most significant bits of the final result */
958   msb = (512 - bucket) << (32 - 9);
959   /* calculate the 32-9 least significant bits of the final result by
960    * looking at the differences in the 32-9 bits following the
961    * mismatching bit at 'bucket' */
962   lsb = 0;
963   for (i = bucket + 1;
964        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
965   {
966     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
967         GNUNET_CRYPTO_hash_get_bit (have, i))
968       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
969                                                  * last bit set will be 31 -- if
970                                                  * i does not reach 512 first... */
971   }
972   return msb | lsb;
973 }
974
975
976 /**
977  * Check whether my identity is closer than any known peers.  If a
978  * non-null bloomfilter is given, check if this is the closest peer
979  * that hasn't already been routed to.
980  *
981  * @param key hash code to check closeness to
982  * @param bloom bloomfilter, exclude these entries from the decision
983  * @return GNUNET_YES if node location is closest,
984  *         GNUNET_NO otherwise.
985  */
986 static int
987 am_closest_peer (const GNUNET_HashCode *key,
988                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
989 {
990   int bits;
991   int other_bits;
992   int bucket_num;
993   int count;
994   struct PeerInfo *pos;
995
996   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
997     return GNUNET_YES;
998   bucket_num = find_bucket (key);
999   GNUNET_assert (bucket_num >= 0);
1000   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
1001   pos = k_buckets[bucket_num].head;
1002   count = 0;
1003   while ((pos != NULL) && (count < bucket_size))
1004   {
1005     if ((bloom != NULL) &&
1006         (GNUNET_YES ==
1007          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1008     {
1009       pos = pos->next;
1010       continue;                 /* Skip already checked entries */
1011     }
1012     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
1013     if (other_bits > bits)
1014       return GNUNET_NO;
1015     if (other_bits == bits)        /* We match the same number of bits */
1016       return GNUNET_YES;
1017     pos = pos->next;
1018   }
1019   /* No peers closer, we are the closest! */
1020   return GNUNET_YES;
1021 }
1022
1023
1024 /**
1025  * Select a peer from the routing table that would be a good routing
1026  * destination for sending a message for "key".  The resulting peer
1027  * must not be in the set of blocked peers.<p>
1028  *
1029  * Note that we should not ALWAYS select the closest peer to the
1030  * target, peers further away from the target should be chosen with
1031  * exponentially declining probability.
1032  *
1033  * FIXME: double-check that this is fine
1034  * 
1035  *
1036  * @param key the key we are selecting a peer to route to
1037  * @param bloom a bloomfilter containing entries this request has seen already
1038  * @param hops how many hops has this message traversed thus far
1039  * @return Peer to route to, or NULL on error
1040  */
1041 static struct PeerInfo *
1042 select_peer (const GNUNET_HashCode *key,
1043              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
1044              uint32_t hops)
1045 {
1046   unsigned int bc;
1047   unsigned int count;
1048   unsigned int selected;
1049   struct PeerInfo *pos;
1050   unsigned int dist;
1051   unsigned int smallest_distance;
1052   struct PeerInfo *chosen;
1053
1054   if (hops >= GDS_NSE_get ())
1055   {
1056     /* greedy selection (closest peer that is not in bloomfilter) */
1057     smallest_distance = UINT_MAX;
1058     chosen = NULL;
1059     for (bc = 0; bc < closest_bucket; bc++)
1060     {
1061       pos = k_buckets[bc].head;
1062       count = 0;
1063       while ((pos != NULL) && (count < bucket_size))
1064       {
1065         if ( (bloom == NULL) ||
1066              (GNUNET_NO ==
1067               GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) )
1068         {
1069           dist = get_distance (key, &pos->id.hashPubKey);
1070           if (dist < smallest_distance)
1071           {
1072             chosen = pos;
1073             smallest_distance = dist;
1074           }
1075         }
1076         else
1077         {
1078           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1080                       GNUNET_i2s (&pos->id),
1081                       GNUNET_h2s (key));
1082           GNUNET_STATISTICS_update (GDS_stats,
1083                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1,
1084                                     GNUNET_NO);
1085         }
1086         count++;
1087         pos = pos->next;
1088       }
1089     }
1090     if (NULL == chosen)
1091       GNUNET_STATISTICS_update (GDS_stats,
1092                                 gettext_noop ("# Peer selection failed"), 1,
1093                                 GNUNET_NO);
1094     return chosen;
1095   }
1096
1097   /* select "random" peer */
1098   /* count number of peers that are available and not filtered */
1099   count = 0;
1100   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1101   {
1102     pos = k_buckets[bc].head;
1103     while ((pos != NULL) && (count < bucket_size))
1104     {
1105       if ( (bloom != NULL) &&
1106            (GNUNET_YES ==
1107             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) )
1108       {
1109         GNUNET_STATISTICS_update (GDS_stats,
1110                                   gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1,
1111                                   GNUNET_NO);
1112         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1114                     GNUNET_i2s (&pos->id),
1115                     GNUNET_h2s (key));
1116         pos = pos->next;
1117         continue;               /* Ignore bloomfiltered peers */
1118       }
1119       count++;
1120       pos = pos->next;
1121     }
1122   }
1123   if (count == 0)               /* No peers to select from! */
1124   {
1125     GNUNET_STATISTICS_update (GDS_stats,
1126                               gettext_noop ("# Peer selection failed"), 1,
1127                               GNUNET_NO);
1128     return NULL;
1129   }
1130   /* Now actually choose a peer */
1131   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1132   count = 0;
1133   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1134   {
1135     pos = k_buckets[bc].head;
1136     while ((pos != NULL) && (count < bucket_size))
1137     {
1138       if ( (bloom != NULL) &&
1139            (GNUNET_YES ==
1140             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) )
1141       {
1142         pos = pos->next;
1143         continue;               /* Ignore bloomfiltered peers */
1144       }
1145       if (0 == selected--)
1146         return pos;
1147       pos = pos->next;
1148     }
1149   }
1150   GNUNET_break (0);
1151   return NULL;
1152 }
1153
1154
1155 /**
1156  * Compute the set of peers that the given request should be
1157  * forwarded to.
1158  *
1159  * @param key routing key
1160  * @param bloom bloom filter excluding peers as targets, all selected
1161  *        peers will be added to the bloom filter
1162  * @param hop_count number of hops the request has traversed so far
1163  * @param target_replication desired number of replicas
1164  * @param targets where to store an array of target peers (to be
1165  *         free'd by the caller)
1166  * @return number of peers returned in 'targets'.
1167  */
1168 static unsigned int
1169 get_target_peers (const GNUNET_HashCode *key,
1170                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1171                   uint32_t hop_count,
1172                   uint32_t target_replication,
1173                   struct PeerInfo ***targets)
1174 {
1175   unsigned int ret;
1176   unsigned int off;
1177   struct PeerInfo **rtargets;
1178   struct PeerInfo *nxt;
1179
1180   GNUNET_assert (NULL != bloom);
1181   ret = get_forward_count (hop_count, target_replication);
1182   if (ret == 0)
1183   {
1184     *targets = NULL;
1185     return 0;
1186   }
1187   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1188   for (off = 0; off < ret; off++)
1189   {
1190     nxt = select_peer (key, bloom, hop_count);
1191     if (nxt == NULL)
1192       break;      
1193     rtargets[off] = nxt;
1194     GNUNET_break (GNUNET_NO ==
1195                   GNUNET_CONTAINER_bloomfilter_test (bloom, &nxt->id.hashPubKey));
1196     GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
1197   }
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1200               off,
1201               GNUNET_CONTAINER_multihashmap_size (all_known_peers),
1202               (unsigned int) hop_count,
1203               GNUNET_h2s (key),
1204               ret);
1205   if (0 == off)
1206   {
1207     GNUNET_free (rtargets);
1208     *targets = NULL;
1209     return 0;
1210   }
1211   *targets = rtargets;
1212   return off;
1213 }
1214
1215
1216 /**
1217  * Perform a PUT operation.   Forwards the given request to other
1218  * peers.   Does not store the data locally.  Does not give the
1219  * data to local clients.  May do nothing if this is the only
1220  * peer in the network (or if we are the closest peer in the
1221  * network).
1222  *
1223  * @param type type of the block
1224  * @param options routing options
1225  * @param desired_replication_level desired replication count
1226  * @param expiration_time when does the content expire
1227  * @param hop_count how many hops has this message traversed so far
1228  * @param bf Bloom filter of peers this PUT has already traversed
1229  * @param key key for the content
1230  * @param put_path_length number of entries in put_path
1231  * @param put_path peers this request has traversed so far (if tracked)
1232  * @param data payload to store
1233  * @param data_size number of bytes in data
1234  */
1235 void
1236 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1237                            enum GNUNET_DHT_RouteOption options,
1238                            uint32_t desired_replication_level,
1239                            struct GNUNET_TIME_Absolute expiration_time,
1240                            uint32_t hop_count,
1241                            struct GNUNET_CONTAINER_BloomFilter *bf,
1242                            const GNUNET_HashCode *key,
1243                            unsigned int put_path_length,
1244                            struct GNUNET_PeerIdentity *put_path,
1245                            const void *data,
1246                            size_t data_size)
1247 {
1248   unsigned int target_count;
1249   unsigned int i;
1250   struct PeerInfo **targets;
1251   struct PeerInfo *target;
1252   struct P2PPendingMessage *pending;
1253   size_t msize;
1254   struct PeerPutMessage *ppm;
1255   struct GNUNET_PeerIdentity *pp;
1256   
1257   GNUNET_assert (NULL != bf);
1258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259               "Adding myself (%s) to PUT bloomfilter for %s\n",
1260               GNUNET_i2s (&my_identity),
1261               GNUNET_h2s (key));
1262   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
1263   GNUNET_STATISTICS_update (GDS_stats,
1264                             gettext_noop ("# PUT requests routed"), 1,
1265                             GNUNET_NO);
1266   target_count = get_target_peers (key, bf, hop_count,
1267                                    desired_replication_level,
1268                                    &targets);
1269   if (0 == target_count)
1270     { 
1271       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272                   "Routing PUT for %s terminates after %u hops at %s\n",
1273                   GNUNET_h2s (key),
1274                   (unsigned int) hop_count,
1275                   GNUNET_i2s (&my_identity));
1276       return;
1277     }
1278   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1279   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1280   {
1281     put_path_length = 0;
1282     msize = data_size + sizeof (struct PeerPutMessage);
1283   }
1284   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1285   {
1286     GNUNET_break (0);
1287     GNUNET_free (targets);
1288     return;
1289   }
1290   GNUNET_STATISTICS_update (GDS_stats,
1291                             gettext_noop ("# PUT messages queued for transmission"), target_count,
1292                             GNUNET_NO);
1293   for (i=0;i<target_count;i++)
1294   {
1295     target = targets[i];
1296     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1297                 "Routing PUT for %s after %u hops to %s\n",
1298                 GNUNET_h2s (key),
1299                 (unsigned int) hop_count,
1300                 GNUNET_i2s (&target->id));
1301     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1302     pending->importance = 0; /* FIXME */
1303     pending->timeout = expiration_time;   
1304     ppm = (struct PeerPutMessage*) &pending[1];
1305     pending->msg = &ppm->header;
1306     ppm->header.size = htons (msize);
1307     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1308     ppm->options = htonl (options);
1309     ppm->type = htonl (type);
1310     ppm->hop_count = htonl (hop_count + 1);
1311     ppm->desired_replication_level = htonl (desired_replication_level);
1312     ppm->put_path_length = htonl (put_path_length);
1313     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1314     GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &target->id.hashPubKey));
1315     GNUNET_assert (GNUNET_OK ==
1316                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1317                                                               ppm->bloomfilter,
1318                                                               DHT_BLOOM_SIZE));
1319     ppm->key = *key;
1320     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1321     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1322     memcpy (&pp[put_path_length], data, data_size);
1323     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1324                                       target->tail,
1325                                       pending);
1326     target->pending_count++;
1327     process_peer_queue (target);
1328   }
1329   GNUNET_free (targets);
1330 }
1331
1332
1333 /**
1334  * Perform a GET operation.  Forwards the given request to other
1335  * peers.  Does not lookup the key locally.  May do nothing if this is
1336  * the only peer in the network (or if we are the closest peer in the
1337  * network).
1338  *
1339  * @param type type of the block
1340  * @param options routing options
1341  * @param desired_replication_level desired replication count
1342  * @param hop_count how many hops did this request traverse so far?
1343  * @param key key for the content
1344  * @param xquery extended query
1345  * @param xquery_size number of bytes in xquery
1346  * @param reply_bf bloomfilter to filter duplicates
1347  * @param reply_bf_mutator mutator for reply_bf
1348  * @param peer_bf filter for peers not to select (again)
1349  */
1350 void
1351 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1352                            enum GNUNET_DHT_RouteOption options,
1353                            uint32_t desired_replication_level,
1354                            uint32_t hop_count,
1355                            const GNUNET_HashCode *key,
1356                            const void *xquery,
1357                            size_t xquery_size,
1358                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1359                            uint32_t reply_bf_mutator,
1360                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1361 {
1362   unsigned int target_count;
1363   unsigned int i;
1364   struct PeerInfo **targets;
1365   struct PeerInfo *target;
1366   struct P2PPendingMessage *pending;
1367   size_t msize;
1368   struct PeerGetMessage *pgm;
1369   char *xq;
1370   size_t reply_bf_size;
1371
1372   GNUNET_assert (NULL != peer_bf);  
1373   GNUNET_STATISTICS_update (GDS_stats,
1374                             gettext_noop ("# GET requests routed"), 1,
1375                             GNUNET_NO);
1376   target_count = get_target_peers (key, peer_bf, hop_count,
1377                                    desired_replication_level,
1378                                    &targets);
1379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1380               "Adding myself (%s) to GET bloomfilter for %s\n",
1381               GNUNET_i2s (&my_identity),
1382               GNUNET_h2s (key));
1383   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
1384   if (0 == target_count)
1385     {
1386       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387                   "Routing GET for %s terminates after %u hops at %s\n",
1388                   GNUNET_h2s (key),
1389                   (unsigned int) hop_count,
1390                   GNUNET_i2s (&my_identity));
1391       return;
1392     }
1393   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1394   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1395   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1396   {
1397     GNUNET_break (0);
1398     GNUNET_free (targets);
1399     return;
1400   }
1401   GNUNET_STATISTICS_update (GDS_stats,
1402                             gettext_noop ("# GET messages queued for transmission"), target_count,
1403                             GNUNET_NO);
1404   /* forward request */
1405   for (i=0;i<target_count;i++)
1406   {
1407     target = targets[i];
1408     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1409                 "Routing GET for %s after %u hops to %s\n",
1410                 GNUNET_h2s (key),
1411                 (unsigned int) hop_count,
1412                 GNUNET_i2s (&target->id));
1413     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1414     pending->importance = 0; /* FIXME */
1415     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1416     pgm = (struct PeerGetMessage*) &pending[1];
1417     pending->msg = &pgm->header;
1418     pgm->header.size = htons (msize);
1419     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1420     pgm->options = htonl (options);
1421     pgm->type = htonl (type);
1422     pgm->hop_count = htonl (hop_count + 1);
1423     pgm->desired_replication_level = htonl (desired_replication_level);
1424     pgm->xquery_size = htonl (xquery_size);
1425     pgm->bf_mutator = reply_bf_mutator; 
1426     GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &target->id.hashPubKey));
1427     GNUNET_assert (GNUNET_OK ==
1428                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1429                                                               pgm->bloomfilter,
1430                                                               DHT_BLOOM_SIZE));
1431     pgm->key = *key;
1432     xq = (char *) &pgm[1];
1433     memcpy (xq, xquery, xquery_size);
1434     if (NULL != reply_bf)
1435       GNUNET_assert (GNUNET_OK ==
1436                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1437                                                                 &xq[xquery_size],
1438                                                                 reply_bf_size));
1439     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1440                                       target->tail,
1441                                       pending);
1442     target->pending_count++;
1443     process_peer_queue (target);
1444   }
1445   GNUNET_free (targets);
1446 }
1447
1448
1449 /**
1450  * Handle a reply (route to origin).  Only forwards the reply back to
1451  * the given peer.  Does not do local caching or forwarding to local
1452  * clients.
1453  *
1454  * @param target neighbour that should receive the block (if still connected)
1455  * @param type type of the block
1456  * @param expiration_time when does the content expire
1457  * @param key key for the content
1458  * @param put_path_length number of entries in put_path
1459  * @param put_path peers the original PUT traversed (if tracked)
1460  * @param get_path_length number of entries in put_path
1461  * @param get_path peers this reply has traversed so far (if tracked)
1462  * @param data payload of the reply
1463  * @param data_size number of bytes in data
1464  */
1465 void
1466 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1467                              enum GNUNET_BLOCK_Type type,
1468                              struct GNUNET_TIME_Absolute expiration_time,
1469                              const GNUNET_HashCode *key,
1470                              unsigned int put_path_length,
1471                              const struct GNUNET_PeerIdentity *put_path,
1472                              unsigned int get_path_length,
1473                              const struct GNUNET_PeerIdentity *get_path,
1474                              const void *data,
1475                              size_t data_size)
1476 {
1477   struct PeerInfo *pi;
1478   struct P2PPendingMessage *pending;
1479   size_t msize;
1480   struct PeerResultMessage *prm;
1481   struct GNUNET_PeerIdentity *paths;
1482   
1483   msize = data_size + sizeof (struct PeerResultMessage) + 
1484     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1485   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1486        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1487        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1488        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1489   {
1490     GNUNET_break (0);
1491     return;
1492   }
1493   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1494                                           &target->hashPubKey);
1495   if (NULL == pi)
1496   {
1497     /* peer disconnected in the meantime, drop reply */
1498     return;
1499   }
1500   GNUNET_STATISTICS_update (GDS_stats,
1501                             gettext_noop ("# RESULT messages queued for transmission"), 1,
1502                             GNUNET_NO);
1503   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1504   pending->importance = 0; /* FIXME */
1505   pending->timeout = expiration_time;
1506   prm = (struct PeerResultMessage*) &pending[1];
1507   pending->msg = &prm->header;
1508   prm->header.size = htons (msize);
1509   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1510   prm->type = htonl (type);
1511   prm->put_path_length = htonl (put_path_length);
1512   prm->get_path_length = htonl (get_path_length);
1513   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1514   prm->key = *key;
1515   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1516   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1517   memcpy (&paths[put_path_length],
1518           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1519   memcpy (&paths[put_path_length + get_path_length],
1520           data, data_size);
1521   GNUNET_CONTAINER_DLL_insert (pi->head,
1522                                pi->tail,
1523                                pending);
1524   pi->pending_count++;
1525   process_peer_queue (pi);
1526 }
1527
1528
1529 /**
1530  * To be called on core init/fail.
1531  *
1532  * @param cls service closure
1533  * @param server handle to the server for this service
1534  * @param identity the public identity of this peer
1535  */
1536 static void
1537 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1538            const struct GNUNET_PeerIdentity *identity)
1539 {
1540   GNUNET_assert (server != NULL);
1541   my_identity = *identity;
1542 }
1543
1544
1545 /**
1546  * Core handler for p2p put requests.
1547  *
1548  * @param cls closure
1549  * @param peer sender of the request
1550  * @param message message
1551  * @param peer peer identity this notification is about
1552  * @param atsi performance data
1553  * @return GNUNET_OK to keep the connection open,
1554  *         GNUNET_SYSERR to close it (signal serious error)
1555  */
1556 static int
1557 handle_dht_p2p_put (void *cls,
1558                     const struct GNUNET_PeerIdentity *peer,
1559                     const struct GNUNET_MessageHeader *message,
1560                     const struct GNUNET_TRANSPORT_ATS_Information
1561                     *atsi)
1562 {
1563   const struct PeerPutMessage *put;
1564   const struct GNUNET_PeerIdentity *put_path;
1565   const void *payload;
1566   uint32_t putlen;
1567   uint16_t msize;
1568   size_t payload_size;
1569   enum GNUNET_DHT_RouteOption options;
1570   struct GNUNET_CONTAINER_BloomFilter *bf;
1571   GNUNET_HashCode test_key;
1572   
1573   msize = ntohs (message->size);
1574   if (msize < sizeof (struct PeerPutMessage))
1575   {
1576     GNUNET_break_op (0);
1577     return GNUNET_YES;
1578   }
1579   put = (const struct PeerPutMessage*) message;
1580   putlen = ntohl (put->put_path_length);
1581   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1582        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1583     {
1584       GNUNET_break_op (0);
1585       return GNUNET_YES;
1586     }
1587   GNUNET_STATISTICS_update (GDS_stats,
1588                             gettext_noop ("# P2P PUT requests received"), 1,
1589                             GNUNET_NO);
1590   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1591   payload = &put_path[putlen];
1592   options = ntohl (put->options);
1593   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1594                           putlen * sizeof (struct GNUNET_PeerIdentity));
1595   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1596                                 ntohl (put->type),
1597                                 payload, payload_size,
1598                                 &test_key))
1599   {
1600   case GNUNET_YES:
1601     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1602     {
1603       GNUNET_break_op (0);
1604       return GNUNET_YES;
1605     }
1606     break;
1607   case GNUNET_NO:
1608     GNUNET_break_op (0);
1609     return GNUNET_YES;
1610   case GNUNET_SYSERR:
1611     /* cannot verify, good luck */
1612     break;
1613   }
1614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1615               "PUT for %s at %s\n",
1616               GNUNET_h2s (&put->key),
1617               GNUNET_i2s (&my_identity));
1618   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1619                                           DHT_BLOOM_SIZE,
1620                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1621   GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &peer->hashPubKey));
1622   {
1623     struct GNUNET_PeerIdentity pp[putlen+1];
1624   
1625     /* extend 'put path' by sender */
1626     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1627     {
1628       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1629       pp[putlen] = *peer;
1630       putlen++;
1631     }
1632     else
1633       putlen = 0;
1634     
1635     /* give to local clients */
1636     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1637                              &put->key,
1638                              0, NULL,
1639                              putlen,
1640                              pp,
1641                              ntohl (put->type),
1642                              payload_size,
1643                              payload);
1644     /* store locally */
1645     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1646          (am_closest_peer (&put->key,
1647                            bf) ) )
1648       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1649                                 &put->key,
1650                                 putlen, pp,
1651                                 ntohl (put->type),
1652                                 payload_size,
1653                                 payload);
1654     /* route to other peers */
1655     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1656                                options,
1657                                ntohl (put->desired_replication_level),
1658                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1659                                ntohl (put->hop_count),
1660                                bf,
1661                                &put->key,
1662                                putlen, pp,
1663                                payload,
1664                                payload_size);
1665   }
1666   GNUNET_CONTAINER_bloomfilter_free (bf);
1667   return GNUNET_YES;
1668 }
1669
1670
1671 /**
1672  * We have received a FIND PEER request.  Send matching
1673  * HELLOs back.
1674  *
1675  * @param sender sender of the FIND PEER request
1676  * @param key peers close to this key are desired
1677  * @param bf peers matching this bf are excluded
1678  * @param bf_mutator mutator for bf
1679  */
1680 static void
1681 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1682                   const GNUNET_HashCode *key,
1683                   struct GNUNET_CONTAINER_BloomFilter *bf,
1684                   uint32_t bf_mutator)
1685 {
1686   int bucket_idx;
1687   struct PeerBucket *bucket;
1688   struct PeerInfo *peer;
1689   unsigned int choice;
1690   GNUNET_HashCode mhash;
1691   const struct GNUNET_HELLO_Message *hello;
1692
1693   /* first, check about our own HELLO */
1694   if (NULL != GDS_my_hello)
1695   {
1696     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1697     if ( (NULL == bf) ||
1698          (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) )
1699     {
1700       GDS_NEIGHBOURS_handle_reply (sender,
1701                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1702                                    GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1703                                    key,
1704                                    0, NULL,
1705                                    0, NULL,
1706                                    GDS_my_hello,
1707                                    GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message*) GDS_my_hello));
1708     }
1709     else
1710     {
1711       GNUNET_STATISTICS_update (GDS_stats,
1712                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"), 1,
1713                                 GNUNET_NO);
1714     }
1715   }
1716   else
1717   {
1718     GNUNET_STATISTICS_update (GDS_stats,
1719                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"), 1,
1720                               GNUNET_NO);
1721   }
1722
1723   /* then, also consider sending a random HELLO from the closest bucket */
1724   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
1725     bucket_idx = closest_bucket;
1726   else
1727     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
1728   if (bucket_idx == GNUNET_SYSERR)
1729     return;
1730   bucket = &k_buckets[bucket_idx];
1731   if (bucket->peers_size == 0)
1732     return;
1733   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1734                                      bucket->peers_size);
1735   peer = bucket->head;
1736   while (choice > 0)
1737   {
1738     GNUNET_assert (peer != NULL);
1739     peer = peer->next;
1740     choice--;
1741   }
1742   choice = bucket->peers_size;
1743   do
1744     {
1745       peer = peer->next;
1746       if (choice-- == 0)
1747         return; /* no non-masked peer available */
1748       if (peer == NULL)
1749         peer = bucket->head;
1750       GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1751       hello = GDS_HELLO_get (&peer->id);
1752     }
1753   while ( (hello == NULL) ||
1754           (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) );
1755   GDS_NEIGHBOURS_handle_reply (sender,
1756                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1757                                GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1758                                key,
1759                                0, NULL,
1760                                0, NULL,
1761                                hello,
1762                                GNUNET_HELLO_size (hello));    
1763 }
1764
1765
1766 /**
1767  * Core handler for p2p get requests.
1768  *
1769  * @param cls closure
1770  * @param peer sender of the request
1771  * @param message message
1772  * @param peer peer identity this notification is about
1773  * @param atsi performance data
1774  * @return GNUNET_OK to keep the connection open,
1775  *         GNUNET_SYSERR to close it (signal serious error)
1776  */
1777 static int
1778 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1779                     const struct GNUNET_MessageHeader *message,
1780                     const struct GNUNET_TRANSPORT_ATS_Information
1781                     *atsi)
1782 {
1783   struct PeerGetMessage *get;
1784   uint32_t xquery_size;
1785   size_t reply_bf_size;
1786   uint16_t msize;
1787   enum GNUNET_BLOCK_Type type;
1788   enum GNUNET_DHT_RouteOption options;
1789   enum GNUNET_BLOCK_EvaluationResult eval;
1790   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1791   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1792   const char *xquery;
1793
1794   GNUNET_break (0 != memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)));
1795   /* parse and validate message */
1796   msize = ntohs (message->size);
1797   if (msize < sizeof (struct PeerGetMessage))
1798   {
1799     GNUNET_break_op (0);
1800     return GNUNET_YES;
1801   }
1802   get = (struct PeerGetMessage *) message;
1803   xquery_size = ntohl (get->xquery_size);
1804   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1805   {
1806     GNUNET_break_op (0);
1807     return GNUNET_YES;
1808   }
1809   GNUNET_STATISTICS_update (GDS_stats,
1810                             gettext_noop ("# P2P GET requests received"), 1,
1811                             GNUNET_NO);
1812   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1813   type = ntohl (get->type);
1814   options = ntohl (get->options);
1815   xquery = (const char*) &get[1];
1816   reply_bf = NULL;
1817   if (reply_bf_size > 0)
1818     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1819                                                   reply_bf_size,
1820                                                   GNUNET_CONSTANTS_BLOOMFILTER_K);
1821   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1822                                 type,
1823                                 &get->key,
1824                                 &reply_bf,
1825                                 get->bf_mutator,
1826                                 xquery, xquery_size,
1827                                 NULL, 0);
1828   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1829   {
1830     /* request invalid or block type not supported */
1831     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1832     if (NULL != reply_bf)
1833       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1834     return GNUNET_YES;
1835   }
1836   peer_bf =
1837     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1838                                        DHT_BLOOM_SIZE,
1839                                        GNUNET_CONSTANTS_BLOOMFILTER_K);
1840   GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &peer->hashPubKey));
1841   /* remember request for routing replies */
1842   GDS_ROUTING_add (peer,
1843                    type,
1844                    options,
1845                    &get->key,
1846                    xquery, xquery_size,
1847                    reply_bf, get->bf_mutator);
1848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1849               "GET for %s at %s after %u hops\n",
1850               GNUNET_h2s (&get->key),
1851               GNUNET_i2s (&my_identity),
1852               (unsigned int) ntohl (get->hop_count));
1853   /* local lookup (this may update the reply_bf) */
1854   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1855        (am_closest_peer (&get->key,
1856                          peer_bf) ) )
1857     {
1858     if ( (0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1859     {
1860       GNUNET_STATISTICS_update (GDS_stats,
1861                                 gettext_noop ("# P2P FIND PEER requests processed"), 1,
1862                                 GNUNET_NO);
1863       handle_find_peer (peer,
1864                         &get->key,
1865                         reply_bf,
1866                         get->bf_mutator);
1867     }
1868     else
1869     {
1870       eval = GDS_DATACACHE_handle_get (&get->key,
1871                                        type,
1872                                        xquery, xquery_size,
1873                                        &reply_bf, 
1874                                        get->bf_mutator);
1875     }
1876   }
1877   else
1878   {
1879     GNUNET_STATISTICS_update (GDS_stats,
1880                               gettext_noop ("# P2P GET requests ONLY routed"), 1,
1881                               GNUNET_NO);
1882   }
1883   
1884   /* P2P forwarding */
1885   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1886     GDS_NEIGHBOURS_handle_get (type,
1887                                options,
1888                                ntohl (get->desired_replication_level),
1889                                ntohl (get->hop_count),
1890                                &get->key,
1891                                xquery, xquery_size,
1892                                reply_bf,
1893                                get->bf_mutator,
1894                                peer_bf);
1895   /* clean up */
1896   if (NULL != reply_bf)
1897     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1898   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1899   return GNUNET_YES;
1900 }
1901
1902
1903 /**
1904  * Core handler for p2p result messages.
1905  *
1906  * @param cls closure
1907  * @param message message
1908  * @param peer peer identity this notification is about
1909  * @param atsi performance data
1910  * @return GNUNET_YES (do not cut p2p connection)
1911  */
1912 static int
1913 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1914                        const struct GNUNET_MessageHeader *message,
1915                        const struct GNUNET_TRANSPORT_ATS_Information
1916                        *atsi)
1917 {
1918   const struct PeerResultMessage *prm;
1919   const struct GNUNET_PeerIdentity *put_path;
1920   const struct GNUNET_PeerIdentity *get_path;
1921   const void *data;
1922   uint32_t get_path_length;
1923   uint32_t put_path_length;
1924   uint16_t msize;
1925   size_t data_size;
1926   enum GNUNET_BLOCK_Type type;
1927                        
1928   /* parse and validate message */
1929   msize = ntohs (message->size);
1930   if (msize < sizeof (struct PeerResultMessage))
1931   {
1932     GNUNET_break_op (0);
1933     return GNUNET_YES;
1934   }
1935   prm = (struct PeerResultMessage *) message;
1936   put_path_length = ntohl (prm->put_path_length);
1937   get_path_length = ntohl (prm->get_path_length);
1938   if ( (msize < sizeof (struct PeerResultMessage) + 
1939         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1940        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1941        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1942   {
1943     GNUNET_break_op (0);
1944     return GNUNET_YES;
1945   } 
1946   GNUNET_STATISTICS_update (GDS_stats,
1947                             gettext_noop ("# P2P RESULTS received"), 1,
1948                             GNUNET_NO);
1949   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1950   get_path = &put_path[put_path_length];
1951   type = ntohl (prm->type);
1952   data = (const void*) &get_path[get_path_length];
1953   data_size = msize - (sizeof (struct PeerResultMessage) + 
1954                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1955
1956   /* if we got a HELLO, consider it for our own routing table */
1957   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1958   {
1959     const struct GNUNET_MessageHeader *h;
1960     struct GNUNET_PeerIdentity pid;
1961     int bucket;
1962
1963     /* Should be a HELLO, validate and consider using it! */
1964     if (data_size < sizeof (struct GNUNET_MessageHeader))
1965     {
1966       GNUNET_break_op (0);
1967       return GNUNET_YES;
1968     }
1969     h = data;
1970     if (data_size != ntohs (h->size))
1971     {
1972       GNUNET_break_op (0);
1973       return GNUNET_YES;
1974     }
1975     if (GNUNET_OK !=
1976         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) h,
1977                              &pid))
1978     {
1979       GNUNET_break_op (0);
1980       return GNUNET_YES;
1981     }
1982     if (0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity)))
1983     {
1984       bucket = find_bucket (&pid.hashPubKey);
1985       if ( (bucket >= 0) &&
1986            (k_buckets[bucket].peers_size < bucket_size) )
1987         {    
1988           if (NULL != GDS_transport_handle)
1989           {
1990             GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
1991                                           h, NULL, NULL);
1992             GNUNET_TRANSPORT_try_connect (GDS_transport_handle,
1993                                           &pid);
1994           }
1995         }   
1996     }
1997   }
1998
1999   /* append 'peer' to 'get_path' */
2000   {    
2001     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
2002
2003     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
2004     xget_path[get_path_length] = *peer;
2005     get_path_length++;
2006
2007     /* forward to local clients */   
2008     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2009                              &prm->key,
2010                              get_path_length,
2011                              xget_path,
2012                              put_path_length,
2013                              put_path,
2014                              type,
2015                              data_size, 
2016                              data);
2017
2018     /* forward to other peers */
2019     GDS_ROUTING_process (type,
2020                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2021                          &prm->key,
2022                          put_path_length,
2023                          put_path,
2024                          get_path_length,
2025                          xget_path,
2026                          data,
2027                          data_size);                     
2028   }
2029   return GNUNET_YES;
2030 }
2031
2032
2033 /**
2034  * Initialize neighbours subsystem.
2035  *
2036  * @return GNUNET_OK on success, GNUNET_SYSERR on error
2037  */
2038 int
2039 GDS_NEIGHBOURS_init ()
2040 {
2041   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2042     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2043     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2044     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2045     {NULL, 0, 0}
2046   };
2047   unsigned long long temp_config_num;
2048  
2049   if (GNUNET_OK ==
2050       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2051                                              &temp_config_num))
2052     bucket_size = (unsigned int) temp_config_num;  
2053   atsAPI = GNUNET_ATS_init (GDS_cfg, NULL, NULL);
2054   coreAPI = GNUNET_CORE_connect (GDS_cfg,
2055                                  1,
2056                                  NULL,
2057                                  &core_init,
2058                                  &handle_core_connect,
2059                                  &handle_core_disconnect, 
2060                                  NULL, GNUNET_NO,
2061                                  NULL, GNUNET_NO,
2062                                  core_handlers);
2063   if (coreAPI == NULL)
2064     return GNUNET_SYSERR;
2065   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
2066   return GNUNET_OK;
2067 }
2068
2069
2070 /**
2071  * Shutdown neighbours subsystem.
2072  */
2073 void
2074 GDS_NEIGHBOURS_done ()
2075 {
2076   if (coreAPI == NULL)
2077     return;
2078   GNUNET_CORE_disconnect (coreAPI);
2079   coreAPI = NULL;    
2080   GNUNET_ATS_shutdown (atsAPI);
2081   atsAPI = NULL;    
2082   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
2083   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
2084   all_known_peers = NULL;
2085   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
2086   {
2087     GNUNET_SCHEDULER_cancel (find_peer_task);
2088     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
2089   }
2090 }
2091
2092
2093 /* end of gnunet-service-dht_neighbours.c */