inexorably closer to perfection
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  */
28
29 #include "platform.h"
30 #include "gnunet_bandwidth_lib.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_container_lib.h"
34 #include "gnunet_arm_service.h"
35 #include "gnunet_hello_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_server_lib.h"
38 #include "gnunet_time_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "dht.h"
41
42 #define DEBUG_DHT_API GNUNET_NO
43
44 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
45
46 struct PendingMessage
47 {
48   /**
49    * Message that is pending
50    */
51   struct GNUNET_MessageHeader *msg;
52
53   /**
54    * Timeout for this message
55    */
56   struct GNUNET_TIME_Relative timeout;
57
58   /**
59    * Continuation to call on message send
60    * or message receipt confirmation
61    */
62   GNUNET_SCHEDULER_Task cont;
63
64   /**
65    * Continuation closure
66    */
67   void *cont_cls;
68
69   /**
70    * Whether or not to await verification the message
71    * was received by the service
72    */
73   size_t is_unique;
74
75   /**
76    * Unique ID for this request
77    */
78   uint64_t unique_id;
79
80 };
81
82 struct GNUNET_DHT_GetContext
83 {
84   /**
85    * Iterator to call on data receipt
86    */
87   GNUNET_DHT_GetIterator iter;
88
89   /**
90    * Closure for the iterator callback
91    */
92   void *iter_cls;
93
94 };
95
96 struct GNUNET_DHT_FindPeerContext
97 {
98   /**
99    * Iterator to call on data receipt
100    */
101   GNUNET_DHT_FindPeerProcessor proc;
102
103   /**
104    * Closure for the iterator callback
105    */
106   void *proc_cls;
107
108 };
109
110 /**
111  * Handle to control a unique operation (one that is
112  * expected to return results)
113  */
114 struct GNUNET_DHT_RouteHandle
115 {
116
117   /**
118    * Unique identifier for this request (for key collisions)
119    */
120   uint64_t uid;
121
122   /**
123    * Key that this get request is for
124    */
125   GNUNET_HashCode key;
126
127   /**
128    * Iterator to call on data receipt
129    */
130   GNUNET_DHT_ReplyProcessor iter;
131
132   /**
133    * Closure for the iterator callback
134    */
135   void *iter_cls;
136
137   /**
138    * Main handle to this DHT api
139    */
140   struct GNUNET_DHT_Handle *dht_handle;
141 };
142
143 /**
144  * Handle for a non unique request, holds callback
145  * which needs to be called before we allow other
146  * messages to be processed and sent to the DHT service
147  */
148 struct GNUNET_DHT_NonUniqueHandle
149 {
150   /**
151    * Key that this get request is for
152    */
153   GNUNET_HashCode key;
154
155   /**
156    * Type of data get request was for
157    */
158   uint32_t type;
159
160   /**
161    * Continuation to call on service
162    * confirmation of message receipt.
163    */
164   GNUNET_SCHEDULER_Task cont;
165
166   /**
167    * Send continuation cls
168    */
169   void *cont_cls;
170 };
171
172 /**
173  * Handle to control a get operation.
174  */
175 struct GNUNET_DHT_GetHandle
176 {
177   /**
178    * Handle to the actual route operation for the get
179    */
180   struct GNUNET_DHT_RouteHandle *route_handle;
181
182   /**
183    * The context of the get request
184    */
185   struct GNUNET_DHT_GetContext get_context;
186 };
187
188 /**
189  * Handle to control a find peer operation.
190  */
191 struct GNUNET_DHT_FindPeerHandle
192 {
193   /**
194      * Handle to the actual route operation for the request
195      */
196   struct GNUNET_DHT_RouteHandle *route_handle;
197
198     /**
199      * The context of the find peer request
200      */
201   struct GNUNET_DHT_FindPeerContext find_peer_context;
202 };
203
204
205 /**
206  * Connection to the DHT service.
207  */
208 struct GNUNET_DHT_Handle
209 {
210   /**
211    * Our scheduler.
212    */
213   struct GNUNET_SCHEDULER_Handle *sched;
214
215   /**
216    * Configuration to use.
217    */
218   const struct GNUNET_CONFIGURATION_Handle *cfg;
219
220   /**
221    * Socket (if available).
222    */
223   struct GNUNET_CLIENT_Connection *client;
224
225   /**
226    * Currently pending transmission request.
227    */
228   struct GNUNET_CLIENT_TransmitHandle *th;
229
230   /**
231    * Message we are currently sending, only allow
232    * a single message to be queued.  If not unique
233    * (typically a put request), await a confirmation
234    * from the service that the message was received.
235    * If unique, just fire and forget.
236    */
237   struct PendingMessage *current;
238
239   /**
240    * Hash map containing the current outstanding unique requests
241    */
242   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
243
244   /**
245    * Non unique handle.  If set don't schedule another non
246    * unique request.
247    */
248   struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
249
250   /**
251    * Generator for unique ids.
252    */
253   uint64_t uid_gen;
254
255 };
256
257
258 /**
259  * Convert unique ID to hash code.
260  *
261  * @param uid unique ID to convert
262  * @param hash set to uid (extended with zeros)
263  */
264 static void
265 hash_from_uid (uint64_t uid,
266                GNUNET_HashCode *hash)
267 {
268   memset (hash, 0, sizeof(GNUNET_HashCode));
269   *((uint64_t*)hash) = uid;
270 }
271
272
273 /**
274  * Handler for messages received from the DHT service
275  * a demultiplexer which handles numerous message types
276  *
277  */
278 void
279 service_message_handler (void *cls,
280                          const struct GNUNET_MessageHeader *msg)
281 {
282   struct GNUNET_DHT_Handle *handle = cls;
283   struct GNUNET_DHT_RouteResultMessage *dht_msg;
284   struct GNUNET_MessageHeader *enc_msg;
285   struct GNUNET_DHT_RouteHandle *route_handle;
286   uint64_t uid;
287   GNUNET_HashCode uid_hash;
288   size_t enc_size;
289   /* TODO: find out message type, handle callbacks for different types of messages.
290    * Should be a non unique acknowledgment, or unique result. */
291
292   if (msg == NULL)
293     {
294 #if DEBUG_DHT_API
295       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
296                   "`%s': Received NULL from server, connection down!\n",
297                   "DHT API");
298 #endif
299       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
300       handle->client = GNUNET_CLIENT_connect (handle->sched, 
301                                               "dht", 
302                                               handle->cfg);
303       /* FIXME: re-transmit *all* of our GET requests AND re-start
304          receiving responses! */
305       return;
306     }
307
308   switch (ntohs (msg->type))
309     {
310     case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT:
311       {
312         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
313         uid = GNUNET_ntohll (dht_msg->unique_id);
314 #if DEBUG_DHT_API
315         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
316                     "`%s': Received response to message (uid %llu)\n",
317                     "DHT API", uid);
318 #endif
319
320         hash_from_uid (uid, &uid_hash);
321         route_handle =
322           GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
323                                              &uid_hash);
324         if (route_handle == NULL)   /* We have no recollection of this request */
325           {
326 #if DEBUG_DHT_API
327             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328                         "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
329                         "DHT API", uid);
330 #endif
331           }
332         else
333           {
334             enc_size =
335               ntohs (dht_msg->header.size) -
336               sizeof (struct GNUNET_DHT_RouteResultMessage);
337             GNUNET_assert (enc_size > 0);
338             enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
339             route_handle->iter (route_handle->iter_cls, enc_msg);
340           }
341
342         break;
343       }
344       /* FIXME: we don't want these anymore, call continuation once message is sent. */
345       /*
346     case GNUNET_MESSAGE_TYPE_DHT_STOP:
347       {
348         stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
349         uid = GNUNET_ntohll (stop_msg->unique_id);
350 #if DEBUG_DHT_API
351         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352                     "`%s': Received response to message (uid %llu), current uid %llu\n",
353                     "DHT API", uid, handle->current->unique_id);
354 #endif
355         if (handle->current->unique_id == uid)
356           {
357 #if DEBUG_DHT_API
358             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359                         "`%s': Have pending confirmation for this message!\n",
360                         "DHT API", uid);
361 #endif
362             if (handle->current->cont != NULL)
363               GNUNET_SCHEDULER_add_continuation (handle->sched,
364                                                  handle->current->cont,
365                                                  handle->current->cont_cls,
366                                                  GNUNET_SCHEDULER_REASON_PREREQ_DONE);
367
368             GNUNET_free (handle->current->msg);
369             GNUNET_free (handle->current);
370             handle->current = NULL;
371           }
372         break;
373       }
374       */
375     default:
376       {
377         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
378                     "`%s': Received unknown message type %d\n", "DHT API",
379                     ntohs (msg->type));
380       }
381     }
382   GNUNET_CLIENT_receive (handle->client,
383                          &service_message_handler,
384                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
385
386 }
387
388
389 /**
390  * Initialize the connection with the DHT service.
391  *
392  * @param sched scheduler to use
393  * @param cfg configuration to use
394  * @param ht_len size of the internal hash table to use for
395  *               processing multiple GET/FIND requests in parallel
396  *
397  * @return handle to the DHT service, or NULL on error
398  */
399 struct GNUNET_DHT_Handle *
400 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
401                     const struct GNUNET_CONFIGURATION_Handle *cfg,
402                     unsigned int ht_len)
403 {
404   struct GNUNET_DHT_Handle *handle;
405
406   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
407   handle->cfg = cfg;
408   handle->sched = sched;
409   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
410   if (handle->client == NULL)
411     {
412       GNUNET_free (handle);
413       return NULL;
414     }
415   handle->outstanding_requests =
416     GNUNET_CONTAINER_multihashmap_create (ht_len);
417 #if DEBUG_DHT_API
418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419               "`%s': Connection to service in progress\n", "DHT API");
420 #endif
421   GNUNET_CLIENT_receive (handle->client,
422                          &service_message_handler,
423                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
424   return handle;
425 }
426
427
428 /**
429  * Shutdown connection with the DHT service.
430  *
431  * @param handle handle of the DHT connection to stop
432  */
433 void
434 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
435 {
436 #if DEBUG_DHT_API
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
439 #endif
440   GNUNET_assert (handle != NULL);
441   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
442     {
443       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
444       handle->th = NULL;
445     }
446   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
447     GNUNET_free (handle->current);
448
449   if (handle->client != NULL)   /* Finally, disconnect from the service */
450     {
451       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
452       handle->client = NULL;
453     }
454   /* Either assert that outstanding_requests is empty */
455   /* FIXME: handle->outstanding_requests not freed! */
456   GNUNET_free (handle);
457 }
458
459
460 /**
461  * Send complete (or failed), call continuation if we have one.
462  */
463 static void
464 finish (struct GNUNET_DHT_Handle *handle, int code)
465 {
466   struct PendingMessage *pos = handle->current;
467 #if DEBUG_DHT_API
468   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
469 #endif
470   GNUNET_assert (pos != NULL);
471
472
473   if (pos->cont != NULL)
474     {
475       if (code == GNUNET_SYSERR)
476         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
477                                            pos->cont_cls,
478                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
479       else
480         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
481                                            pos->cont_cls,
482                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
483     }
484
485   GNUNET_free (pos->msg);
486   GNUNET_free (pos);
487   handle->current = NULL;
488 }
489
490
491 /**
492  * Transmit the next pending message, called by notify_transmit_ready
493  */
494 static size_t
495 transmit_pending (void *cls, size_t size, void *buf)
496 {
497   struct GNUNET_DHT_Handle *handle = cls;
498   size_t tsize;
499
500 #if DEBUG_DHT_API
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502               "`%s': In transmit_pending\n", "DHT API");
503 #endif
504   if (buf == NULL)
505     {
506 #if DEBUG_DHT_API
507       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
509 #endif
510       /* FIXME: free associated resources or summat */
511       finish (handle, GNUNET_SYSERR);
512       return 0;
513     }
514
515   handle->th = NULL;
516
517   if (handle->current != NULL)
518     {
519       tsize = ntohs (handle->current->msg->size);
520       if (size >= tsize)
521         {
522 #if DEBUG_DHT_API
523           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                       "`%s': Sending message size %d\n", "DHT API", tsize);
525 #endif
526           memcpy (buf, handle->current->msg, tsize);
527           finish (handle, GNUNET_OK);
528           return tsize;
529         }
530       else
531         {
532           return 0;
533         }
534     }
535   /* Have no pending request */
536   return 0;
537 }
538
539
540 /**
541  * Try to (re)connect to the dht service.
542  *
543  * @return GNUNET_YES on success, GNUNET_NO on failure.
544  */
545 static int
546 try_connect (struct GNUNET_DHT_Handle *handle)
547 {
548   if (handle->client != NULL)
549     return GNUNET_OK;
550   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
551   if (handle->client != NULL)
552     return GNUNET_YES;
553 #if DEBUG_STATISTICS
554   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555               _("Failed to connect to the dht service!\n"));
556 #endif
557   return GNUNET_NO;
558 }
559
560
561 /**
562  * Try to send messages from list of messages to send
563  */
564 static void
565 process_pending_message (struct GNUNET_DHT_Handle *handle)
566 {
567
568   if (handle->current == NULL)
569     return;                     /* action already pending */
570   if (GNUNET_YES != try_connect (handle))
571     {
572       finish (handle, GNUNET_SYSERR);
573       return;
574     }
575
576   if (NULL ==
577       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
578                                                          ntohs (handle->
579                                                                 current->msg->
580                                                                 size),
581                                                          handle->current->
582                                                          timeout, GNUNET_YES,
583                                                          &transmit_pending,
584                                                          handle)))
585     {
586 #if DEBUG_DHT_API
587       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588                   "Failed to transmit request to dht service.\n");
589 #endif
590       finish (handle, GNUNET_SYSERR);
591       return;
592     }
593 #if DEBUG_DHT_API
594   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595               "`%s': Scheduled sending message of size %d to service\n",
596               "DHT API", ntohs (handle->current->msg->size));
597 #endif
598 }
599
600 /**
601  * Iterator called on each result obtained from a generic route
602  * operation
603  */
604 void
605 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
606 {
607   struct GNUNET_DHT_GetHandle *get_handle = cls;
608   struct GNUNET_DHT_GetResultMessage *result;
609   size_t data_size;
610   char *result_data;
611
612   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
613     return;
614
615   GNUNET_assert (ntohs (reply->size) >=
616                  sizeof (struct GNUNET_DHT_GetResultMessage));
617   result = (struct GNUNET_DHT_GetResultMessage *) reply;
618   data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
619
620   result_data = (char *) &result[1];    /* Set data pointer to end of message */
621
622   get_handle->get_context.iter (get_handle->get_context.iter_cls,
623                                 result->expiration, &result->key,
624                                 ntohs (result->type), data_size, result_data);
625 }
626
627
628 /**
629  * Iterator called on each result obtained from a generic route
630  * operation
631  */
632 void
633 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
634 {
635   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
636
637 #if DEBUG_DHT_API
638       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
639                   "Find peer iterator called.\n");
640 #endif
641   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_HELLO)
642     return;
643
644   GNUNET_assert (ntohs (reply->size) >=
645                  sizeof (struct GNUNET_MessageHeader));
646
647   find_peer_handle->find_peer_context.proc (find_peer_handle->
648                                             find_peer_context.proc_cls,
649                                             (struct GNUNET_HELLO_Message *)reply);
650 }
651
652 /**
653  * Perform an asynchronous FIND_PEER operation on the DHT.
654  *
655  * @param handle handle to the DHT service
656  * @param key the key to look up
657  * @param desired_replication_level how many peers should ultimately receive
658  *                this message (advisory only, target may be too high for the
659  *                given DHT or not hit exactly).
660  * @param options options for routing
661  * @param enc send the encapsulated message to a peer close to the key
662  * @param iter function to call on each result, NULL if no replies are expected
663  * @param iter_cls closure for iter
664  * @param timeout when to abort with an error if we fail to get
665  *                a confirmation for the request (when necessary) or how long
666  *                to wait for tramission to the service
667  * @param cont continuation to call when done;
668  *             reason will be TIMEOUT on error,
669  *             reason will be PREREQ_DONE on success
670  * @param cont_cls closure for cont
671  *
672  * @return handle to stop the request, NULL if the request is "fire and forget"
673  */
674 struct GNUNET_DHT_RouteHandle *
675 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
676                         const GNUNET_HashCode * key,
677                         unsigned int desired_replication_level,
678                         enum GNUNET_DHT_RouteOption options,
679                         const struct GNUNET_MessageHeader *enc,
680                         struct GNUNET_TIME_Relative timeout,
681                         GNUNET_DHT_ReplyProcessor iter,
682                         void *iter_cls,
683                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
684 {
685   struct GNUNET_DHT_RouteHandle *route_handle;
686   struct PendingMessage *pending;
687   struct GNUNET_DHT_RouteMessage *message;
688   size_t expects_response;
689   uint16_t msize;
690   GNUNET_HashCode uid_key;
691   uint64_t uid;
692
693   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
694     {
695       GNUNET_break (0);
696       return NULL;
697     }
698   expects_response = GNUNET_YES;
699   if (iter == NULL)
700     expects_response = GNUNET_NO;
701   uid = handle->uid_gen++;
702   if (expects_response)
703     {
704       route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
705       memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
706       route_handle->iter = iter;
707       route_handle->iter_cls = iter_cls;
708       route_handle->dht_handle = handle;
709       route_handle->uid = uid;
710 #if DEBUG_DHT_API
711       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
712                   "`%s': Unique ID is %llu\n", "DHT API", uid);
713 #endif
714       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
715                                          &uid_key, route_handle,
716                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
717     }
718   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
719   message = GNUNET_malloc (msize);
720   message->header.size = htons (msize);
721   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE);
722   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
723   message->options = htonl (options);
724   message->desired_replication_level = htonl (options);
725   message->unique = htonl (expects_response);
726   message->unique_id = GNUNET_htonll (uid);
727   memcpy (&message[1], enc, ntohs (enc->size));
728   pending = GNUNET_malloc (sizeof (struct PendingMessage));
729   pending->msg = &message->header;
730   pending->timeout = timeout;
731   pending->cont = cont;
732   pending->cont_cls = cont_cls;
733   pending->unique_id = uid;
734   GNUNET_assert (handle->current == NULL);
735   handle->current = pending;
736   process_pending_message (handle);
737   return route_handle;
738 }
739
740
741 /**
742  * Perform an asynchronous GET operation on the DHT identified.
743  *
744  * @param handle handle to the DHT service
745  * @param timeout how long to wait for transmission of this request to the service
746  * @param type expected type of the response object
747  * @param key the key to look up
748  * @param iter function to call on each result
749  * @param iter_cls closure for iter
750  * @param cont continuation to call once message sent
751  * @param cont_cls closure for continuation
752  *
753  * @return handle to stop the async get
754  */
755 struct GNUNET_DHT_GetHandle *
756 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
757                       struct GNUNET_TIME_Relative timeout,
758                       uint32_t type,
759                       const GNUNET_HashCode * key,
760                       GNUNET_DHT_GetIterator iter,
761                       void *iter_cls,
762                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
763 {
764   struct GNUNET_DHT_GetHandle *get_handle;
765   struct GNUNET_DHT_GetMessage *get_msg;
766
767   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
768     return NULL;
769
770   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
771   get_handle->get_context.iter = iter;
772   get_handle->get_context.iter_cls = iter_cls;
773
774 #if DEBUG_DHT_API
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "`%s': Inserting pending get request with key %s\n", "DHT API",
777               GNUNET_h2s (key));
778 #endif
779
780   get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
781   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
782   get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
783   get_msg->type = htons (type);
784
785   get_handle->route_handle =
786     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
787                             &get_reply_iterator, get_handle, cont, cont_cls);
788   return get_handle;
789 }
790
791
792 /**
793  * Stop a previously issued routing request
794  *
795  * @param route_handle handle to the request to stop
796  * @param cont continuation to call once this message is sent to the service or times out
797  * @param cont_cls closure for the continuation
798  */
799 void
800 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
801                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
802 {
803   struct PendingMessage *pending;
804   struct GNUNET_DHT_StopMessage *message;
805   size_t msize;
806   GNUNET_HashCode uid_key;
807
808   msize = sizeof (struct GNUNET_DHT_StopMessage);
809   message = GNUNET_malloc (msize);
810   message->header.size = htons (msize);
811   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
812 #if DEBUG_DHT_API
813   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
814               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
815               route_handle->uid);
816 #endif
817   message->unique_id = GNUNET_htonll (route_handle->uid);
818   GNUNET_assert (route_handle->dht_handle->current == NULL);
819   pending = GNUNET_malloc (sizeof (struct PendingMessage));
820   pending->msg = (struct GNUNET_MessageHeader *) message;
821   pending->timeout = DEFAULT_DHT_TIMEOUT;
822   pending->cont = cont;
823   pending->cont_cls = cont_cls;
824   pending->unique_id = route_handle->uid;
825   GNUNET_assert (route_handle->dht_handle->current == NULL);
826   route_handle->dht_handle->current = pending;
827   process_pending_message (route_handle->dht_handle);
828   hash_from_uid (route_handle->uid, &uid_key);
829   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
830                  (route_handle->dht_handle->outstanding_requests, &uid_key,
831                   route_handle) == GNUNET_YES);
832 }
833
834
835 /**
836  * Stop async DHT-get.
837  *
838  * @param get_handle handle to the GET operation to stop
839  * @param cont continuation to call once this message is sent to the service or times out
840  * @param cont_cls closure for the continuation
841  */
842 void
843 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
844                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
845 {
846 #if DEBUG_DHT_API
847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848               "`%s': Removing pending get request with key %s, uid %llu\n",
849               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
850               get_handle->route_handle->uid);
851 #endif
852   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
853   GNUNET_free (get_handle);
854 }
855
856
857 /**
858  * Perform an asynchronous FIND PEER operation on the DHT.
859  *
860  * @param handle handle to the DHT service
861  * @param timeout timeout for this request to be sent to the
862  *        service
863  * @param options routing options for this message
864  * @param key the key to look up
865  * @param proc function to call on each result
866  * @param proc_cls closure for proc
867  * @param cont continuation to call once message sent
868  * @param cont_cls closure for continuation
869  *
870  * @return handle to stop the async get, NULL on error
871  */
872 struct GNUNET_DHT_FindPeerHandle *
873 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
874                             struct GNUNET_TIME_Relative timeout,
875                             enum GNUNET_DHT_RouteOption options,
876                             const GNUNET_HashCode * key,
877                             GNUNET_DHT_FindPeerProcessor proc,
878                             void *proc_cls,
879                             GNUNET_SCHEDULER_Task cont,
880                             void *cont_cls)
881 {
882   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
883   struct GNUNET_MessageHeader *find_peer_msg;
884
885   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
886     return NULL;
887
888   find_peer_handle =
889     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
890   find_peer_handle->find_peer_context.proc = proc;
891   find_peer_handle->find_peer_context.proc_cls = proc_cls;
892
893 #if DEBUG_DHT_API
894   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
896               "FIND PEER", GNUNET_h2s (key));
897 #endif
898
899   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
900   find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader));
901   find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
902   find_peer_handle->route_handle =
903     GNUNET_DHT_route_start (handle, key, 0, options, find_peer_msg,
904                             timeout, &find_peer_reply_iterator,
905                             find_peer_handle, cont, cont_cls);
906   return find_peer_handle;
907 }
908
909 /**
910  * Stop async find peer.  Frees associated resources.
911  *
912  * @param find_peer_handle GET operation to stop.
913  * @param cont continuation to call once this message is sent to the service or times out
914  * @param cont_cls closure for the continuation
915  */
916 void
917 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
918                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
919 {
920 #if DEBUG_DHT_API
921   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
923               "DHT API", "FIND PEER",
924               GNUNET_h2s (&find_peer_handle->route_handle->key),
925               find_peer_handle->route_handle->uid);
926 #endif
927   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
928   GNUNET_free (find_peer_handle);
929
930 }
931
932
933 /**
934  * Perform a PUT operation storing data in the DHT.
935  *
936  * @param handle handle to DHT service
937  * @param key the key to store under
938  * @param type type of the value
939  * @param size number of bytes in data; must be less than 64k
940  * @param data the data to store
941  * @param exp desired expiration time for the value
942  * @param timeout how long to wait for transmission of this request
943  * @param cont continuation to call when done;
944  *             reason will be TIMEOUT on error,
945  *             reason will be PREREQ_DONE on success
946  * @param cont_cls closure for cont
947  *
948  * @return GNUNET_YES if put message is queued for transmission
949  */
950 void
951 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
952                 const GNUNET_HashCode * key,
953                 uint32_t type,
954                 uint32_t size,
955                 const char *data,
956                 struct GNUNET_TIME_Absolute exp,
957                 struct GNUNET_TIME_Relative timeout,
958                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
959 {
960   struct GNUNET_DHT_PutMessage *put_msg;
961   size_t msize;
962
963   if (handle->current != NULL)
964     {
965       GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
966                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
967       return;
968     }
969
970 #if DEBUG_DHT_API
971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972               "`%s': Inserting pending put request with key %s\n", "DHT API",
973               GNUNET_h2s (key));
974 #endif
975
976   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
977   put_msg = GNUNET_malloc (msize);
978   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
979   put_msg->header.size = htons (msize);
980   put_msg->type = htons (type);
981   put_msg->data_size = htons (size);
982   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
983   memcpy (&put_msg[1], data, size);
984
985   GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
986                           NULL, cont, cont_cls);
987
988   GNUNET_free (put_msg);
989 }