-getting core, nse, testbed, ats-tool and peerinfo-tool to compile again (part of...
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_block_lib.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_nse_service.h"
35 #include "gnunet_ats_service.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
40 #include "gnunet_dht_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-dht.h"
43 #include "gnunet-service-dht_clients.h"
44 #include "gnunet-service-dht_datacache.h"
45 #include "gnunet-service-dht_hello.h"
46 #include "gnunet-service-dht_neighbours.h"
47 #include "gnunet-service-dht_nse.h"
48 #include "gnunet-service-dht_routing.h"
49 #include <fenv.h>
50 #include "dht.h"
51
52 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
53
54 /**
55  * How many buckets will we allow total.
56  */
57 #define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
58
59 /**
60  * What is the maximum number of peers in a given bucket.
61  */
62 #define DEFAULT_BUCKET_SIZE 8
63
64 /**
65  * Desired replication level for FIND PEER requests
66  */
67 #define FIND_PEER_REPLICATION_LEVEL 4
68
69 /**
70  * Maximum allowed replication level for all requests.
71  */
72 #define MAXIMUM_REPLICATION_LEVEL 16
73
74 /**
75  * Maximum allowed number of pending messages per peer.
76  */
77 #define MAXIMUM_PENDING_PER_PEER 64
78
79 /**
80  * How often to update our preference levels for peers in our routing tables.
81  */
82 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
83
84 /**
85  * How long at least to wait before sending another find peer request.
86  */
87 #define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
88
89 /**
90  * How long at most to wait before sending another find peer request.
91  */
92 #define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10)
93
94 /**
95  * How long at most to wait for transmission of a GET request to another peer?
96  */
97 #define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
98
99 /**
100  * Hello address expiration
101  */
102 extern struct GNUNET_TIME_Relative hello_expiration;
103
104
105 GNUNET_NETWORK_STRUCT_BEGIN
106
107 /**
108  * P2P PUT message
109  */
110 struct PeerPutMessage
111 {
112   /**
113    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
114    */
115   struct GNUNET_MessageHeader header;
116
117   /**
118    * Processing options
119    */
120   uint32_t options GNUNET_PACKED;
121
122   /**
123    * Content type.
124    */
125   uint32_t type GNUNET_PACKED;
126
127   /**
128    * Hop count
129    */
130   uint32_t hop_count GNUNET_PACKED;
131
132   /**
133    * Replication level for this message
134    */
135   uint32_t desired_replication_level GNUNET_PACKED;
136
137   /**
138    * Length of the PUT path that follows (if tracked).
139    */
140   uint32_t put_path_length GNUNET_PACKED;
141
142   /**
143    * When does the content expire?
144    */
145   struct GNUNET_TIME_AbsoluteNBO expiration_time;
146
147   /**
148    * Bloomfilter (for peer identities) to stop circular routes
149    */
150   char bloomfilter[DHT_BLOOM_SIZE];
151
152   /**
153    * The key we are storing under.
154    */
155   struct GNUNET_HashCode key;
156
157   /* put path (if tracked) */
158
159   /* Payload */
160
161 };
162
163
164 /**
165  * P2P Result message
166  */
167 struct PeerResultMessage
168 {
169   /**
170    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
171    */
172   struct GNUNET_MessageHeader header;
173
174   /**
175    * Content type.
176    */
177   uint32_t type GNUNET_PACKED;
178
179   /**
180    * Length of the PUT path that follows (if tracked).
181    */
182   uint32_t put_path_length GNUNET_PACKED;
183
184   /**
185    * Length of the GET path that follows (if tracked).
186    */
187   uint32_t get_path_length GNUNET_PACKED;
188
189   /**
190    * When does the content expire?
191    */
192   struct GNUNET_TIME_AbsoluteNBO expiration_time;
193
194   /**
195    * The key of the corresponding GET request.
196    */
197   struct GNUNET_HashCode key;
198
199   /* put path (if tracked) */
200
201   /* get path (if tracked) */
202
203   /* Payload */
204
205 };
206
207
208 /**
209  * P2P GET message
210  */
211 struct PeerGetMessage
212 {
213   /**
214    * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
215    */
216   struct GNUNET_MessageHeader header;
217
218   /**
219    * Processing options
220    */
221   uint32_t options GNUNET_PACKED;
222
223   /**
224    * Desired content type.
225    */
226   uint32_t type GNUNET_PACKED;
227
228   /**
229    * Hop count
230    */
231   uint32_t hop_count GNUNET_PACKED;
232
233   /**
234    * Desired replication level for this request.
235    */
236   uint32_t desired_replication_level GNUNET_PACKED;
237
238   /**
239    * Size of the extended query.
240    */
241   uint32_t xquery_size;
242
243   /**
244    * Bloomfilter mutator.
245    */
246   uint32_t bf_mutator;
247
248   /**
249    * Bloomfilter (for peer identities) to stop circular routes
250    */
251   char bloomfilter[DHT_BLOOM_SIZE];
252
253   /**
254    * The key we are looking for.
255    */
256   struct GNUNET_HashCode key;
257
258   /* xquery */
259
260   /* result bloomfilter */
261
262 };
263 GNUNET_NETWORK_STRUCT_END
264
265 /**
266  * Linked list of messages to send to a particular other peer.
267  */
268 struct P2PPendingMessage
269 {
270   /**
271    * Pointer to next item in the list
272    */
273   struct P2PPendingMessage *next;
274
275   /**
276    * Pointer to previous item in the list
277    */
278   struct P2PPendingMessage *prev;
279
280   /**
281    * Message importance level.  FIXME: used? useful?
282    */
283   unsigned int importance;
284
285   /**
286    * When does this message time out?
287    */
288   struct GNUNET_TIME_Absolute timeout;
289
290   /**
291    * Actual message to be sent, allocated at the end of the struct:
292    * // msg = (cast) &pm[1];
293    * // memcpy (&pm[1], data, len);
294    */
295   const struct GNUNET_MessageHeader *msg;
296
297 };
298
299
300 /**
301  * Entry for a peer in a bucket.
302  */
303 struct PeerInfo
304 {
305   /**
306    * Next peer entry (DLL)
307    */
308   struct PeerInfo *next;
309
310   /**
311    *  Prev peer entry (DLL)
312    */
313   struct PeerInfo *prev;
314
315   /**
316    * Count of outstanding messages for peer. 
317    */
318   unsigned int pending_count;
319
320   /**
321    * Head of pending messages to be sent to this peer.
322    */
323   struct P2PPendingMessage *head;
324
325   /**
326    * Tail of pending messages to be sent to this peer.
327    */
328   struct P2PPendingMessage *tail;
329
330   /**
331    * Core handle for sending messages to this peer.
332    */
333   struct GNUNET_CORE_TransmitHandle *th;
334
335   /**
336    * Task for scheduling preference updates
337    */
338   GNUNET_SCHEDULER_TaskIdentifier preference_task;
339
340   /**
341    * What is the identity of the peer?
342    */
343   struct GNUNET_PeerIdentity id;
344
345 #if 0
346   /**
347    * What is the average latency for replies received?
348    */
349   struct GNUNET_TIME_Relative latency;
350
351   /**
352    * Transport level distance to peer.
353    */
354   unsigned int distance;
355 #endif
356
357 };
358
359
360 /**
361  * Peers are grouped into buckets.
362  */
363 struct PeerBucket
364 {
365   /**
366    * Head of DLL
367    */
368   struct PeerInfo *head;
369
370   /**
371    * Tail of DLL
372    */
373   struct PeerInfo *tail;
374
375   /**
376    * Number of peers in the bucket.
377    */
378   unsigned int peers_size;
379 };
380
381
382 /**
383  * Do we cache all results that we are routing in the local datacache?
384  */
385 static int cache_results;
386
387 /**
388  * Should routing details be logged to stderr (for debugging)?
389  */
390 static int log_route_details_stderr;
391
392 /**
393  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
394  */
395 static unsigned int closest_bucket;
396
397 /**
398  * How many peers have we added since we sent out our last
399  * find peer request?
400  */
401 static unsigned int newly_found_peers;
402
403 /**
404  * Option for testing that disables the 'connect' function of the DHT.
405  */
406 static int disable_try_connect;
407
408 /**
409  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
410  */
411 static struct PeerBucket k_buckets[MAX_BUCKETS];
412
413 /**
414  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
415  */
416 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
417
418 /**
419  * Maximum size for each bucket.
420  */
421 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
422
423 /**
424  * Task that sends FIND PEER requests.
425  */
426 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
427
428 /**
429  * Identity of this peer.
430  */
431 static struct GNUNET_PeerIdentity my_identity;
432
433 /**
434  * Handle to CORE.
435  */
436 static struct GNUNET_CORE_Handle *coreAPI;
437
438 /**
439  * Handle to ATS.
440  */
441 static struct GNUNET_ATS_PerformanceHandle *atsAPI;
442
443
444
445 /**
446  * Find the optimal bucket for this key.
447  *
448  * @param hc the hashcode to compare our identity to
449  * @return the proper bucket index, or GNUNET_SYSERR
450  *         on error (same hashcode)
451  */
452 static int
453 find_bucket (const struct GNUNET_HashCode *hc)
454 {
455   unsigned int bits;
456
457   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
458   if (bits == MAX_BUCKETS)
459   {
460     /* How can all bits match? Got my own ID? */
461     GNUNET_break (0);
462     return GNUNET_SYSERR;
463   }
464   return MAX_BUCKETS - bits - 1;
465 }
466
467
468 /**
469  * Let GNUnet core know that we like the given peer.
470  *
471  * @param cls the `struct PeerInfo` of the peer
472  * @param tc scheduler context.
473  */
474 static void
475 update_core_preference (void *cls,
476                         const struct GNUNET_SCHEDULER_TaskContext *tc)
477 {
478   struct PeerInfo *peer = cls;
479   uint64_t preference;
480   unsigned int matching;
481   int bucket;
482
483   peer->preference_task = GNUNET_SCHEDULER_NO_TASK;
484   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
485     return;
486   matching =
487       GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
488                                         &peer->id.hashPubKey);
489   if (matching >= 64)
490     matching = 63;
491   bucket = find_bucket (&peer->id.hashPubKey);
492   if (bucket == GNUNET_SYSERR)
493     preference = 0;
494   else
495   {
496     GNUNET_assert (k_buckets[bucket].peers_size != 0);
497     preference = (1LL << matching) / k_buckets[bucket].peers_size;
498   }
499   if (preference == 0)
500   {
501     peer->preference_task =
502         GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
503                                       &update_core_preference, peer);
504     return;
505   }
506   GNUNET_STATISTICS_update (GDS_stats,
507                             gettext_noop ("# Preference updates given to core"),
508                             1, GNUNET_NO);
509   GNUNET_ATS_performance_change_preference (atsAPI, &peer->id,
510                                 GNUNET_ATS_PREFERENCE_BANDWIDTH,
511                                 (double) preference, GNUNET_ATS_PREFERENCE_END);
512   peer->preference_task =
513       GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
514                                     &update_core_preference, peer);
515
516
517 }
518
519
520 /**
521  * Closure for 'add_known_to_bloom'.
522  */
523 struct BloomConstructorContext
524 {
525   /**
526    * Bloom filter under construction.
527    */
528   struct GNUNET_CONTAINER_BloomFilter *bloom;
529
530   /**
531    * Mutator to use.
532    */
533   uint32_t bf_mutator;
534 };
535
536
537 /**
538  * Add each of the peers we already know to the bloom filter of
539  * the request so that we don't get duplicate HELLOs.
540  *
541  * @param cls the 'struct BloomConstructorContext'.
542  * @param key peer identity to add to the bloom filter
543  * @param value value the peer information (unused)
544  * @return #GNUNET_YES (we should continue to iterate)
545  */
546 static int
547 add_known_to_bloom (void *cls, 
548                     const struct GNUNET_HashCode *key, 
549                     void *value)
550 {
551   struct BloomConstructorContext *ctx = cls;
552   struct GNUNET_HashCode mh;
553
554   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
555   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
556               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
557               GNUNET_h2s (key), ctx->bf_mutator);
558   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
559   return GNUNET_YES;
560 }
561
562
563 /**
564  * Task to send a find peer message for our own peer identifier
565  * so that we can find the closest peers in the network to ourselves
566  * and attempt to connect to them.
567  *
568  * @param cls closure for this task
569  * @param tc the context under which the task is running
570  */
571 static void
572 send_find_peer_message (void *cls,
573                         const struct GNUNET_SCHEDULER_TaskContext *tc)
574 {
575   struct GNUNET_TIME_Relative next_send_time;
576   struct BloomConstructorContext bcc;
577   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
578
579   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
580   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
581     return;
582   if (newly_found_peers > bucket_size)
583   {
584     /* If we are finding many peers already, no need to send out our request right now! */
585     find_peer_task =
586         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
587                                       &send_find_peer_message, NULL);
588     newly_found_peers = 0;
589     return;
590   }
591   bcc.bf_mutator =
592       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
593   bcc.bloom =
594       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
595                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
596   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
597                                          &bcc);
598   GNUNET_STATISTICS_update (GDS_stats,
599                             gettext_noop ("# FIND PEER messages initiated"), 1,
600                             GNUNET_NO);
601   peer_bf =
602       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
603                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
604   // FIXME: pass priority!?
605   GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
606                              GNUNET_DHT_RO_FIND_PEER,
607                              FIND_PEER_REPLICATION_LEVEL, 0,
608                              &my_identity.hashPubKey, NULL, 0, bcc.bloom,
609                              bcc.bf_mutator, peer_bf);
610   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
611   GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
612   /* schedule next round */
613   next_send_time.rel_value_us =
614       DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us +
615       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
616                                 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us /
617                                 (newly_found_peers + 1));
618   newly_found_peers = 0;
619   find_peer_task =
620       GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
621                                     NULL);
622 }
623
624
625 /**
626  * Method called whenever a peer connects.
627  *
628  * @param cls closure
629  * @param peer peer identity this notification is about
630  */
631 static void
632 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer)
633 {
634   struct PeerInfo *ret;
635   int peer_bucket;
636
637   /* Check for connect to self message */
638   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
639     return;
640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n",
641               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
642   if (GNUNET_YES ==
643       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
644                                               &peer->hashPubKey))
645   {
646     GNUNET_break (0);
647     return;
648   }
649   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1,
650                             GNUNET_NO);
651   peer_bucket = find_bucket (&peer->hashPubKey);
652   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
653   ret = GNUNET_malloc (sizeof (struct PeerInfo));
654 #if 0
655   ret->latency = latency;
656   ret->distance = distance;
657 #endif
658   ret->id = *peer;
659   GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
660                                     k_buckets[peer_bucket].tail, ret);
661   k_buckets[peer_bucket].peers_size++;
662   closest_bucket = GNUNET_MAX (closest_bucket, peer_bucket);
663   if ((peer_bucket > 0) && (k_buckets[peer_bucket].peers_size <= bucket_size))
664   {
665     ret->preference_task =
666         GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
667     newly_found_peers++;
668   }
669   GNUNET_assert (GNUNET_OK ==
670                  GNUNET_CONTAINER_multihashmap_put (all_known_peers,
671                                                     &peer->hashPubKey, ret,
672                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
673   if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers) &&
674       (GNUNET_YES != disable_try_connect))
675   {
676     /* got a first connection, good time to start with FIND PEER requests... */
677     find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL);
678   }
679 }
680
681
682 /**
683  * Method called whenever a peer disconnects.
684  *
685  * @param cls closure
686  * @param peer peer identity this notification is about
687  */
688 static void
689 handle_core_disconnect (void *cls,
690                         const struct GNUNET_PeerIdentity *peer)
691 {
692   struct PeerInfo *to_remove;
693   int current_bucket;
694   struct P2PPendingMessage *pos;
695   unsigned int discarded;
696
697   /* Check for disconnect from self message */
698   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
699     return;
700   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n",
701               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
702   to_remove =
703       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
704   if (NULL == to_remove)
705   {
706     GNUNET_break (0);
707     return;
708   }
709   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1,
710                             GNUNET_NO);
711   GNUNET_assert (GNUNET_YES ==
712                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
713                                                        &peer->hashPubKey,
714                                                        to_remove));
715   if (GNUNET_SCHEDULER_NO_TASK != to_remove->preference_task)
716   {
717     GNUNET_SCHEDULER_cancel (to_remove->preference_task);
718     to_remove->preference_task = GNUNET_SCHEDULER_NO_TASK;
719   }
720   current_bucket = find_bucket (&to_remove->id.hashPubKey);
721   GNUNET_assert (current_bucket >= 0);
722   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
723                                k_buckets[current_bucket].tail, to_remove);
724   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
725   k_buckets[current_bucket].peers_size--;
726   while ((closest_bucket > 0) && (k_buckets[closest_bucket].peers_size == 0))
727     closest_bucket--;
728
729   if (to_remove->th != NULL)
730   {
731     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
732     to_remove->th = NULL;
733   }
734   discarded = 0;
735   while (NULL != (pos = to_remove->head))
736   {
737     GNUNET_CONTAINER_DLL_remove (to_remove->head, to_remove->tail, pos);
738     discarded++;
739     GNUNET_free (pos);
740   }
741   GNUNET_STATISTICS_update (GDS_stats,
742                             gettext_noop
743                             ("# Queued messages discarded (peer disconnected)"),
744                             discarded, GNUNET_NO);
745   GNUNET_free (to_remove);
746 }
747
748
749 /**
750  * Called when core is ready to send a message we asked for
751  * out to the destination.
752  *
753  * @param cls the 'struct PeerInfo' of the target peer
754  * @param size number of bytes available in buf
755  * @param buf where the callee should write the message
756  * @return number of bytes written to buf
757  */
758 static size_t
759 core_transmit_notify (void *cls, size_t size, void *buf)
760 {
761   struct PeerInfo *peer = cls;
762   char *cbuf = buf;
763   struct P2PPendingMessage *pending;
764   size_t off;
765   size_t msize;
766
767   peer->th = NULL;
768   while ((NULL != (pending = peer->head)) &&
769          (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
770   {
771     peer->pending_count--;
772     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
773     GNUNET_free (pending);
774   }
775   if (pending == NULL)
776   {
777     /* no messages pending */
778     return 0;
779   }
780   if (buf == NULL)
781   {
782     peer->th =
783         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_NO,
784                                            pending->importance,
785                                            GNUNET_TIME_absolute_get_remaining
786                                            (pending->timeout), &peer->id,
787                                            ntohs (pending->msg->size),
788                                            &core_transmit_notify, peer);
789     GNUNET_break (NULL != peer->th);
790     return 0;
791   }
792   off = 0;
793   while ((NULL != (pending = peer->head)) &&
794          (size - off >= (msize = ntohs (pending->msg->size))))
795   {
796     GNUNET_STATISTICS_update (GDS_stats,
797                               gettext_noop
798                               ("# Bytes transmitted to other peers"), msize,
799                               GNUNET_NO);
800     memcpy (&cbuf[off], pending->msg, msize);
801     off += msize;
802     peer->pending_count--;
803     GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
804     GNUNET_free (pending);
805   }
806   if (peer->head != NULL)
807   {
808     peer->th =
809         GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_NO,
810                                            pending->importance,
811                                            GNUNET_TIME_absolute_get_remaining
812                                            (pending->timeout), &peer->id, msize,
813                                            &core_transmit_notify, peer);
814     GNUNET_break (NULL != peer->th);
815   }
816   return off;
817 }
818
819
820 /**
821  * Transmit all messages in the peer's message queue.
822  *
823  * @param peer message queue to process
824  */
825 static void
826 process_peer_queue (struct PeerInfo *peer)
827 {
828   struct P2PPendingMessage *pending;
829
830   if (NULL == (pending = peer->head))
831     return;
832   if (NULL != peer->th)
833     return;
834   GNUNET_STATISTICS_update (GDS_stats,
835                             gettext_noop
836                             ("# Bytes of bandwidth requested from core"),
837                             ntohs (pending->msg->size), GNUNET_NO);
838   peer->th =
839       GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_NO,
840                                          pending->importance,
841                                          GNUNET_TIME_absolute_get_remaining
842                                          (pending->timeout), &peer->id,
843                                          ntohs (pending->msg->size),
844                                          &core_transmit_notify, peer);
845   GNUNET_break (NULL != peer->th);
846 }
847
848
849 /**
850  * To how many peers should we (on average) forward the request to
851  * obtain the desired target_replication count (on average).
852  *
853  * @param hop_count number of hops the message has traversed
854  * @param target_replication the number of total paths desired
855  * @return Some number of peers to forward the message to
856  */
857 static unsigned int
858 get_forward_count (uint32_t hop_count, uint32_t target_replication)
859 {
860   uint32_t random_value;
861   uint32_t forward_count;
862   float target_value;
863
864   if (hop_count > GDS_NSE_get () * 4.0)
865   {
866     /* forcefully terminate */
867     GNUNET_STATISTICS_update (GDS_stats,
868                               gettext_noop ("# requests TTL-dropped"),
869                               1, GNUNET_NO);
870     return 0;
871   }
872   if (hop_count > GDS_NSE_get () * 2.0)
873   {
874     /* Once we have reached our ideal number of hops, only forward to 1 peer */
875     return 1;
876   }
877   /* bound by system-wide maximum */
878   target_replication =
879       GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
880   target_value =
881       1 + (target_replication - 1.0) / (GDS_NSE_get () +
882                                         ((float) (target_replication - 1.0) *
883                                          hop_count));
884   /* Set forward count to floor of target_value */
885   forward_count = (uint32_t) target_value;
886   /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
887   target_value = target_value - forward_count;
888   random_value =
889       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
890   if (random_value < (target_value * UINT32_MAX))
891     forward_count++;
892   return forward_count;
893 }
894
895
896 /**
897  * Compute the distance between have and target as a 32-bit value.
898  * Differences in the lower bits must count stronger than differences
899  * in the higher bits.
900  *
901  * @param target 
902  * @param have
903  * @return 0 if have==target, otherwise a number
904  *           that is larger as the distance between
905  *           the two hash codes increases
906  */
907 static unsigned int
908 get_distance (const struct GNUNET_HashCode *target, 
909               const struct GNUNET_HashCode *have)
910 {
911   unsigned int bucket;
912   unsigned int msb;
913   unsigned int lsb;
914   unsigned int i;
915
916   /* We have to represent the distance between two 2^9 (=512)-bit
917    * numbers as a 2^5 (=32)-bit number with "0" being used for the
918    * two numbers being identical; furthermore, we need to
919    * guarantee that a difference in the number of matching
920    * bits is always represented in the result.
921    *
922    * We use 2^32/2^9 numerical values to distinguish between
923    * hash codes that have the same LSB bit distance and
924    * use the highest 2^9 bits of the result to signify the
925    * number of (mis)matching LSB bits; if we have 0 matching
926    * and hence 512 mismatching LSB bits we return -1 (since
927    * 512 itself cannot be represented with 9 bits) */
928
929   /* first, calculate the most significant 9 bits of our
930    * result, aka the number of LSBs */
931   bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
932   /* bucket is now a value between 0 and 512 */
933   if (bucket == 512)
934     return 0;                   /* perfect match */
935   if (bucket == 0)
936     return (unsigned int) -1;   /* LSB differs; use max (if we did the bit-shifting
937                                  * below, we'd end up with max+1 (overflow)) */
938
939   /* calculate the most significant bits of the final result */
940   msb = (512 - bucket) << (32 - 9);
941   /* calculate the 32-9 least significant bits of the final result by
942    * looking at the differences in the 32-9 bits following the
943    * mismatching bit at 'bucket' */
944   lsb = 0;
945   for (i = bucket + 1;
946        (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
947   {
948     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
949         GNUNET_CRYPTO_hash_get_bit (have, i))
950       lsb |= (1 << (bucket + 32 - 9 - i));      /* first bit set will be 10,
951                                                  * last bit set will be 31 -- if
952                                                  * i does not reach 512 first... */
953   }
954   return msb | lsb;
955 }
956
957
958 /**
959  * Check whether my identity is closer than any known peers.  If a
960  * non-null bloomfilter is given, check if this is the closest peer
961  * that hasn't already been routed to.
962  *
963  * @param key hash code to check closeness to
964  * @param bloom bloomfilter, exclude these entries from the decision
965  * @return GNUNET_YES if node location is closest,
966  *         GNUNET_NO otherwise.
967  */
968 static int
969 am_closest_peer (const struct GNUNET_HashCode * key,
970                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
971 {
972   int bits;
973   int other_bits;
974   int bucket_num;
975   int count;
976   struct PeerInfo *pos;
977
978   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
979     return GNUNET_YES;
980   bucket_num = find_bucket (key);
981   GNUNET_assert (bucket_num >= 0);
982   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
983   pos = k_buckets[bucket_num].head;
984   count = 0;
985   while ((pos != NULL) && (count < bucket_size))
986   {
987     if ((bloom != NULL) &&
988         (GNUNET_YES ==
989          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
990     {
991       pos = pos->next;
992       continue;                 /* Skip already checked entries */
993     }
994     other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
995     if (other_bits > bits)
996       return GNUNET_NO;
997     if (other_bits == bits)     /* We match the same number of bits */
998       return GNUNET_YES;
999     pos = pos->next;
1000   }
1001   /* No peers closer, we are the closest! */
1002   return GNUNET_YES;
1003 }
1004
1005
1006 /**
1007  * Select a peer from the routing table that would be a good routing
1008  * destination for sending a message for "key".  The resulting peer
1009  * must not be in the set of blocked peers.<p>
1010  *
1011  * Note that we should not ALWAYS select the closest peer to the
1012  * target, peers further away from the target should be chosen with
1013  * exponentially declining probability.
1014  *
1015  * FIXME: double-check that this is fine
1016  *
1017  *
1018  * @param key the key we are selecting a peer to route to
1019  * @param bloom a bloomfilter containing entries this request has seen already
1020  * @param hops how many hops has this message traversed thus far
1021  * @return Peer to route to, or NULL on error
1022  */
1023 static struct PeerInfo *
1024 select_peer (const struct GNUNET_HashCode * key,
1025              const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops)
1026 {
1027   unsigned int bc;
1028   unsigned int count;
1029   unsigned int selected;
1030   struct PeerInfo *pos;
1031   unsigned int dist;
1032   unsigned int smallest_distance;
1033   struct PeerInfo *chosen;
1034
1035   if (hops >= GDS_NSE_get ())
1036   {
1037     /* greedy selection (closest peer that is not in bloomfilter) */
1038     smallest_distance = UINT_MAX;
1039     chosen = NULL;
1040     for (bc = 0; bc <= closest_bucket; bc++)
1041     {
1042       pos = k_buckets[bc].head;
1043       count = 0;
1044       while ((pos != NULL) && (count < bucket_size))
1045       {
1046         if ((bloom == NULL) ||
1047             (GNUNET_NO ==
1048              GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1049         {
1050           dist = get_distance (key, &pos->id.hashPubKey);
1051           if (dist < smallest_distance)
1052           {
1053             chosen = pos;
1054             smallest_distance = dist;
1055           }
1056         }
1057         else
1058         {
1059           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1061                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1062           GNUNET_STATISTICS_update (GDS_stats,
1063                                     gettext_noop
1064                                     ("# Peers excluded from routing due to Bloomfilter"),
1065                                     1, GNUNET_NO);
1066           dist = get_distance (key, &pos->id.hashPubKey);
1067           if (dist < smallest_distance)
1068           {
1069             chosen = NULL;
1070             smallest_distance = dist;
1071           }
1072         }
1073         count++;
1074         pos = pos->next;
1075       }
1076     }
1077     if (NULL == chosen)
1078       GNUNET_STATISTICS_update (GDS_stats,
1079                                 gettext_noop ("# Peer selection failed"), 1,
1080                                 GNUNET_NO);
1081     return chosen;
1082   }
1083
1084   /* select "random" peer */
1085   /* count number of peers that are available and not filtered */
1086   count = 0;
1087   for (bc = 0; bc <= closest_bucket; bc++)
1088   {
1089     pos = k_buckets[bc].head;
1090     while ((pos != NULL) && (count < bucket_size))
1091     {
1092       if ((bloom != NULL) &&
1093           (GNUNET_YES ==
1094            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1095       {
1096         GNUNET_STATISTICS_update (GDS_stats,
1097                                   gettext_noop
1098                                   ("# Peers excluded from routing due to Bloomfilter"),
1099                                   1, GNUNET_NO);
1100         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101                     "Excluded peer `%s' due to BF match in random routing for %s\n",
1102                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
1103         pos = pos->next;
1104         continue;               /* Ignore bloomfiltered peers */
1105       }
1106       count++;
1107       pos = pos->next;
1108     }
1109   }
1110   if (0 == count)               /* No peers to select from! */
1111   {
1112     GNUNET_STATISTICS_update (GDS_stats,
1113                               gettext_noop ("# Peer selection failed"), 1,
1114                               GNUNET_NO);
1115     return NULL;
1116   }
1117   /* Now actually choose a peer */
1118   selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1119   count = 0;
1120   for (bc = 0; bc <= closest_bucket; bc++)
1121   {
1122     for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1123     {
1124       if ((bloom != NULL) &&
1125           (GNUNET_YES ==
1126            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1127       {
1128         continue;               /* Ignore bloomfiltered peers */
1129       }
1130       if (0 == selected--)
1131         return pos;
1132     }
1133   }
1134   GNUNET_break (0);
1135   return NULL;
1136 }
1137
1138
1139 /**
1140  * Compute the set of peers that the given request should be
1141  * forwarded to.
1142  *
1143  * @param key routing key
1144  * @param bloom bloom filter excluding peers as targets, all selected
1145  *        peers will be added to the bloom filter
1146  * @param hop_count number of hops the request has traversed so far
1147  * @param target_replication desired number of replicas
1148  * @param targets where to store an array of target peers (to be
1149  *         free'd by the caller)
1150  * @return number of peers returned in 'targets'.
1151  */
1152 static unsigned int
1153 get_target_peers (const struct GNUNET_HashCode *key,
1154                   struct GNUNET_CONTAINER_BloomFilter *bloom,
1155                   uint32_t hop_count, uint32_t target_replication,
1156                   struct PeerInfo ***targets)
1157 {
1158   unsigned int ret;
1159   unsigned int off;
1160   struct PeerInfo **rtargets;
1161   struct PeerInfo *nxt;
1162
1163   GNUNET_assert (NULL != bloom);
1164   ret = get_forward_count (hop_count, target_replication);  
1165   if (0 == ret)
1166   {
1167     *targets = NULL;
1168     return 0;
1169   }
1170   rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret);
1171   for (off = 0; off < ret; off++)
1172   {
1173     nxt = select_peer (key, bloom, hop_count);
1174     if (NULL == nxt)
1175       break;
1176     rtargets[off] = nxt;
1177     GNUNET_break (GNUNET_NO ==
1178                   GNUNET_CONTAINER_bloomfilter_test (bloom,
1179                                                      &nxt->id.hashPubKey));
1180     GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
1181   }
1182   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183               "Selected %u/%u peers at hop %u for %s (target was %u)\n", off,
1184               GNUNET_CONTAINER_multihashmap_size (all_known_peers),
1185               (unsigned int) hop_count, GNUNET_h2s (key), ret);
1186   if (0 == off)
1187   {
1188     GNUNET_free (rtargets);
1189     *targets = NULL;
1190     return 0;
1191   }
1192   *targets = rtargets;
1193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1194               "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1195               GNUNET_h2s (key), 
1196               off,
1197               ret);
1198   return off;
1199 }
1200
1201
1202 /**
1203  * Perform a PUT operation.   Forwards the given request to other
1204  * peers.   Does not store the data locally.  Does not give the
1205  * data to local clients.  May do nothing if this is the only
1206  * peer in the network (or if we are the closest peer in the
1207  * network).
1208  *
1209  * @param type type of the block
1210  * @param options routing options
1211  * @param desired_replication_level desired replication count
1212  * @param expiration_time when does the content expire
1213  * @param hop_count how many hops has this message traversed so far
1214  * @param bf Bloom filter of peers this PUT has already traversed
1215  * @param key key for the content
1216  * @param put_path_length number of entries in @a put_path
1217  * @param put_path peers this request has traversed so far (if tracked)
1218  * @param data payload to store
1219  * @param data_size number of bytes in @a data
1220  */
1221 void
1222 GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1223                            enum GNUNET_DHT_RouteOption options,
1224                            uint32_t desired_replication_level,
1225                            struct GNUNET_TIME_Absolute expiration_time,
1226                            uint32_t hop_count,
1227                            struct GNUNET_CONTAINER_BloomFilter *bf,
1228                            const struct GNUNET_HashCode *key,
1229                            unsigned int put_path_length,
1230                            struct GNUNET_PeerIdentity *put_path,
1231                            const void *data, size_t data_size)
1232 {
1233   unsigned int target_count;
1234   unsigned int i;
1235   struct PeerInfo **targets;
1236   struct PeerInfo *target;
1237   struct P2PPendingMessage *pending;
1238   size_t msize;
1239   struct PeerPutMessage *ppm;
1240   struct GNUNET_PeerIdentity *pp;
1241
1242   GNUNET_assert (NULL != bf);
1243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244               "Adding myself (%s) to PUT bloomfilter for %s\n",
1245               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1246   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
1247   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1248                             1, GNUNET_NO);
1249   target_count =
1250       get_target_peers (key, bf, hop_count, desired_replication_level,
1251                         &targets);
1252   if (0 == target_count)
1253   {
1254     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255                 "Routing PUT for %s terminates after %u hops at %s\n",
1256                 GNUNET_h2s (key), (unsigned int) hop_count,
1257                 GNUNET_i2s (&my_identity));
1258     return;
1259   }
1260   msize =
1261       put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1262       sizeof (struct PeerPutMessage);
1263   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1264   {
1265     put_path_length = 0;
1266     msize = data_size + sizeof (struct PeerPutMessage);
1267   }
1268   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1269   {
1270     GNUNET_break (0);
1271     GNUNET_free (targets);
1272     return;
1273   }
1274   GNUNET_STATISTICS_update (GDS_stats,
1275                             gettext_noop
1276                             ("# PUT messages queued for transmission"),
1277                             target_count, GNUNET_NO);
1278   for (i = 0; i < target_count; i++)
1279   {
1280     target = targets[i];
1281     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1282     {
1283       GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1284                                 1, GNUNET_NO);
1285       continue; /* skip */
1286     }
1287     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
1289                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1290     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1291     pending->importance = 0;    /* FIXME */
1292     pending->timeout = expiration_time;
1293     ppm = (struct PeerPutMessage *) &pending[1];
1294     pending->msg = &ppm->header;
1295     ppm->header.size = htons (msize);
1296     ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1297     ppm->options = htonl (options);
1298     ppm->type = htonl (type);
1299     ppm->hop_count = htonl (hop_count + 1);
1300     ppm->desired_replication_level = htonl (desired_replication_level);
1301     ppm->put_path_length = htonl (put_path_length);
1302     ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1303     GNUNET_break (GNUNET_YES ==
1304                   GNUNET_CONTAINER_bloomfilter_test (bf,
1305                                                      &target->id.hashPubKey));
1306     GNUNET_assert (GNUNET_OK ==
1307                    GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1308                                                               ppm->bloomfilter,
1309                                                               DHT_BLOOM_SIZE));
1310     ppm->key = *key;
1311     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1312     memcpy (pp, put_path,
1313             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1314     memcpy (&pp[put_path_length], data, data_size);
1315     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1316     target->pending_count++;
1317     process_peer_queue (target);
1318   }
1319   GNUNET_free (targets);
1320 }
1321
1322
1323 /**
1324  * Perform a GET operation.  Forwards the given request to other
1325  * peers.  Does not lookup the key locally.  May do nothing if this is
1326  * the only peer in the network (or if we are the closest peer in the
1327  * network).
1328  *
1329  * @param type type of the block
1330  * @param options routing options
1331  * @param desired_replication_level desired replication count
1332  * @param hop_count how many hops did this request traverse so far?
1333  * @param key key for the content
1334  * @param xquery extended query
1335  * @param xquery_size number of bytes in @a xquery
1336  * @param reply_bf bloomfilter to filter duplicates
1337  * @param reply_bf_mutator mutator for @a reply_bf
1338  * @param peer_bf filter for peers not to select (again)
1339  */
1340 void
1341 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1342                            enum GNUNET_DHT_RouteOption options,
1343                            uint32_t desired_replication_level,
1344                            uint32_t hop_count, const struct GNUNET_HashCode * key,
1345                            const void *xquery, size_t xquery_size,
1346                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1347                            uint32_t reply_bf_mutator,
1348                            struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1349 {
1350   unsigned int target_count;
1351   unsigned int i;
1352   struct PeerInfo **targets;
1353   struct PeerInfo *target;
1354   struct P2PPendingMessage *pending;
1355   size_t msize;
1356   struct PeerGetMessage *pgm;
1357   char *xq;
1358   size_t reply_bf_size;
1359
1360   GNUNET_assert (NULL != peer_bf);
1361   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"),
1362                             1, GNUNET_NO);
1363   target_count =
1364       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
1365                         &targets);
1366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1367               "Adding myself (%s) to GET bloomfilter for %s\n",
1368               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
1369   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
1370   if (0 == target_count)
1371   {
1372     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373                 "Routing GET for %s terminates after %u hops at %s\n",
1374                 GNUNET_h2s (key), (unsigned int) hop_count,
1375                 GNUNET_i2s (&my_identity));
1376     return;
1377   }
1378   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1379   msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
1380   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1381   {
1382     GNUNET_break (0);
1383     GNUNET_free (targets);
1384     return;
1385   }
1386   GNUNET_STATISTICS_update (GDS_stats,
1387                             gettext_noop
1388                             ("# GET messages queued for transmission"),
1389                             target_count, GNUNET_NO);
1390   /* forward request */
1391   for (i = 0; i < target_count; i++)
1392   {
1393     target = targets[i];
1394     if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
1395     {
1396       GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1397                                 1, GNUNET_NO);
1398       continue; /* skip */
1399     }
1400     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
1402                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
1403     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1404     pending->importance = 0;    /* FIXME */
1405     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
1406     pgm = (struct PeerGetMessage *) &pending[1];
1407     pending->msg = &pgm->header;
1408     pgm->header.size = htons (msize);
1409     pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1410     pgm->options = htonl (options);
1411     pgm->type = htonl (type);
1412     pgm->hop_count = htonl (hop_count + 1);
1413     pgm->desired_replication_level = htonl (desired_replication_level);
1414     pgm->xquery_size = htonl (xquery_size);
1415     pgm->bf_mutator = reply_bf_mutator;
1416     GNUNET_break (GNUNET_YES ==
1417                   GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1418                                                      &target->id.hashPubKey));
1419     GNUNET_assert (GNUNET_OK ==
1420                    GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1421                                                               pgm->bloomfilter,
1422                                                               DHT_BLOOM_SIZE));
1423     pgm->key = *key;
1424     xq = (char *) &pgm[1];
1425     memcpy (xq, xquery, xquery_size);
1426     if (NULL != reply_bf)
1427       GNUNET_assert (GNUNET_OK ==
1428                      GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1429                                                                 &xq
1430                                                                 [xquery_size],
1431                                                                 reply_bf_size));
1432     GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
1433     target->pending_count++;
1434     process_peer_queue (target);
1435   }
1436   GNUNET_free (targets);
1437 }
1438
1439
1440 /**
1441  * Handle a reply (route to origin).  Only forwards the reply back to
1442  * the given peer.  Does not do local caching or forwarding to local
1443  * clients.
1444  *
1445  * @param target neighbour that should receive the block (if still connected)
1446  * @param type type of the block
1447  * @param expiration_time when does the content expire
1448  * @param key key for the content
1449  * @param put_path_length number of entries in @a put_path
1450  * @param put_path peers the original PUT traversed (if tracked)
1451  * @param get_path_length number of entries in @a get_path
1452  * @param get_path peers this reply has traversed so far (if tracked)
1453  * @param data payload of the reply
1454  * @param data_size number of bytes in @a data
1455  */
1456 void
1457 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1458                              enum GNUNET_BLOCK_Type type,
1459                              struct GNUNET_TIME_Absolute expiration_time,
1460                              const struct GNUNET_HashCode * key,
1461                              unsigned int put_path_length,
1462                              const struct GNUNET_PeerIdentity *put_path,
1463                              unsigned int get_path_length,
1464                              const struct GNUNET_PeerIdentity *get_path,
1465                              const void *data, size_t data_size)
1466 {
1467   struct PeerInfo *pi;
1468   struct P2PPendingMessage *pending;
1469   size_t msize;
1470   struct PeerResultMessage *prm;
1471   struct GNUNET_PeerIdentity *paths;
1472
1473   msize =
1474       data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1475                                                        put_path_length) *
1476       sizeof (struct GNUNET_PeerIdentity);
1477   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1478       (get_path_length >
1479        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1480       (put_path_length >
1481        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1482       (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE))
1483   {
1484     GNUNET_break (0);
1485     return;
1486   }
1487   pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers, &target->hashPubKey);
1488   if (NULL == pi)
1489   {
1490     /* peer disconnected in the meantime, drop reply */
1491     return;
1492   }
1493   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
1494   {
1495     /* skip */
1496     GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
1497                               1, GNUNET_NO);
1498     return;
1499   }
1500
1501   GNUNET_STATISTICS_update (GDS_stats,
1502                             gettext_noop
1503                             ("# RESULT messages queued for transmission"), 1,
1504                             GNUNET_NO);
1505   pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1506   pending->importance = 0;      /* FIXME */
1507   pending->timeout = expiration_time;
1508   prm = (struct PeerResultMessage *) &pending[1];
1509   pending->msg = &prm->header;
1510   prm->header.size = htons (msize);
1511   prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1512   prm->type = htonl (type);
1513   prm->put_path_length = htonl (put_path_length);
1514   prm->get_path_length = htonl (get_path_length);
1515   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1516   prm->key = *key;
1517   paths = (struct GNUNET_PeerIdentity *) &prm[1];
1518   memcpy (paths, put_path,
1519           put_path_length * sizeof (struct GNUNET_PeerIdentity));
1520   memcpy (&paths[put_path_length], get_path,
1521           get_path_length * sizeof (struct GNUNET_PeerIdentity));
1522   memcpy (&paths[put_path_length + get_path_length], data, data_size);
1523   GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
1524   pi->pending_count++;
1525   process_peer_queue (pi);
1526 }
1527
1528
1529 /**
1530  * To be called on core init/fail.
1531  *
1532  * @param cls service closure
1533  * @param identity the public identity of this peer
1534  */
1535 static void
1536 core_init (void *cls,
1537            const struct GNUNET_PeerIdentity *identity)
1538 {
1539   my_identity = *identity;
1540 }
1541
1542
1543 /**
1544  * Core handler for p2p put requests.
1545  *
1546  * @param cls closure
1547  * @param peer sender of the request
1548  * @param message message
1549  * @param peer peer identity this notification is about
1550  * @return #GNUNET_OK to keep the connection open,
1551  *         #GNUNET_SYSERR to close it (signal serious error)
1552  */
1553 static int
1554 handle_dht_p2p_put (void *cls, 
1555                     const struct GNUNET_PeerIdentity *peer,
1556                     const struct GNUNET_MessageHeader *message)
1557 {
1558   const struct PeerPutMessage *put;
1559   const struct GNUNET_PeerIdentity *put_path;
1560   const void *payload;
1561   uint32_t putlen;
1562   uint16_t msize;
1563   size_t payload_size;
1564   enum GNUNET_DHT_RouteOption options;
1565   struct GNUNET_CONTAINER_BloomFilter *bf;
1566   struct GNUNET_HashCode test_key;
1567
1568   msize = ntohs (message->size);
1569   if (msize < sizeof (struct PeerPutMessage))
1570   {
1571     GNUNET_break_op (0);
1572     return GNUNET_YES;
1573   }
1574   put = (const struct PeerPutMessage *) message;
1575   putlen = ntohl (put->put_path_length);
1576   if ((msize <
1577        sizeof (struct PeerPutMessage) +
1578        putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1579       (putlen >
1580        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1581   {
1582     GNUNET_break_op (0);
1583     return GNUNET_YES;
1584   }
1585   GNUNET_STATISTICS_update (GDS_stats,
1586                             gettext_noop ("# P2P PUT requests received"), 1,
1587                             GNUNET_NO);
1588   GNUNET_STATISTICS_update (GDS_stats,
1589                             gettext_noop ("# P2P PUT bytes received"), msize,
1590                             GNUNET_NO);
1591   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1592   payload = &put_path[putlen];
1593   options = ntohl (put->options);
1594   payload_size =
1595       msize - (sizeof (struct PeerPutMessage) +
1596                putlen * sizeof (struct GNUNET_PeerIdentity));
1597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1598               GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1599   if (GNUNET_YES == log_route_details_stderr)
1600   {
1601     char *tmp;
1602
1603     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1604     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1605                  "XDHT PUT %s: %s->%s (%u, %u=>%u)\n", 
1606                  GNUNET_h2s (&put->key), GNUNET_i2s (peer), tmp,
1607                  ntohl(put->hop_count),
1608                  GNUNET_CRYPTO_hash_matching_bits (&peer->hashPubKey, &put->key),
1609                  GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, &put->key)
1610                 );
1611     GNUNET_free (tmp);
1612   }
1613   switch (GNUNET_BLOCK_get_key
1614           (GDS_block_context, ntohl (put->type), payload, payload_size,
1615            &test_key))
1616   {
1617   case GNUNET_YES:
1618     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
1619     {
1620       char *put_s = GNUNET_strdup (GNUNET_h2s (&put->key));
1621       GNUNET_break_op (0);
1622       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1623                   "PUT with key `%s' for block with key %s\n",
1624                   put_s, GNUNET_h2s (&test_key));
1625       GNUNET_free (put_s);
1626       return GNUNET_YES;
1627     }
1628     break;
1629   case GNUNET_NO:
1630     GNUNET_break_op (0);
1631     return GNUNET_YES;
1632   case GNUNET_SYSERR:
1633     /* cannot verify, good luck */
1634     break;
1635   }
1636   if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1637   {
1638     switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1639                                    ntohl (put->type),
1640                                    NULL,    /* query */
1641                                    NULL, 0, /* bloom filer */
1642                                    NULL, 0, /* xquery */
1643                                    payload, payload_size))
1644     {
1645     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1646     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1647       break;
1648
1649     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1650     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1651     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1652     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1653     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1654     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1655     default:
1656       GNUNET_break_op (0);
1657       return GNUNET_OK;
1658     }
1659   }
1660
1661   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
1662                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
1663   GNUNET_break_op (GNUNET_YES ==
1664                    GNUNET_CONTAINER_bloomfilter_test (bf, &peer->hashPubKey));
1665   {
1666     struct GNUNET_PeerIdentity pp[putlen + 1];
1667
1668     /* extend 'put path' by sender */
1669     if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1670     {
1671       memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1672       pp[putlen] = *peer;
1673       putlen++;
1674     }
1675     else
1676       putlen = 0;
1677
1678     /* give to local clients */
1679     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1680                               &put->key, 0, NULL, putlen, pp, ntohl (put->type),
1681                               payload_size, payload);
1682     /* store locally */
1683     if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1684         (am_closest_peer (&put->key, bf)))
1685       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh
1686                                 (put->expiration_time), &put->key, putlen, pp,
1687                                 ntohl (put->type), payload_size, payload);
1688     /* route to other peers */
1689     GDS_NEIGHBOURS_handle_put (ntohl (put->type), options,
1690                                ntohl (put->desired_replication_level),
1691                                GNUNET_TIME_absolute_ntoh (put->expiration_time),
1692                                ntohl (put->hop_count), bf, &put->key, putlen,
1693                                pp, payload, payload_size);
1694     /* notify monitoring clients */
1695     GDS_CLIENTS_process_put (options,
1696                              ntohl (put->type),
1697                              ntohl (put->hop_count),
1698                              ntohl (put->desired_replication_level),
1699                              putlen, pp,
1700                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
1701                              &put->key,
1702                              payload,
1703                              payload_size);
1704   }
1705   GNUNET_CONTAINER_bloomfilter_free (bf);
1706   return GNUNET_YES;
1707 }
1708
1709
1710 /**
1711  * We have received a FIND PEER request.  Send matching
1712  * HELLOs back.
1713  *
1714  * @param sender sender of the FIND PEER request
1715  * @param key peers close to this key are desired
1716  * @param bf peers matching this bf are excluded
1717  * @param bf_mutator mutator for bf
1718  */
1719 static void
1720 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1721                   const struct GNUNET_HashCode * key,
1722                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
1723 {
1724   int bucket_idx;
1725   struct PeerBucket *bucket;
1726   struct PeerInfo *peer;
1727   unsigned int choice;
1728   struct GNUNET_HashCode mhash;
1729   const struct GNUNET_HELLO_Message *hello;
1730
1731   /* first, check about our own HELLO */
1732   if (NULL != GDS_my_hello)
1733   {
1734     GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash);
1735     if ((NULL == bf) ||
1736         (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
1737     {
1738       GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1739                                    GNUNET_TIME_relative_to_absolute
1740                                    (hello_expiration),
1741                                    key, 0, NULL, 0, NULL, GDS_my_hello,
1742                                    GNUNET_HELLO_size ((const struct
1743                                                        GNUNET_HELLO_Message *)
1744                                                       GDS_my_hello));
1745     }
1746     else
1747     {
1748       GNUNET_STATISTICS_update (GDS_stats,
1749                                 gettext_noop
1750                                 ("# FIND PEER requests ignored due to Bloomfilter"),
1751                                 1, GNUNET_NO);
1752     }
1753   }
1754   else
1755   {
1756     GNUNET_STATISTICS_update (GDS_stats,
1757                               gettext_noop
1758                               ("# FIND PEER requests ignored due to lack of HELLO"),
1759                               1, GNUNET_NO);
1760   }
1761
1762   /* then, also consider sending a random HELLO from the closest bucket */
1763   if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
1764     bucket_idx = closest_bucket;
1765   else
1766     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
1767   if (bucket_idx == GNUNET_SYSERR)
1768     return;
1769   bucket = &k_buckets[bucket_idx];
1770   if (bucket->peers_size == 0)
1771     return;
1772   choice =
1773       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size);
1774   peer = bucket->head;
1775   while (choice > 0)
1776   {
1777     GNUNET_assert (NULL != peer);
1778     peer = peer->next;
1779     choice--;
1780   }
1781   choice = bucket->peers_size;
1782   do
1783   {
1784     peer = peer->next;
1785     if (choice-- == 0)
1786       return;                   /* no non-masked peer available */
1787     if (peer == NULL)
1788       peer = bucket->head;
1789     GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash);
1790     hello = GDS_HELLO_get (&peer->id);
1791   }
1792   while ((hello == NULL) ||
1793          (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)));
1794   GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO,
1795                                GNUNET_TIME_relative_to_absolute
1796                                (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key,
1797                                0, NULL, 0, NULL, hello,
1798                                GNUNET_HELLO_size (hello));
1799 }
1800
1801
1802 /**
1803  * Core handler for p2p get requests.
1804  *
1805  * @param cls closure
1806  * @param peer sender of the request
1807  * @param message message
1808  * @return #GNUNET_OK to keep the connection open,
1809  *         #GNUNET_SYSERR to close it (signal serious error)
1810  */
1811 static int
1812 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1813                     const struct GNUNET_MessageHeader *message)
1814 {
1815   struct PeerGetMessage *get;
1816   uint32_t xquery_size;
1817   size_t reply_bf_size;
1818   uint16_t msize;
1819   enum GNUNET_BLOCK_Type type;
1820   enum GNUNET_DHT_RouteOption options;
1821   enum GNUNET_BLOCK_EvaluationResult eval;
1822   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1823   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1824   const char *xquery;
1825
1826   GNUNET_break (0 !=
1827                 memcmp (peer, &my_identity,
1828                         sizeof (struct GNUNET_PeerIdentity)));
1829   /* parse and validate message */
1830   msize = ntohs (message->size);
1831   if (msize < sizeof (struct PeerGetMessage))
1832   {
1833     GNUNET_break_op (0);
1834     return GNUNET_YES;
1835   }
1836   get = (struct PeerGetMessage *) message;
1837   xquery_size = ntohl (get->xquery_size);
1838   if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1839   {
1840     GNUNET_break_op (0);
1841     return GNUNET_YES;
1842   }
1843   reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1844   type = ntohl (get->type);
1845   options = ntohl (get->options);
1846   xquery = (const char *) &get[1];
1847   reply_bf = NULL;
1848   GNUNET_STATISTICS_update (GDS_stats,
1849                             gettext_noop ("# P2P GET requests received"), 1,
1850                             GNUNET_NO);
1851   GNUNET_STATISTICS_update (GDS_stats,
1852                             gettext_noop ("# P2P GET bytes received"), msize,
1853                             GNUNET_NO);
1854   if (GNUNET_YES == log_route_details_stderr)
1855   {
1856     char *tmp;
1857
1858     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1859     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1860                  "XDHT GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n", 
1861                  GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp,
1862                  ntohl(get->hop_count),
1863                  GNUNET_CRYPTO_hash_matching_bits (&peer->hashPubKey, &get->key),
1864                  GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, &get->key),
1865                  ntohl(get->xquery_size), xquery
1866                 );
1867     GNUNET_free (tmp);
1868   }
1869
1870   if (reply_bf_size > 0)
1871     reply_bf =
1872         GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size,
1873                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
1874   eval =
1875       GNUNET_BLOCK_evaluate (GDS_block_context, type, &get->key, &reply_bf,
1876                              get->bf_mutator, xquery, xquery_size, NULL, 0);
1877   if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1878   {
1879     /* request invalid or block type not supported */
1880     GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1881     if (NULL != reply_bf)
1882       GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1883     return GNUNET_YES;
1884   }
1885   peer_bf =
1886       GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE,
1887                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
1888   GNUNET_break_op (GNUNET_YES ==
1889                    GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1890                                                       &peer->hashPubKey));
1891   /* remember request for routing replies */
1892   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
1893                    reply_bf, get->bf_mutator);
1894   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n",
1895               GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity),
1896               (unsigned int) ntohl (get->hop_count));
1897   /* local lookup (this may update the reply_bf) */
1898   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1899       (am_closest_peer (&get->key, peer_bf)))
1900   {
1901     if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
1902     {
1903       GNUNET_STATISTICS_update (GDS_stats,
1904                                 gettext_noop
1905                                 ("# P2P FIND PEER requests processed"), 1,
1906                                 GNUNET_NO);
1907       handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator);
1908     }
1909     else
1910     {
1911       eval =
1912           GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size,
1913                                     &reply_bf, get->bf_mutator);
1914     }
1915   }
1916   else
1917   {
1918     GNUNET_STATISTICS_update (GDS_stats,
1919                               gettext_noop ("# P2P GET requests ONLY routed"),
1920                               1, GNUNET_NO);
1921   }
1922
1923   GDS_CLIENTS_process_get (options,
1924                            type,
1925                            ntohl(get->hop_count),
1926                            ntohl(get->desired_replication_level),
1927                            0, NULL,
1928                            &get->key);
1929
1930   /* P2P forwarding */
1931   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
1932     GDS_NEIGHBOURS_handle_get (type, options,
1933                                ntohl (get->desired_replication_level),
1934                                ntohl (get->hop_count), &get->key, xquery,
1935                                xquery_size, reply_bf, get->bf_mutator, peer_bf);
1936   /* clean up */
1937   if (NULL != reply_bf)
1938     GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1939   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
1940   return GNUNET_YES;
1941 }
1942
1943
1944 /**
1945  * Core handler for p2p result messages.
1946  *
1947  * @param cls closure
1948  * @param message message
1949  * @param peer peer identity this notification is about
1950  * @return #GNUNET_YES (do not cut p2p connection)
1951  */
1952 static int
1953 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1954                        const struct GNUNET_MessageHeader *message)
1955 {
1956   const struct PeerResultMessage *prm;
1957   const struct GNUNET_PeerIdentity *put_path;
1958   const struct GNUNET_PeerIdentity *get_path;
1959   const void *data;
1960   uint32_t get_path_length;
1961   uint32_t put_path_length;
1962   uint16_t msize;
1963   size_t data_size;
1964   enum GNUNET_BLOCK_Type type;
1965
1966   /* parse and validate message */
1967   msize = ntohs (message->size);
1968   if (msize < sizeof (struct PeerResultMessage))
1969   {
1970     GNUNET_break_op (0);
1971     return GNUNET_YES;
1972   }
1973   prm = (struct PeerResultMessage *) message;
1974   put_path_length = ntohl (prm->put_path_length);
1975   get_path_length = ntohl (prm->get_path_length);
1976   if ((msize <
1977        sizeof (struct PeerResultMessage) + (get_path_length +
1978                                             put_path_length) *
1979        sizeof (struct GNUNET_PeerIdentity)) ||
1980       (get_path_length >
1981        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1982       (put_path_length >
1983        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1984   {
1985     GNUNET_break_op (0);
1986     return GNUNET_YES;
1987   }
1988   put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
1989   get_path = &put_path[put_path_length];
1990   type = ntohl (prm->type);
1991   data = (const void *) &get_path[get_path_length];
1992   data_size =
1993       msize - (sizeof (struct PeerResultMessage) +
1994                (get_path_length +
1995                 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
1996   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"),
1997                             1, GNUNET_NO);
1998   GNUNET_STATISTICS_update (GDS_stats,
1999                             gettext_noop ("# P2P RESULT bytes received"),
2000                             msize, GNUNET_NO);
2001   if (GNUNET_YES == log_route_details_stderr)
2002   {
2003     char *tmp;
2004
2005     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2006     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "XDHT RESULT %s: %s->%s (%u)\n", 
2007                  GNUNET_h2s (&prm->key), GNUNET_i2s (peer), tmp,
2008                  get_path_length + 1);
2009     GNUNET_free (tmp);
2010   }
2011   /* if we got a HELLO, consider it for our own routing table */
2012   if (type == GNUNET_BLOCK_TYPE_DHT_HELLO)
2013   {
2014     const struct GNUNET_MessageHeader *h;
2015     struct GNUNET_PeerIdentity pid;
2016     int bucket;
2017
2018     /* Should be a HELLO, validate and consider using it! */
2019     if (data_size < sizeof (struct GNUNET_MessageHeader))
2020     {
2021       GNUNET_break_op (0);
2022       return GNUNET_YES;
2023     }
2024     h = data;
2025     if (data_size != ntohs (h->size))
2026     {
2027       GNUNET_break_op (0);
2028       return GNUNET_YES;
2029     }
2030     if (GNUNET_OK !=
2031         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid))
2032     {
2033       GNUNET_break_op (0);
2034       return GNUNET_YES;
2035     }
2036     if ((GNUNET_YES != disable_try_connect) &&
2037         0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity)))
2038     {
2039       bucket = find_bucket (&pid.hashPubKey);
2040       if ((bucket >= 0) &&
2041           (k_buckets[bucket].peers_size < bucket_size) &&
2042           (NULL != GDS_transport_handle))
2043       {
2044         GNUNET_TRANSPORT_offer_hello (GDS_transport_handle, h, NULL, NULL);
2045         GNUNET_TRANSPORT_try_connect (GDS_transport_handle, &pid, NULL, NULL); /*FIXME TRY_CONNECT change */
2046       }
2047     }
2048   }
2049
2050   /* append 'peer' to 'get_path' */
2051   {
2052     struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2053
2054     memcpy (xget_path, get_path,
2055             get_path_length * sizeof (struct GNUNET_PeerIdentity));
2056     xget_path[get_path_length] = *peer;
2057     get_path_length++;
2058
2059     /* forward to local clients */
2060     GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2061                               &prm->key, get_path_length, xget_path,
2062                               put_path_length, put_path, type, data_size, data);
2063     GDS_CLIENTS_process_get_resp (type,
2064                                   xget_path, get_path_length,
2065                                   put_path, put_path_length,
2066                                   GNUNET_TIME_absolute_ntoh (
2067                                     prm->expiration_time),
2068                                   &prm->key,
2069                                   data,
2070                                   data_size);
2071     if (GNUNET_YES == cache_results)
2072     {
2073       struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2074
2075       memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
2076       memcpy (&xput_path[put_path_length], 
2077               xget_path, 
2078               get_path_length * sizeof (struct GNUNET_PeerIdentity));
2079       
2080       GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2081                                 &prm->key,
2082                                 get_path_length + put_path_length, xput_path,
2083                                 type, data_size, data);
2084     }
2085     /* forward to other peers */
2086     GDS_ROUTING_process (type, GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2087                          &prm->key, put_path_length, put_path, get_path_length,
2088                          xget_path, data, data_size);
2089   }
2090
2091   return GNUNET_YES;
2092 }
2093
2094
2095 /**
2096  * Initialize neighbours subsystem.
2097  *
2098  * @return GNUNET_OK on success, GNUNET_SYSERR on error
2099  */
2100 int
2101 GDS_NEIGHBOURS_init ()
2102 {
2103   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2104     {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2105     {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2106     {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
2107     {NULL, 0, 0}
2108   };
2109   unsigned long long temp_config_num;
2110
2111   disable_try_connect
2112     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT");
2113   if (GNUNET_OK ==
2114       GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size",
2115                                              &temp_config_num))
2116     bucket_size = (unsigned int) temp_config_num;
2117   cache_results
2118     = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS");
2119
2120   log_route_details_stderr =
2121     (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2122   atsAPI = GNUNET_ATS_performance_init (GDS_cfg, NULL, NULL);
2123   coreAPI =
2124       GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect,
2125                            &handle_core_disconnect, NULL, GNUNET_NO, NULL,
2126                            GNUNET_NO, core_handlers);
2127   if (coreAPI == NULL)
2128     return GNUNET_SYSERR;
2129   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2130   return GNUNET_OK;
2131 }
2132
2133
2134 /**
2135  * Shutdown neighbours subsystem.
2136  */
2137 void
2138 GDS_NEIGHBOURS_done ()
2139 {
2140   if (NULL == coreAPI)
2141     return;
2142   GNUNET_CORE_disconnect (coreAPI);
2143   coreAPI = NULL;
2144   GNUNET_ATS_performance_done (atsAPI);
2145   atsAPI = NULL;
2146   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers));
2147   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
2148   all_known_peers = NULL;
2149   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
2150   {
2151     GNUNET_SCHEDULER_cancel (find_peer_task);
2152     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
2153   }
2154 }
2155
2156 /**
2157  * Get the ID of the local node.
2158  * 
2159  * @return identity of the local node
2160  */
2161 struct GNUNET_PeerIdentity *
2162 GDS_NEIGHBOURS_get_id ()
2163 {
2164     return &my_identity;
2165 }
2166
2167
2168 /* end of gnunet-service-dht_neighbours.c */