service does simple put and get into datacache, test case verifies it works
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  */
28
29 #include "platform.h"
30 #include "gnunet_bandwidth_lib.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_container_lib.h"
34 #include "gnunet_arm_service.h"
35 #include "gnunet_hello_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_server_lib.h"
38 #include "gnunet_time_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "dht.h"
41
42 #define DEBUG_DHT_API GNUNET_YES
43
44 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
45
46 struct PendingMessage
47 {
48   /**
49    * Message that is pending
50    */
51   struct GNUNET_MessageHeader *msg;
52
53   /**
54    * Timeout for this message
55    */
56   struct GNUNET_TIME_Relative timeout;
57
58   /**
59    * Continuation to call on message send
60    * or message receipt confirmation
61    */
62   GNUNET_SCHEDULER_Task cont;
63
64   /**
65    * Continuation closure
66    */
67   void *cont_cls;
68
69   /**
70    * Whether or not to await verification the message
71    * was received by the service
72    */
73   size_t is_unique;
74
75   /**
76    * Unique ID for this request
77    */
78   uint64_t unique_id;
79
80 };
81
82 struct GNUNET_DHT_GetContext
83 {
84   /**
85    * Iterator to call on data receipt
86    */
87   GNUNET_DHT_GetIterator iter;
88
89   /**
90    * Closure for the iterator callback
91    */
92   void *iter_cls;
93
94 };
95
96 struct GNUNET_DHT_FindPeerContext
97 {
98   /**
99    * Iterator to call on data receipt
100    */
101   GNUNET_DHT_FindPeerProcessor proc;
102
103   /**
104    * Closure for the iterator callback
105    */
106   void *proc_cls;
107
108 };
109
110 /**
111  * Handle to control a unique operation (one that is
112  * expected to return results)
113  */
114 struct GNUNET_DHT_RouteHandle
115 {
116
117   /**
118    * Unique identifier for this request (for key collisions)
119    */
120   uint64_t uid;
121
122   /**
123    * Key that this get request is for
124    */
125   GNUNET_HashCode key;
126
127   /**
128    * Iterator to call on data receipt
129    */
130   GNUNET_DHT_ReplyProcessor iter;
131
132   /**
133    * Closure for the iterator callback
134    */
135   void *iter_cls;
136
137   /**
138    * Main handle to this DHT api
139    */
140   struct GNUNET_DHT_Handle *dht_handle;
141 };
142
143 /**
144  * Handle for a non unique request, holds callback
145  * which needs to be called before we allow other
146  * messages to be processed and sent to the DHT service
147  */
148 struct GNUNET_DHT_NonUniqueHandle
149 {
150   /**
151    * Key that this get request is for
152    */
153   GNUNET_HashCode key;
154
155   /**
156    * Type of data get request was for
157    */
158   uint32_t type;
159
160   /**
161    * Continuation to call on service
162    * confirmation of message receipt.
163    */
164   GNUNET_SCHEDULER_Task cont;
165
166   /**
167    * Send continuation cls
168    */
169   void *cont_cls;
170 };
171
172 /**
173  * Handle to control a get operation.
174  */
175 struct GNUNET_DHT_GetHandle
176 {
177   /**
178    * Handle to the actual route operation for the get
179    */
180   struct GNUNET_DHT_RouteHandle *route_handle;
181
182   /**
183    * The context of the get request
184    */
185   struct GNUNET_DHT_GetContext get_context;
186 };
187
188 /**
189  * Handle to control a find peer operation.
190  */
191 struct GNUNET_DHT_FindPeerHandle
192 {
193   /**
194      * Handle to the actual route operation for the request
195      */
196   struct GNUNET_DHT_RouteHandle *route_handle;
197
198     /**
199      * The context of the get request
200      */
201   struct GNUNET_DHT_FindPeerContext find_peer_context;
202 };
203
204
205 /**
206  * Connection to the DHT service.
207  */
208 struct GNUNET_DHT_Handle
209 {
210   /**
211    * Our scheduler.
212    */
213   struct GNUNET_SCHEDULER_Handle *sched;
214
215   /**
216    * Configuration to use.
217    */
218   const struct GNUNET_CONFIGURATION_Handle *cfg;
219
220   /**
221    * Socket (if available).
222    */
223   struct GNUNET_CLIENT_Connection *client;
224
225   /**
226    * Currently pending transmission request.
227    */
228   struct GNUNET_CLIENT_TransmitHandle *th;
229
230   /**
231    * Message we are currently sending, only allow
232    * a single message to be queued.  If not unique
233    * (typically a put request), await a confirmation
234    * from the service that the message was received.
235    * If unique, just fire and forget.
236    */
237   struct PendingMessage *current;
238
239   /**
240    * Hash map containing the current outstanding unique requests
241    */
242   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
243
244   /**
245    * Non unique handle.  If set don't schedule another non
246    * unique request.
247    */
248   struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
249
250   /**
251    * Kill off the connection and any pending messages.
252    */
253   int do_destroy;
254
255 };
256
257 static struct GNUNET_TIME_Relative default_request_timeout;
258
259 /* Forward declaration */
260 static void process_pending_message (struct GNUNET_DHT_Handle *handle);
261
262 static GNUNET_HashCode *
263 hash_from_uid (uint64_t uid)
264 {
265   int count;
266   int remaining;
267   GNUNET_HashCode *hash;
268   hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
269   count = 0;
270
271   while (count < sizeof (GNUNET_HashCode))
272     {
273       remaining = sizeof (GNUNET_HashCode) - count;
274       if (remaining > sizeof (uid))
275         remaining = sizeof (uid);
276
277       memcpy (hash, &uid, remaining);
278       count += remaining;
279     }
280
281   return hash;
282 }
283
284 /**
285  * Handler for messages received from the DHT service
286  * a demultiplexer which handles numerous message types
287  *
288  */
289 void
290 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
291 {
292   struct GNUNET_DHT_Handle *handle = cls;
293   struct GNUNET_DHT_Message *dht_msg;
294   struct GNUNET_DHT_StopMessage *stop_msg;
295   struct GNUNET_MessageHeader *enc_msg;
296   struct GNUNET_DHT_RouteHandle *route_handle;
297   uint64_t uid;
298   GNUNET_HashCode *uid_hash;
299   size_t enc_size;
300   /* TODO: find out message type, handle callbacks for different types of messages.
301    * Should be a non unique acknowledgment, or unique result. */
302
303   if (msg == NULL)
304     {
305 #if DEBUG_DHT_API
306       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307                   "`%s': Received NULL from server, connection down?\n",
308                   "DHT API");
309 #endif
310       return;
311     }
312
313   switch (ntohs (msg->type))
314     {
315     case GNUNET_MESSAGE_TYPE_DHT:
316       {
317         dht_msg = (struct GNUNET_DHT_Message *) msg;
318         uid = GNUNET_ntohll (dht_msg->unique_id);
319 #if DEBUG_DHT_API
320         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
321                     "`%s': Received response to message (uid %llu)\n",
322                     "DHT API", uid);
323 #endif
324         if (ntohs (dht_msg->unique))
325           {
326             uid_hash = hash_from_uid (uid);
327             route_handle =
328               GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
329                                                  uid_hash);
330             GNUNET_free (uid_hash);
331             if (route_handle == NULL)   /* We have no recollection of this request */
332               {
333 #if DEBUG_DHT_API
334                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335                             "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
336                             "DHT API", uid);
337 #endif
338               }
339             else
340               {
341                 enc_size =
342                   ntohs (dht_msg->header.size) -
343                   sizeof (struct GNUNET_DHT_Message);
344                 GNUNET_assert (enc_size > 0);
345                 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
346                 route_handle->iter (route_handle->iter_cls, enc_msg);
347
348               }
349           }
350         break;
351       }
352     case GNUNET_MESSAGE_TYPE_DHT_STOP:
353       {
354         stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
355         uid = GNUNET_ntohll (stop_msg->unique_id);
356 #if DEBUG_DHT_API
357         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
358                     "`%s': Received response to message (uid %llu), current uid %llu\n",
359                     "DHT API", uid, handle->current->unique_id);
360 #endif
361         if (handle->current->unique_id == uid)
362           {
363 #if DEBUG_DHT_API
364             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365                         "`%s': Have pending confirmation for this message!\n",
366                         "DHT API", uid);
367 #endif
368             if (handle->current->cont != NULL)
369               GNUNET_SCHEDULER_add_continuation (handle->sched,
370                                                  handle->current->cont,
371                                                  handle->current->cont_cls,
372                                                  GNUNET_SCHEDULER_REASON_PREREQ_DONE);
373
374             GNUNET_free (handle->current->msg);
375             GNUNET_free (handle->current);
376             handle->current = NULL;
377           }
378         break;
379       }
380     default:
381       {
382         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
383                     "`%s': Received unknown message type %d\n", "DHT API",
384                     ntohs (msg->type));
385       }
386     }
387   GNUNET_CLIENT_receive (handle->client,
388                          &service_message_handler,
389                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
390
391 }
392
393
394 /**
395  * Initialize the connection with the DHT service.
396  *
397  * @param sched scheduler to use
398  * @param cfg configuration to use
399  * @param ht_len size of the internal hash table to use for
400  *               processing multiple GET/FIND requests in parallel
401  *
402  * @return handle to the DHT service, or NULL on error
403  */
404 struct GNUNET_DHT_Handle *
405 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
406                     const struct GNUNET_CONFIGURATION_Handle *cfg,
407                     unsigned int ht_len)
408 {
409   struct GNUNET_DHT_Handle *handle;
410
411   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
412
413   default_request_timeout =
414     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
415   handle->cfg = cfg;
416   handle->sched = sched;
417
418   handle->current = NULL;
419   handle->do_destroy = GNUNET_NO;
420   handle->th = NULL;
421
422   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
423   handle->outstanding_requests =
424     GNUNET_CONTAINER_multihashmap_create (ht_len);
425
426   if (handle->client == NULL)
427     {
428       GNUNET_free (handle);
429       return NULL;
430     }
431 #if DEBUG_DHT_API
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "`%s': Connection to service in progress\n", "DHT API");
434 #endif
435   GNUNET_CLIENT_receive (handle->client,
436                          &service_message_handler,
437                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
438
439   return handle;
440 }
441
442
443 /**
444  * Shutdown connection with the DHT service.
445  *
446  * @param handle handle of the DHT connection to stop
447  */
448 void
449 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
450 {
451 #if DEBUG_DHT_API
452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
454 #endif
455   GNUNET_assert (handle != NULL);
456
457   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
458     {
459       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
460       handle->th = NULL;
461     }
462   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
463     GNUNET_free (handle->current);
464
465   if (handle->client != NULL)   /* Finally, disconnect from the service */
466     {
467       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
468       handle->client = NULL;
469     }
470
471   GNUNET_free (handle);
472 }
473
474
475 /**
476  * Send complete (or failed), schedule next (or don't)
477  */
478 static void
479 finish (struct GNUNET_DHT_Handle *handle, int code)
480 {
481   /* TODO: if code is not GNUNET_OK, do something! */
482   struct PendingMessage *pos = handle->current;
483 #if DEBUG_DHT_API
484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
485 #endif
486   GNUNET_assert (pos != NULL);
487
488   if (pos->is_unique)
489     {
490       if (pos->cont != NULL)
491         {
492           if (code == GNUNET_SYSERR)
493             GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
494                                                pos->cont_cls,
495                                                GNUNET_SCHEDULER_REASON_TIMEOUT);
496           else
497             GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
498                                                pos->cont_cls,
499                                                GNUNET_SCHEDULER_REASON_PREREQ_DONE);
500         }
501
502       GNUNET_free (pos->msg);
503       handle->current = NULL;
504       GNUNET_free (pos);
505     }
506   /* Otherwise we need to wait for a response to this message! */
507 }
508
509 /**
510  * Transmit the next pending message, called by notify_transmit_ready
511  */
512 static size_t
513 transmit_pending (void *cls, size_t size, void *buf)
514 {
515   struct GNUNET_DHT_Handle *handle = cls;
516   size_t tsize;
517
518 #if DEBUG_DHT_API
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520               "`%s': In transmit_pending\n", "DHT API");
521 #endif
522   if (buf == NULL)
523     {
524 #if DEBUG_DHT_API
525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
527 #endif
528       /* FIXME: free associated resources or summat */
529       finish (handle, GNUNET_SYSERR);
530       return 0;
531     }
532
533   handle->th = NULL;
534
535   if (handle->current != NULL)
536     {
537       tsize = ntohs (handle->current->msg->size);
538       if (size >= tsize)
539         {
540 #if DEBUG_DHT_API
541           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                       "`%s': Sending message size %d\n", "DHT API", tsize);
543 #endif
544           memcpy (buf, handle->current->msg, tsize);
545           finish (handle, GNUNET_OK);
546           return tsize;
547         }
548       else
549         {
550           return 0;
551         }
552     }
553   /* Have no pending request */
554   return 0;
555 }
556
557
558 /**
559  * Try to (re)connect to the dht service.
560  *
561  * @return GNUNET_YES on success, GNUNET_NO on failure.
562  */
563 static int
564 try_connect (struct GNUNET_DHT_Handle *handle)
565 {
566   if (handle->client != NULL)
567     return GNUNET_OK;
568   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
569   if (handle->client != NULL)
570     return GNUNET_YES;
571 #if DEBUG_STATISTICS
572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573               _("Failed to connect to the dht service!\n"));
574 #endif
575   return GNUNET_NO;
576 }
577
578
579 /**
580  * Try to send messages from list of messages to send
581  */
582 static void
583 process_pending_message (struct GNUNET_DHT_Handle *handle)
584 {
585
586   if (handle->current == NULL)
587     return;                     /* action already pending */
588   if (GNUNET_YES != try_connect (handle))
589     {
590       finish (handle, GNUNET_SYSERR);
591       return;
592     }
593
594   /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
595   if (handle->do_destroy)
596     {
597       //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
598     }
599
600
601   if (NULL ==
602       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
603                                                          ntohs (handle->
604                                                                 current->msg->
605                                                                 size),
606                                                          handle->current->
607                                                          timeout, GNUNET_YES,
608                                                          &transmit_pending,
609                                                          handle)))
610     {
611 #if DEBUG_DHT_API
612       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613                   "Failed to transmit request to dht service.\n");
614 #endif
615       finish (handle, GNUNET_SYSERR);
616     }
617 #if DEBUG_DHT_API
618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619               "`%s': Scheduled sending message of size %d to service\n",
620               "DHT API", ntohs (handle->current->msg->size));
621 #endif
622 }
623
624 /**
625  * Iterator called on each result obtained from a generic route
626  * operation
627  */
628 void
629 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
630 {
631   struct GNUNET_DHT_GetHandle *get_handle = cls;
632   struct GNUNET_DHT_GetResultMessage *result;
633   size_t data_size;
634   char *result_data;
635
636   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
637     return;
638
639   GNUNET_assert (ntohs (reply->size) >=
640                  sizeof (struct GNUNET_DHT_GetResultMessage));
641   result = (struct GNUNET_DHT_GetResultMessage *) reply;
642   data_size = ntohs (result->data_size);
643   GNUNET_assert (ntohs (reply->size) ==
644                  sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
645   result_data = (char *) &result[1];    /* Set data pointer to end of message */
646
647   get_handle->get_context.iter (get_handle->get_context.iter_cls,
648                                 result->expiration, &result->key,
649                                 ntohs (result->type), data_size, result_data);
650 }
651
652
653 /**
654  * Iterator called on each result obtained from a generic route
655  * operation
656  */
657 void
658 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
659 {
660   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
661   struct GNUNET_DHT_FindPeerResultMessage *result;
662   size_t data_size;
663   struct GNUNET_MessageHeader *result_data;
664
665   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
666     return;
667
668   GNUNET_assert (ntohs (reply->size) >=
669                  sizeof (struct GNUNET_DHT_FindPeerResultMessage));
670   result = (struct GNUNET_DHT_FindPeerResultMessage *) reply;
671   data_size = ntohs (result->data_size);
672   GNUNET_assert (ntohs (reply->size) ==
673                  sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
674
675   if (data_size > 0)
676     result_data = (struct GNUNET_MessageHeader *) &result[1];   /* Set data pointer to end of message */
677   else
678     result_data = NULL;
679
680   find_peer_handle->find_peer_context.proc (find_peer_handle->
681                                             find_peer_context.proc_cls,
682                                             &result->peer, result_data);
683 }
684
685 /**
686  * Perform an asynchronous FIND_PEER operation on the DHT.
687  *
688  * @param handle handle to the DHT service
689  * @param key the key to look up
690  * @param desired_replication_level how many peers should ultimately receive
691  *                this message (advisory only, target may be too high for the
692  *                given DHT or not hit exactly).
693  * @param options options for routing
694  * @param enc send the encapsulated message to a peer close to the key
695  * @param iter function to call on each result, NULL if no replies are expected
696  * @param iter_cls closure for iter
697  * @param timeout when to abort with an error if we fail to get
698  *                a confirmation for the request (when necessary) or how long
699  *                to wait for tramission to the service
700  * @param cont continuation to call when done;
701  *             reason will be TIMEOUT on error,
702  *             reason will be PREREQ_DONE on success
703  * @param cont_cls closure for cont
704  *
705  * @return handle to stop the request, NULL if the request is "fire and forget"
706  */
707 struct GNUNET_DHT_RouteHandle *
708 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
709                         const GNUNET_HashCode * key,
710                         unsigned int desired_replication_level,
711                         enum GNUNET_DHT_RouteOption options,
712                         const struct GNUNET_MessageHeader *enc,
713                         struct GNUNET_TIME_Relative timeout,
714                         GNUNET_DHT_ReplyProcessor iter,
715                         void *iter_cls,
716                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
717 {
718   struct GNUNET_DHT_RouteHandle *route_handle;
719   struct PendingMessage *pending;
720   struct GNUNET_DHT_Message *message;
721   size_t is_unique;
722   size_t msize;
723   GNUNET_HashCode *uid_key;
724   uint64_t uid;
725
726   is_unique = GNUNET_YES;
727   if (iter == NULL)
728     is_unique = GNUNET_NO;
729
730   route_handle = NULL;
731   uid_key = NULL;
732
733   do
734     {
735       GNUNET_free_non_null (uid_key);
736       uid = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
737       uid_key = hash_from_uid (uid);
738     }
739   while (GNUNET_CONTAINER_multihashmap_contains
740          (handle->outstanding_requests, uid_key) == GNUNET_YES);
741
742   if (is_unique)
743     {
744       route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
745       memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
746       route_handle->iter = iter;
747       route_handle->iter_cls = iter_cls;
748       route_handle->dht_handle = handle;
749       route_handle->uid = uid;
750 #if DEBUG_DHT_API
751       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
752                   "`%s': Unique ID is %llu\n", "DHT API", uid);
753 #endif
754       /**
755        * Store based on random identifier!
756        */
757       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
758                                          uid_key, route_handle,
759                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
760       msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
761
762     }
763   else
764     {
765       msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
766     }
767
768   GNUNET_free (uid_key);
769   message = GNUNET_malloc (msize);
770   message->header.size = htons (msize);
771   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
772   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
773   message->options = htons (options);
774   message->desired_replication_level = htons (options);
775   message->unique = htons (is_unique);
776   message->unique_id = GNUNET_htonll (uid);
777   memcpy (&message[1], enc, ntohs (enc->size));
778
779   pending = GNUNET_malloc (sizeof (struct PendingMessage));
780   pending->msg = &message->header;
781   pending->timeout = timeout;
782   pending->cont = cont;
783   pending->cont_cls = cont_cls;
784   pending->is_unique = is_unique;
785   pending->unique_id = uid;
786
787   GNUNET_assert (handle->current == NULL);
788
789   handle->current = pending;
790
791   process_pending_message (handle);
792
793   return route_handle;
794 }
795
796 void
797 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
798                        GNUNET_SCHEDULER_Task cont, void *cont_cls);
799
800 /**
801  * Perform an asynchronous GET operation on the DHT identified.
802  *
803  * @param handle handle to the DHT service
804  * @param timeout how long to wait for transmission of this request to the service
805  * @param type expected type of the response object
806  * @param key the key to look up
807  * @param iter function to call on each result
808  * @param iter_cls closure for iter
809  * @param cont continuation to call once message sent
810  * @param cont_cls closure for continuation
811  *
812  * @return handle to stop the async get
813  */
814 struct GNUNET_DHT_GetHandle *
815 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
816                       struct GNUNET_TIME_Relative timeout,
817                       uint32_t type,
818                       const GNUNET_HashCode * key,
819                       GNUNET_DHT_GetIterator iter,
820                       void *iter_cls,
821                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
822 {
823   struct GNUNET_DHT_GetHandle *get_handle;
824   struct GNUNET_DHT_GetMessage *get_msg;
825
826   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
827     return NULL;
828
829   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
830   get_handle->get_context.iter = iter;
831   get_handle->get_context.iter_cls = iter_cls;
832
833 #if DEBUG_DHT_API
834   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
835               "`%s': Inserting pending get request with key %s\n", "DHT API",
836               GNUNET_h2s (key));
837 #endif
838
839   get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
840   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
841   get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
842   get_msg->type = htonl (type);
843
844   get_handle->route_handle =
845     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
846                             &get_reply_iterator, get_handle, cont, cont_cls);
847   return get_handle;
848 }
849
850
851 void
852 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
853                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
854 {
855   struct PendingMessage *pending;
856   struct GNUNET_DHT_StopMessage *message;
857   size_t msize;
858   GNUNET_HashCode *uid_key;
859
860   msize = sizeof (struct GNUNET_DHT_StopMessage);
861
862   message = GNUNET_malloc (msize);
863   message->header.size = htons (msize);
864   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
865 #if DEBUG_DHT_API
866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
868               route_handle->uid);
869 #endif
870   message->unique_id = GNUNET_htonll (route_handle->uid);
871
872   GNUNET_assert (route_handle->dht_handle->current == NULL);
873
874   pending = GNUNET_malloc (sizeof (struct PendingMessage));
875   pending->msg = (struct GNUNET_MessageHeader *) message;
876   pending->timeout = DEFAULT_DHT_TIMEOUT;
877   pending->cont = cont;
878   pending->cont_cls = cont_cls;
879   pending->is_unique = GNUNET_NO;
880   pending->unique_id = route_handle->uid;
881
882   GNUNET_assert (route_handle->dht_handle->current == NULL);
883
884   route_handle->dht_handle->current = pending;
885
886   process_pending_message (route_handle->dht_handle);
887
888   uid_key = hash_from_uid (route_handle->uid);
889
890   if (GNUNET_CONTAINER_multihashmap_remove
891       (route_handle->dht_handle->outstanding_requests, uid_key,
892        route_handle) != GNUNET_YES)
893     {
894 #if DEBUG_DHT_API
895       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896                   "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n",
897                   "DHT API", GNUNET_h2s (uid_key), route_handle->uid);
898 #endif
899     }
900   GNUNET_free (uid_key);
901   return;
902 }
903
904
905 /**
906  * Stop async DHT-get.
907  *
908  * @param get_handle handle to the GET operation to stop
909  */
910 void
911 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
912                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
913 {
914 #if DEBUG_DHT_API
915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
916               "`%s': Removing pending get request with key %s, uid %llu\n",
917               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
918               get_handle->route_handle->uid);
919 #endif
920   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
921   GNUNET_free (get_handle);
922
923 }
924
925
926 /**
927  * Perform an asynchronous FIND PEER operation on the DHT.
928  *
929  * @param handle handle to the DHT service
930  * @param timeout timeout for this request to be sent to the
931  *        service
932  * @param options routing options for this message
933  * @param message a message to inject at found peers (may be null)
934  * @param key the key to look up
935  * @param iter function to call on each result
936  * @param iter_cls closure for iter
937  * @param cont continuation to call once message sent
938  * @param cont_cls closure for continuation
939  *
940  * @return handle to stop the async get, NULL on error
941  */
942 struct GNUNET_DHT_FindPeerHandle *
943 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
944                             struct GNUNET_TIME_Relative timeout,
945                             enum GNUNET_DHT_RouteOption options,
946                             struct GNUNET_MessageHeader *message,
947                             const GNUNET_HashCode * key,
948                             GNUNET_DHT_FindPeerProcessor proc,
949                             void *proc_cls,
950                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
951 {
952   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
953   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
954   size_t msize;
955
956   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
957     return NULL;
958
959   if (message != NULL)
960     msize = ntohs (message->size);
961   else
962     msize = 0;
963
964   find_peer_handle =
965     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
966   find_peer_handle->find_peer_context.proc = proc;
967   find_peer_handle->find_peer_context.proc_cls = proc_cls;
968
969 #if DEBUG_DHT_API
970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
972               "FIND PEER", GNUNET_h2s (key));
973 #endif
974
975   find_peer_msg =
976     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + msize);
977   find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
978   find_peer_msg->header.size =
979     htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
980   find_peer_msg->msg_len = msize;
981
982   if (message != NULL)
983     {
984       memcpy (&find_peer_msg[1], message, msize);
985     }
986
987   find_peer_handle->route_handle =
988     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg->header,
989                             timeout, &find_peer_reply_iterator,
990                             find_peer_handle, cont, cont_cls);
991   return find_peer_handle;
992 }
993
994 /**
995  * Stop async find peer.  Frees associated resources.
996  *
997  * @param find_peer_handle GET operation to stop.
998  */
999 void
1000 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1001                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1002 {
1003 #if DEBUG_DHT_API
1004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1005               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1006               "DHT API", "FIND PEER",
1007               GNUNET_h2s (&find_peer_handle->route_handle->key),
1008               find_peer_handle->route_handle->uid);
1009 #endif
1010   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1011   GNUNET_free (find_peer_handle);
1012
1013 }
1014
1015
1016 /**
1017  * Perform a PUT operation storing data in the DHT.
1018  *
1019  * @param h handle to DHT service
1020  * @param key the key to store under
1021  * @param type type of the value
1022  * @param size number of bytes in data; must be less than 64k
1023  * @param data the data to store
1024  * @param exp desired expiration time for the value
1025  * @param cont continuation to call when done;
1026  *             reason will be TIMEOUT on error,
1027  *             reason will be PREREQ_DONE on success
1028  * @param cont_cls closure for cont
1029  *
1030  * @return GNUNET_YES if put message is queued for transmission
1031  */
1032 void
1033 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1034                 const GNUNET_HashCode * key,
1035                 uint32_t type,
1036                 uint32_t size,
1037                 const char *data,
1038                 struct GNUNET_TIME_Absolute exp,
1039                 struct GNUNET_TIME_Relative timeout,
1040                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1041 {
1042   struct GNUNET_DHT_PutMessage *put_msg;
1043   size_t msize;
1044
1045   if (handle->current != NULL)
1046     {
1047       GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1048                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
1049       return;
1050     }
1051
1052 #if DEBUG_DHT_API
1053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054               "`%s': Inserting pending put request with key %s\n", "DHT API",
1055               GNUNET_h2s (key));
1056 #endif
1057
1058   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1059   put_msg = GNUNET_malloc (msize);
1060   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1061   put_msg->header.size = htons (msize);
1062   put_msg->type = htonl (type);
1063   put_msg->data_size = htons (size);
1064   put_msg->expiration = exp;
1065   memcpy (&put_msg[1], data, size);
1066
1067   GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1068                           NULL, cont, cont_cls);
1069
1070   GNUNET_free (put_msg);
1071 }