breaking DHT code
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  */
28
29 #include "platform.h"
30 #include "gnunet_bandwidth_lib.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_container_lib.h"
34 #include "gnunet_arm_service.h"
35 #include "gnunet_hello_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_server_lib.h"
38 #include "gnunet_time_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "dht.h"
41
42 #define DEBUG_DHT_API GNUNET_NO
43
44 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
45
46 struct PendingMessage
47 {
48   /**
49    * Message that is pending
50    */
51   struct GNUNET_MessageHeader *msg;
52
53   /**
54    * Timeout for this message
55    */
56   struct GNUNET_TIME_Relative timeout;
57
58   /**
59    * Continuation to call on message send
60    * or message receipt confirmation
61    */
62   GNUNET_SCHEDULER_Task cont;
63
64   /**
65    * Continuation closure
66    */
67   void *cont_cls;
68
69   /**
70    * Whether or not to await verification the message
71    * was received by the service
72    */
73   size_t is_unique;
74
75   /**
76    * Unique ID for this request
77    */
78   uint64_t unique_id;
79
80 };
81
82 struct GNUNET_DHT_GetContext
83 {
84   /**
85    * Iterator to call on data receipt
86    */
87   GNUNET_DHT_GetIterator iter;
88
89   /**
90    * Closure for the iterator callback
91    */
92   void *iter_cls;
93
94 };
95
96 struct GNUNET_DHT_FindPeerContext
97 {
98   /**
99    * Iterator to call on data receipt
100    */
101   GNUNET_DHT_FindPeerProcessor proc;
102
103   /**
104    * Closure for the iterator callback
105    */
106   void *proc_cls;
107
108 };
109
110 /**
111  * Handle to control a unique operation (one that is
112  * expected to return results)
113  */
114 struct GNUNET_DHT_RouteHandle
115 {
116
117   /**
118    * Unique identifier for this request (for key collisions)
119    */
120   uint64_t uid;
121
122   /**
123    * Key that this get request is for
124    */
125   GNUNET_HashCode key;
126
127   /**
128    * Iterator to call on data receipt
129    */
130   GNUNET_DHT_ReplyProcessor iter;
131
132   /**
133    * Closure for the iterator callback
134    */
135   void *iter_cls;
136
137   /**
138    * Main handle to this DHT api
139    */
140   struct GNUNET_DHT_Handle *dht_handle;
141 };
142
143 /**
144  * Handle for a non unique request, holds callback
145  * which needs to be called before we allow other
146  * messages to be processed and sent to the DHT service
147  */
148 struct GNUNET_DHT_NonUniqueHandle
149 {
150   /**
151    * Key that this get request is for
152    */
153   GNUNET_HashCode key;
154
155   /**
156    * Type of data get request was for
157    */
158   uint32_t type;
159
160   /**
161    * Continuation to call on service
162    * confirmation of message receipt.
163    */
164   GNUNET_SCHEDULER_Task cont;
165
166   /**
167    * Send continuation cls
168    */
169   void *cont_cls;
170 };
171
172 /**
173  * Handle to control a get operation.
174  */
175 struct GNUNET_DHT_GetHandle
176 {
177   /**
178    * Handle to the actual route operation for the get
179    */
180   struct GNUNET_DHT_RouteHandle *route_handle;
181
182   /**
183    * The context of the get request
184    */
185   struct GNUNET_DHT_GetContext get_context;
186 };
187
188 /**
189  * Handle to control a find peer operation.
190  */
191 struct GNUNET_DHT_FindPeerHandle
192 {
193   /**
194      * Handle to the actual route operation for the request
195      */
196   struct GNUNET_DHT_RouteHandle *route_handle;
197
198     /**
199      * The context of the get request
200      */
201   struct GNUNET_DHT_FindPeerContext find_peer_context;
202 };
203
204
205 /**
206  * Connection to the DHT service.
207  */
208 struct GNUNET_DHT_Handle
209 {
210   /**
211    * Our scheduler.
212    */
213   struct GNUNET_SCHEDULER_Handle *sched;
214
215   /**
216    * Configuration to use.
217    */
218   const struct GNUNET_CONFIGURATION_Handle *cfg;
219
220   /**
221    * Socket (if available).
222    */
223   struct GNUNET_CLIENT_Connection *client;
224
225   /**
226    * Currently pending transmission request.
227    */
228   struct GNUNET_CLIENT_TransmitHandle *th;
229
230   /**
231    * Message we are currently sending, only allow
232    * a single message to be queued.  If not unique
233    * (typically a put request), await a confirmation
234    * from the service that the message was received.
235    * If unique, just fire and forget.
236    */
237   struct PendingMessage *current;
238
239   /**
240    * Hash map containing the current outstanding unique requests
241    */
242   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
243
244   /**
245    * Non unique handle.  If set don't schedule another non
246    * unique request.
247    */
248   struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
249
250   /**
251    * Generator for unique ids.
252    */
253   uint64_t uid_gen;
254
255 };
256
257
258 /**
259  * Convert unique ID to hash code.
260  *
261  * @param uid unique ID to convert
262  * @param hash set to uid (extended with zeros)
263  */
264 static void
265 hash_from_uid (uint64_t uid,
266                GNUNET_HashCode *hash)
267 {
268   memset (hash, 0, sizeof(GNUNET_HashCode));
269   *((uint64_t*)hash) = uid;
270 }
271
272
273 /**
274  * Handler for messages received from the DHT service
275  * a demultiplexer which handles numerous message types
276  *
277  */
278 void
279 service_message_handler (void *cls,
280                          const struct GNUNET_MessageHeader *msg)
281 {
282   struct GNUNET_DHT_Handle *handle = cls;
283   struct GNUNET_DHT_Message *dht_msg;
284   struct GNUNET_DHT_StopMessage *stop_msg;
285   struct GNUNET_MessageHeader *enc_msg;
286   struct GNUNET_DHT_RouteHandle *route_handle;
287   uint64_t uid;
288   GNUNET_HashCode uid_hash;
289   size_t enc_size;
290   /* TODO: find out message type, handle callbacks for different types of messages.
291    * Should be a non unique acknowledgment, or unique result. */
292
293   if (msg == NULL)
294     {
295 #if DEBUG_DHT_API
296       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
297                   "`%s': Received NULL from server, connection down!\n",
298                   "DHT API");
299 #endif
300       GNUNET_CLIENT_disconnect (handle->client);
301       handle->client = GNUNET_CLIENT_connect (handle->sched, 
302                                               "dht", 
303                                               handle->cfg);
304       /* FIXME: re-transmit *all* of our GET requests AND re-start
305          receiving responses! */
306       return;
307     }
308
309   switch (ntohs (msg->type))
310     {
311     case GNUNET_MESSAGE_TYPE_DHT:
312       {
313         dht_msg = (struct GNUNET_DHT_Message *) msg;
314         uid = GNUNET_ntohll (dht_msg->unique_id);
315 #if DEBUG_DHT_API
316         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
317                     "`%s': Received response to message (uid %llu)\n",
318                     "DHT API", uid);
319 #endif
320         if (ntohl (dht_msg->unique))
321           {
322             hash_from_uid (uid, &uid_hash);
323             route_handle =
324               GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
325                                                  &uid_hash);
326             if (route_handle == NULL)   /* We have no recollection of this request */
327               {
328 #if DEBUG_DHT_API
329                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330                             "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
331                             "DHT API", uid);
332 #endif
333               }
334             else
335               {
336                 enc_size =
337                   ntohs (dht_msg->header.size) -
338                   sizeof (struct GNUNET_DHT_Message);
339                 GNUNET_assert (enc_size > 0);
340                 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
341                 route_handle->iter (route_handle->iter_cls, enc_msg);
342               }
343           }
344         break;
345       }
346     case GNUNET_MESSAGE_TYPE_DHT_STOP:
347       {
348         stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
349         uid = GNUNET_ntohll (stop_msg->unique_id);
350 #if DEBUG_DHT_API
351         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352                     "`%s': Received response to message (uid %llu), current uid %llu\n",
353                     "DHT API", uid, handle->current->unique_id);
354 #endif
355         if (handle->current->unique_id == uid)
356           {
357 #if DEBUG_DHT_API
358             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359                         "`%s': Have pending confirmation for this message!\n",
360                         "DHT API", uid);
361 #endif
362             if (handle->current->cont != NULL)
363               GNUNET_SCHEDULER_add_continuation (handle->sched,
364                                                  handle->current->cont,
365                                                  handle->current->cont_cls,
366                                                  GNUNET_SCHEDULER_REASON_PREREQ_DONE);
367
368             GNUNET_free (handle->current->msg);
369             GNUNET_free (handle->current);
370             handle->current = NULL;
371           }
372         break;
373       }
374     default:
375       {
376         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
377                     "`%s': Received unknown message type %d\n", "DHT API",
378                     ntohs (msg->type));
379       }
380     }
381   GNUNET_CLIENT_receive (handle->client,
382                          &service_message_handler,
383                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
384
385 }
386
387
388 /**
389  * Initialize the connection with the DHT service.
390  *
391  * @param sched scheduler to use
392  * @param cfg configuration to use
393  * @param ht_len size of the internal hash table to use for
394  *               processing multiple GET/FIND requests in parallel
395  *
396  * @return handle to the DHT service, or NULL on error
397  */
398 struct GNUNET_DHT_Handle *
399 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
400                     const struct GNUNET_CONFIGURATION_Handle *cfg,
401                     unsigned int ht_len)
402 {
403   struct GNUNET_DHT_Handle *handle;
404
405   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
406   handle->cfg = cfg;
407   handle->sched = sched;
408   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
409   if (handle->client == NULL)
410     {
411       GNUNET_free (handle);
412       return NULL;
413     }
414   handle->outstanding_requests =
415     GNUNET_CONTAINER_multihashmap_create (ht_len);
416 #if DEBUG_DHT_API
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418               "`%s': Connection to service in progress\n", "DHT API");
419 #endif
420   GNUNET_CLIENT_receive (handle->client,
421                          &service_message_handler,
422                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
423   return handle;
424 }
425
426
427 /**
428  * Shutdown connection with the DHT service.
429  *
430  * @param handle handle of the DHT connection to stop
431  */
432 void
433 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
434 {
435 #if DEBUG_DHT_API
436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
438 #endif
439   GNUNET_assert (handle != NULL);
440   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
441     {
442       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
443       handle->th = NULL;
444     }
445   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
446     GNUNET_free (handle->current);
447
448   if (handle->client != NULL)   /* Finally, disconnect from the service */
449     {
450       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
451       handle->client = NULL;
452     }
453   /* Either assert that outstanding_requests is empty */
454   /* FIXME: handle->outstanding_requests not freed! */
455   GNUNET_free (handle);
456 }
457
458
459 /**
460  * Send complete (or failed), schedule next (or don't)
461  */
462 static void
463 finish (struct GNUNET_DHT_Handle *handle, int code)
464 {
465   /* TODO: if code is not GNUNET_OK, do something! */
466   struct PendingMessage *pos = handle->current;
467 #if DEBUG_DHT_API
468   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
469 #endif
470   GNUNET_assert (pos != NULL);
471
472   if (pos->is_unique)
473     {
474       if (pos->cont != NULL)
475         {
476           if (code == GNUNET_SYSERR)
477             GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
478                                                pos->cont_cls,
479                                                GNUNET_SCHEDULER_REASON_TIMEOUT);
480           else
481             GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
482                                                pos->cont_cls,
483                                                GNUNET_SCHEDULER_REASON_PREREQ_DONE);
484         }
485
486       GNUNET_free (pos->msg);
487       handle->current = NULL;
488       GNUNET_free (pos);
489     }
490   /* Otherwise we need to wait for a response to this message! */
491 }
492
493
494 /**
495  * Transmit the next pending message, called by notify_transmit_ready
496  */
497 static size_t
498 transmit_pending (void *cls, size_t size, void *buf)
499 {
500   struct GNUNET_DHT_Handle *handle = cls;
501   size_t tsize;
502
503 #if DEBUG_DHT_API
504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
505               "`%s': In transmit_pending\n", "DHT API");
506 #endif
507   if (buf == NULL)
508     {
509 #if DEBUG_DHT_API
510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
512 #endif
513       /* FIXME: free associated resources or summat */
514       finish (handle, GNUNET_SYSERR);
515       return 0;
516     }
517
518   handle->th = NULL;
519
520   if (handle->current != NULL)
521     {
522       tsize = ntohs (handle->current->msg->size);
523       if (size >= tsize)
524         {
525 #if DEBUG_DHT_API
526           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
527                       "`%s': Sending message size %d\n", "DHT API", tsize);
528 #endif
529           memcpy (buf, handle->current->msg, tsize);
530           finish (handle, GNUNET_OK);
531           return tsize;
532         }
533       else
534         {
535           return 0;
536         }
537     }
538   /* Have no pending request */
539   return 0;
540 }
541
542
543 /**
544  * Try to (re)connect to the dht service.
545  *
546  * @return GNUNET_YES on success, GNUNET_NO on failure.
547  */
548 static int
549 try_connect (struct GNUNET_DHT_Handle *handle)
550 {
551   if (handle->client != NULL)
552     return GNUNET_OK;
553   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
554   if (handle->client != NULL)
555     return GNUNET_YES;
556 #if DEBUG_STATISTICS
557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558               _("Failed to connect to the dht service!\n"));
559 #endif
560   return GNUNET_NO;
561 }
562
563
564 /**
565  * Try to send messages from list of messages to send
566  */
567 static void
568 process_pending_message (struct GNUNET_DHT_Handle *handle)
569 {
570
571   if (handle->current == NULL)
572     return;                     /* action already pending */
573   if (GNUNET_YES != try_connect (handle))
574     {
575       finish (handle, GNUNET_SYSERR);
576       return;
577     }
578
579   if (NULL ==
580       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
581                                                          ntohs (handle->
582                                                                 current->msg->
583                                                                 size),
584                                                          handle->current->
585                                                          timeout, GNUNET_YES,
586                                                          &transmit_pending,
587                                                          handle)))
588     {
589 #if DEBUG_DHT_API
590       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591                   "Failed to transmit request to dht service.\n");
592 #endif
593       finish (handle, GNUNET_SYSERR);
594       return;
595     }
596 #if DEBUG_DHT_API
597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598               "`%s': Scheduled sending message of size %d to service\n",
599               "DHT API", ntohs (handle->current->msg->size));
600 #endif
601 }
602
603 /**
604  * Iterator called on each result obtained from a generic route
605  * operation
606  */
607 void
608 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
609 {
610   struct GNUNET_DHT_GetHandle *get_handle = cls;
611   struct GNUNET_DHT_GetResultMessage *result;
612   size_t data_size;
613   char *result_data;
614
615   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
616     return;
617
618   GNUNET_assert (ntohs (reply->size) >=
619                  sizeof (struct GNUNET_DHT_GetResultMessage));
620   result = (struct GNUNET_DHT_GetResultMessage *) reply;
621   data_size = ntohs (result->data_size);
622   GNUNET_assert (ntohs (reply->size) ==
623                  sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
624   result_data = (char *) &result[1];    /* Set data pointer to end of message */
625
626   get_handle->get_context.iter (get_handle->get_context.iter_cls,
627                                 result->expiration, &result->key,
628                                 ntohs (result->type), data_size, result_data);
629 }
630
631
632 /**
633  * Iterator called on each result obtained from a generic route
634  * operation
635  */
636 void
637 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
638 {
639   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
640   struct GNUNET_DHT_FindPeerResultMessage *result;
641   size_t data_size;
642   struct GNUNET_MessageHeader *result_data;
643
644 #if DEBUG_DHT_API
645       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646                   "Find peer iterator called.\n");
647 #endif
648   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
649     return;
650
651   GNUNET_assert (ntohs (reply->size) >=
652                  sizeof (struct GNUNET_DHT_FindPeerResultMessage));
653   result = (struct GNUNET_DHT_FindPeerResultMessage *) reply;
654   data_size = ntohs (result->data_size);
655   GNUNET_assert (ntohs (reply->size) ==
656                  sizeof (struct GNUNET_DHT_FindPeerResultMessage) + data_size);
657
658   if (data_size > 0)
659     result_data = (struct GNUNET_MessageHeader *) &result[1];   /* Set data pointer to end of message */
660   else
661     result_data = NULL;
662
663   find_peer_handle->find_peer_context.proc (find_peer_handle->
664                                             find_peer_context.proc_cls,
665                                             &result->peer, result_data);
666 }
667
668 /**
669  * Perform an asynchronous FIND_PEER operation on the DHT.
670  *
671  * @param handle handle to the DHT service
672  * @param key the key to look up
673  * @param desired_replication_level how many peers should ultimately receive
674  *                this message (advisory only, target may be too high for the
675  *                given DHT or not hit exactly).
676  * @param options options for routing
677  * @param enc send the encapsulated message to a peer close to the key
678  * @param iter function to call on each result, NULL if no replies are expected
679  * @param iter_cls closure for iter
680  * @param timeout when to abort with an error if we fail to get
681  *                a confirmation for the request (when necessary) or how long
682  *                to wait for tramission to the service
683  * @param cont continuation to call when done;
684  *             reason will be TIMEOUT on error,
685  *             reason will be PREREQ_DONE on success
686  * @param cont_cls closure for cont
687  *
688  * @return handle to stop the request, NULL if the request is "fire and forget"
689  */
690 struct GNUNET_DHT_RouteHandle *
691 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
692                         const GNUNET_HashCode * key,
693                         unsigned int desired_replication_level,
694                         enum GNUNET_DHT_RouteOption options,
695                         const struct GNUNET_MessageHeader *enc,
696                         struct GNUNET_TIME_Relative timeout,
697                         GNUNET_DHT_ReplyProcessor iter,
698                         void *iter_cls,
699                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
700 {
701   struct GNUNET_DHT_RouteHandle *route_handle;
702   struct PendingMessage *pending;
703   struct GNUNET_DHT_Message *message;
704   size_t expects_response;
705   uint16_t msize;
706   GNUNET_HashCode uid_key;
707   uint64_t uid;
708
709   if (sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
710     {
711       GNUNET_break (0);
712       return NULL;
713     }
714   expects_response = GNUNET_YES;
715   if (iter == NULL)
716     expects_response = GNUNET_NO;
717   uid = handle->uid_gen++;
718   if (expects_response)
719     {
720       route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
721       memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
722       route_handle->iter = iter;
723       route_handle->iter_cls = iter_cls;
724       route_handle->dht_handle = handle;
725       route_handle->uid = uid;
726 #if DEBUG_DHT_API
727       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728                   "`%s': Unique ID is %llu\n", "DHT API", uid);
729 #endif
730       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
731                                          &uid_key, route_handle,
732                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
733     }
734   msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
735   message = GNUNET_malloc (msize);
736   message->header.size = htons (msize);
737   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
738   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
739   message->options = htonl (options);
740   message->desired_replication_level = htonl (options);
741   message->unique = htonl (expects_response);
742   message->unique_id = GNUNET_htonll (uid);
743   memcpy (&message[1], enc, ntohs (enc->size));
744   pending = GNUNET_malloc (sizeof (struct PendingMessage));
745   pending->msg = &message->header;
746   pending->timeout = timeout;
747   pending->cont = cont;
748   pending->cont_cls = cont_cls;
749   pending->expects_response = expects_response;
750   pending->unique_id = uid;
751   GNUNET_assert (handle->current == NULL);
752   handle->current = pending;
753   process_pending_message (handle);
754   return route_handle;
755 }
756
757
758 /**
759  * Perform an asynchronous GET operation on the DHT identified.
760  *
761  * @param handle handle to the DHT service
762  * @param timeout how long to wait for transmission of this request to the service
763  * @param type expected type of the response object
764  * @param key the key to look up
765  * @param iter function to call on each result
766  * @param iter_cls closure for iter
767  * @param cont continuation to call once message sent
768  * @param cont_cls closure for continuation
769  *
770  * @return handle to stop the async get
771  */
772 struct GNUNET_DHT_GetHandle *
773 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
774                       struct GNUNET_TIME_Relative timeout,
775                       uint32_t type,
776                       const GNUNET_HashCode * key,
777                       GNUNET_DHT_GetIterator iter,
778                       void *iter_cls,
779                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
780 {
781   struct GNUNET_DHT_GetHandle *get_handle;
782   struct GNUNET_DHT_GetMessage *get_msg;
783
784   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
785     return NULL;
786
787   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
788   get_handle->get_context.iter = iter;
789   get_handle->get_context.iter_cls = iter_cls;
790
791 #if DEBUG_DHT_API
792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793               "`%s': Inserting pending get request with key %s\n", "DHT API",
794               GNUNET_h2s (key));
795 #endif
796
797   get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
798   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
799   get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
800   get_msg->type = htons (type);
801
802   get_handle->route_handle =
803     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
804                             &get_reply_iterator, get_handle, cont, cont_cls);
805   return get_handle;
806 }
807
808
809 /**
810  * Stop a previously issued routing request
811  *
812  * @param route_handle handle to the request to stop
813  * @param cont continuation to call once this message is sent to the service or times out
814  * @param cont_cls closure for the continuation
815  */
816 void
817 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
818                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
819 {
820   struct PendingMessage *pending;
821   struct GNUNET_DHT_StopMessage *message;
822   size_t msize;
823   GNUNET_HashCode uid_key;
824
825   msize = sizeof (struct GNUNET_DHT_StopMessage);
826   message = GNUNET_malloc (msize);
827   message->header.size = htons (msize);
828   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
829 #if DEBUG_DHT_API
830   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
832               route_handle->uid);
833 #endif
834   message->unique_id = GNUNET_htonll (route_handle->uid);
835   GNUNET_assert (route_handle->dht_handle->current == NULL);
836   pending = GNUNET_malloc (sizeof (struct PendingMessage));
837   pending->msg = (struct GNUNET_MessageHeader *) message;
838   pending->timeout = DEFAULT_DHT_TIMEOUT;
839   pending->cont = cont;
840   pending->cont_cls = cont_cls;
841   pending->unique_id = route_handle->uid;
842   GNUNET_assert (route_handle->dht_handle->current == NULL);
843   route_handle->dht_handle->current = pending;
844   process_pending_message (route_handle->dht_handle);
845   hash_from_uid (route_handle->uid, &uid_key);
846   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
847                  (route_handle->dht_handle->outstanding_requests, &uid_key,
848                   route_handle) == GNUNET_YES);
849 }
850
851
852 /**
853  * Stop async DHT-get.
854  *
855  * @param get_handle handle to the GET operation to stop
856  * @param cont continuation to call once this message is sent to the service or times out
857  * @param cont_cls closure for the continuation
858  */
859 void
860 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
861                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
862 {
863 #if DEBUG_DHT_API
864   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865               "`%s': Removing pending get request with key %s, uid %llu\n",
866               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
867               get_handle->route_handle->uid);
868 #endif
869   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
870   GNUNET_free (get_handle);
871 }
872
873
874 /**
875  * Perform an asynchronous FIND PEER operation on the DHT.
876  *
877  * @param handle handle to the DHT service
878  * @param timeout timeout for this request to be sent to the
879  *        service
880  * @param options routing options for this message
881  * @param message a message to inject at found peers (may be null)
882  * @param key the key to look up
883  * @param proc function to call on each result
884  * @param proc_cls closure for proc
885  * @param cont continuation to call once message sent
886  * @param cont_cls closure for continuation
887  *
888  * @return handle to stop the async get, NULL on error
889  */
890 struct GNUNET_DHT_FindPeerHandle *
891 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
892                             struct GNUNET_TIME_Relative timeout,
893                             enum GNUNET_DHT_RouteOption options,
894                             struct GNUNET_MessageHeader *message,
895                             const GNUNET_HashCode * key,
896                             GNUNET_DHT_FindPeerProcessor proc,
897                             void *proc_cls,
898                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
899 {
900   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
901   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
902   size_t msize;
903
904   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
905     return NULL;
906
907   if (message != NULL)
908     msize = ntohs (message->size);
909   else
910     msize = 0;
911
912   find_peer_handle =
913     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
914   find_peer_handle->find_peer_context.proc = proc;
915   find_peer_handle->find_peer_context.proc_cls = proc_cls;
916
917 #if DEBUG_DHT_API
918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
920               "FIND PEER", GNUNET_h2s (key));
921 #endif
922
923   find_peer_msg =
924     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + msize);
925   find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
926   find_peer_msg->header.size =
927     htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
928   find_peer_msg->msg_len = msize;
929
930   if (message != NULL)
931     {
932       memcpy (&find_peer_msg[1], message, msize);
933     }
934
935   find_peer_handle->route_handle =
936     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg->header,
937                             timeout, &find_peer_reply_iterator,
938                             find_peer_handle, cont, cont_cls);
939   return find_peer_handle;
940 }
941
942 /**
943  * Stop async find peer.  Frees associated resources.
944  *
945  * @param find_peer_handle GET operation to stop.
946  * @param cont continuation to call once this message is sent to the service or times out
947  * @param cont_cls closure for the continuation
948  */
949 void
950 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
951                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
952 {
953 #if DEBUG_DHT_API
954   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
956               "DHT API", "FIND PEER",
957               GNUNET_h2s (&find_peer_handle->route_handle->key),
958               find_peer_handle->route_handle->uid);
959 #endif
960   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
961   GNUNET_free (find_peer_handle);
962
963 }
964
965
966 /**
967  * Perform a PUT operation storing data in the DHT.
968  *
969  * @param handle handle to DHT service
970  * @param key the key to store under
971  * @param type type of the value
972  * @param size number of bytes in data; must be less than 64k
973  * @param data the data to store
974  * @param exp desired expiration time for the value
975  * @param timeout how long to wait for transmission of this request
976  * @param cont continuation to call when done;
977  *             reason will be TIMEOUT on error,
978  *             reason will be PREREQ_DONE on success
979  * @param cont_cls closure for cont
980  *
981  * @return GNUNET_YES if put message is queued for transmission
982  */
983 void
984 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
985                 const GNUNET_HashCode * key,
986                 uint32_t type,
987                 uint32_t size,
988                 const char *data,
989                 struct GNUNET_TIME_Absolute exp,
990                 struct GNUNET_TIME_Relative timeout,
991                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
992 {
993   struct GNUNET_DHT_PutMessage *put_msg;
994   size_t msize;
995
996   if (handle->current != NULL)
997     {
998       GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
999                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
1000       return;
1001     }
1002
1003 #if DEBUG_DHT_API
1004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1005               "`%s': Inserting pending put request with key %s\n", "DHT API",
1006               GNUNET_h2s (key));
1007 #endif
1008
1009   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1010   put_msg = GNUNET_malloc (msize);
1011   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1012   put_msg->header.size = htons (msize);
1013   put_msg->type = htons (type);
1014   put_msg->data_size = htons (size);
1015   put_msg->expiration = exp;
1016   memcpy (&put_msg[1], data, size);
1017
1018   GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1019                           NULL, cont, cont_cls);
1020
1021   GNUNET_free (put_msg);
1022 }