leak
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_ats_service.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50 #include "dht.h"
51
52 /**
53  * How many buckets will we allow total.
54  */
55 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
56
57 /**
58  * What is the maximum number of peers in a given bucket.
59  */
60 #define DEFAULT_BUCKET_SIZE 8
61
62 /**
63  * Desired replication level for FIND PEER requests
64  */
65 #define FIND_PEER_REPLICATION_LEVEL 4
66
67 /**
68  * Maximum allowed replication level for all requests.
69  */
70 #define MAXIMUM_REPLICATION_LEVEL 16
71
72 /**
73  * How often to update our preference levels for peers in our routing tables.
74  */
75 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
76
77 /**
78  * How long at least to wait before sending another find peer request.
79  */
80 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
81
82 /**
83  * How long at most to wait before sending another find peer request.
84  */
85 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
86
87 /**
88  * How long at most to wait for transmission of a GET request to another peer?
89  */
90 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
91
92
93 /**
94  * P2P PUT message
95  */
96 struct PeerPutMessage
97 {
98   /**
99    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
100    */
101   struct GNUNET_MessageHeader header;
102
103   /**
104    * Processing options
105    */
106   uint32_t options GNUNET_PACKED;
107
108   /**
109    * Content type.
110    */
111   uint32_t type GNUNET_PACKED;
112
113   /**
114    * Hop count
115    */
116   uint32_t hop_count GNUNET_PACKED;
117
118   /**
119    * Replication level for this message
120    */
121   uint32_t desired_replication_level GNUNET_PACKED;
122
123   /**
124    * Length of the PUT path that follows (if tracked).
125    */
126   uint32_t put_path_length GNUNET_PACKED;
127
128   /**
129    * When does the content expire?
130    */
131   struct GNUNET_TIME_AbsoluteNBO expiration_time;
132
133   /**
134    * Bloomfilter (for peer identities) to stop circular routes
135    */
136   char bloomfilter[DHT_BLOOM_SIZE];
137
138   /**
139    * The key we are storing under.
140    */
141   GNUNET_HashCode key;
142
143   /* put path (if tracked) */
144
145   /* Payload */
146
147 };
148
149
150 /**
151  * P2P Result message
152  */
153 struct PeerResultMessage
154 {
155   /**
156    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
157    */
158   struct GNUNET_MessageHeader header;
159
160   /**
161    * Content type.
162    */
163   uint32_t type GNUNET_PACKED;
164
165   /**
166    * Length of the PUT path that follows (if tracked).
167    */
168   uint32_t put_path_length GNUNET_PACKED;
169
170   /**
171    * Length of the GET path that follows (if tracked).
172    */
173   uint32_t get_path_length GNUNET_PACKED;
174
175   /**
176    * When does the content expire?
177    */
178   struct GNUNET_TIME_AbsoluteNBO expiration_time;
179
180   /**
181    * The key of the corresponding GET request.
182    */
183   GNUNET_HashCode key;
184
185   /* put path (if tracked) */
186
187   /* get path (if tracked) */
188
189   /* Payload */
190
191 };
192
193
194 /**
195  * P2P GET message
196  */
197 struct PeerGetMessage
198 {
199   /**
200    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
201    */
202   struct GNUNET_MessageHeader header;
203
204   /**
205    * Processing options
206    */
207   uint32_t options GNUNET_PACKED;
208
209   /**
210    * Desired content type.
211    */
212   uint32_t type GNUNET_PACKED;
213
214   /**
215    * Hop count
216    */
217   uint32_t hop_count GNUNET_PACKED;
218
219   /**
220    * Desired replication level for this request.
221    */
222   uint32_t desired_replication_level GNUNET_PACKED;
223
224   /**
225    * Size of the extended query.
226    */
227   uint32_t xquery_size;
228
229   /**
230    * Bloomfilter mutator.
231    */
232   uint32_t bf_mutator;
233
234   /**
235    * Bloomfilter (for peer identities) to stop circular routes
236    */
237   char bloomfilter[DHT_BLOOM_SIZE];
238
239   /**
240    * The key we are looking for.
241    */
242   GNUNET_HashCode key;
243
244   /* xquery */
245
246   /* result bloomfilter */
247
248 };
249
250
251 /**
252  * Linked list of messages to send to a particular other peer.
253  */
254 struct P2PPendingMessage
255 {
256   /**
257    * Pointer to next item in the list
258    */
259   struct P2PPendingMessage *next;
260
261   /**
262    * Pointer to previous item in the list
263    */
264   struct P2PPendingMessage *prev;
265
266   /**
267    * Message importance level.  FIXME: used? useful?
268    */
269   unsigned int importance;
270
271   /**
272    * When does this message time out?
273    */
274   struct GNUNET_TIME_Absolute timeout;
275
276   /**
277    * Actual message to be sent, allocated at the end of the struct:
278    * // msg = (cast) &pm[1]; 
279    * // memcpy (&pm[1], data, len);
280    */
281   const struct GNUNET_MessageHeader *msg;
282
283 };
284
285
286 /**
287  * Entry for a peer in a bucket.
288  */
289 struct PeerInfo
290 {
291   /**
292    * Next peer entry (DLL)
293    */
294   struct PeerInfo *next;
295
296   /**
297    *  Prev peer entry (DLL)
298    */
299   struct PeerInfo *prev;
300
301   /**
302    * Count of outstanding messages for peer.  FIXME: NEEDED?
303    * FIXME: bound queue size!?
304    */
305   unsigned int pending_count;
306
307   /**
308    * Head of pending messages to be sent to this peer.
309    */
310   struct P2PPendingMessage *head;
311
312   /**
313    * Tail of pending messages to be sent to this peer.
314    */
315   struct P2PPendingMessage *tail;
316
317   /**
318    * Core handle for sending messages to this peer.
319    */
320   struct GNUNET_CORE_TransmitHandle *th;
321
322   /**
323    * Preference update context
324    */
325   struct GNUNET_ATS_InformationRequestContext *info_ctx;
326
327   /**
328    * Task for scheduling preference updates
329    */
330   GNUNET_SCHEDULER_TaskIdentifier preference_task;
331
332   /**
333    * What is the identity of the peer?
334    */
335   struct GNUNET_PeerIdentity id;
336
337 #if 0
338   /**
339    * What is the average latency for replies received?
340    */
341   struct GNUNET_TIME_Relative latency;
342
343   /**
344    * Transport level distance to peer.
345    */
346   unsigned int distance;
347 #endif
348
349 };
350
351
352 /**
353  * Peers are grouped into buckets.
354  */
355 struct PeerBucket
356 {
357   /**
358    * Head of DLL
359    */
360   struct PeerInfo *head;
361
362   /**
363    * Tail of DLL
364    */
365   struct PeerInfo *tail;
366
367   /**
368    * Number of peers in the bucket.
369    */
370   unsigned int peers_size;
371 };
372
373
374 /**
375  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
376  */
377 static unsigned int closest_bucket;
378
379 /**
380  * How many peers have we added since we sent out our last
381  * find peer request?
382  */
383 static unsigned int newly_found_peers;
384
385 /**
386  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
387  */
388 static struct PeerBucket k_buckets[MAX_BUCKETS];
389
390 /**
391  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
392  */
393 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
394
395 /**
396  * Maximum size for each bucket.
397  */
398 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
399
400 /**
401  * Task that sends FIND PEER requests.
402  */
403 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
404
405 /**
406  * Identity of this peer.
407  */ 
408 static struct GNUNET_PeerIdentity my_identity;
409
410 /**
411  * Handle to CORE.
412  */
413 static struct GNUNET_CORE_Handle *coreAPI;
414
415 /**
416  * Handle to ATS.
417  */
418 static struct GNUNET_ATS_Handle *atsAPI;
419
420
421
422 /**
423  * Find the optimal bucket for this key.
424  *
425  * @param hc the hashcode to compare our identity to
426  * @return the proper bucket index, or GNUNET_SYSERR
427  *         on error (same hashcode)
428  */
429 static int
430 find_bucket (const GNUNET_HashCode * hc)
431 {
432   unsigned int bits;
433
434   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
435   if (bits == MAX_BUCKETS)
436     {
437       /* How can all bits match? Got my own ID? */
438       GNUNET_break (0);
439       return GNUNET_SYSERR; 
440     }
441   return MAX_BUCKETS - bits - 1;
442 }
443
444
445 /**
446  * Let GNUnet core know that we like the given peer.
447  *
448  * @param cls the 'struct PeerInfo' of the peer
449  * @param tc scheduler context.
450  */ 
451 static void
452 update_core_preference (void *cls,
453                         const struct GNUNET_SCHEDULER_TaskContext *tc);
454
455
456 /**
457  * Function called with statistics about the given peer.
458  *
459  * @param cls closure
460  * @param peer identifies the peer
461  * @param amount set to the amount that was actually reserved or unreserved;
462  *               either the full requested amount or zero (no partial reservations)
463  * @param res_delay if the reservation could not be satisfied (amount was 0), how
464  *        long should the client wait until re-trying?
465  */
466 static void
467 update_core_preference_finish (void *cls,
468                                const struct GNUNET_PeerIdentity *peer,
469                                int32_t amount,
470                                struct GNUNET_TIME_Relative res_delay)
471 {
472   struct PeerInfo *peer_info = cls;
473
474   peer_info->info_ctx = NULL;
475   peer_info->preference_task
476     = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
477                                     &update_core_preference, peer_info);
478 }
479
480
481 /**
482  * Let GNUnet core know that we like the given peer.
483  *
484  * @param cls the 'struct PeerInfo' of the peer
485  * @param tc scheduler context.
486  */ 
487 static void
488 update_core_preference (void *cls,
489                         const struct GNUNET_SCHEDULER_TaskContext *tc)
490 {
491   struct PeerInfo *peer = cls;
492   uint64_t preference;
493   unsigned int matching;
494   int bucket;
495
496   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
497   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
498     return;  
499   matching =
500     GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
501                                       &peer->id.hashPubKey);
502   if (matching >= 64)
503     matching = 63;
504   bucket = find_bucket (&peer->id.hashPubKey);
505   if (bucket == GNUNET_SYSERR)
506     preference = 0;
507   else
508   {
509     GNUNET_assert (k_buckets[bucket].peers_size != 0);
510     preference = (1LL << matching) / k_buckets[bucket].peers_size;
511   }
512   if (preference == 0)
513     {
514       peer->preference_task
515         = GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
516                                         &update_core_preference, peer);
517       return;
518     }
519   GNUNET_STATISTICS_update (GDS_stats,
520                             gettext_noop ("# Preference updates given to core"), 1,
521                             GNUNET_NO);
522   peer->info_ctx =
523     GNUNET_ATS_peer_change_preference (atsAPI, &peer->id,
524                                        0,
525                                        preference,
526                                        &update_core_preference_finish, peer);
527 }
528
529
530 /**
531  * Closure for 'add_known_to_bloom'.
532  */
533 struct BloomConstructorContext
534 {
535   /**
536    * Bloom filter under construction.
537    */
538   struct GNUNET_CONTAINER_BloomFilter *bloom;
539
540   /**
541    * Mutator to use.
542    */
543   uint32_t bf_mutator;
544 };
545
546
547 /**
548  * Add each of the peers we already know to the bloom filter of
549  * the request so that we don't get duplicate HELLOs.
550  *
551  * @param cls the 'struct BloomConstructorContext'.
552  * @param key peer identity to add to the bloom filter
553  * @param value value the peer information (unused)
554  * @return GNUNET_YES (we should continue to iterate)
555  */
556 static int
557 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
558 {
559   struct BloomConstructorContext *ctx = cls;
560   GNUNET_HashCode mh;
561
562   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
565               GNUNET_h2s (key),
566               ctx->bf_mutator);
567   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
568   return GNUNET_YES;
569 }
570
571
572 /**
573  * Task to send a find peer message for our own peer identifier
574  * so that we can find the closest peers in the network to ourselves
575  * and attempt to connect to them.
576  *
577  * @param cls closure for this task
578  * @param tc the context under which the task is running
579  */
580 static void
581 send_find_peer_message (void *cls,
582                         const struct GNUNET_SCHEDULER_TaskContext *tc)
583 {
584   struct GNUNET_TIME_Relative next_send_time;
585   struct BloomConstructorContext bcc;
586   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
587
588   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
589   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
590     return;
591   if (newly_found_peers > bucket_size) 
592   {
593     /* If we are finding many peers already, no need to send out our request right now! */
594     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
595                                                    &send_find_peer_message, NULL);
596     newly_found_peers = 0;
597     return;
598   }
599   bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
600   bcc.bloom =
601     GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K);
602   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
603                                          &add_known_to_bloom,
604                                          &bcc);
605   GNUNET_STATISTICS_update (GDS_stats,
606                             gettext_noop ("# FIND PEER messages initiated"), 1,
607                             GNUNET_NO);
608   peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
609                                                DHT_BLOOM_SIZE,
610                                                GNUNET_CONSTANTS_BLOOMFILTER_K);
611   // FIXME: pass priority!?
612   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
613                              GNUNET_DHT_RO_FIND_PEER,
614                              FIND_PEER_REPLICATION_LEVEL,
615                              0,
616                              &my_identity.hashPubKey,
617                              NULL, 0,
618                              bcc.bloom, bcc.bf_mutator, 
619                              peer_bf);
620   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
621   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
622   /* schedule next round */
623   next_send_time.rel_value =
624     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
625     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
626                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / (newly_found_peers+1));
627   newly_found_peers = 0;
628   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
629                                                  &send_find_peer_message,
630                                                  NULL);  
631 }
632
633
634 /**
635  * Method called whenever a peer connects.
636  *
637  * @param cls closure
638  * @param peer peer identity this notification is about
639  * @param atsi performance data
640  */
641 static void
642 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
643                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
644 {
645   struct PeerInfo *ret;
646   int peer_bucket;
647
648   /* Check for connect to self message */
649   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
650     return;
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652               "Connected %s to %s\n",
653               GNUNET_i2s (&my_identity),
654               GNUNET_h2s (&peer->hashPubKey));
655   if (GNUNET_YES ==
656       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
657                                               &peer->hashPubKey))
658   {
659     GNUNET_break (0);
660     return;
661   }
662   GNUNET_STATISTICS_update (GDS_stats,
663                             gettext_noop ("# Peers connected"), 1,
664                             GNUNET_NO);
665   peer_bucket = find_bucket (&peer->hashPubKey);
666   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
667   ret = GNUNET_malloc (sizeof (struct PeerInfo));
668 #if 0
669   ret->latency = latency;
670   ret->distance = distance;
671 #endif
672   ret->id = *peer;
673   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
674                                     k_buckets[peer_bucket].tail, ret);
675   k_buckets[peer_bucket].peers_size++;
676   closest_bucket = GNUNET_MAX (closest_bucket,
677                                peer_bucket);
678   if ( (peer_bucket > 0) &&
679        (k_buckets[peer_bucket].peers_size <= bucket_size) )
680   {
681     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
682     newly_found_peers++;
683   }
684   GNUNET_assert (GNUNET_OK ==
685                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
686                                                     &peer->hashPubKey, ret,
687                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
688   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers))
689   {
690     /* got a first connection, good time to start with FIND PEER requests... */
691     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
692                                                NULL);    
693   }
694 }
695
696
697 /**
698  * Method called whenever a peer disconnects.
699  *
700  * @param cls closure
701  * @param peer peer identity this notification is about
702  */
703 static void
704 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
705 {
706   struct PeerInfo *to_remove;
707   int current_bucket;
708   struct P2PPendingMessage *pos;
709   unsigned int discarded;
710
711   /* Check for disconnect from self message */
712   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
713     return;
714   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715               "Disconnected %s from %s\n",
716               GNUNET_i2s (&my_identity),
717               GNUNET_h2s (&peer->hashPubKey));
718   to_remove =
719       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
720   if (NULL == to_remove)
721     {
722       GNUNET_break (0);
723       return;
724     }
725   GNUNET_STATISTICS_update (GDS_stats,
726                             gettext_noop ("# Peers connected"), -1,
727                             GNUNET_NO);
728   GNUNET_assert (GNUNET_YES ==
729                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
730                                                        &peer->hashPubKey,
731                                                        to_remove));
732   if (NULL != to_remove->info_ctx)
733   {
734     GNUNET_ATS_peer_change_preference_cancel (to_remove->info_ctx);
735     to_remove->info_ctx = NULL;
736   }
737   if (GNUNET_SCHEDULER_NO_TASK != to_remove->preference_task)
738   {
739     GNUNET_SCHEDULER_cancel (to_remove->preference_task);
740     to_remove->preference_task = GNUNET_SCHEDULER_NO_TASK;
741   }
742   current_bucket = find_bucket (&to_remove->id.hashPubKey);
743   GNUNET_assert (current_bucket >= 0);
744   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
745                                k_buckets[current_bucket].tail,
746                                to_remove);
747   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
748   k_buckets[current_bucket].peers_size--;
749   while ( (closest_bucket > 0) &&
750           (k_buckets[closest_bucket].peers_size == 0) )
751     closest_bucket--;
752
753   if (to_remove->th != NULL) 
754   {
755     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
756     to_remove->th = NULL;
757   }
758   discarded = 0;
759   while (NULL != (pos = to_remove->head))
760   {
761     GNUNET_CONTAINER_DLL_remove (to_remove->head,
762                                  to_remove->tail,
763                                  pos);
764     discarded++;
765     GNUNET_free (pos);
766   }
767   GNUNET_STATISTICS_update (GDS_stats,
768                             gettext_noop ("# Queued messages discarded (peer disconnected)"), discarded,
769                             GNUNET_NO);
770   GNUNET_free (to_remove);
771 }
772
773
774 /**
775  * Called when core is ready to send a message we asked for
776  * out to the destination.
777  *
778  * @param cls the 'struct PeerInfo' of the target peer
779  * @param size number of bytes available in buf
780  * @param buf where the callee should write the message
781  * @return number of bytes written to buf
782  */
783 static size_t
784 core_transmit_notify (void *cls, size_t size, void *buf)
785 {
786   struct PeerInfo *peer = cls;
787   char *cbuf = buf;
788   struct P2PPendingMessage *pending;
789   size_t off;
790   size_t msize;
791
792   peer->th = NULL;
793   while ( (NULL != (pending = peer->head)) &&
794           (GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value == 0) )
795   {
796     peer->pending_count--;
797     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
798     GNUNET_free (pending);
799   }
800   if (pending == NULL)
801   {
802     /* no messages pending */
803     return 0;
804   }
805   if (buf == NULL)
806   {
807     peer->th 
808       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
809                                            pending->importance,
810                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
811                                            &peer->id, ntohs (pending->msg->size),
812                                            &core_transmit_notify, peer);
813     GNUNET_break (NULL != peer->th);
814     return 0;
815   }
816   off = 0;
817   while ( (NULL != (pending = peer->head)) &&
818           (size - off >= (msize = ntohs (pending->msg->size))) )
819   {
820     GNUNET_STATISTICS_update (GDS_stats,
821                               gettext_noop ("# Bytes transmitted to other peers"), msize,
822                               GNUNET_NO);
823     memcpy (&cbuf[off], pending->msg, msize);
824     off += msize;
825     peer->pending_count--;
826     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
827     GNUNET_free (pending);
828   }
829   if (peer->head != NULL)
830   {
831     peer->th 
832       = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
833                                            pending->importance,
834                                            GNUNET_TIME_absolute_get_remaining (pending->timeout),
835                                            &peer->id, msize,
836                                            &core_transmit_notify, peer);
837     GNUNET_break (NULL != peer->th);
838   }
839   return off;
840 }
841
842
843 /**
844  * Transmit all messages in the peer's message queue.
845  *
846  * @param peer message queue to process
847  */
848 static void
849 process_peer_queue (struct PeerInfo *peer)
850 {
851   struct P2PPendingMessage *pending;
852
853   if (NULL == (pending = peer->head))
854     return;
855   if (NULL != peer->th)
856     return;
857   GNUNET_STATISTICS_update (GDS_stats,
858                             gettext_noop ("# Bytes of bandwdith requested from core"),
859                             ntohs (pending->msg->size),
860                             GNUNET_NO);
861   peer->th 
862     = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
863                                          pending->importance,
864                                          GNUNET_TIME_absolute_get_remaining (pending->timeout),
865                                          &peer->id,
866                                          ntohs (pending->msg->size),
867                                          &core_transmit_notify, peer);
868   GNUNET_break (NULL != peer->th);
869 }
870
871
872 /**
873  * To how many peers should we (on average) forward the request to
874  * obtain the desired target_replication count (on average).
875  *
876  * @param hop_count number of hops the message has traversed
877  * @param target_replication the number of total paths desired
878  * @return Some number of peers to forward the message to
879  */
880 static unsigned int
881 get_forward_count (uint32_t hop_count, 
882                    uint32_t target_replication)
883 {
884   uint32_t random_value;
885   uint32_t forward_count;
886   float target_value;
887
888   if (hop_count > GDS_NSE_get () * 6.0)
889   {
890     /* forcefully terminate */
891     return 0;
892   }
893   if (hop_count > GDS_NSE_get () * 4.0)
894   {
895     /* Once we have reached our ideal number of hops, only forward to 1 peer */
896     return 1;
897   }
898   /* bound by system-wide maximum */
899   target_replication = GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL,
900                                    target_replication);
901   target_value =
902     1 + (target_replication - 1.0) / (GDS_NSE_get () +
903                                       ((float) (target_replication - 1.0) *
904                                        hop_count));
905   /* Set forward count to floor of target_value */
906   forward_count = (uint32_t) target_value;
907   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
908   target_value = target_value - forward_count;
909   random_value =
910     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); 
911   if (random_value < (target_value * UINT32_MAX))
912     forward_count++;
913   return forward_count;
914 }
915
916
917 /**
918  * Compute the distance between have and target as a 32-bit value.
919  * Differences in the lower bits must count stronger than differences
920  * in the higher bits.
921  *
922  * @return 0 if have==target, otherwise a number
923  *           that is larger as the distance between
924  *           the two hash codes increases
925  */
926 static unsigned int
927 get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
928 {
929   unsigned int bucket;
930   unsigned int msb;
931   unsigned int lsb;
932   unsigned int i;
933
934   /* We have to represent the distance between two 2^9 (=512)-bit
935    * numbers as a 2^5 (=32)-bit number with "0" being used for the
936    * two numbers being identical; furthermore, we need to
937    * guarantee that a difference in the number of matching
938    * bits is always represented in the result.
939    *
940    * We use 2^32/2^9 numerical values to distinguish between
941    * hash codes that have the same LSB bit distance and
942    * use the highest 2^9 bits of the result to signify the
943    * number of (mis)matching LSB bits; if we have 0 matching
944    * and hence 512 mismatching LSB bits we return -1 (since
945    * 512 itself cannot be represented with 9 bits) */
946
947   /* first, calculate the most significant 9 bits of our
948    * result, aka the number of LSBs */
949   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
950   /* bucket is now a value between 0 and 512 */
951   if (bucket == 512)
952     return 0;                   /* perfect match */
953   if (bucket == 0)
954     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
955                                  * below, we'd end up with max+1 (overflow)) */
956
957   /* calculate the most significant bits of the final result */
958   msb = (512 - bucket) << (32 - 9);
959   /* calculate the 32-9 least significant bits of the final result by
960    * looking at the differences in the 32-9 bits following the
961    * mismatching bit at 'bucket' */
962   lsb = 0;
963   for (i = bucket + 1;
964        (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
965   {
966     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
967         GNUNET_CRYPTO_hash_get_bit (have, i))
968       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
969                                                  * last bit set will be 31 -- if
970                                                  * i does not reach 512 first... */
971   }
972   return msb | lsb;
973 }
974
975
976 /**
977  * Check whether my identity is closer than any known peers.  If a
978  * non-null bloomfilter is given, check if this is the closest peer
979  * that hasn't already been routed to.
980  *
981  * @param key hash code to check closeness to
982  * @param bloom bloomfilter, exclude these entries from the decision
983  * @return GNUNET_YES if node location is closest,
984  *         GNUNET_NO otherwise.
985  */
986 static int
987 am_closest_peer (const GNUNET_HashCode *key,
988                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
989 {
990   int bits;
991   int other_bits;
992   int bucket_num;
993   int count;
994   struct PeerInfo *pos;
995
996   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
997     return GNUNET_YES;
998   bucket_num = find_bucket (key);
999   GNUNET_assert (bucket_num >= 0);
1000   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
1001   pos = k_buckets[bucket_num].head;
1002   count = 0;
1003   while ((pos != NULL) && (count < bucket_size))
1004   {
1005     if ((bloom != NULL) &&
1006         (GNUNET_YES ==
1007          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1008     {
1009       pos = pos->next;
1010       continue;                 /* Skip already checked entries */
1011     }
1012     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
1013     if (other_bits > bits)
1014       return GNUNET_NO;
1015     if (other_bits == bits)        /* We match the same number of bits */
1016       return GNUNET_YES;
1017     pos = pos->next;
1018   }
1019   /* No peers closer, we are the closest! */
1020   return GNUNET_YES;
1021 }
1022
1023
1024 /**
1025  * Select a peer from the routing table that would be a good routing
1026  * destination for sending a message for "key".  The resulting peer
1027  * must not be in the set of blocked peers.<p>
1028  *
1029  * Note that we should not ALWAYS select the closest peer to the
1030  * target, peers further away from the target should be chosen with
1031  * exponentially declining probability.
1032  *
1033  * FIXME: double-check that this is fine
1034  * 
1035  *
1036  * @param key the key we are selecting a peer to route to
1037  * @param bloom a bloomfilter containing entries this request has seen already
1038  * @param hops how many hops has this message traversed thus far
1039  * @return Peer to route to, or NULL on error
1040  */
1041 static struct PeerInfo *
1042 select_peer (const GNUNET_HashCode *key,
1043              const struct GNUNET_CONTAINER_BloomFilter *bloom, 
1044              uint32_t hops)
1045 {
1046   unsigned int bc;
1047   unsigned int count;
1048   unsigned int selected;
1049   struct PeerInfo *pos;
1050   unsigned int dist;
1051   unsigned int smallest_distance;
1052   struct PeerInfo *chosen;
1053
1054   if (hops >= GDS_NSE_get ())
1055   {
1056     /* greedy selection (closest peer that is not in bloomfilter) */
1057     smallest_distance = UINT_MAX;
1058     chosen = NULL;
1059     for (bc = 0; bc < closest_bucket; bc++)
1060     {
1061       pos = k_buckets[bc].head;
1062       count = 0;
1063       while ((pos != NULL) && (count < bucket_size))
1064       {
1065         if ( (bloom == NULL) ||
1066              (GNUNET_NO ==
1067               GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) )
1068         {
1069           dist = get_distance (key, &pos->id.hashPubKey);
1070           if (dist < smallest_distance)
1071           {
1072             chosen = pos;
1073             smallest_distance = dist;
1074           }
1075         }
1076         else
1077         {
1078           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1080                       GNUNET_i2s (&pos->id),
1081                       GNUNET_h2s (key));
1082           GNUNET_STATISTICS_update (GDS_stats,
1083                                     gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1,
1084                                     GNUNET_NO);
1085         }
1086         count++;
1087         pos = pos->next;
1088       }
1089     }
1090     if (NULL == chosen)
1091       GNUNET_STATISTICS_update (GDS_stats,
1092                                 gettext_noop ("# Peer selection failed"), 1,
1093                                 GNUNET_NO);
1094     return chosen;
1095   }
1096
1097   /* select "random" peer */
1098   /* count number of peers that are available and not filtered */
1099   count = 0;
1100   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1101   {
1102     pos = k_buckets[bc].head;
1103     while ((pos != NULL) && (count < bucket_size))
1104     {
1105       if ( (bloom != NULL) &&
1106            (GNUNET_YES ==
1107             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) )
1108       {
1109         GNUNET_STATISTICS_update (GDS_stats,
1110                                   gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1,
1111                                   GNUNET_NO);
1112         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1114                     GNUNET_i2s (&pos->id),
1115                     GNUNET_h2s (key));
1116         pos = pos->next;
1117         continue;               /* Ignore bloomfiltered peers */
1118       }
1119       count++;
1120       pos = pos->next;
1121     }
1122   }
1123   if (count == 0)               /* No peers to select from! */
1124   {
1125     GNUNET_STATISTICS_update (GDS_stats,
1126                               gettext_noop ("# Peer selection failed"), 1,
1127                               GNUNET_NO);
1128     return NULL;
1129   }
1130   /* Now actually choose a peer */
1131   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1132   count = 0;
1133   for (bc = closest_bucket; bc < MAX_BUCKETS; bc++)
1134   {
1135     pos = k_buckets[bc].head;
1136     while ((pos != NULL) && (count < bucket_size))
1137     {
1138       if ( (bloom != NULL) &&
1139            (GNUNET_YES ==
1140             GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) )
1141       {
1142         pos = pos->next;
1143         continue;               /* Ignore bloomfiltered peers */
1144       }
1145       if (0 == selected--)
1146         return pos;
1147       pos = pos->next;
1148     }
1149   }
1150   GNUNET_break (0);
1151   return NULL;
1152 }
1153
1154
1155 /**
1156  * Compute the set of peers that the given request should be
1157  * forwarded to.
1158  *
1159  * @param key routing key
1160  * @param bloom bloom filter excluding peers as targets, all selected
1161  *        peers will be added to the bloom filter
1162  * @param hop_count number of hops the request has traversed so far
1163  * @param target_replication desired number of replicas
1164  * @param targets where to store an array of target peers (to be
1165  *         free'd by the caller)
1166  * @return number of peers returned in 'targets'.
1167  */
1168 static unsigned int
1169 get_target_peers (const GNUNET_HashCode *key,
1170                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1171                   uint32_t hop_count,
1172                   uint32_t target_replication,
1173                   struct PeerInfo ***targets)
1174 {
1175   unsigned int ret;
1176   unsigned int off;
1177   struct PeerInfo **rtargets;
1178   struct PeerInfo *nxt;
1179
1180   GNUNET_assert (NULL != bloom);
1181   ret = get_forward_count (hop_count, target_replication);
1182   if (ret == 0)
1183   {
1184     *targets = NULL;
1185     return 0;
1186   }
1187   rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
1188   for (off = 0; off < ret; off++)
1189   {
1190     nxt = select_peer (key, bloom, hop_count);
1191     if (nxt == NULL)
1192       break;      
1193     rtargets[off] = nxt;
1194     GNUNET_break (GNUNET_NO ==
1195                   GNUNET_CONTAINER_bloomfilter_test (bloom, &nxt->id.hashPubKey));
1196     GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
1197   }
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1200               off,
1201               GNUNET_CONTAINER_multihashmap_size (all_known_peers),
1202               (unsigned int) hop_count,
1203               GNUNET_h2s (key),
1204               ret);
1205   if (0 == off)
1206   {
1207     GNUNET_free (rtargets);
1208     *targets = NULL;
1209     return 0;
1210   }
1211   *targets = rtargets;
1212   return off;
1213 }
1214
1215
1216 /**
1217  * Perform a PUT operation.   Forwards the given request to other
1218  * peers.   Does not store the data locally.  Does not give the
1219  * data to local clients.  May do nothing if this is the only
1220  * peer in the network (or if we are the closest peer in the
1221  * network).
1222  *
1223  * @param type type of the block
1224  * @param options routing options
1225  * @param desired_replication_level desired replication count
1226  * @param expiration_time when does the content expire
1227  * @param hop_count how many hops has this message traversed so far
1228  * @param bf Bloom filter of peers this PUT has already traversed
1229  * @param key key for the content
1230  * @param put_path_length number of entries in put_path
1231  * @param put_path peers this request has traversed so far (if tracked)
1232  * @param data payload to store
1233  * @param data_size number of bytes in data
1234  */
1235 void
1236 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1237                            enum GNUNET_DHT_RouteOption options,
1238                            uint32_t desired_replication_level,
1239                            struct GNUNET_TIME_Absolute expiration_time,
1240                            uint32_t hop_count,
1241                            struct GNUNET_CONTAINER_BloomFilter *bf,
1242                            const GNUNET_HashCode *key,
1243                            unsigned int put_path_length,
1244                            struct GNUNET_PeerIdentity *put_path,
1245                            const void *data,
1246                            size_t data_size)
1247 {
1248   unsigned int target_count;
1249   unsigned int i;
1250   struct PeerInfo **targets;
1251   struct PeerInfo *target;
1252   struct P2PPendingMessage *pending;
1253   size_t msize;
1254   struct PeerPutMessage *ppm;
1255   struct GNUNET_PeerIdentity *pp;
1256   
1257   GNUNET_assert (NULL != bf);
1258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259               "Adding myself (%s) to PUT bloomfilter for %s\n",
1260               GNUNET_i2s (&my_identity),
1261               GNUNET_h2s (key));
1262   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
1263   GNUNET_STATISTICS_update (GDS_stats,
1264                             gettext_noop ("# PUT requests routed"), 1,
1265                             GNUNET_NO);
1266   target_count = get_target_peers (key, bf, hop_count,
1267                                    desired_replication_level,
1268                                    &targets);
1269   if (0 == target_count)
1270     { 
1271       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272                   "Routing PUT for %s terminates after %u hops at %s\n",
1273                   GNUNET_h2s (key),
1274                   (unsigned int) hop_count,
1275                   GNUNET_i2s (&my_identity));
1276       return;
1277     }
1278   msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
1279   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1280   {
1281     put_path_length = 0;
1282     msize = data_size + sizeof (struct PeerPutMessage);
1283   }
1284   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1285   {
1286     GNUNET_break (0);
1287     return;
1288   }
1289   GNUNET_STATISTICS_update (GDS_stats,
1290                             gettext_noop ("# PUT messages queued for transmission"), target_count,
1291                             GNUNET_NO);
1292   for (i=0;i<target_count;i++)
1293   {
1294     target = targets[i];
1295     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296                 "Routing PUT for %s after %u hops to %s\n",
1297                 GNUNET_h2s (key),
1298                 (unsigned int) hop_count,
1299                 GNUNET_i2s (&target->id));
1300     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1301     pending->importance = 0; /* FIXME */
1302     pending->timeout = expiration_time;   
1303     ppm = (struct PeerPutMessage*) &pending[1];
1304     pending->msg = &ppm->header;
1305     ppm->header.size = htons (msize);
1306     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1307     ppm->options = htonl (options);
1308     ppm->type = htonl (type);
1309     ppm->hop_count = htonl (hop_count + 1);
1310     ppm->desired_replication_level = htonl (desired_replication_level);
1311     ppm->put_path_length = htonl (put_path_length);
1312     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1313     GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &target->id.hashPubKey));
1314     GNUNET_assert (GNUNET_OK ==
1315                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1316                                                               ppm->bloomfilter,
1317                                                               DHT_BLOOM_SIZE));
1318     ppm->key = *key;
1319     pp = (struct GNUNET_PeerIdentity*) &ppm[1];
1320     memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1321     memcpy (&pp[put_path_length], data, data_size);
1322     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1323                                       target->tail,
1324                                       pending);
1325     target->pending_count++;
1326     process_peer_queue (target);
1327   }
1328   GNUNET_free (targets);
1329 }
1330
1331
1332 /**
1333  * Perform a GET operation.  Forwards the given request to other
1334  * peers.  Does not lookup the key locally.  May do nothing if this is
1335  * the only peer in the network (or if we are the closest peer in the
1336  * network).
1337  *
1338  * @param type type of the block
1339  * @param options routing options
1340  * @param desired_replication_level desired replication count
1341  * @param hop_count how many hops did this request traverse so far?
1342  * @param key key for the content
1343  * @param xquery extended query
1344  * @param xquery_size number of bytes in xquery
1345  * @param reply_bf bloomfilter to filter duplicates
1346  * @param reply_bf_mutator mutator for reply_bf
1347  * @param peer_bf filter for peers not to select (again)
1348  */
1349 void
1350 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1351                            enum GNUNET_DHT_RouteOption options,
1352                            uint32_t desired_replication_level,
1353                            uint32_t hop_count,
1354                            const GNUNET_HashCode *key,
1355                            const void *xquery,
1356                            size_t xquery_size,
1357                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1358                            uint32_t reply_bf_mutator,
1359                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1360 {
1361   unsigned int target_count;
1362   unsigned int i;
1363   struct PeerInfo **targets;
1364   struct PeerInfo *target;
1365   struct P2PPendingMessage *pending;
1366   size_t msize;
1367   struct PeerGetMessage *pgm;
1368   char *xq;
1369   size_t reply_bf_size;
1370
1371   GNUNET_assert (NULL != peer_bf);  
1372   GNUNET_STATISTICS_update (GDS_stats,
1373                             gettext_noop ("# GET requests routed"), 1,
1374                             GNUNET_NO);
1375   target_count = get_target_peers (key, peer_bf, hop_count,
1376                                    desired_replication_level,
1377                                    &targets);
1378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1379               "Adding myself (%s) to GET bloomfilter for %s\n",
1380               GNUNET_i2s (&my_identity),
1381               GNUNET_h2s (key));
1382   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
1383   if (0 == target_count)
1384     {
1385       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386                   "Routing GET for %s terminates after %u hops at %s\n",
1387                   GNUNET_h2s (key),
1388                   (unsigned int) hop_count,
1389                   GNUNET_i2s (&my_identity));
1390       return;
1391     }
1392   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1393   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1394   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1395   {
1396     GNUNET_break (0);
1397     GNUNET_free (targets);
1398     return;
1399   }
1400   GNUNET_STATISTICS_update (GDS_stats,
1401                             gettext_noop ("# GET messages queued for transmission"), target_count,
1402                             GNUNET_NO);
1403   /* forward request */
1404   for (i=0;i<target_count;i++)
1405   {
1406     target = targets[i];
1407     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1408                 "Routing GET for %s after %u hops to %s\n",
1409                 GNUNET_h2s (key),
1410                 (unsigned int) hop_count,
1411                 GNUNET_i2s (&target->id));
1412     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1413     pending->importance = 0; /* FIXME */
1414     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1415     pgm = (struct PeerGetMessage*) &pending[1];
1416     pending->msg = &pgm->header;
1417     pgm->header.size = htons (msize);
1418     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1419     pgm->options = htonl (options);
1420     pgm->type = htonl (type);
1421     pgm->hop_count = htonl (hop_count + 1);
1422     pgm->desired_replication_level = htonl (desired_replication_level);
1423     pgm->xquery_size = htonl (xquery_size);
1424     pgm->bf_mutator = reply_bf_mutator; 
1425     GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &target->id.hashPubKey));
1426     GNUNET_assert (GNUNET_OK ==
1427                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1428                                                               pgm->bloomfilter,
1429                                                               DHT_BLOOM_SIZE));
1430     pgm->key = *key;
1431     xq = (char *) &pgm[1];
1432     memcpy (xq, xquery, xquery_size);
1433     if (NULL != reply_bf)
1434       GNUNET_assert (GNUNET_OK ==
1435                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1436                                                                 &xq[xquery_size],
1437                                                                 reply_bf_size));
1438     GNUNET_CONTAINER_DLL_insert_tail (target->head,
1439                                       target->tail,
1440                                       pending);
1441     target->pending_count++;
1442     process_peer_queue (target);
1443   }
1444   GNUNET_free (targets);
1445 }
1446
1447
1448 /**
1449  * Handle a reply (route to origin).  Only forwards the reply back to
1450  * the given peer.  Does not do local caching or forwarding to local
1451  * clients.
1452  *
1453  * @param target neighbour that should receive the block (if still connected)
1454  * @param type type of the block
1455  * @param expiration_time when does the content expire
1456  * @param key key for the content
1457  * @param put_path_length number of entries in put_path
1458  * @param put_path peers the original PUT traversed (if tracked)
1459  * @param get_path_length number of entries in put_path
1460  * @param get_path peers this reply has traversed so far (if tracked)
1461  * @param data payload of the reply
1462  * @param data_size number of bytes in data
1463  */
1464 void
1465 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1466                              enum GNUNET_BLOCK_Type type,
1467                              struct GNUNET_TIME_Absolute expiration_time,
1468                              const GNUNET_HashCode *key,
1469                              unsigned int put_path_length,
1470                              const struct GNUNET_PeerIdentity *put_path,
1471                              unsigned int get_path_length,
1472                              const struct GNUNET_PeerIdentity *get_path,
1473                              const void *data,
1474                              size_t data_size)
1475 {
1476   struct PeerInfo *pi;
1477   struct P2PPendingMessage *pending;
1478   size_t msize;
1479   struct PeerResultMessage *prm;
1480   struct GNUNET_PeerIdentity *paths;
1481   
1482   msize = data_size + sizeof (struct PeerResultMessage) + 
1483     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1484   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1485        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1486        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1487        (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1488   {
1489     GNUNET_break (0);
1490     return;
1491   }
1492   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1493                                           &target->hashPubKey);
1494   if (NULL == pi)
1495   {
1496     /* peer disconnected in the meantime, drop reply */
1497     return;
1498   }
1499   GNUNET_STATISTICS_update (GDS_stats,
1500                             gettext_noop ("# RESULT messages queued for transmission"), 1,
1501                             GNUNET_NO);
1502   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
1503   pending->importance = 0; /* FIXME */
1504   pending->timeout = expiration_time;
1505   prm = (struct PeerResultMessage*) &pending[1];
1506   pending->msg = &prm->header;
1507   prm->header.size = htons (msize);
1508   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1509   prm->type = htonl (type);
1510   prm->put_path_length = htonl (put_path_length);
1511   prm->get_path_length = htonl (get_path_length);
1512   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1513   prm->key = *key;
1514   paths = (struct GNUNET_PeerIdentity*) &prm[1];
1515   memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1516   memcpy (&paths[put_path_length],
1517           get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1518   memcpy (&paths[put_path_length + get_path_length],
1519           data, data_size);
1520   GNUNET_CONTAINER_DLL_insert (pi->head,
1521                                pi->tail,
1522                                pending);
1523   pi->pending_count++;
1524   process_peer_queue (pi);
1525 }
1526
1527
1528 /**
1529  * To be called on core init/fail.
1530  *
1531  * @param cls service closure
1532  * @param server handle to the server for this service
1533  * @param identity the public identity of this peer
1534  */
1535 static void
1536 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1537            const struct GNUNET_PeerIdentity *identity)
1538 {
1539   GNUNET_assert (server != NULL);
1540   my_identity = *identity;
1541 }
1542
1543
1544 /**
1545  * Core handler for p2p put requests.
1546  *
1547  * @param cls closure
1548  * @param peer sender of the request
1549  * @param message message
1550  * @param peer peer identity this notification is about
1551  * @param atsi performance data
1552  * @return GNUNET_OK to keep the connection open,
1553  *         GNUNET_SYSERR to close it (signal serious error)
1554  */
1555 static int
1556 handle_dht_p2p_put (void *cls,
1557                     const struct GNUNET_PeerIdentity *peer,
1558                     const struct GNUNET_MessageHeader *message,
1559                     const struct GNUNET_TRANSPORT_ATS_Information
1560                     *atsi)
1561 {
1562   const struct PeerPutMessage *put;
1563   const struct GNUNET_PeerIdentity *put_path;
1564   const void *payload;
1565   uint32_t putlen;
1566   uint16_t msize;
1567   size_t payload_size;
1568   enum GNUNET_DHT_RouteOption options;
1569   struct GNUNET_CONTAINER_BloomFilter *bf;
1570   GNUNET_HashCode test_key;
1571   
1572   msize = ntohs (message->size);
1573   if (msize < sizeof (struct PeerPutMessage))
1574   {
1575     GNUNET_break_op (0);
1576     return GNUNET_YES;
1577   }
1578   put = (const struct PeerPutMessage*) message;
1579   putlen = ntohl (put->put_path_length);
1580   if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1581        (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1582     {
1583       GNUNET_break_op (0);
1584       return GNUNET_YES;
1585     }
1586   GNUNET_STATISTICS_update (GDS_stats,
1587                             gettext_noop ("# P2P PUT requests received"), 1,
1588                             GNUNET_NO);
1589   put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
1590   payload = &put_path[putlen];
1591   options = ntohl (put->options);
1592   payload_size = msize - (sizeof (struct PeerPutMessage) + 
1593                           putlen * sizeof (struct GNUNET_PeerIdentity));
1594   switch (GNUNET_BLOCK_get_key (GDS_block_context,
1595                                 ntohl (put->type),
1596                                 payload, payload_size,
1597                                 &test_key))
1598   {
1599   case GNUNET_YES:
1600     if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
1601     {
1602       GNUNET_break_op (0);
1603       return GNUNET_YES;
1604     }
1605     break;
1606   case GNUNET_NO:
1607     GNUNET_break_op (0);
1608     return GNUNET_YES;
1609   case GNUNET_SYSERR:
1610     /* cannot verify, good luck */
1611     break;
1612   }
1613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614               "PUT for %s at %s\n",
1615               GNUNET_h2s (&put->key),
1616               GNUNET_i2s (&my_identity));
1617   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1618                                           DHT_BLOOM_SIZE,
1619                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1620   GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &peer->hashPubKey));
1621   {
1622     struct GNUNET_PeerIdentity pp[putlen+1];
1623   
1624     /* extend 'put path' by sender */
1625     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1626     {
1627       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1628       pp[putlen] = *peer;
1629       putlen++;
1630     }
1631     else
1632       putlen = 0;
1633     
1634     /* give to local clients */
1635     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1636                              &put->key,
1637                              0, NULL,
1638                              putlen,
1639                              pp,
1640                              ntohl (put->type),
1641                              payload_size,
1642                              payload);
1643     /* store locally */
1644     if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1645          (am_closest_peer (&put->key,
1646                            bf) ) )
1647       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1648                                 &put->key,
1649                                 putlen, pp,
1650                                 ntohl (put->type),
1651                                 payload_size,
1652                                 payload);
1653     /* route to other peers */
1654     GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1655                                options,
1656                                ntohl (put->desired_replication_level),
1657                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1658                                ntohl (put->hop_count),
1659                                bf,
1660                                &put->key,
1661                                putlen, pp,
1662                                payload,
1663                                payload_size);
1664   }
1665   GNUNET_CONTAINER_bloomfilter_free (bf);
1666   return GNUNET_YES;
1667 }
1668
1669
1670 /**
1671  * We have received a FIND PEER request.  Send matching
1672  * HELLOs back.
1673  *
1674  * @param sender sender of the FIND PEER request
1675  * @param key peers close to this key are desired
1676  * @param bf peers matching this bf are excluded
1677  * @param bf_mutator mutator for bf
1678  */
1679 static void
1680 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1681                   const GNUNET_HashCode *key,
1682                   struct GNUNET_CONTAINER_BloomFilter *bf,
1683                   uint32_t bf_mutator)
1684 {
1685   int bucket_idx;
1686   struct PeerBucket *bucket;
1687   struct PeerInfo *peer;
1688   unsigned int choice;
1689   GNUNET_HashCode mhash;
1690   const struct GNUNET_HELLO_Message *hello;
1691
1692   /* first, check about our own HELLO */
1693   if (NULL != GDS_my_hello)
1694   {
1695     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1696     if ( (NULL == bf) ||
1697          (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) )
1698     {
1699       GDS_NEIGHBOURS_handle_reply (sender,
1700                                    GNUNET_BLOCK_TYPE_DHT_HELLO,
1701                                    GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1702                                    key,
1703                                    0, NULL,
1704                                    0, NULL,
1705                                    GDS_my_hello,
1706                                    GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message*) GDS_my_hello));
1707     }
1708     else
1709     {
1710       GNUNET_STATISTICS_update (GDS_stats,
1711                                 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"), 1,
1712                                 GNUNET_NO);
1713     }
1714   }
1715   else
1716   {
1717     GNUNET_STATISTICS_update (GDS_stats,
1718                               gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"), 1,
1719                               GNUNET_NO);
1720   }
1721
1722   /* then, also consider sending a random HELLO from the closest bucket */
1723   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
1724     bucket_idx = closest_bucket;
1725   else
1726     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
1727   if (bucket_idx == GNUNET_SYSERR)
1728     return;
1729   bucket = &k_buckets[bucket_idx];
1730   if (bucket->peers_size == 0)
1731     return;
1732   choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1733                                      bucket->peers_size);
1734   peer = bucket->head;
1735   while (choice > 0)
1736   {
1737     GNUNET_assert (peer != NULL);
1738     peer = peer->next;
1739     choice--;
1740   }
1741   choice = bucket->peers_size;
1742   do
1743     {
1744       peer = peer->next;
1745       if (choice-- == 0)
1746         return; /* no non-masked peer available */
1747       if (peer == NULL)
1748         peer = bucket->head;
1749       GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1750       hello = GDS_HELLO_get (&peer->id);
1751     }
1752   while ( (hello == NULL) ||
1753           (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)) );
1754   GDS_NEIGHBOURS_handle_reply (sender,
1755                                GNUNET_BLOCK_TYPE_DHT_HELLO,
1756                                GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1757                                key,
1758                                0, NULL,
1759                                0, NULL,
1760                                hello,
1761                                GNUNET_HELLO_size (hello));    
1762 }
1763
1764
1765 /**
1766  * Core handler for p2p get requests.
1767  *
1768  * @param cls closure
1769  * @param peer sender of the request
1770  * @param message message
1771  * @param peer peer identity this notification is about
1772  * @param atsi performance data
1773  * @return GNUNET_OK to keep the connection open,
1774  *         GNUNET_SYSERR to close it (signal serious error)
1775  */
1776 static int
1777 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1778                     const struct GNUNET_MessageHeader *message,
1779                     const struct GNUNET_TRANSPORT_ATS_Information
1780                     *atsi)
1781 {
1782   struct PeerGetMessage *get;
1783   uint32_t xquery_size;
1784   size_t reply_bf_size;
1785   uint16_t msize;
1786   enum GNUNET_BLOCK_Type type;
1787   enum GNUNET_DHT_RouteOption options;
1788   enum GNUNET_BLOCK_EvaluationResult eval;
1789   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1790   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1791   const char *xquery;
1792
1793   GNUNET_break (0 != memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)));
1794   /* parse and validate message */
1795   msize = ntohs (message->size);
1796   if (msize < sizeof (struct PeerGetMessage))
1797   {
1798     GNUNET_break_op (0);
1799     return GNUNET_YES;
1800   }
1801   get = (struct PeerGetMessage *) message;
1802   xquery_size = ntohl (get->xquery_size);
1803   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1804   {
1805     GNUNET_break_op (0);
1806     return GNUNET_YES;
1807   }
1808   GNUNET_STATISTICS_update (GDS_stats,
1809                             gettext_noop ("# P2P GET requests received"), 1,
1810                             GNUNET_NO);
1811   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1812   type = ntohl (get->type);
1813   options = ntohl (get->options);
1814   xquery = (const char*) &get[1];
1815   reply_bf = NULL;
1816   if (reply_bf_size > 0)
1817     reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1818                                                   reply_bf_size,
1819                                                   GNUNET_CONSTANTS_BLOOMFILTER_K);
1820   eval = GNUNET_BLOCK_evaluate (GDS_block_context,
1821                                 type,
1822                                 &get->key,
1823                                 &reply_bf,
1824                                 get->bf_mutator,
1825                                 xquery, xquery_size,
1826                                 NULL, 0);
1827   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1828   {
1829     /* request invalid or block type not supported */
1830     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1831     if (NULL != reply_bf)
1832       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1833     return GNUNET_YES;
1834   }
1835   peer_bf =
1836     GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, 
1837                                        DHT_BLOOM_SIZE,
1838                                        GNUNET_CONSTANTS_BLOOMFILTER_K);
1839   GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &peer->hashPubKey));
1840   /* remember request for routing replies */
1841   GDS_ROUTING_add (peer,
1842                    type,
1843                    options,
1844                    &get->key,
1845                    xquery, xquery_size,
1846                    reply_bf, get->bf_mutator);
1847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1848               "GET for %s at %s after %u hops\n",
1849               GNUNET_h2s (&get->key),
1850               GNUNET_i2s (&my_identity),
1851               (unsigned int) ntohl (get->hop_count));
1852   /* local lookup (this may update the reply_bf) */
1853   if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1854        (am_closest_peer (&get->key,
1855                          peer_bf) ) )
1856     {
1857     if ( (0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1858     {
1859       GNUNET_STATISTICS_update (GDS_stats,
1860                                 gettext_noop ("# P2P FIND PEER requests processed"), 1,
1861                                 GNUNET_NO);
1862       handle_find_peer (peer,
1863                         &get->key,
1864                         reply_bf,
1865                         get->bf_mutator);
1866     }
1867     else
1868     {
1869       eval = GDS_DATACACHE_handle_get (&get->key,
1870                                        type,
1871                                        xquery, xquery_size,
1872                                        &reply_bf, 
1873                                        get->bf_mutator);
1874     }
1875   }
1876   else
1877   {
1878     GNUNET_STATISTICS_update (GDS_stats,
1879                               gettext_noop ("# P2P GET requests ONLY routed"), 1,
1880                               GNUNET_NO);
1881   }
1882   
1883   /* P2P forwarding */
1884   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1885     GDS_NEIGHBOURS_handle_get (type,
1886                                options,
1887                                ntohl (get->desired_replication_level),
1888                                ntohl (get->hop_count),
1889                                &get->key,
1890                                xquery, xquery_size,
1891                                reply_bf,
1892                                get->bf_mutator,
1893                                peer_bf);
1894   /* clean up */
1895   if (NULL != reply_bf)
1896     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1897   GNUNET_CONTAINER_bloomfilter_free (peer_bf);  
1898   return GNUNET_YES;
1899 }
1900
1901
1902 /**
1903  * Core handler for p2p result messages.
1904  *
1905  * @param cls closure
1906  * @param message message
1907  * @param peer peer identity this notification is about
1908  * @param atsi performance data
1909  * @return GNUNET_YES (do not cut p2p connection)
1910  */
1911 static int
1912 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1913                        const struct GNUNET_MessageHeader *message,
1914                        const struct GNUNET_TRANSPORT_ATS_Information
1915                        *atsi)
1916 {
1917   const struct PeerResultMessage *prm;
1918   const struct GNUNET_PeerIdentity *put_path;
1919   const struct GNUNET_PeerIdentity *get_path;
1920   const void *data;
1921   uint32_t get_path_length;
1922   uint32_t put_path_length;
1923   uint16_t msize;
1924   size_t data_size;
1925   enum GNUNET_BLOCK_Type type;
1926                        
1927   /* parse and validate message */
1928   msize = ntohs (message->size);
1929   if (msize < sizeof (struct PeerResultMessage))
1930   {
1931     GNUNET_break_op (0);
1932     return GNUNET_YES;
1933   }
1934   prm = (struct PeerResultMessage *) message;
1935   put_path_length = ntohl (prm->put_path_length);
1936   get_path_length = ntohl (prm->get_path_length);
1937   if ( (msize < sizeof (struct PeerResultMessage) + 
1938         (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) ||
1939        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1940        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
1941   {
1942     GNUNET_break_op (0);
1943     return GNUNET_YES;
1944   } 
1945   GNUNET_STATISTICS_update (GDS_stats,
1946                             gettext_noop ("# P2P RESULTS received"), 1,
1947                             GNUNET_NO);
1948   put_path = (const struct GNUNET_PeerIdentity*) &prm[1];
1949   get_path = &put_path[put_path_length];
1950   type = ntohl (prm->type);
1951   data = (const void*) &get_path[get_path_length];
1952   data_size = msize - (sizeof (struct PeerResultMessage) + 
1953                        (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1954
1955   /* if we got a HELLO, consider it for our own routing table */
1956   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
1957   {
1958     const struct GNUNET_MessageHeader *h;
1959     struct GNUNET_PeerIdentity pid;
1960     int bucket;
1961
1962     /* Should be a HELLO, validate and consider using it! */
1963     if (data_size < sizeof (struct GNUNET_MessageHeader))
1964     {
1965       GNUNET_break_op (0);
1966       return GNUNET_YES;
1967     }
1968     h = data;
1969     if (data_size != ntohs (h->size))
1970     {
1971       GNUNET_break_op (0);
1972       return GNUNET_YES;
1973     }
1974     if (GNUNET_OK !=
1975         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) h,
1976                              &pid))
1977     {
1978       GNUNET_break_op (0);
1979       return GNUNET_YES;
1980     }
1981     if (0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity)))
1982     {
1983       bucket = find_bucket (&pid.hashPubKey);
1984       if ( (bucket >= 0) &&
1985            (k_buckets[bucket].peers_size < bucket_size) )
1986         {    
1987           if (NULL != GDS_transport_handle)
1988           {
1989             GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
1990                                           h, NULL, NULL);
1991             GNUNET_TRANSPORT_try_connect (GDS_transport_handle,
1992                                           &pid);
1993           }
1994         }   
1995     }
1996   }
1997
1998   /* append 'peer' to 'get_path' */
1999   {    
2000     struct GNUNET_PeerIdentity xget_path[get_path_length+1];
2001
2002     memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
2003     xget_path[get_path_length] = *peer;
2004     get_path_length++;
2005
2006     /* forward to local clients */   
2007     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2008                              &prm->key,
2009                              get_path_length,
2010                              xget_path,
2011                              put_path_length,
2012                              put_path,
2013                              type,
2014                              data_size, 
2015                              data);
2016
2017     /* forward to other peers */
2018     GDS_ROUTING_process (type,
2019                          GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2020                          &prm->key,
2021                          put_path_length,
2022                          put_path,
2023                          get_path_length,
2024                          xget_path,
2025                          data,
2026                          data_size);                     
2027   }
2028   return GNUNET_YES;
2029 }
2030
2031
2032 /**
2033  * Initialize neighbours subsystem.
2034  *
2035  * @return GNUNET_OK on success, GNUNET_SYSERR on error
2036  */
2037 int
2038 GDS_NEIGHBOURS_init ()
2039 {
2040   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2041     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2042     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2043     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2044     {NULL, 0, 0}
2045   };
2046   unsigned long long temp_config_num;
2047  
2048   if (GNUNET_OK ==
2049       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2050                                              &temp_config_num))
2051     bucket_size = (unsigned int) temp_config_num;  
2052   atsAPI = GNUNET_ATS_init (GDS_cfg, NULL, NULL);
2053   coreAPI = GNUNET_CORE_connect (GDS_cfg,
2054                                  1,
2055                                  NULL,
2056                                  &core_init,
2057                                  &handle_core_connect,
2058                                  &handle_core_disconnect, 
2059                                  NULL, GNUNET_NO,
2060                                  NULL, GNUNET_NO,
2061                                  core_handlers);
2062   if (coreAPI == NULL)
2063     return GNUNET_SYSERR;
2064   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
2065   return GNUNET_OK;
2066 }
2067
2068
2069 /**
2070  * Shutdown neighbours subsystem.
2071  */
2072 void
2073 GDS_NEIGHBOURS_done ()
2074 {
2075   if (coreAPI == NULL)
2076     return;
2077   GNUNET_CORE_disconnect (coreAPI);
2078   coreAPI = NULL;    
2079   GNUNET_ATS_shutdown (atsAPI);
2080   atsAPI = NULL;    
2081   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
2082   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
2083   all_known_peers = NULL;
2084   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
2085   {
2086     GNUNET_SCHEDULER_cancel (find_peer_task);
2087     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
2088   }
2089 }
2090
2091
2092 /* end of gnunet-service-dht_neighbours.c */