11f13b184b6b9287f1df7f3b10b7f77ec0c0b2bc
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_bandwidth_lib.h"
30 #include "gnunet_client_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_container_lib.h"
33 #include "gnunet_arm_service.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_protocols.h"
36 #include "gnunet_server_lib.h"
37 #include "gnunet_time_lib.h"
38 #include "gnunet_dht_service.h"
39 #include "dht.h"
40
41 #define DEBUG_DHT_API GNUNET_NO
42
43 /**
44  * Entry in our list of messages to be (re-)transmitted.
45  */
46 struct PendingMessage
47 {
48   /**
49    * This is a doubly-linked list.
50    */
51   struct PendingMessage *prev;
52
53   /**
54    * This is a doubly-linked list.
55    */
56   struct PendingMessage *next;
57
58   /**
59    * Message that is pending, allocated at the end
60    * of this struct.
61    */
62   const struct GNUNET_MessageHeader *msg;
63   
64   /**
65    * Handle to the DHT API context.
66    */
67   struct GNUNET_DHT_Handle *handle;
68                        
69   /**
70    * Continuation to call when the request has been
71    * transmitted (for the first time) to the service; can be NULL.
72    */
73   GNUNET_SCHEDULER_Task cont;
74
75   /**
76    * Closure for 'cont'.
77    */
78   void *cont_cls;
79
80   /**
81    * Timeout task for this message
82    */
83   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
84
85   /**
86    * Unique ID for this request
87    */
88   uint64_t unique_id;
89
90   /**
91    * Free the saved message once sent, set to GNUNET_YES for messages
92    * that do not receive responses; GNUNET_NO if this pending message
93    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
94    * from there.
95    */
96   int free_on_send;
97
98   /**
99    * GNUNET_YES if this message is in our pending queue right now.
100    */
101   int in_pending_queue;
102
103 };
104
105
106 /**
107  * Handle to a route request
108  */
109 struct GNUNET_DHT_RouteHandle
110 {
111
112   /**
113    * Iterator to call on data receipt
114    */
115   GNUNET_DHT_ReplyProcessor iter;
116
117   /**
118    * Closure for the iterator callback
119    */
120   void *iter_cls;
121
122   /**
123    * Main handle to this DHT api
124    */
125   struct GNUNET_DHT_Handle *dht_handle;
126
127   /**
128    * The actual message sent for this request,
129    * used for retransmitting requests on service
130    * failure/reconnect.  Freed on route_stop.
131    */
132   struct PendingMessage *message;
133
134   /**
135    * Key that this get request is for
136    */
137   GNUNET_HashCode key;
138
139   /**
140    * Unique identifier for this request (for key collisions). FIXME: redundant!?
141    */
142   uint64_t uid;
143
144 };
145
146
147 /**
148  * Connection to the DHT service.
149  */
150 struct GNUNET_DHT_Handle
151 {
152
153   /**
154    * Configuration to use.
155    */
156   const struct GNUNET_CONFIGURATION_Handle *cfg;
157
158   /**
159    * Socket (if available).
160    */
161   struct GNUNET_CLIENT_Connection *client;
162
163   /**
164    * Currently pending transmission request (or NULL).
165    */
166   struct GNUNET_CLIENT_TransmitHandle *th;
167
168   /**
169    * Head of linked list of messages we would like to transmit.
170    */
171   struct PendingMessage *pending_head;
172
173   /**
174    * Tail of linked list of messages we would like to transmit.
175    */
176   struct PendingMessage *pending_tail;
177
178   /**
179    * Hash map containing the current outstanding unique requests
180    * (values are of type 'struct GNUNET_DHT_RouteHandle').
181    */
182   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
183
184   /**
185    * Task for trying to reconnect.
186    */
187   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
188
189   /**
190    * How quickly should we retry?  Used for exponential back-off on
191    * connect-errors.
192    */
193   struct GNUNET_TIME_Relative retry_time;
194
195   /**
196    * Generator for unique ids.
197    */
198   uint64_t uid_gen;
199
200 };
201
202
203 /**
204  * Transmit the next pending message, called by notify_transmit_ready
205  */
206 static size_t
207 transmit_pending (void *cls,
208                   size_t size, 
209                   void *buf);
210
211
212 /**
213  * Handler for messages received from the DHT service
214  * a demultiplexer which handles numerous message types
215  *
216  */
217 static void
218 service_message_handler (void *cls,
219                          const struct GNUNET_MessageHeader *msg);
220
221
222
223
224 /**
225  * Try to (re)connect to the DHT service.
226  *
227  * @return GNUNET_YES on success, GNUNET_NO on failure.
228  */
229 static int
230 try_connect (struct GNUNET_DHT_Handle *handle)
231 {
232   if (handle->client != NULL)
233     return GNUNET_OK;
234   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
235   if (handle->client == NULL)
236     { 
237       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
238                   _("Failed to connect to the DHT service!\n"));
239       return GNUNET_NO;
240     }
241 #if DEBUG_DHT
242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
243               "Starting to process replies from DHT\n");
244 #endif
245   GNUNET_CLIENT_receive (handle->client,
246                          &service_message_handler,
247                          handle, 
248                          GNUNET_TIME_UNIT_FOREVER_REL);
249   return GNUNET_YES;
250 }
251
252
253 /**
254  * Add the request corresponding to the given route handle
255  * to the pending queue (if it is not already in there).
256  *
257  * @param cls the 'struct GNUNET_DHT_Handle*'
258  * @param key key for the request (not used)
259  * @param value the 'struct GNUNET_DHT_RouteHandle*'
260  * @return GNUNET_YES (always)
261  */
262 static int
263 add_request_to_pending (void *cls,
264                         const GNUNET_HashCode *key,
265                         void *value)
266 {
267   struct GNUNET_DHT_Handle *handle = cls;
268   struct GNUNET_DHT_RouteHandle *rh = value;
269
270   if (GNUNET_NO == rh->message->in_pending_queue)
271     {
272       GNUNET_CONTAINER_DLL_insert (handle->pending_head,
273                                    handle->pending_tail,
274                                    rh->message);
275       rh->message->in_pending_queue = GNUNET_YES;
276     }
277   return GNUNET_YES;
278 }
279
280
281 /**
282  * Try to send messages from list of messages to send
283  * @param handle DHT_Handle
284  */
285 static void
286 process_pending_messages (struct GNUNET_DHT_Handle *handle);
287
288
289 /**
290  * Try reconnecting to the dht service.
291  *
292  * @param cls GNUNET_DHT_Handle
293  * @param tc scheduler context
294  */
295 static void
296 try_reconnect (void *cls,
297                const struct GNUNET_SCHEDULER_TaskContext *tc)
298 {
299   struct GNUNET_DHT_Handle *handle = cls;
300
301   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
302   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
303     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
304   else
305     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
306   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
307     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
308   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
309   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
310   if (handle->client == NULL)
311     {
312       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313                   "dht reconnect failed(!)\n");
314       return;
315     }
316   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
317                                          &add_request_to_pending,
318                                          handle);
319   process_pending_messages (handle);
320 }
321
322
323 /**
324  * Try reconnecting to the DHT service.
325  *
326  * @param handle handle to dht to (possibly) disconnect and reconnect
327  */
328 static void
329 do_disconnect (struct GNUNET_DHT_Handle *handle)
330 {
331   if (handle->client == NULL)
332     return;
333   GNUNET_assert(handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
334   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
335   handle->client = NULL;
336   handle->reconnect_task = GNUNET_SCHEDULER_add_delayed (handle->retry_time,
337                                                          &try_reconnect,
338                                                          handle);
339 }
340
341
342 /**
343  * Try to send messages from list of messages to send
344  */
345 static void
346 process_pending_messages (struct GNUNET_DHT_Handle *handle)
347 {
348   struct PendingMessage *head;
349
350   if (handle->client == NULL)
351     {
352       do_disconnect(handle);
353       return;
354     }
355   if (handle->th != NULL)
356     return;
357   if (NULL == (head = handle->pending_head))
358     return;
359   handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
360                                                     ntohs (head->msg->size),
361                                                     GNUNET_TIME_UNIT_FOREVER_REL, 
362                                                     GNUNET_YES,
363                                                     &transmit_pending,
364                                                     handle);
365   if (NULL == handle->th)    
366     {
367       do_disconnect (handle);
368       return;
369     }
370 }
371
372
373 /**
374  * Transmit the next pending message, called by notify_transmit_ready
375  */
376 static size_t
377 transmit_pending (void *cls,
378                   size_t size, 
379                   void *buf)
380 {
381   struct GNUNET_DHT_Handle *handle = cls;
382   struct PendingMessage *head;
383   size_t tsize;
384
385   handle->th = NULL;
386   if (buf == NULL)
387     {
388       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Transmission to DHT service failed!  Reconnecting!\n");
389       do_disconnect (handle);
390       return 0;
391     }
392   if (NULL == (head = handle->pending_head))
393     return 0;
394   
395   tsize = ntohs (head->msg->size);
396   if (size < tsize)
397     {
398       process_pending_messages (handle);
399       return 0;
400     }
401   memcpy (buf, head->msg, tsize);
402   GNUNET_CONTAINER_DLL_remove (handle->pending_head,
403                                handle->pending_tail,
404                                head);
405   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
406     {
407       GNUNET_SCHEDULER_cancel (head->timeout_task);
408       head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
409     }
410   if (NULL != head->cont)
411     {
412       GNUNET_SCHEDULER_add_continuation (head->cont,
413                                          head->cont_cls,
414                                          GNUNET_SCHEDULER_REASON_PREREQ_DONE);
415       head->cont = NULL;
416       head->cont_cls = NULL;
417     }
418   head->in_pending_queue = GNUNET_NO;
419   if (GNUNET_YES == head->free_on_send)
420     GNUNET_free (head);
421   process_pending_messages (handle);
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "Forwarded request of %u bytes to DHT service\n",
424               (unsigned int) tsize);
425   return tsize;
426 }
427
428
429 /**
430  * Process a given reply that might match the given
431  * request.
432  */
433 static int
434 process_reply (void *cls,
435                const GNUNET_HashCode *key,
436                void *value)
437 {
438   const struct GNUNET_DHT_RouteResultMessage *dht_msg = cls;
439   struct GNUNET_DHT_RouteHandle *rh = value;
440   const struct GNUNET_MessageHeader *enc_msg;
441   size_t enc_size;
442   uint64_t uid;
443   const struct GNUNET_PeerIdentity **outgoing_path;
444   const struct GNUNET_PeerIdentity *pos;
445   uint32_t outgoing_path_length;
446   unsigned int i;
447   char *path_offset;
448
449   uid = GNUNET_ntohll (dht_msg->unique_id);
450   if (uid != rh->uid)
451     {
452       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453                   "Reply UID did not match request UID\n");
454       return GNUNET_YES;
455     }
456   enc_msg = (const struct GNUNET_MessageHeader *)&dht_msg[1];
457   enc_size = ntohs (enc_msg->size);
458   if (enc_size < sizeof (struct GNUNET_MessageHeader))
459     {
460       GNUNET_break (0);
461       return GNUNET_NO;
462     }
463   path_offset = (char *)&dht_msg[1];
464   path_offset += enc_size;
465   pos = (const struct GNUNET_PeerIdentity *) path_offset;
466   outgoing_path_length = ntohl (dht_msg->outgoing_path_length);
467   if (outgoing_path_length * sizeof (struct GNUNET_PeerIdentity) > ntohs(dht_msg->header.size) - enc_size)
468     {
469       GNUNET_break (0);
470       return GNUNET_NO;
471     }
472
473   if (outgoing_path_length > 0)
474     {
475       outgoing_path = GNUNET_malloc ((outgoing_path_length + 1) * sizeof (struct GNUNET_PeerIdentity*));
476       for (i = 0; i < outgoing_path_length; i++)
477         {
478           outgoing_path[i] = pos;
479           pos++;
480         }
481       outgoing_path[outgoing_path_length] = NULL;
482     }
483   else
484     outgoing_path = NULL;
485
486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487               "Processing reply.\n");
488   rh->iter (rh->iter_cls, 
489             &rh->key,
490             outgoing_path,
491             enc_msg);
492   GNUNET_free_non_null (outgoing_path);
493   return GNUNET_YES;
494 }
495
496
497 /**
498  * Handler for messages received from the DHT service
499  * a demultiplexer which handles numerous message types
500  *
501  * @param cls the 'struct GNUNET_DHT_Handle'
502  * @param msg the incoming message
503  */
504 static void
505 service_message_handler (void *cls,
506                          const struct GNUNET_MessageHeader *msg)
507 {
508   struct GNUNET_DHT_Handle *handle = cls;
509   const struct GNUNET_DHT_RouteResultMessage *dht_msg;
510
511   if (msg == NULL)
512     {
513       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514                   "Error receiving data from DHT service, reconnecting\n");
515       do_disconnect (handle);
516       return;
517     }
518   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT)
519     {
520       GNUNET_break (0);
521       do_disconnect (handle);
522       return;
523     }
524   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_RouteResultMessage))
525     {
526       GNUNET_break (0);
527       do_disconnect (handle);
528       return;
529     }
530   dht_msg = (const struct GNUNET_DHT_RouteResultMessage *) msg;
531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532               "Comparing reply `%s' against %u pending requests.\n",
533               GNUNET_h2s (&dht_msg->key),
534               GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
535   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
536                                               &dht_msg->key,
537                                               &process_reply,
538                                               (void*) dht_msg);
539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540               "Continuing to process replies from DHT\n");
541   GNUNET_CLIENT_receive (handle->client,
542                          &service_message_handler,
543                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
544
545 }
546
547
548 /**
549  * Initialize the connection with the DHT service.
550  *
551  * @param cfg configuration to use
552  * @param ht_len size of the internal hash table to use for
553  *               processing multiple GET/FIND requests in parallel
554  *
555  * @return handle to the DHT service, or NULL on error
556  */
557 struct GNUNET_DHT_Handle *
558 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
559                     unsigned int ht_len)
560 {
561   struct GNUNET_DHT_Handle *handle;
562
563   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
564   handle->cfg = cfg;
565   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
566   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
567   if (GNUNET_NO == try_connect (handle))
568     {
569       GNUNET_DHT_disconnect (handle);
570       return NULL;
571     }
572   return handle;
573 }
574
575
576 /**
577  * Shutdown connection with the DHT service.
578  *
579  * @param handle handle of the DHT connection to stop
580  */
581 void
582 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
583 {
584   struct PendingMessage *pm;
585   GNUNET_assert(handle != NULL);
586   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size(handle->active_requests));
587   if (handle->th != NULL)
588     {
589       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
590       handle->th = NULL;
591     }
592   while (NULL != (pm = handle->pending_head))
593     {
594       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
595                                    handle->pending_tail,
596                                    pm);
597       GNUNET_assert (GNUNET_YES == pm->free_on_send);
598       if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
599         GNUNET_SCHEDULER_cancel (pm->timeout_task);
600       if (NULL != pm->cont)
601         GNUNET_SCHEDULER_add_continuation (pm->cont,
602                                            pm->cont_cls,
603                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
604       pm->in_pending_queue = GNUNET_NO;
605       GNUNET_free (pm);
606     }
607   if (handle->client != NULL)
608     {
609       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
610       handle->client = NULL;
611     }  
612   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
613     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
614   GNUNET_CONTAINER_multihashmap_destroy(handle->active_requests);
615   GNUNET_free (handle);
616 }
617
618
619
620
621 /* ***** Special low-level API providing generic routing abstraction ***** */
622
623
624 /**
625  * Timeout for the transmission of a fire&forget-request.  Clean it up.
626  *
627  * @param cls the 'struct PendingMessage'
628  * @param tc scheduler context
629  */
630 static void
631 timeout_route_request (void *cls,
632                        const struct GNUNET_SCHEDULER_TaskContext *tc)
633 {
634   struct PendingMessage *pending = cls;
635   struct GNUNET_DHT_Handle *handle;
636
637   if (pending->free_on_send != GNUNET_YES)
638     {
639       /* timeouts should only apply to fire & forget requests! */
640       GNUNET_break (0);
641       return;
642     }
643   handle = pending->handle;
644   GNUNET_CONTAINER_DLL_remove (handle->pending_head,
645                                handle->pending_tail,
646                                pending);
647   if (pending->cont != NULL)
648     pending->cont (pending->cont_cls,
649                    tc);
650   GNUNET_free (pending);
651 }
652
653
654 /**
655  * Initiate a generic DHT route operation.
656  *
657  * @param handle handle to the DHT service
658  * @param key the key to look up
659  * @param desired_replication_level how many peers should ultimately receive
660  *                this message (advisory only, target may be too high for the
661  *                given DHT or not hit exactly).
662  * @param options options for routing
663  * @param enc send the encapsulated message to a peer close to the key
664  * @param iter function to call on each result, NULL if no replies are expected
665  * @param iter_cls closure for iter
666  * @param timeout when to abort with an error if we fail to get
667  *                a confirmation for the request (when necessary) or how long
668  *                to wait for tramission to the service; only applies
669  *                if 'iter' is NULL
670  * @param cont continuation to call when the request has been transmitted
671  *             the first time to the service
672  * @param cont_cls closure for cont
673  * @return handle to stop the request, NULL if the request is "fire and forget"
674  */
675 struct GNUNET_DHT_RouteHandle *
676 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
677                         const GNUNET_HashCode *key,
678                         uint32_t desired_replication_level,
679                         enum GNUNET_DHT_RouteOption options,
680                         const struct GNUNET_MessageHeader *enc,
681                         struct GNUNET_TIME_Relative timeout,
682                         GNUNET_DHT_ReplyProcessor iter,
683                         void *iter_cls,
684                         GNUNET_SCHEDULER_Task cont,
685                         void *cont_cls)
686 {
687   struct PendingMessage *pending;
688   struct GNUNET_DHT_RouteMessage *message;
689   struct GNUNET_DHT_RouteHandle *route_handle;
690   uint16_t msize;
691   uint16_t esize;
692
693   esize = ntohs (enc->size);
694   if (sizeof (struct GNUNET_DHT_RouteMessage) + esize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
695     {
696       GNUNET_break (0);
697       return NULL;
698     }
699   msize = sizeof (struct GNUNET_DHT_RouteMessage) + esize;
700   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
701   message = (struct GNUNET_DHT_RouteMessage*) &pending[1];
702   pending->msg = &message->header;
703   pending->handle = handle;
704   pending->cont = cont;
705   pending->cont_cls = cont_cls;
706   
707   message->header.size = htons (msize);
708   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
709   message->options = htonl ((uint32_t) options);
710   message->desired_replication_level = htonl (desired_replication_level);
711   message->reserved = 0;
712   message->key = *key;
713   handle->uid_gen++;
714   message->unique_id = GNUNET_htonll (handle->uid_gen);
715   memcpy (&message[1], enc, esize);
716
717   if (iter != NULL)
718     {
719       route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
720       route_handle->key = *key;
721       route_handle->iter = iter;
722       route_handle->iter_cls = iter_cls;
723       route_handle->dht_handle = handle;
724       route_handle->uid = handle->uid_gen;
725       route_handle->message = pending;
726       GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
727                                          key,
728                                          route_handle,
729                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
730     }
731   else
732     {
733       route_handle = NULL;
734       pending->free_on_send = GNUNET_YES;
735       pending->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
736                                                             &timeout_route_request,
737                                                             pending);
738     }
739   GNUNET_CONTAINER_DLL_insert (handle->pending_head,
740                                handle->pending_tail,
741                                pending);
742   pending->in_pending_queue = GNUNET_YES;
743   process_pending_messages (handle);
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "DHT route start request processed, returning %p\n",
746               route_handle);
747   return route_handle;
748 }
749
750
751 /**
752  * Stop a previously issued routing request
753  *
754  * @param route_handle handle to the request to stop
755  */
756 void
757 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
758 {
759   struct GNUNET_DHT_Handle *handle;
760   struct PendingMessage *pending;
761   struct GNUNET_DHT_StopMessage *message;
762   size_t msize;
763
764   handle = route_handle->dht_handle;
765   if (GNUNET_NO == route_handle->message->in_pending_queue)
766     {
767       /* need to send stop message */
768       msize = sizeof (struct GNUNET_DHT_StopMessage);
769       pending = GNUNET_malloc (sizeof (struct PendingMessage) + 
770                                msize);
771       message = (struct GNUNET_DHT_StopMessage*) &pending[1];
772       pending->msg = &message->header;
773       message->header.size = htons (msize);
774       message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
775       message->reserved = 0;
776       message->unique_id = GNUNET_htonll (route_handle->uid);
777       message->key = route_handle->key;
778       pending->handle = handle;
779       pending->free_on_send = GNUNET_YES;
780       pending->in_pending_queue = GNUNET_YES;      
781       GNUNET_CONTAINER_DLL_insert (handle->pending_head,
782                                    handle->pending_tail,
783                                    pending);
784       process_pending_messages (handle);
785     }
786   else
787     {
788       /* simply remove pending request from message queue before
789          transmission, no need to transmit STOP request! */
790       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
791                                    handle->pending_tail,
792                                    route_handle->message);
793     }
794   GNUNET_assert (GNUNET_YES ==
795                  GNUNET_CONTAINER_multihashmap_remove (route_handle->dht_handle->active_requests,
796                                                        &route_handle->key,
797                                                        route_handle));
798   GNUNET_free(route_handle->message);
799   GNUNET_free(route_handle);
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "DHT route stop request processed\n");
802 }
803
804
805
806 /* ***** Special API for controlling DHT routing maintenance ******* */
807
808
809 /**
810  * Send a control message to the DHT.
811  *
812  * @param handle handle to the DHT service
813  * @param command command
814  * @param variable variable to the command
815  * @param cont continuation to call when done (transmitting request to service)
816  * @param cont_cls closure for cont
817  */
818 static void
819 send_control_message (struct GNUNET_DHT_Handle *handle,
820                       uint16_t command,
821                       uint16_t variable,
822                       GNUNET_SCHEDULER_Task cont,
823                       void *cont_cls)
824 {
825   struct GNUNET_DHT_ControlMessage *msg;
826   struct PendingMessage *pending;
827
828   pending = GNUNET_malloc (sizeof (struct PendingMessage) + 
829                            sizeof(struct GNUNET_DHT_ControlMessage)); 
830   msg = (struct GNUNET_DHT_ControlMessage*) &pending[1];
831   pending->msg = &msg->header;
832   msg->header.size = htons (sizeof(struct GNUNET_DHT_ControlMessage));
833   msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CONTROL);
834   msg->command = htons (command);
835   msg->variable = htons (variable);
836   pending->free_on_send = GNUNET_YES;
837   pending->cont = cont;
838   pending->cont_cls = cont_cls;
839   pending->in_pending_queue = GNUNET_YES;      
840   GNUNET_CONTAINER_DLL_insert (handle->pending_head,
841                                handle->pending_tail,
842                                pending);
843   process_pending_messages (handle);
844 }
845
846
847 /**
848  * Send a message to the DHT telling it to issue a single find
849  * peer request using the peers unique identifier as key.  This
850  * is used to fill the routing table, and is normally controlled
851  * by the DHT itself.  However, for testing and perhaps more
852  * close control over the DHT, this can be explicitly managed.
853  *
854  * @param handle handle to the DHT service
855  * @param cont continuation to call when done (transmitting request to service)
856  * @param cont_cls closure for cont
857  */
858 void
859 GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
860                        GNUNET_SCHEDULER_Task cont,
861                        void *cont_cls)
862 {
863   send_control_message (handle,
864                         GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0,
865                         cont, cont_cls);
866 }
867
868
869
870 #if HAVE_MALICIOUS
871
872 /**
873  * Send a message to the DHT telling it to start issuing random GET
874  * requests every 'frequency' milliseconds.
875  *
876  * @param handle handle to the DHT service
877  * @param frequency delay between sending malicious messages
878  * @param cont continuation to call when done (transmitting request to service)
879  * @param cont_cls closure for cont
880  */
881 void
882 GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle,
883                                  struct GNUNET_TIME_Relative frequency, GNUNET_SCHEDULER_Task cont,
884                             void *cont_cls)
885 {
886   if (frequency.rel_value > UINT16_MAX)
887     {
888       GNUNET_break (0);
889       return;
890     }
891   send_control_message (handle,
892                         GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET, frequency.rel_value,
893                         cont, cont_cls);
894 }
895
896 /**
897  * Send a message to the DHT telling it to start issuing random PUT
898  * requests every 'frequency' milliseconds.
899  *
900  * @param handle handle to the DHT service
901  * @param frequency delay between sending malicious messages
902  * @param cont continuation to call when done (transmitting request to service)
903  * @param cont_cls closure for cont
904  */
905 void 
906 GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, 
907                                  struct GNUNET_TIME_Relative frequency, GNUNET_SCHEDULER_Task cont,
908                             void *cont_cls)
909 {
910   if (frequency.rel_value > UINT16_MAX)
911     {
912       GNUNET_break (0);
913       return;
914     }
915
916   send_control_message (handle,
917                         GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT, frequency.rel_value,
918                         cont, cont_cls);
919 }
920
921
922 /**
923  * Send a message to the DHT telling it to start dropping
924  * all requests received.
925  *
926  * @param handle handle to the DHT service
927  * @param cont continuation to call when done (transmitting request to service)
928  * @param cont_cls closure for cont
929  *
930  */
931 void 
932 GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont,
933     void *cont_cls)
934 {
935   send_control_message (handle,
936                         GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP, 0,
937                         cont, cont_cls);
938 }
939
940 #endif
941
942 /* end of dht_api.c */