stuff
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_neighbours.c
23  * @brief GNUnet DHT service's bucket and neighbour management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht.h"
40 #include <fenv.h>
41
42 /**
43  * How many buckets will we allow total.
44  */
45 #define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
46
47 /**
48  * What is the maximum number of peers in a given bucket.
49  */
50 #define DEFAULT_BUCKET_SIZE 4
51
52 /**
53  * Size of the bloom filter the DHT uses to filter peers.
54  */
55 #define DHT_BLOOM_SIZE 128
56
57
58 /**
59  * P2P PUT message
60  */
61 struct PeerPutMessage
62 {
63   /**
64    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
65    */
66   struct GNUNET_MessageHeader header;
67
68   /**
69    * Processing options
70    */
71   uint32_t options GNUNET_PACKED;
72
73   /**
74    * Content type.
75    */
76   uint32_t type GNUNET_PACKED;
77
78   /**
79    * Hop count
80    */
81   uint32_t hop_count GNUNET_PACKED;
82
83   /**
84    * Replication level for this message
85    */
86   uint32_t desired_replication_level GNUNET_PACKED;
87
88   /**
89    * Generic route path length for a message in the
90    * DHT that arrived at a peer and generated
91    * a reply. Copied to the end of this message.
92    */
93   uint32_t outgoing_path_length GNUNET_PACKED;
94
95   /**
96    * Bloomfilter (for peer identities) to stop circular routes
97    */
98   char bloomfilter[DHT_BLOOM_SIZE];
99
100   /**
101    * The key we are storing under.
102    */
103   GNUNET_HashCode key;
104
105   /* put path (if tracked) */
106
107   /* Payload */
108
109 };
110
111
112 /**
113  * P2P GET message
114  */
115 struct PeerGetMessage
116 {
117   /**
118    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
119    */
120   struct GNUNET_MessageHeader header;
121
122   /**
123    * Processing options
124    */
125   uint32_t options GNUNET_PACKED;
126
127   /**
128    * Desired content type.
129    */
130   uint32_t type GNUNET_PACKED;
131
132   /**
133    * Hop count
134    */
135   uint32_t hop_count GNUNET_PACKED;
136
137   /**
138    * Desired replication level for this request.
139    */
140   uint32_t desired_replication_level GNUNET_PACKED;
141
142   /**
143    * Size of the extended query.
144    */
145   uint32_t xquery_size;
146
147   /**
148    * Bloomfilter mutator.
149    */
150   uint32_t bf_mutator;
151
152   /**
153    * Bloomfilter (for peer identities) to stop circular routes
154    */
155   char bloomfilter[DHT_BLOOM_SIZE];
156
157   /**
158    * The key we are looking for.
159    */
160   GNUNET_HashCode key;
161
162   /* xquery */
163
164   /* result bloomfilter */
165
166 };
167
168
169 /**
170  * Linked list of messages to send to a particular other peer.
171  */
172 struct P2PPendingMessage
173 {
174   /**
175    * Pointer to next item in the list
176    */
177   struct P2PPendingMessage *next;
178
179   /**
180    * Pointer to previous item in the list
181    */
182   struct P2PPendingMessage *prev;
183
184   /**
185    * Message importance level.
186    */
187   unsigned int importance;
188
189   /**
190    * Time when this request was scheduled to be sent.
191    */
192   struct GNUNET_TIME_Absolute scheduled;
193
194   /**
195    * How long to wait before sending message.
196    */
197   struct GNUNET_TIME_Relative timeout;
198
199   /**
200    * Actual message to be sent, allocated at the end of the struct:
201    * // msg = (cast) &pm[1]; 
202    * // memcpy (&pm[1], data, len);
203    */
204   const struct GNUNET_MessageHeader *msg;
205
206 };
207
208
209 /**
210  * Entry for a peer in a bucket.
211  */
212 struct PeerInfo
213 {
214   /**
215    * Next peer entry (DLL)
216    */
217   struct PeerInfo *next;
218
219   /**
220    *  Prev peer entry (DLL)
221    */
222   struct PeerInfo *prev;
223
224   /**
225    * Count of outstanding messages for peer.
226    */
227   unsigned int pending_count;
228
229   /**
230    * Head of pending messages to be sent to this peer.
231    */
232   struct P2PPendingMessage *head;
233
234   /**
235    * Tail of pending messages to be sent to this peer.
236    */
237   struct P2PPendingMessage *tail;
238
239   /**
240    * Core handle for sending messages to this peer.
241    */
242   struct GNUNET_CORE_TransmitHandle *th;
243
244   /**
245    * Preference update context
246    */
247   struct GNUNET_CORE_InformationRequestContext *info_ctx;
248
249   /**
250    * Task for scheduling message sends.
251    */
252   GNUNET_SCHEDULER_TaskIdentifier send_task;
253
254   /**
255    * Task for scheduling preference updates
256    */
257   GNUNET_SCHEDULER_TaskIdentifier preference_task;
258
259   /**
260    * What is the identity of the peer?
261    */
262   struct GNUNET_PeerIdentity id;
263
264 #if 0
265   /**
266    * What is the average latency for replies received?
267    */
268   struct GNUNET_TIME_Relative latency;
269
270   /**
271    * Transport level distance to peer.
272    */
273   unsigned int distance;
274 #endif
275
276 };
277
278
279 /**
280  * Peers are grouped into buckets.
281  */
282 struct PeerBucket
283 {
284   /**
285    * Head of DLL
286    */
287   struct PeerInfo *head;
288
289   /**
290    * Tail of DLL
291    */
292   struct PeerInfo *tail;
293
294   /**
295    * Number of peers in the bucket.
296    */
297   unsigned int peers_size;
298 };
299
300
301 /**
302  * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
303  */
304 static unsigned int closest_bucket;
305
306 /**
307  * How many peers have we added since we sent out our last
308  * find peer request?
309  */
310 static unsigned int newly_found_peers;
311
312 /**
313  * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits matching.
314  */
315 static struct PeerBucket k_buckets[MAX_BUCKETS];
316
317 /**
318  * Hash map of all known peers, for easy removal from k_buckets on disconnect.
319  */
320 static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
321
322 /**
323  * Maximum size for each bucket.
324  */
325 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
326
327 /**
328  * Task that sends FIND PEER requests.
329  */
330 static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
331
332
333 /**
334  * Find the optimal bucket for this key.
335  *
336  * @param hc the hashcode to compare our identity to
337  * @return the proper bucket index, or GNUNET_SYSERR
338  *         on error (same hashcode)
339  */
340 static int
341 find_bucket (const GNUNET_HashCode * hc)
342 {
343   unsigned int bits;
344
345   bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
346   if (bits == MAX_BUCKETS)
347     {
348       /* How can all bits match? Got my own ID? */
349       GNUNET_break (0);
350       return GNUNET_SYSERR; 
351     }
352   return MAX_BUCKETS - bits - 1;
353 }
354
355
356 /**
357  * Method called whenever a peer connects.
358  *
359  * @param cls closure
360  * @param peer peer identity this notification is about
361  * @param atsi performance data
362  */
363 static void
364 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
365                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
366 {
367   struct PeerInfo *ret;
368   int peer_bucket;
369
370   /* Check for connect to self message */
371   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
372     return;
373   if (GNUNET_YES ==
374       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
375                                               &peer->hashPubKey))
376   {
377     GNUNET_break (0);
378     return;
379   }
380   peer_bucket = find_bucket (&peer->hashPubKey);
381   GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
382   ret = GNUNET_malloc (sizeof (struct PeerInfo));
383 #if 0
384   ret->latency = latency;
385   ret->distance = distance;
386 #endif
387   ret->id = *peer;
388   GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
389                                      k_buckets[peer_bucket].tail,
390                                      k_buckets[peer_bucket].tail, ret);
391   k_buckets[peer_bucket].peers_size++;
392   closest_bucket = GNUNET_MAX (closest_bucket,
393                                peer_bucket);
394   if ( (peer_bucket > 0) &&
395        (k_buckets[peer_bucket].peers_size <= bucket_size) )
396     ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
397   newly_found_peers++;
398   GNUNET_assert (GNUNET_OK ==
399                  GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
400                                                     &peer->hashPubKey, ret,
401                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
402   increment_stats (STAT_PEERS_KNOWN);
403 }
404
405
406 /**
407  * Method called whenever a peer disconnects.
408  *
409  * @param cls closure
410  * @param peer peer identity this notification is about
411  */
412 static void
413 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
414 {
415   struct PeerInfo *to_remove;
416   int current_bucket;
417   struct P2PPendingMessage *pos;
418   struct P2PPendingMessage *next;
419
420   /* Check for disconnect from self message */
421   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
422     return;
423   to_remove =
424       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
425   if (NULL == to_remove)
426     {
427       GNUNET_break (0);
428       return;
429     }
430   GNUNET_assert (GNUNET_YES ==
431                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
432                                                        &peer->hashPubKey,
433                                                        to_remove));
434   if (NULL != to_remove->info_ctx)
435   {
436     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
437     to_remove->info_ctx = NULL;
438   }
439   current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
440   GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
441                                k_buckets[current_bucket].tail,
442                                to_remove);
443   GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
444   k_buckets[current_bucket].peers_size--;
445   while ( (lowest_bucket > 0) &&
446           (k_buckets[lowest_bucket].peers_size == 0) )
447     lowest_bucket--;
448
449   if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
450   {
451     GNUNET_SCHEDULER_cancel (peer->send_task);
452     peer->send_task = GNUNET_SCHEDULER_NO_TASK;
453   }
454   if (to_remove->th != NULL) 
455   {
456     GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
457     to_remove->th = NULL;
458   }
459   while (NULL != (pos = to_remove->head))
460   {
461     GNUNET_CONTAINER_DLL_remove (to_remove->head,
462                                  to_remove->tail,
463                                  pos);
464     GNUNET_free (pos);
465   }
466 }
467
468
469 /**
470  * Perform a PUT operation.  // FIXME: document if this is only
471  * routing or also storage and/or even local client notification!
472  *
473  * @param type type of the block
474  * @param options routing options
475  * @param desired_replication_level desired replication count
476  * @param expiration_time when does the content expire
477  * @param key key for the content
478  * @param put_path_length number of entries in put_path
479  * @param put_path peers this request has traversed so far (if tracked)
480  * @param data payload to store
481  * @param data_size number of bytes in data
482  */
483 void
484 GST_NEIGHBOURS_handle_put (uint32_t type,
485                            uint32_t options,
486                            uint32_t desired_replication_level,
487                            GNUNET_TIME_Absolute expiration_time,
488                            const GNUNET_HashCode *key,
489                            unsigned int put_path_length,
490                            struct GNUNET_PeerIdentity *put_path,
491                            const void *data,
492                            size_t data_size)
493 {
494   // FIXME
495 }
496
497
498 /**
499  * Perform a GET operation.  // FIXME: document if this is only
500  * routing or also state-tracking and/or even local lookup!
501  *
502  * @param type type of the block
503  * @param options routing options
504  * @param desired_replication_level desired replication count
505  * @param key key for the content
506  * @param xquery extended query
507  * @param xquery_size number of bytes in xquery
508  * @param reply_bf bloomfilter to filter duplicates
509  * @param reply_bf_mutator mutator for reply_bf
510  * @param peer_bf filter for peers not to select (again)
511  */
512 void
513 GST_NEIGHBOURS_handle_get (uint32_t type,
514                            uint32_t options,
515                            uint32_t desired_replication_level,
516                            const GNUNET_HashCode *key,
517                            const void *xquery,
518                            size_t xquery_size,
519                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
520                            uint32_t reply_bf_mutator,
521                            const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
522 {
523   // FIXME
524 }
525
526
527 /**
528  * Handle a reply (route to origin).  FIXME: should this be here?
529  * (reply-routing table might be better done elsewhere).
530  *
531  * @param type type of the block
532  * @param options routing options
533  * @param expiration_time when does the content expire
534  * @param key key for the content
535  * @param put_path_length number of entries in put_path
536  * @param put_path peers the original PUT traversed (if tracked)
537  * @param get_path_length number of entries in put_path
538  * @param get_path peers this reply has traversed so far (if tracked)
539  * @param data payload of the reply
540  * @param data_size number of bytes in data
541  */
542 void
543 GST_NEIGHBOURS_handle_reply (uint32_t type,
544                              uint32_t options,
545                              GNUNET_TIME_Absolute expiration_time,
546                              const GNUNET_HashCode *key,
547                              unsigned int put_path_length,
548                              struct GNUNET_PeerIdentity *put_path,
549                              unsigned int get_path_length,
550                              struct GNUNET_PeerIdentity *get_path,
551                              const void *data,
552                              size_t data_size)
553 {
554   // FIXME
555 }
556
557
558 /**
559  * Add each of the peers we already know to the bloom filter of
560  * the request so that we don't get duplicate HELLOs.
561  *
562  * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
563  * @param key peer identity to add to the bloom filter
564  * @param value value the peer information (unused)
565  * @return GNUNET_YES (we should continue to iterate)
566  */
567 static int
568 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
569 {
570   struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
571
572   GNUNET_CONTAINER_bloomfilter_add (bloom, key);
573   return GNUNET_YES;
574 }
575
576
577 /**
578  * Task to send a find peer message for our own peer identifier
579  * so that we can find the closest peers in the network to ourselves
580  * and attempt to connect to them.
581  *
582  * @param cls closure for this task
583  * @param tc the context under which the task is running
584  */
585 static void
586 send_find_peer_message (void *cls,
587                         const struct GNUNET_SCHEDULER_TaskContext *tc)
588 {
589   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
590   struct DHT_MessageContext msg_ctx;
591   struct GNUNET_TIME_Relative next_send_time;
592   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
593
594   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
595   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
596     return;
597   if (newly_found_peers > bucket_size) 
598   {
599     /* If we are finding many peers already, no need to send out our request right now! */
600     find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
601                                                    &send_find_peer_message, NULL);
602     newly_found_peers = 0;
603     return;
604   }
605
606   // FIXME: build message...
607   find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
608   find_peer_msg->header.size =
609       htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
610   find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
611   temp_bloom =
612       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
613   GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
614                                          temp_bloom);
615   GNUNET_assert (GNUNET_OK ==
616                  GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
617                                                             find_peer_msg->
618                                                             bloomfilter,
619                                                             DHT_BLOOM_SIZE));
620   GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
621
622   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
623   memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
624   msg_ctx.unique_id =
625       GNUNET_ntohll (GNUNET_CRYPTO_random_u64
626                      (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
627   msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
628   msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
629   msg_ctx.network_size = log_of_network_size_estimate;
630   msg_ctx.peer = my_identity;
631   msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
632   msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
633   // FIXME: transmit message...
634   demultiplex_message (&find_peer_msg->header, &msg_ctx);
635   GNUNET_free (find_peer_msg);
636
637   /* schedule next round */
638   newly_found_peers = 0;
639   next_send_time.rel_value =
640     (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
641     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
642                               DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
643   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
644                                                  &send_find_peer_message,
645                                                  NULL);  
646 }
647
648
649 /**
650  * To be called on core init/fail.
651  *
652  * @param cls service closure
653  * @param server handle to the server for this service
654  * @param identity the public identity of this peer
655  * @param publicKey the public key of this peer
656  */
657 static void
658 core_init (void *cls, struct GNUNET_CORE_Handle *server,
659            const struct GNUNET_PeerIdentity *identity,
660            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
661 {
662   GNUNET_assert (server != NULL);
663   my_identity = *identity;
664   next_send_time.rel_value =
665     DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
666     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
667                               (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
668                                2) -
669                               DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
670   find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
671                                                  &send_find_peer_message,
672                                                  NULL);
673 }
674
675
676 /**
677  * Core handler for p2p get requests.
678  *
679  * @param cls closure
680  * @param message message
681  * @param peer peer identity this notification is about
682  * @param atsi performance data
683  * @return GNUNET_OK to keep the connection open,
684  *         GNUNET_SYSERR to close it (signal serious error)
685  */
686 static int
687 handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
688                     const struct GNUNET_MessageHeader *message,
689                     const struct GNUNET_TRANSPORT_ATS_Information
690                     *atsi)
691 {
692   struct GNUNET_DHT_P2PRouteMessage *incoming =
693       (struct GNUNET_DHT_P2PRouteMessage *) message;
694   struct GNUNET_MessageHeader *enc_msg =
695       (struct GNUNET_MessageHeader *) &incoming[1];
696   struct DHT_MessageContext *msg_ctx;
697   char *route_path;
698   int path_size;
699
700   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
701   {
702     GNUNET_break_op (0);
703     return GNUNET_YES;
704   }
705
706   if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
707   {
708     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709                 "Sending of previous replies took too long, backing off!\n");
710     increment_stats ("# route requests dropped due to high load");
711     decrease_max_send_delay (get_max_send_delay ());
712     return GNUNET_YES;
713   }
714   msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
715   msg_ctx->bloom =
716       GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
717                                          DHT_BLOOM_K);
718   GNUNET_assert (msg_ctx->bloom != NULL);
719   msg_ctx->hop_count = ntohl (incoming->hop_count);
720   memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
721   msg_ctx->replication = ntohl (incoming->desired_replication_level);
722   msg_ctx->msg_options = ntohl (incoming->options);
723   if (GNUNET_DHT_RO_RECORD_ROUTE ==
724       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
725   {
726     path_size =
727         ntohl (incoming->outgoing_path_length) *
728         sizeof (struct GNUNET_PeerIdentity);
729     if (ntohs (message->size) !=
730         (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
731          path_size))
732     {
733       GNUNET_break_op (0);
734       GNUNET_free (msg_ctx);
735       return GNUNET_YES;
736     }
737     route_path = (char *) &incoming[1];
738     route_path = route_path + ntohs (enc_msg->size);
739     msg_ctx->path_history =
740         GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
741     memcpy (msg_ctx->path_history, route_path, path_size);
742     memcpy (&msg_ctx->path_history[path_size], &my_identity,
743             sizeof (struct GNUNET_PeerIdentity));
744     msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
745   }
746   msg_ctx->network_size = ntohl (incoming->network_size);
747   msg_ctx->peer = *peer;
748   msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
749   msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
750   demultiplex_message (enc_msg, msg_ctx);
751   if (msg_ctx->bloom != NULL)
752   {
753     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
754     msg_ctx->bloom = NULL;
755   }
756   GNUNET_free (msg_ctx);
757   return GNUNET_YES;
758 }
759
760
761 /**
762  * Core handler for p2p put requests.
763  *
764  * @param cls closure
765  * @param message message
766  * @param peer peer identity this notification is about
767  * @param atsi performance data
768  * @return GNUNET_OK to keep the connection open,
769  *         GNUNET_SYSERR to close it (signal serious error)
770  */
771 static int
772 handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
773                     const struct GNUNET_MessageHeader *message,
774                     const struct GNUNET_TRANSPORT_ATS_Information
775                     *atsi)
776 {
777   struct GNUNET_DHT_P2PRouteMessage *incoming =
778       (struct GNUNET_DHT_P2PRouteMessage *) message;
779   struct GNUNET_MessageHeader *enc_msg =
780       (struct GNUNET_MessageHeader *) &incoming[1];
781   struct DHT_MessageContext *msg_ctx;
782   char *route_path;
783   int path_size;
784
785   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
786   {
787     GNUNET_break_op (0);
788     return GNUNET_YES;
789   }
790
791   if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
792   {
793     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794                 "Sending of previous replies took too long, backing off!\n");
795     increment_stats ("# route requests dropped due to high load");
796     decrease_max_send_delay (get_max_send_delay ());
797     return GNUNET_YES;
798   }
799   msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
800   msg_ctx->bloom =
801       GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
802                                          DHT_BLOOM_K);
803   GNUNET_assert (msg_ctx->bloom != NULL);
804   msg_ctx->hop_count = ntohl (incoming->hop_count);
805   memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
806   msg_ctx->replication = ntohl (incoming->desired_replication_level);
807   msg_ctx->msg_options = ntohl (incoming->options);
808   if (GNUNET_DHT_RO_RECORD_ROUTE ==
809       (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
810   {
811     path_size =
812         ntohl (incoming->outgoing_path_length) *
813         sizeof (struct GNUNET_PeerIdentity);
814     if (ntohs (message->size) !=
815         (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
816          path_size))
817     {
818       GNUNET_break_op (0);
819       GNUNET_free (msg_ctx);
820       return GNUNET_YES;
821     }
822     route_path = (char *) &incoming[1];
823     route_path = route_path + ntohs (enc_msg->size);
824     msg_ctx->path_history =
825         GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
826     memcpy (msg_ctx->path_history, route_path, path_size);
827     memcpy (&msg_ctx->path_history[path_size], &my_identity,
828             sizeof (struct GNUNET_PeerIdentity));
829     msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
830   }
831   msg_ctx->network_size = ntohl (incoming->network_size);
832   msg_ctx->peer = *peer;
833   msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
834   msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
835   demultiplex_message (enc_msg, msg_ctx);
836   if (msg_ctx->bloom != NULL)
837   {
838     GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
839     msg_ctx->bloom = NULL;
840   }
841   GNUNET_free (msg_ctx);
842   return GNUNET_YES;
843 }
844
845
846 /**
847  * Core handler for p2p route results.
848  *
849  * @param cls closure
850  * @param message message
851  * @param peer peer identity this notification is about
852  * @param atsi performance data
853  *
854  */
855 static int
856 handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
857                        const struct GNUNET_MessageHeader *message,
858                        const struct GNUNET_TRANSPORT_ATS_Information
859                        *atsi)
860 {
861   const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
862       (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
863   struct GNUNET_MessageHeader *enc_msg =
864       (struct GNUNET_MessageHeader *) &incoming[1];
865   struct DHT_MessageContext msg_ctx;
866
867   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
868   {
869     GNUNET_break_op (0);
870     return GNUNET_YES;
871   }
872
873   memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
874   memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
875   msg_ctx.msg_options = ntohl (incoming->options);
876   msg_ctx.hop_count = ntohl (incoming->hop_count);
877   msg_ctx.peer = *peer;
878   msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;  /* Make result routing a higher priority */
879   msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
880   if ((GNUNET_DHT_RO_RECORD_ROUTE ==
881        (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
882       (ntohl (incoming->outgoing_path_length) > 0))
883   {
884     if (ntohs (message->size) -
885         sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
886         ntohs (enc_msg->size) !=
887         ntohl (incoming->outgoing_path_length) *
888         sizeof (struct GNUNET_PeerIdentity))
889     {
890       GNUNET_break_op (0);
891       return GNUNET_NO;
892     }
893     msg_ctx.path_history = (char *) &incoming[1];
894     msg_ctx.path_history += ntohs (enc_msg->size);
895     msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
896   }
897   route_result_message (enc_msg, &msg_ctx);
898   return GNUNET_YES;
899 }
900
901
902 /**
903  * Initialize neighbours subsystem.
904  */
905 int
906 GST_NEIGHBOURS_init ()
907 {
908   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
909     {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
910     {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
911     {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
912     {NULL, 0, 0}
913   };
914   unsigned long long temp_config_num;
915   struct GNUNET_TIME_Relative next_send_time;
916  
917   if (GNUNET_OK ==
918       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
919                                              &temp_config_num))
920     bucket_size = (unsigned int) temp_config_num;  
921   coreAPI = GNUNET_CORE_connect (GDS_cfg,   /* Main configuration */
922                                  DEFAULT_CORE_QUEUE_SIZE,       /* queue size */
923                                  NULL,  /* Closure passed to DHT functions */
924                                  &core_init,    /* Call core_init once connected */
925                                  &handle_core_connect,  /* Handle connects */
926                                  &handle_core_disconnect,       /* remove peers on disconnects */
927                                  NULL,  /* Do we care about "status" updates? */
928                                  NULL,  /* Don't want notified about all incoming messages */
929                                  GNUNET_NO,     /* For header only inbound notification */
930                                  NULL,  /* Don't want notified about all outbound messages */
931                                  GNUNET_NO,     /* For header only outbound notification */
932                                  core_handlers);        /* Register these handlers */  
933   if (coreAPI == NULL)
934     return GNUNET_SYSERR;
935   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
936   return GNUNET_OK;
937 }
938
939
940 /**
941  * Shutdown neighbours subsystem.
942  */
943 void
944 GST_NEIGHBOURS_done ()
945 {
946   GNUNET_assert (coreAPI != NULL);
947   GNUNET_CORE_disconnect (coreAPI);
948   coreAPI = NULL;    
949   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
950   GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
951   all_known_peers = NULL;
952   if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
953   {
954     GNUNET_SCHEDULER_cancel (find_peer_task);
955     find_peer_task = GNUNET_SCHEDULER_NO_TASK;
956   }
957 }
958
959
960 /* end of gnunet-service-dht_neighbours.c */