9fb77d5d4fab0c014e0314a24d34288949df1f9c
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: Only allow a single message until confirmed as received by
28  *       the service.  For put messages call continuation as soon as
29  *       receipt acknowledged (then remove), for GET or other messages
30  *       only call continuation when data received.
31  *       Add unique identifier to message types requesting data to be
32  *       returned.
33  */
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_YES
48
49 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
50
51 struct PendingMessage
52 {
53   /**
54    * Message that is pending
55    */
56   struct GNUNET_MessageHeader *msg;
57
58   /**
59    * Timeout for this message
60    */
61   struct GNUNET_TIME_Relative timeout;
62
63   /**
64    * Continuation to call on message send
65    * or message receipt confirmation
66    */
67   GNUNET_SCHEDULER_Task cont;
68
69   /**
70    * Continuation closure
71    */
72   void *cont_cls;
73
74   /**
75    * Whether or not to await verification the message
76    * was received by the service
77    */
78   size_t is_unique;
79
80   /**
81    * Unique ID for this request
82    */
83   uint64_t unique_id;
84
85 };
86
87 struct GNUNET_DHT_GetContext
88 {
89
90
91   /**
92    * Iterator to call on data receipt
93    */
94   GNUNET_DHT_GetIterator iter;
95
96   /**
97    * Closure for the iterator callback
98    */
99   void *iter_cls;
100
101 };
102
103 /**
104  * Handle to control a unique operation (one that is
105  * expected to return results)
106  */
107 struct GNUNET_DHT_RouteHandle
108 {
109
110   /**
111    * Unique identifier for this request (for key collisions)
112    */
113   uint64_t uid;
114
115   /**
116    * Key that this get request is for
117    */
118   GNUNET_HashCode key;
119
120   /**
121    * Iterator to call on data receipt
122    */
123   GNUNET_DHT_ReplyProcessor iter;
124
125   /**
126    * Closure for the iterator callback
127    */
128   void *iter_cls;
129
130   /**
131    * Main handle to this DHT api
132    */
133   struct GNUNET_DHT_Handle *dht_handle;
134 };
135
136 /**
137  * Handle for a non unique request, holds callback
138  * which needs to be called before we allow other
139  * messages to be processed and sent to the DHT service
140  */
141 struct GNUNET_DHT_NonUniqueHandle
142 {
143   /**
144    * Key that this get request is for
145    */
146   GNUNET_HashCode key;
147
148   /**
149    * Type of data get request was for
150    */
151   uint32_t type;
152
153   /**
154    * Continuation to call on service
155    * confirmation of message receipt.
156    */
157   GNUNET_SCHEDULER_Task cont;
158
159   /**
160    * Send continuation cls
161    */
162   void *cont_cls;
163 };
164
165
166 /**
167  * Connection to the DHT service.
168  */
169 struct GNUNET_DHT_Handle
170 {
171   /**
172    * Our scheduler.
173    */
174   struct GNUNET_SCHEDULER_Handle *sched;
175
176   /**
177    * Configuration to use.
178    */
179   const struct GNUNET_CONFIGURATION_Handle *cfg;
180
181   /**
182    * Socket (if available).
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Currently pending transmission request.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Message we are currently sending, only allow
193    * a single message to be queued.  If not unique
194    * (typically a put request), await a confirmation
195    * from the service that the message was received.
196    * If unique, just fire and forget.
197    */
198   struct PendingMessage *current;
199
200   /**
201    * Hash map containing the current outstanding unique requests
202    */
203   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
204
205   /**
206    * Non unique handle.  If set don't schedule another non
207    * unique request.
208    */
209   struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
210
211   /**
212    * Kill off the connection and any pending messages.
213    */
214   int do_destroy;
215
216 };
217
218 static struct GNUNET_TIME_Relative default_request_timeout;
219
220 /* Forward declaration */
221 static void process_pending_message(struct GNUNET_DHT_Handle *handle);
222
223 static GNUNET_HashCode * hash_from_uid(uint64_t uid)
224 {
225   int count;
226   int remaining;
227   GNUNET_HashCode *hash;
228   hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
229   count = 0;
230
231   while (count < sizeof(GNUNET_HashCode))
232     {
233       remaining = sizeof(GNUNET_HashCode) - count;
234       if (remaining > sizeof(uid))
235         remaining = sizeof(uid);
236
237       memcpy(hash, &uid, remaining);
238       count += remaining;
239     }
240
241   return hash;
242 }
243
244 /**
245  * Handler for messages received from the DHT service
246  * a demultiplexer which handles numerous message types
247  *
248  */
249 void service_message_handler (void *cls,
250                               const struct GNUNET_MessageHeader *msg)
251 {
252   struct GNUNET_DHT_Handle *handle = cls;
253   struct GNUNET_DHT_Message *dht_msg;
254   struct GNUNET_DHT_StopMessage *stop_msg;
255   struct GNUNET_MessageHeader *enc_msg;
256   struct GNUNET_DHT_RouteHandle *route_handle;
257   uint64_t uid;
258   GNUNET_HashCode *uid_hash;
259   size_t enc_size;
260   /* TODO: find out message type, handle callbacks for different types of messages.
261    * Should be a non unique acknowledgment, or unique result. */
262
263   if (msg == NULL)
264   {
265 #if DEBUG_DHT_API
266           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267                       "`%s': Received NULL from server, connection down?\n", "DHT API");
268 #endif
269     return;
270   }
271
272   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT)
273   {
274     dht_msg = (struct GNUNET_DHT_Message *)msg;
275     uid = GNUNET_ntohll(dht_msg->unique_id);
276 #if DEBUG_DHT_API
277     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278                 "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
279 #endif
280     if (ntohs(dht_msg->unique))
281       {
282         uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
283         route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
284         GNUNET_free(uid_hash);
285         if (route_handle == NULL) /* We have no recollection of this request */
286           {
287 #if DEBUG_DHT_API
288           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289                       "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
290 #endif
291           }
292         else
293           {
294             enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
295             GNUNET_assert(enc_size > 0);
296             enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
297             route_handle->iter(route_handle->iter_cls, enc_msg);
298           }
299       }
300   }
301   else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP)
302   {
303     stop_msg = (struct GNUNET_DHT_StopMessage *)msg;
304     uid = GNUNET_ntohll(stop_msg->unique_id);
305 #if DEBUG_DHT_API
306     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307                 "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id);
308 #endif
309     if (handle->current->unique_id == uid)
310       {
311 #if DEBUG_DHT_API
312         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313                     "`%s': Have pending confirmation for this message!\n", "DHT API", uid);
314 #endif
315         if (handle->current->cont != NULL)
316           GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
317
318         GNUNET_free(handle->current->msg);
319         GNUNET_free(handle->current);
320         handle->current = NULL;
321       }
322   }
323   else
324   {
325 #if DEBUG_DHT_API
326     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327                 "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type));
328 #endif
329   }
330
331   GNUNET_CLIENT_receive (handle->client,
332                          &service_message_handler,
333                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
334
335 }
336
337
338 /**
339  * Initialize the connection with the DHT service.
340  *
341  * @param sched scheduler to use
342  * @param cfg configuration to use
343  * @param ht_len size of the internal hash table to use for
344  *               processing multiple GET/FIND requests in parallel
345  *
346  * @return handle to the DHT service, or NULL on error
347  */
348 struct GNUNET_DHT_Handle *
349 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
350                     const struct GNUNET_CONFIGURATION_Handle *cfg,
351                     unsigned int ht_len)
352 {
353   struct GNUNET_DHT_Handle *handle;
354
355   handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
356
357   default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
358   handle->cfg = cfg;
359   handle->sched = sched;
360
361   handle->current = NULL;
362   handle->do_destroy = GNUNET_NO;
363   handle->th = NULL;
364
365   handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
366   handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
367
368   if (handle->client == NULL)
369     {
370       GNUNET_free(handle);
371       return NULL;
372     }
373 #if DEBUG_DHT_API
374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375               "`%s': Connection to service in progress\n", "DHT API");
376 #endif
377   GNUNET_CLIENT_receive (handle->client,
378                          &service_message_handler,
379                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
380
381   return handle;
382 }
383
384
385 /**
386  * Shutdown connection with the DHT service.
387  *
388  * @param handle handle of the DHT connection to stop
389  */
390 void
391 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
392 {
393 #if DEBUG_DHT_API
394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
396 #endif
397   GNUNET_assert(handle != NULL);
398
399   if (handle->th != NULL) /* We have a live transmit request in the Aether */
400     {
401       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
402       handle->th = NULL;
403     }
404   if (handle->current != NULL) /* We are trying to send something now, clean it up */
405     GNUNET_free(handle->current);
406
407   if (handle->client != NULL) /* Finally, disconnect from the service */
408     {
409       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
410       handle->client = NULL;
411     }
412
413   GNUNET_free (handle);
414 }
415
416
417 /**
418  * Send complete (or failed), schedule next (or don't)
419  */
420 static void
421 finish (struct GNUNET_DHT_Handle *handle, int code)
422 {
423   /* TODO: if code is not GNUNET_OK, do something! */
424   struct PendingMessage *pos = handle->current;
425 #if DEBUG_DHT_API
426       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427                   "`%s': Finish called!\n", "DHT API");
428 #endif
429   GNUNET_assert(pos != NULL);
430
431   if (pos->is_unique)
432     {
433       if (pos->cont != NULL)
434       {
435         if (code == GNUNET_SYSERR)
436           GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
437         else
438           GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
439       }
440
441       GNUNET_free(pos->msg);
442       handle->current = NULL;
443       GNUNET_free(pos);
444     }
445   /* Otherwise we need to wait for a response to this message! */
446 }
447
448 /**
449  * Transmit the next pending message, called by notify_transmit_ready
450  */
451 static size_t
452 transmit_pending (void *cls, size_t size, void *buf)
453 {
454   struct GNUNET_DHT_Handle *handle = cls;
455   size_t tsize;
456
457 #if DEBUG_DHT_API
458       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459                   "`%s': In transmit_pending\n", "DHT API");
460 #endif
461   if (buf == NULL)
462     {
463 #if DEBUG_DHT_API
464       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
466 #endif
467       /* FIXME: free associated resources or summat */
468       finish(handle, GNUNET_SYSERR);
469       return 0;
470     }
471
472   handle->th = NULL;
473
474   if (handle->current != NULL)
475   {
476     tsize = ntohs(handle->current->msg->size);
477     if (size >= tsize)
478     {
479 #if DEBUG_DHT_API
480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481                   "`%s': Sending message size %d\n", "DHT API", tsize);
482 #endif
483       memcpy(buf, handle->current->msg, tsize);
484       finish(handle, GNUNET_OK);
485       return tsize;
486     }
487     else
488     {
489       return 0;
490     }
491   }
492   /* Have no pending request */
493   return 0;
494 }
495
496
497 /**
498  * Try to (re)connect to the dht service.
499  *
500  * @return GNUNET_YES on success, GNUNET_NO on failure.
501  */
502 static int
503 try_connect (struct GNUNET_DHT_Handle *handle)
504 {
505   if (handle->client != NULL)
506     return GNUNET_OK;
507   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
508   if (handle->client != NULL)
509     return GNUNET_YES;
510 #if DEBUG_STATISTICS
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512               _("Failed to connect to the dht service!\n"));
513 #endif
514   return GNUNET_NO;
515 }
516
517
518 /**
519  * Try to send messages from list of messages to send
520  */
521 static void process_pending_message(struct GNUNET_DHT_Handle *handle)
522 {
523
524   if (handle->current == NULL)
525     return;                     /* action already pending */
526   if (GNUNET_YES != try_connect (handle))
527     {
528       finish (handle, GNUNET_SYSERR);
529       return;
530     }
531
532   /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
533   if (handle->do_destroy)
534     {
535       //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
536     }
537
538
539   if (NULL ==
540       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
541                                                     ntohs(handle->current->msg->size),
542                                                     handle->current->timeout,
543                                                     GNUNET_YES,
544                                                     &transmit_pending, handle)))
545     {
546 #if DEBUG_DHT_API
547       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548                   "Failed to transmit request to dht service.\n");
549 #endif
550       finish (handle, GNUNET_SYSERR);
551     }
552 #if DEBUG_DHT_API
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
555 #endif
556 }
557
558 /**
559  * Iterator called on each result obtained from a generic route
560  * operation
561  */
562 void get_reply_iterator (void *cls,
563                          const struct GNUNET_MessageHeader *reply)
564 {
565
566 }
567
568 /**
569  * Perform an asynchronous FIND_PEER operation on the DHT.
570  *
571  * @param handle handle to the DHT service
572  * @param key the key to look up
573  * @param desired_replication_level how many peers should ultimately receive
574  *                this message (advisory only, target may be too high for the
575  *                given DHT or not hit exactly).
576  * @param options options for routing
577  * @param enc send the encapsulated message to a peer close to the key
578  * @param iter function to call on each result, NULL if no replies are expected
579  * @param iter_cls closure for iter
580  * @param timeout when to abort with an error if we fail to get
581  *                a confirmation for the request (when necessary) or how long
582  *                to wait for tramission to the service
583  * @param cont continuation to call when done;
584  *             reason will be TIMEOUT on error,
585  *             reason will be PREREQ_DONE on success
586  * @param cont_cls closure for cont
587  *
588  * @return handle to stop the request, NULL if the request is "fire and forget"
589  */
590 struct GNUNET_DHT_RouteHandle *
591 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
592                         const GNUNET_HashCode *key,
593                         unsigned int desired_replication_level,
594                         enum GNUNET_DHT_RouteOption options,
595                         const struct GNUNET_MessageHeader *enc,
596                         struct GNUNET_TIME_Relative timeout,
597                         GNUNET_DHT_ReplyProcessor iter,
598                         void *iter_cls,
599                         GNUNET_SCHEDULER_Task cont,
600                         void *cont_cls)
601 {
602   struct GNUNET_DHT_RouteHandle *route_handle;
603   struct PendingMessage *pending;
604   struct GNUNET_DHT_Message *message;
605   size_t is_unique;
606   size_t msize;
607   GNUNET_HashCode *uid_key;
608   uint64_t uid;
609
610   is_unique = GNUNET_YES;
611   if (iter == NULL)
612     is_unique = GNUNET_NO;
613
614   route_handle = NULL;
615   uid_key = NULL;
616
617   do
618   {
619     GNUNET_free_non_null(uid_key);
620     uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
621     uid_key = hash_from_uid(uid);
622   } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES);
623
624   if (is_unique)
625     {
626       route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
627       memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
628       route_handle->iter = iter;
629       route_handle->iter_cls = iter_cls;
630       route_handle->dht_handle = handle;
631       route_handle->uid = uid;
632 #if DEBUG_DHT_API
633   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634               "`%s': Unique ID is %llu\n", "DHT API", uid);
635 #endif
636       /**
637        * Store based on random identifier!
638        */
639       GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
640       msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
641
642     }
643   else
644     {
645       msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
646     }
647
648   GNUNET_free(uid_key);
649   message = GNUNET_malloc(msize);
650   message->header.size = htons(msize);
651   message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
652   memcpy(&message->key, key, sizeof(GNUNET_HashCode));
653   message->options = htons(options);
654   message->desired_replication_level = htons(options);
655   message->unique = htons(is_unique);
656   message->unique_id = GNUNET_htonll(uid);
657   memcpy(&message[1], enc, ntohs(enc->size));
658
659   pending = GNUNET_malloc(sizeof(struct PendingMessage));
660   pending->msg = &message->header;
661   pending->timeout = timeout;
662   pending->cont = cont;
663   pending->cont_cls = cont_cls;
664   pending->is_unique = is_unique;
665   pending->unique_id = uid;
666
667   GNUNET_assert(handle->current == NULL);
668
669   handle->current = pending;
670
671   process_pending_message(handle);
672
673   return route_handle;
674 }
675
676 void
677 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
678
679
680 /**
681  * Perform an asynchronous GET operation on the DHT identified.
682  *
683  * @param handle handle to the DHT service
684  * @param timeout how long to wait for transmission of this request to the service
685  * @param type expected type of the response object
686  * @param key the key to look up
687  * @param iter function to call on each result
688  * @param iter_cls closure for iter
689  * @param cont continuation to call once message sent
690  * @param cont_cls closure for continuation
691  *
692  * @return handle to stop the async get
693  */
694 struct GNUNET_DHT_RouteHandle *
695 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
696                       struct GNUNET_TIME_Relative timeout,
697                       uint32_t type,
698                       const GNUNET_HashCode * key,
699                       GNUNET_DHT_GetIterator iter,
700                       void *iter_cls,
701                       GNUNET_SCHEDULER_Task cont,
702                       void *cont_cls)
703 {
704   struct GNUNET_DHT_GetContext *get_context;
705   struct GNUNET_DHT_GetMessage *get_msg;
706
707   if (handle->current != NULL) /* Can't send right now, we have a pending message... */
708     return NULL;
709
710   get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext));
711   get_context->iter = iter;
712   get_context->iter_cls = iter_cls;
713
714 #if DEBUG_DHT_API
715   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716               "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key));
717 #endif
718
719   get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
720   get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
721   get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
722   get_msg->type = htonl(type);
723
724   return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, cont, cont_cls);
725
726 }
727
728
729 void
730 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
731 {
732   struct PendingMessage *pending;
733   struct GNUNET_DHT_StopMessage *message;
734   size_t msize;
735   GNUNET_HashCode *uid_key;
736
737   msize = sizeof(struct GNUNET_DHT_StopMessage);
738
739   message = GNUNET_malloc(msize);
740   message->header.size = htons(msize);
741   message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
742 #if DEBUG_DHT_API
743       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744                   "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid);
745 #endif
746   message->unique_id = GNUNET_htonll(route_handle->uid);
747
748   GNUNET_assert(route_handle->dht_handle->current == NULL);
749
750   pending = GNUNET_malloc(sizeof(struct PendingMessage));
751   pending->msg = (struct GNUNET_MessageHeader *)message;
752   pending->timeout = DEFAULT_DHT_TIMEOUT;
753   pending->cont = NULL;
754   pending->cont_cls = NULL;
755   pending->is_unique = GNUNET_NO;
756   pending->unique_id = route_handle->uid;
757
758   GNUNET_assert(route_handle->dht_handle->current == NULL);
759
760   route_handle->dht_handle->current = pending;
761
762   process_pending_message(route_handle->dht_handle);
763
764   uid_key = hash_from_uid(route_handle->uid);
765
766   if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES)
767     {
768 #if DEBUG_DHT_API
769       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
770                   "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
771 #endif
772     }
773   GNUNET_free(uid_key);
774   return;
775 }
776
777
778 /**
779  * Stop async DHT-get.
780  *
781  * @param get_handle handle to the GET operation to stop
782  */
783 void
784 GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *get_handle)
785 {
786 #if OLDREMOVE
787   struct GNUNET_DHT_GetMessage *get_msg;
788   struct GNUNET_DHT_Handle *handle;
789   GNUNET_HashCode *uid_key;
790 #endif
791
792   GNUNET_DHT_route_stop(get_handle);
793
794 #if OLDREMOVE
795   uid_key = hash_from_uid(get_handle->uid);
796   GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES);
797
798   if (handle->do_destroy == GNUNET_NO)
799     {
800       get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
801       get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
802       get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
803
804
805     }
806 #endif
807 #if DEBUG_DHT_API
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809               "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&get_handle->key), get_handle->uid);
810 #endif
811 }
812
813
814 /**
815  * Perform a PUT operation storing data in the DHT.
816  *
817  * @param h handle to DHT service
818  * @param key the key to store under
819  * @param type type of the value
820  * @param size number of bytes in data; must be less than 64k
821  * @param data the data to store
822  * @param exp desired expiration time for the value
823  * @param cont continuation to call when done;
824  *             reason will be TIMEOUT on error,
825  *             reason will be PREREQ_DONE on success
826  * @param cont_cls closure for cont
827  *
828  * @return GNUNET_YES if put message is queued for transmission
829  */
830 void
831 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
832                 const GNUNET_HashCode * key,
833                 uint32_t type,
834                 uint32_t size,
835                 const char *data,
836                 struct GNUNET_TIME_Absolute exp,
837                 struct GNUNET_TIME_Relative timeout,
838                 GNUNET_SCHEDULER_Task cont,
839                 void *cont_cls)
840 {
841   struct GNUNET_DHT_PutMessage *put_msg;
842   size_t msize;
843
844   if (handle->current != NULL)
845     {
846       GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
847       return;
848     }
849
850 #if DEBUG_DHT_API
851   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852               "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key));
853 #endif
854
855   msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
856   put_msg = GNUNET_malloc(msize);
857   put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
858   put_msg->header.size = htons(msize);
859   put_msg->type = htonl(type);
860   put_msg->data_size = htons(size);
861   put_msg->expiration = exp;
862   memcpy(&put_msg[1], data, size);
863
864   GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls);
865
866   GNUNET_free(put_msg);
867 }