WiP
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_bandwidth_lib.h"
30 #include "gnunet_client_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_container_lib.h"
33 #include "gnunet_arm_service.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_protocols.h"
36 #include "gnunet_server_lib.h"
37 #include "gnunet_time_lib.h"
38 #include "gnunet_dht_service.h"
39 #include "dht.h"
40
41 #define DEBUG_DHT_API GNUNET_NO
42
43 /**
44  * Entry in our list of messages to be (re-)transmitted.
45  */
46 struct PendingMessage
47 {
48   /**
49    * This is a doubly-linked list.
50    */
51   struct PendingMessage *prev;
52
53   /**
54    * This is a doubly-linked list.
55    */
56   struct PendingMessage *next;
57
58   /**
59    * Message that is pending, allocated at the end
60    * of this struct.
61    */
62   const struct GNUNET_MessageHeader *msg;
63   
64   /**
65    * Handle to the DHT API context.
66    */
67   struct GNUNET_DHT_Handle *handle;
68                        
69   /**
70    * Continuation to call when the request has been
71    * transmitted (for the first time) to the service; can be NULL.
72    */
73   GNUNET_SCHEDULER_Task cont;
74
75   /**
76    * Closure for 'cont'.
77    */
78   void *cont_cls;
79
80   /**
81    * Timeout task for this message
82    */
83   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
84
85   /**
86    * Unique ID for this request
87    */
88   uint64_t unique_id;
89
90   /**
91    * Free the saved message once sent, set to GNUNET_YES for messages
92    * that do not receive responses; GNUNET_NO if this pending message
93    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
94    * from there.
95    */
96   int free_on_send;
97
98   /**
99    * GNUNET_YES if this message is in our pending queue right now.
100    */
101   int in_pending_queue;
102
103 };
104
105
106 /**
107  * Handle to a route request
108  */
109 struct GNUNET_DHT_RouteHandle
110 {
111
112   /**
113    * Iterator to call on data receipt
114    */
115   GNUNET_DHT_ReplyProcessor iter;
116
117   /**
118    * Closure for the iterator callback
119    */
120   void *iter_cls;
121
122   /**
123    * Main handle to this DHT api
124    */
125   struct GNUNET_DHT_Handle *dht_handle;
126
127   /**
128    * The actual message sent for this request,
129    * used for retransmitting requests on service
130    * failure/reconnect.  Freed on route_stop.
131    */
132   struct PendingMessage *message;
133
134   /**
135    * Key that this get request is for
136    */
137   GNUNET_HashCode key;
138
139   /**
140    * Unique identifier for this request (for key collisions). FIXME: redundant!?
141    */
142   uint64_t uid;
143
144 };
145
146
147 /**
148  * Connection to the DHT service.
149  */
150 struct GNUNET_DHT_Handle
151 {
152
153   /**
154    * Configuration to use.
155    */
156   const struct GNUNET_CONFIGURATION_Handle *cfg;
157
158   /**
159    * Socket (if available).
160    */
161   struct GNUNET_CLIENT_Connection *client;
162
163   /**
164    * Currently pending transmission request (or NULL).
165    */
166   struct GNUNET_CLIENT_TransmitHandle *th;
167
168   /**
169    * Head of linked list of messages we would like to transmit.
170    */
171   struct PendingMessage *pending_head;
172
173   /**
174    * Tail of linked list of messages we would like to transmit.
175    */
176   struct PendingMessage *pending_tail;
177
178   /**
179    * Hash map containing the current outstanding unique requests
180    * (values are of type 'struct GNUNET_DHT_RouteHandle').
181    */
182   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
183
184   /**
185    * Task for trying to reconnect.
186    */
187   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
188
189   /**
190    * How quickly should we retry?  Used for exponential back-off on
191    * connect-errors.
192    */
193   struct GNUNET_TIME_Relative retry_time;
194
195   /**
196    * Generator for unique ids.
197    */
198   uint64_t uid_gen;
199
200 };
201
202
203 /**
204  * Transmit the next pending message, called by notify_transmit_ready
205  */
206 static size_t
207 transmit_pending (void *cls,
208                   size_t size, 
209                   void *buf);
210
211
212 /**
213  * Handler for messages received from the DHT service
214  * a demultiplexer which handles numerous message types
215  *
216  */
217 static void
218 service_message_handler (void *cls,
219                          const struct GNUNET_MessageHeader *msg);
220
221
222
223
224 /**
225  * Try to (re)connect to the DHT service.
226  *
227  * @return GNUNET_YES on success, GNUNET_NO on failure.
228  */
229 static int
230 try_connect (struct GNUNET_DHT_Handle *handle)
231 {
232   if (handle->client != NULL)
233     return GNUNET_OK;
234   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
235   if (handle->client == NULL)
236     { 
237       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
238                   _("Failed to connect to the DHT service!\n"));
239       return GNUNET_NO;
240     }
241 #if DEBUG_DHT
242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
243               "Starting to process replies from DHT\n");
244 #endif
245   GNUNET_CLIENT_receive (handle->client,
246                          &service_message_handler,
247                          handle, 
248                          GNUNET_TIME_UNIT_FOREVER_REL);
249   return GNUNET_YES;
250 }
251
252
253 /**
254  * Add the request corresponding to the given route handle
255  * to the pending queue (if it is not already in there).
256  *
257  * @param cls the 'struct GNUNET_DHT_Handle*'
258  * @param key key for the request (not used)
259  * @param value the 'struct GNUNET_DHT_RouteHandle*'
260  * @return GNUNET_YES (always)
261  */
262 static int
263 add_request_to_pending (void *cls,
264                         const GNUNET_HashCode *key,
265                         void *value)
266 {
267   struct GNUNET_DHT_Handle *handle = cls;
268   struct GNUNET_DHT_RouteHandle *rh = value;
269
270   if (GNUNET_NO == rh->message->in_pending_queue)
271     {
272       GNUNET_CONTAINER_DLL_insert (handle->pending_head,
273                                    handle->pending_tail,
274                                    rh->message);
275       rh->message->in_pending_queue = GNUNET_YES;
276     }
277   return GNUNET_YES;
278 }
279
280
281 /**
282  * Try reconnecting to the dht service.
283  *
284  * @param cls GNUNET_DHT_Handle
285  * @param tc scheduler context
286  */
287 static void
288 try_reconnect (void *cls,
289                const struct GNUNET_SCHEDULER_TaskContext *tc)
290 {
291   struct GNUNET_DHT_Handle *handle = cls;
292   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
293   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
294     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
295   else
296     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
297   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
298     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
299   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
300   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
301   if (handle->client == NULL)
302     {
303       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
304                   "dht reconnect failed(!)\n");
305       return;
306     }
307
308   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
309                                          &add_request_to_pending,
310                                          handle);
311   if (handle->pending_head == NULL)
312     return;
313
314   GNUNET_CLIENT_notify_transmit_ready (handle->client,
315                                        ntohs(handle->pending_head->msg->size),
316                                        GNUNET_TIME_UNIT_FOREVER_REL,
317                                        GNUNET_NO,
318                                        &transmit_pending,
319                                        handle);
320 }
321
322
323 /**
324  * Try reconnecting to the DHT service.
325  *
326  * @param handle handle to dht to (possibly) disconnect and reconnect
327  */
328 static void
329 do_disconnect (struct GNUNET_DHT_Handle *handle)
330 {
331   if (handle->client == NULL)
332     return;
333   GNUNET_assert(handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
334   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
335   handle->client = NULL;
336   handle->reconnect_task = GNUNET_SCHEDULER_add_delayed (handle->retry_time,
337                                                          &try_reconnect,
338                                                          handle);
339 }
340
341
342 /**
343  * Try to send messages from list of messages to send
344  */
345 static void
346 process_pending_messages (struct GNUNET_DHT_Handle *handle)
347 {
348   struct PendingMessage *head;
349
350   if (handle->client == NULL)
351     {
352       do_disconnect(handle);
353       return;
354     }
355   if (handle->th != NULL)
356     return;
357   if (NULL == (head = handle->pending_head))
358     return;
359   handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
360                                                     ntohs (head->msg->size),
361                                                     GNUNET_TIME_UNIT_FOREVER_REL, 
362                                                     GNUNET_YES,
363                                                     &transmit_pending,
364                                                     handle);
365   if (NULL == handle->th)    
366     {
367       do_disconnect (handle);
368       return;
369     }
370 }
371
372
373 /**
374  * Transmit the next pending message, called by notify_transmit_ready
375  */
376 static size_t
377 transmit_pending (void *cls,
378                   size_t size, 
379                   void *buf)
380 {
381   struct GNUNET_DHT_Handle *handle = cls;
382   struct PendingMessage *head;
383   size_t tsize;
384
385   handle->th = NULL;
386   if (buf == NULL)
387     {
388       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Transmission to DHT service failed!  Reconnecting!\n");
389       do_disconnect (handle);
390       return 0;
391     }
392   if (NULL == (head = handle->pending_head))
393     return 0;
394   
395   tsize = ntohs (head->msg->size);
396   if (size < tsize)
397     {
398       process_pending_messages (handle);
399       return 0;
400     }
401   memcpy (buf, head->msg, tsize);
402   GNUNET_CONTAINER_DLL_remove (handle->pending_head,
403                                handle->pending_tail,
404                                head);
405   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
406     {
407       GNUNET_SCHEDULER_cancel (head->timeout_task);
408       head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
409     }
410   if (NULL != head->cont)
411     {
412       GNUNET_SCHEDULER_add_continuation (head->cont,
413                                          head->cont_cls,
414                                          GNUNET_SCHEDULER_REASON_PREREQ_DONE);
415       head->cont = NULL;
416       head->cont_cls = NULL;
417     }
418   head->in_pending_queue = GNUNET_NO;
419   if (GNUNET_YES == head->free_on_send)
420     GNUNET_free (head);
421   process_pending_messages (handle);
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "Forwarded request of %u bytes to DHT service\n",
424               (unsigned int) tsize);
425   return tsize;
426 }
427
428
429 /**
430  * Process a given reply that might match the given
431  * request.
432  */
433 static int
434 process_reply (void *cls,
435                const GNUNET_HashCode *key,
436                void *value)
437 {
438   const struct GNUNET_DHT_RouteResultMessage *dht_msg = cls;
439   struct GNUNET_DHT_RouteHandle *rh = value;
440   const struct GNUNET_MessageHeader *enc_msg;
441   size_t enc_size;
442   uint64_t uid;
443   const struct GNUNET_PeerIdentity **outgoing_path;
444   const struct GNUNET_PeerIdentity *pos;
445   uint32_t outgoing_path_length;
446   unsigned int i;
447   char *path_offset;
448
449   uid = GNUNET_ntohll (dht_msg->unique_id);
450   if (uid != rh->uid)
451     {
452       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453                   "Reply UID did not match request UID\n");
454       return GNUNET_YES;
455     }
456   enc_msg = (const struct GNUNET_MessageHeader *)&dht_msg[1];
457   enc_size = ntohs (enc_msg->size);
458   if (enc_size < sizeof (struct GNUNET_MessageHeader))
459     {
460       GNUNET_break (0);
461       return GNUNET_NO;
462     }
463   path_offset = (char *)&dht_msg[1];
464   path_offset += enc_size;
465   pos = (const struct GNUNET_PeerIdentity *) path_offset;
466   outgoing_path_length = ntohl (dht_msg->outgoing_path_length);
467   if (outgoing_path_length * sizeof (struct GNUNET_PeerIdentity) > ntohs(dht_msg->header.size) - enc_size)
468     {
469       GNUNET_break (0);
470       return GNUNET_NO;
471     }
472
473   if (outgoing_path_length > 0)
474     {
475       outgoing_path = GNUNET_malloc ((outgoing_path_length + 1) * sizeof (struct GNUNET_PeerIdentity*));
476       for (i = 0; i < outgoing_path_length; i++)
477         {
478           outgoing_path[i] = pos;
479           pos++;
480         }
481       outgoing_path[outgoing_path_length] = NULL;
482     }
483   else
484     outgoing_path = NULL;
485
486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487               "Processing reply.\n");
488   rh->iter (rh->iter_cls, 
489             &rh->key,
490             outgoing_path,
491             enc_msg);
492   GNUNET_free_non_null (outgoing_path);
493   return GNUNET_YES;
494 }
495
496
497 /**
498  * Handler for messages received from the DHT service
499  * a demultiplexer which handles numerous message types
500  *
501  * @param cls the 'struct GNUNET_DHT_Handle'
502  * @param msg the incoming message
503  */
504 static void
505 service_message_handler (void *cls,
506                          const struct GNUNET_MessageHeader *msg)
507 {
508   struct GNUNET_DHT_Handle *handle = cls;
509   const struct GNUNET_DHT_RouteResultMessage *dht_msg;
510
511   if (msg == NULL)
512     {
513       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514                   "Error receiving data from DHT service, reconnecting\n");
515       do_disconnect (handle);
516       return;
517     }
518   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT)
519     {
520       GNUNET_break (0);
521       do_disconnect (handle);
522       return;
523     }
524   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_RouteResultMessage))
525     {
526       GNUNET_break (0);
527       do_disconnect (handle);
528       return;
529     }
530   dht_msg = (const struct GNUNET_DHT_RouteResultMessage *) msg;
531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532               "Comparing reply `%s' against %u pending requests.\n",
533               GNUNET_h2s (&dht_msg->key),
534               GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
535   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
536                                               &dht_msg->key,
537                                               &process_reply,
538                                               (void*) dht_msg);
539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540               "Continuing to process replies from DHT\n");
541   GNUNET_CLIENT_receive (handle->client,
542                          &service_message_handler,
543                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
544
545 }
546
547
548 /**
549  * Initialize the connection with the DHT service.
550  *
551  * @param cfg configuration to use
552  * @param ht_len size of the internal hash table to use for
553  *               processing multiple GET/FIND requests in parallel
554  *
555  * @return handle to the DHT service, or NULL on error
556  */
557 struct GNUNET_DHT_Handle *
558 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
559                     unsigned int ht_len)
560 {
561   struct GNUNET_DHT_Handle *handle;
562
563   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
564   handle->cfg = cfg;
565   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
566   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
567   if (GNUNET_NO == try_connect (handle))
568     {
569       GNUNET_DHT_disconnect (handle);
570       return NULL;
571     }
572   return handle;
573 }
574
575
576 /**
577  * Shutdown connection with the DHT service.
578  *
579  * @param handle handle of the DHT connection to stop
580  */
581 void
582 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
583 {
584   struct PendingMessage *pm;
585   GNUNET_assert(handle != NULL);
586   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size(handle->active_requests));
587   if (handle->th != NULL)
588     {
589       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
590       handle->th = NULL;
591     }
592   while (NULL != (pm = handle->pending_head))
593     {
594       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
595                                    handle->pending_tail,
596                                    pm);
597       GNUNET_assert (GNUNET_YES == pm->free_on_send);
598       if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
599         GNUNET_SCHEDULER_cancel (pm->timeout_task);
600       if (NULL != pm->cont)
601         GNUNET_SCHEDULER_add_continuation (pm->cont,
602                                            pm->cont_cls,
603                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
604       pm->in_pending_queue = GNUNET_NO;
605       GNUNET_free (pm);
606     }
607   if (handle->client != NULL)
608     {
609       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
610       handle->client = NULL;
611     }  
612   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
613     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
614   GNUNET_CONTAINER_multihashmap_destroy(handle->active_requests);
615   GNUNET_free (handle);
616 }
617
618
619
620
621 /* ***** Special low-level API providing generic routing abstraction ***** */
622
623
624 /**
625  * Timeout for the transmission of a fire&forget-request.  Clean it up.
626  *
627  * @param cls the 'struct PendingMessage'
628  * @param tc scheduler context
629  */
630 static void
631 timeout_route_request (void *cls,
632                        const struct GNUNET_SCHEDULER_TaskContext *tc)
633 {
634   struct PendingMessage *pending = cls;
635   struct GNUNET_DHT_Handle *handle;
636
637   if (pending->free_on_send != GNUNET_YES)
638     {
639       /* timeouts should only apply to fire & forget requests! */
640       GNUNET_break (0);
641       return;
642     }
643   handle = pending->handle;
644   GNUNET_CONTAINER_DLL_remove (handle->pending_head,
645                                handle->pending_tail,
646                                pending);
647   if (pending->cont != NULL)
648     pending->cont (pending->cont_cls,
649                    tc);
650   GNUNET_free (pending);
651 }
652
653
654 /**
655  * Initiate a generic DHT route operation.
656  *
657  * @param handle handle to the DHT service
658  * @param key the key to look up
659  * @param desired_replication_level how many peers should ultimately receive
660  *                this message (advisory only, target may be too high for the
661  *                given DHT or not hit exactly).
662  * @param options options for routing
663  * @param enc send the encapsulated message to a peer close to the key
664  * @param iter function to call on each result, NULL if no replies are expected
665  * @param iter_cls closure for iter
666  * @param timeout when to abort with an error if we fail to get
667  *                a confirmation for the request (when necessary) or how long
668  *                to wait for tramission to the service; only applies
669  *                if 'iter' is NULL
670  * @param cont continuation to call when the request has been transmitted
671  *             the first time to the service
672  * @param cont_cls closure for cont
673  * @return handle to stop the request, NULL if the request is "fire and forget"
674  */
675 struct GNUNET_DHT_RouteHandle *
676 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
677                         const GNUNET_HashCode *key,
678                         uint32_t desired_replication_level,
679                         enum GNUNET_DHT_RouteOption options,
680                         const struct GNUNET_MessageHeader *enc,
681                         struct GNUNET_TIME_Relative timeout,
682                         GNUNET_DHT_ReplyProcessor iter,
683                         void *iter_cls,
684                         GNUNET_SCHEDULER_Task cont,
685                         void *cont_cls)
686 {
687   struct PendingMessage *pending;
688   struct GNUNET_DHT_RouteMessage *message;
689   struct GNUNET_DHT_RouteHandle *route_handle;
690   uint16_t msize;
691   uint16_t esize;
692
693   esize = ntohs (enc->size);
694   if (sizeof (struct GNUNET_DHT_RouteMessage) + esize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
695     {
696       GNUNET_break (0);
697       return NULL;
698     }
699   msize = sizeof (struct GNUNET_DHT_RouteMessage) + esize;
700   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
701   message = (struct GNUNET_DHT_RouteMessage*) &pending[1];
702   pending->msg = &message->header;
703   pending->handle = handle;
704   pending->cont = cont;
705   pending->cont_cls = cont_cls;
706   
707   message->header.size = htons (msize);
708   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
709   message->key = *key;
710   message->options = htonl ((uint32_t) options);
711   message->desired_replication_level = htonl (desired_replication_level);
712   handle->uid_gen++;
713   message->unique_id = GNUNET_htonll (handle->uid_gen);
714   memcpy (&message[1], enc, esize);
715
716   if (iter != NULL)
717     {
718       route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
719       route_handle->key = *key;
720       route_handle->iter = iter;
721       route_handle->iter_cls = iter_cls;
722       route_handle->dht_handle = handle;
723       route_handle->uid = handle->uid_gen;
724       route_handle->message = pending;
725       GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
726                                          key,
727                                          route_handle,
728                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
729     }
730   else
731     {
732       route_handle = NULL;
733       pending->free_on_send = GNUNET_YES;
734       pending->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
735                                                             &timeout_route_request,
736                                                             pending);
737     }
738   GNUNET_CONTAINER_DLL_insert (handle->pending_head,
739                                handle->pending_tail,
740                                pending);
741   pending->in_pending_queue = GNUNET_YES;
742   process_pending_messages (handle);
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "DHT route start request processed, returning %p\n",
745               route_handle);
746   return route_handle;
747 }
748
749
750 /**
751  * Stop a previously issued routing request
752  *
753  * @param route_handle handle to the request to stop
754  */
755 void
756 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
757 {
758   struct GNUNET_DHT_Handle *handle;
759   struct PendingMessage *pending;
760   struct GNUNET_DHT_StopMessage *message;
761   size_t msize;
762
763   handle = route_handle->dht_handle;
764   if (GNUNET_NO == route_handle->message->in_pending_queue)
765     {
766       /* need to send stop message */
767       msize = sizeof (struct GNUNET_DHT_StopMessage);
768       pending = GNUNET_malloc (sizeof (struct PendingMessage) + 
769                                msize);
770       message = (struct GNUNET_DHT_StopMessage*) &pending[1];
771       pending->msg = &message->header;
772       message->header.size = htons (msize);
773       message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
774       message->unique_id = GNUNET_htonll (route_handle->uid);
775       message->key = route_handle->key;
776       pending->handle = handle;
777       pending->free_on_send = GNUNET_YES;
778       pending->in_pending_queue = GNUNET_YES;      
779       GNUNET_CONTAINER_DLL_insert (handle->pending_head,
780                                    handle->pending_tail,
781                                    pending);
782       process_pending_messages (handle);
783     }
784   else
785     {
786       /* simply remove pending request from message queue before
787          transmission, no need to transmit STOP request! */
788       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
789                                    handle->pending_tail,
790                                    route_handle->message);
791     }
792   GNUNET_assert (GNUNET_YES ==
793                  GNUNET_CONTAINER_multihashmap_remove (route_handle->dht_handle->active_requests,
794                                                        &route_handle->key,
795                                                        route_handle));
796   GNUNET_free(route_handle->message);
797   GNUNET_free(route_handle);
798   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799               "DHT route stop request processed\n");
800 }
801
802
803
804 /* ***** Special API for controlling DHT routing maintenance ******* */
805
806
807 /**
808  * Send a control message to the DHT.
809  *
810  * @param handle handle to the DHT service
811  * @param command command
812  * @param variable variable to the command
813  * @param cont continuation to call when done (transmitting request to service)
814  * @param cont_cls closure for cont
815  */
816 static void
817 send_control_message (struct GNUNET_DHT_Handle *handle,
818                       uint16_t command,
819                       uint16_t variable,
820                       GNUNET_SCHEDULER_Task cont,
821                       void *cont_cls)
822 {
823   struct GNUNET_DHT_ControlMessage *msg;
824   struct PendingMessage *pending;
825
826   pending = GNUNET_malloc (sizeof (struct PendingMessage) + 
827                            sizeof(struct GNUNET_DHT_ControlMessage)); 
828   msg = (struct GNUNET_DHT_ControlMessage*) &pending[1];
829   pending->msg = &msg->header;
830   msg->header.size = htons (sizeof(struct GNUNET_DHT_ControlMessage));
831   msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CONTROL);
832   msg->command = htons (command);
833   msg->variable = htons (variable);
834   pending->free_on_send = GNUNET_YES;
835   pending->cont = cont;
836   pending->cont_cls = cont_cls;
837   pending->in_pending_queue = GNUNET_YES;      
838   GNUNET_CONTAINER_DLL_insert (handle->pending_head,
839                                handle->pending_tail,
840                                pending);
841   process_pending_messages (handle);
842 }
843
844
845 /**
846  * Send a message to the DHT telling it to issue a single find
847  * peer request using the peers unique identifier as key.  This
848  * is used to fill the routing table, and is normally controlled
849  * by the DHT itself.  However, for testing and perhaps more
850  * close control over the DHT, this can be explicitly managed.
851  *
852  * @param handle handle to the DHT service
853  * @param cont continuation to call when done (transmitting request to service)
854  * @param cont_cls closure for cont
855  */
856 void
857 GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
858                        GNUNET_SCHEDULER_Task cont,
859                        void *cont_cls)
860 {
861   send_control_message (handle,
862                         GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0,
863                         cont, cont_cls);
864 }
865
866
867
868 #if HAVE_MALICIOUS
869
870 /**
871  * Send a message to the DHT telling it to start issuing random GET
872  * requests every 'frequency' milliseconds.
873  *
874  * @param handle handle to the DHT service
875  * @param frequency delay between sending malicious messages
876  * @param cont continuation to call when done (transmitting request to service)
877  * @param cont_cls closure for cont
878  */
879 void
880 GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle,
881                                  struct GNUNET_TIME_Relative frequency, GNUNET_SCHEDULER_Task cont,
882                             void *cont_cls)
883 {
884   if (frequency.rel_value > UINT16_MAX)
885     {
886       GNUNET_break (0);
887       return;
888     }
889   send_control_message (handle,
890                         GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET, frequency.rel_value,
891                         cont, cont_cls);
892 }
893
894 /**
895  * Send a message to the DHT telling it to start issuing random PUT
896  * requests every 'frequency' milliseconds.
897  *
898  * @param handle handle to the DHT service
899  * @param frequency delay between sending malicious messages
900  * @param cont continuation to call when done (transmitting request to service)
901  * @param cont_cls closure for cont
902  */
903 void 
904 GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, 
905                                  struct GNUNET_TIME_Relative frequency, GNUNET_SCHEDULER_Task cont,
906                             void *cont_cls)
907 {
908   if (frequency.rel_value > UINT16_MAX)
909     {
910       GNUNET_break (0);
911       return;
912     }
913
914   send_control_message (handle,
915                         GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT, frequency.rel_value,
916                         cont, cont_cls);
917 }
918
919
920 /**
921  * Send a message to the DHT telling it to start dropping
922  * all requests received.
923  *
924  * @param handle handle to the DHT service
925  * @param cont continuation to call when done (transmitting request to service)
926  * @param cont_cls closure for cont
927  *
928  */
929 void 
930 GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont,
931     void *cont_cls)
932 {
933   send_control_message (handle,
934                         GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP, 0,
935                         cont, cont_cls);
936 }
937
938 #endif
939
940 /* end of dht_api.c */