stuff
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: Only allow a single message until confirmed as received by
28  *       the service.  For put messages call continuation as soon as
29  *       receipt acknowledged (then remove), for GET or other messages
30  *       only call continuation when data received.
31  *       Add unique identifier to message types requesting data to be
32  *       returned.
33  */
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_YES
48
49 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
50
51 struct PendingMessage
52 {
53   /**
54    * Message that is pending
55    */
56   struct GNUNET_MessageHeader *msg;
57
58   /**
59    * Timeout for this message
60    */
61   struct GNUNET_TIME_Relative timeout;
62
63   /**
64    * Continuation to call on message send
65    * or message receipt confirmation
66    */
67   GNUNET_SCHEDULER_Task cont;
68
69   /**
70    * Continuation closure
71    */
72   void *cont_cls;
73
74   /**
75    * Whether or not to await verification the message
76    * was received by the service
77    */
78   size_t is_unique;
79
80   /**
81    * Unique ID for this request
82    */
83   uint64_t unique_id;
84
85 };
86
87 struct GNUNET_DHT_GetContext
88 {
89
90
91   /**
92    * Iterator to call on data receipt
93    */
94   GNUNET_DHT_GetIterator iter;
95
96   /**
97    * Closure for the iterator callback
98    */
99   void *iter_cls;
100
101 };
102
103 /**
104  * Handle to control a unique operation (one that is
105  * expected to return results)
106  */
107 struct GNUNET_DHT_RouteHandle
108 {
109
110   /**
111    * Unique identifier for this request (for key collisions)
112    */
113   uint64_t uid;
114
115   /**
116    * Key that this get request is for
117    */
118   GNUNET_HashCode key;
119
120   /**
121    * Iterator to call on data receipt
122    */
123   GNUNET_DHT_ReplyProcessor iter;
124
125   /**
126    * Closure for the iterator callback
127    */
128   void *iter_cls;
129
130   /**
131    * Main handle to this DHT api
132    */
133   struct GNUNET_DHT_Handle *dht_handle;
134 };
135
136 /**
137  * Handle for a non unique request, holds callback
138  * which needs to be called before we allow other
139  * messages to be processed and sent to the DHT service
140  */
141 struct GNUNET_DHT_NonUniqueHandle
142 {
143   /**
144    * Key that this get request is for
145    */
146   GNUNET_HashCode key;
147
148   /**
149    * Type of data get request was for
150    */
151   uint32_t type;
152
153   /**
154    * Continuation to call on service
155    * confirmation of message receipt.
156    */
157   GNUNET_SCHEDULER_Task cont;
158
159   /**
160    * Send continuation cls
161    */
162   void *cont_cls;
163 };
164
165
166 /**
167  * Connection to the DHT service.
168  */
169 struct GNUNET_DHT_Handle
170 {
171   /**
172    * Our scheduler.
173    */
174   struct GNUNET_SCHEDULER_Handle *sched;
175
176   /**
177    * Configuration to use.
178    */
179   const struct GNUNET_CONFIGURATION_Handle *cfg;
180
181   /**
182    * Socket (if available).
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Currently pending transmission request.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Message we are currently sending, only allow
193    * a single message to be queued.  If not unique
194    * (typically a put request), await a confirmation
195    * from the service that the message was received.
196    * If unique, just fire and forget.
197    */
198   struct PendingMessage *current;
199
200   /**
201    * Hash map containing the current outstanding unique requests
202    */
203   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
204
205   /**
206    * Non unique handle.  If set don't schedule another non
207    * unique request.
208    */
209   struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
210
211   /**
212    * Kill off the connection and any pending messages.
213    */
214   int do_destroy;
215
216 };
217
218 static struct GNUNET_TIME_Relative default_request_timeout;
219
220 /* Forward declaration */
221 static void process_pending_message(struct GNUNET_DHT_Handle *handle);
222
223 static GNUNET_HashCode * hash_from_uid(uint64_t uid)
224 {
225   int count;
226   int remaining;
227   GNUNET_HashCode *hash;
228   hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
229   count = 0;
230
231   while (count < sizeof(GNUNET_HashCode))
232     {
233       remaining = sizeof(GNUNET_HashCode) - count;
234       if (remaining > sizeof(uid))
235         remaining = sizeof(uid);
236
237       memcpy(hash, &uid, remaining);
238       count += remaining;
239     }
240
241   return hash;
242 }
243
244 /**
245  * Handler for messages received from the DHT service
246  * a demultiplexer which handles numerous message types
247  *
248  */
249 void service_message_handler (void *cls,
250                               const struct GNUNET_MessageHeader *msg)
251 {
252   struct GNUNET_DHT_Handle *handle = cls;
253   struct GNUNET_DHT_Message *dht_msg;
254   struct GNUNET_DHT_StopMessage *stop_msg;
255   struct GNUNET_MessageHeader *enc_msg;
256   struct GNUNET_DHT_RouteHandle *route_handle;
257   uint64_t uid;
258   GNUNET_HashCode *uid_hash;
259   size_t enc_size;
260   /* TODO: find out message type, handle callbacks for different types of messages.
261    * Should be a non unique acknowledgment, or unique result. */
262
263   if (msg == NULL)
264   {
265 #if DEBUG_DHT_API
266           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267                       "`%s': Received NULL from server, connection down?\n", "DHT API");
268 #endif
269     return;
270   }
271
272   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT)
273   {
274     dht_msg = (struct GNUNET_DHT_Message *)msg;
275     uid = GNUNET_ntohll(dht_msg->unique_id);
276 #if DEBUG_DHT_API
277     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278                 "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
279 #endif
280     if (ntohs(dht_msg->unique))
281       {
282         uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
283
284         route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
285         if (route_handle == NULL) /* We have no recollection of this request */
286           {
287 #if DEBUG_DHT_API
288           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289                       "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
290 #endif
291           }
292         else
293           {
294             enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
295             GNUNET_assert(enc_size > 0);
296             enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
297             route_handle->iter(route_handle->iter_cls, enc_msg);
298           }
299       }
300   }
301   else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP)
302   {
303     stop_msg = (struct GNUNET_DHT_StopMessage *)msg;
304     uid = GNUNET_ntohll(stop_msg->unique_id);
305 #if DEBUG_DHT_API
306     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307                 "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id);
308 #endif
309     if (handle->current->unique_id == uid)
310       {
311 #if DEBUG_DHT_API
312         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313                     "`%s': Have pending confirmation for this message!\n", "DHT API", uid);
314 #endif
315         if (handle->current->cont != NULL)
316           GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
317
318         GNUNET_free(handle->current->msg);
319         GNUNET_free(handle->current);
320         handle->current = NULL;
321       }
322   }
323   else
324   {
325 #if DEBUG_DHT_API
326     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327                 "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type));
328 #endif
329   }
330
331   GNUNET_CLIENT_receive (handle->client,
332                          &service_message_handler,
333                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
334
335 }
336
337
338 /**
339  * Initialize the connection with the DHT service.
340  *
341  * @param cfg configuration to use
342  * @param sched scheduler to use
343  * @param ht_len size of the internal hash table to use for
344  *               processing multiple GET/FIND requests in parallel
345  * @return NULL on error
346  */
347 struct GNUNET_DHT_Handle *
348 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
349                     const struct GNUNET_CONFIGURATION_Handle *cfg,
350                     unsigned int ht_len)
351 {
352   struct GNUNET_DHT_Handle *handle;
353
354   handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
355
356   default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
357   handle->cfg = cfg;
358   handle->sched = sched;
359
360   handle->current = NULL;
361   handle->do_destroy = GNUNET_NO;
362   handle->th = NULL;
363
364   handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
365   handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
366
367   if (handle->client == NULL)
368     return NULL;
369 #if DEBUG_DHT_API
370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371               "`%s': Connection to service in progress\n", "DHT API");
372 #endif
373   GNUNET_CLIENT_receive (handle->client,
374                          &service_message_handler,
375                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
376
377   return handle;
378 }
379
380
381 /**
382  * Shutdown connection with the DHT service.
383  *
384  * @param h connection to shut down
385  */
386 void
387 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
388 {
389 #if DEBUG_DHT_API
390   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
392 #endif
393   GNUNET_assert(handle != NULL);
394
395   if (handle->th != NULL) /* We have a live transmit request in the Aether */
396     {
397       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
398       handle->th = NULL;
399     }
400   if (handle->current != NULL) /* We are trying to send something now, clean it up */
401     GNUNET_free(handle->current);
402
403   if (handle->client != NULL) /* Finally, disconnect from the service */
404     {
405       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
406       handle->client = NULL;
407     }
408
409   GNUNET_free (handle);
410 }
411
412
413 /**
414  * Send complete (or failed), schedule next (or don't)
415  */
416 static void
417 finish (struct GNUNET_DHT_Handle *handle, int code)
418 {
419   /* TODO: if code is not GNUNET_OK, do something! */
420   struct PendingMessage *pos = handle->current;
421 #if DEBUG_DHT_API
422       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423                   "`%s': Finish called!\n", "DHT API");
424 #endif
425   GNUNET_assert(pos != NULL);
426
427   if (pos->is_unique)
428     {
429       if (pos->cont != NULL)
430       {
431         if (code == GNUNET_SYSERR)
432           GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
433         else
434           GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
435       }
436
437       GNUNET_free(pos->msg);
438       handle->current = NULL;
439       GNUNET_free(pos);
440     }
441   /* Otherwise we need to wait for a response to this message! */
442 }
443
444 /**
445  * Transmit the next pending message, called by notify_transmit_ready
446  */
447 static size_t
448 transmit_pending (void *cls, size_t size, void *buf)
449 {
450   struct GNUNET_DHT_Handle *handle = cls;
451   size_t tsize;
452
453 #if DEBUG_DHT_API
454       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455                   "`%s': In transmit_pending\n", "DHT API");
456 #endif
457   if (buf == NULL)
458     {
459 #if DEBUG_DHT_API
460       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
462 #endif
463       /* FIXME: free associated resources or summat */
464       finish(handle, GNUNET_SYSERR);
465       return 0;
466     }
467
468   handle->th = NULL;
469
470   if (handle->current != NULL)
471   {
472     tsize = ntohs(handle->current->msg->size);
473     if (size >= tsize)
474     {
475 #if DEBUG_DHT_API
476       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
477                   "`%s': Sending message size %d\n", "DHT API", tsize);
478 #endif
479       memcpy(buf, handle->current->msg, tsize);
480       finish(handle, GNUNET_OK);
481       return tsize;
482     }
483     else
484     {
485       return 0;
486     }
487   }
488   /* Have no pending request */
489   return 0;
490 }
491
492
493 /**
494  * Try to (re)connect to the dht service.
495  *
496  * @return GNUNET_YES on success, GNUNET_NO on failure.
497  */
498 static int
499 try_connect (struct GNUNET_DHT_Handle *handle)
500 {
501   if (handle->client != NULL)
502     return GNUNET_OK;
503   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
504   if (handle->client != NULL)
505     return GNUNET_YES;
506 #if DEBUG_STATISTICS
507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508               _("Failed to connect to the dht service!\n"));
509 #endif
510   return GNUNET_NO;
511 }
512
513
514 /**
515  * Try to send messages from list of messages to send
516  */
517 static void process_pending_message(struct GNUNET_DHT_Handle *handle)
518 {
519
520   if (handle->current == NULL)
521     return;                     /* action already pending */
522   if (GNUNET_YES != try_connect (handle))
523     {
524       finish (handle, GNUNET_SYSERR);
525       return;
526     }
527
528   /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
529   if (handle->do_destroy)
530     {
531       //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
532     }
533
534
535   if (NULL ==
536       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
537                                                     ntohs(handle->current->msg->size),
538                                                     handle->current->timeout,
539                                                     GNUNET_YES,
540                                                     &transmit_pending, handle)))
541     {
542 #if DEBUG_DHT_API
543       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544                   "Failed to transmit request to dht service.\n");
545 #endif
546       finish (handle, GNUNET_SYSERR);
547     }
548 #if DEBUG_DHT_API
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
551 #endif
552 }
553
554 /**
555  * Iterator called on each result obtained from a generic route
556  * operation
557  */
558 void get_reply_iterator (void *cls,
559                          const struct GNUNET_MessageHeader *reply)
560 {
561
562 }
563
564 /**
565  * Perform an asynchronous FIND_PEER operation on the DHT.
566  *
567  * @param handle handle to the DHT service
568  * @param key the key to look up
569  * @param desired_replication_level how many peers should ultimately receive
570  *                this message (advisory only, target may be too high for the
571  *                given DHT or not hit exactly).
572  * @param options options for routing
573  * @param enc send the encapsulated message to a peer close to the key
574  * @param iter function to call on each result, NULL if no replies are expected
575  * @param iter_cls closure for iter
576  * @param timeout when to abort with an error if we fail to get
577  *                a confirmation for the PUT from the local DHT service
578  * @param cont continuation to call when done;
579  *             reason will be TIMEOUT on error,
580  *             reason will be PREREQ_DONE on success
581  * @param cont_cls closure for cont
582  * @return handle to stop the request
583  */
584 struct GNUNET_DHT_RouteHandle *
585 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
586                         const GNUNET_HashCode *key,
587                         unsigned int desired_replication_level,
588                         enum GNUNET_DHT_RouteOption options,
589                         const struct GNUNET_MessageHeader *enc,
590                         struct GNUNET_TIME_Relative timeout,
591                         GNUNET_DHT_ReplyProcessor iter,
592                         void *iter_cls,
593                         GNUNET_SCHEDULER_Task cont,
594                         void *cont_cls)
595 {
596   struct GNUNET_DHT_RouteHandle *route_handle;
597   struct PendingMessage *pending;
598   struct GNUNET_DHT_Message *message;
599   size_t is_unique;
600   size_t msize;
601   GNUNET_HashCode *uid_key;
602   uint64_t uid;
603
604   is_unique = GNUNET_YES;
605   if (iter == NULL)
606     is_unique = GNUNET_NO;
607
608   route_handle = NULL;
609   uid_key = NULL;
610
611   do
612   {
613     GNUNET_free_non_null(uid_key);
614     uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
615     uid_key = hash_from_uid(uid);
616   } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES);
617
618   if (is_unique)
619     {
620       route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
621       memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
622       route_handle->iter = iter;
623       route_handle->iter_cls = iter_cls;
624       route_handle->dht_handle = handle;
625       route_handle->uid = uid;
626 #if DEBUG_DHT_API
627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628               "`%s': Unique ID is %llu\n", "DHT API", uid);
629 #endif
630       /**
631        * Store based on random identifier!
632        */
633       GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
634       msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
635
636     }
637   else
638     {
639       msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
640     }
641
642   GNUNET_free(uid_key);
643   message = GNUNET_malloc(msize);
644   message->header.size = htons(msize);
645   message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
646   memcpy(&message->key, key, sizeof(GNUNET_HashCode));
647   message->options = htons(options);
648   message->desired_replication_level = htons(options);
649   message->unique = htons(is_unique);
650   message->unique_id = GNUNET_htonll(uid);
651   memcpy(&message[1], enc, ntohs(enc->size));
652
653   pending = GNUNET_malloc(sizeof(struct PendingMessage));
654   pending->msg = &message->header;
655   pending->timeout = timeout;
656   pending->cont = cont;
657   pending->cont_cls = cont_cls;
658   pending->is_unique = is_unique;
659   pending->unique_id = uid;
660
661   GNUNET_assert(handle->current == NULL);
662
663   handle->current = pending;
664
665   process_pending_message(handle);
666
667   return route_handle;
668 }
669
670 void
671 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
672
673
674 void dht_get_processor (void *cls,
675                         const struct GNUNET_MessageHeader *reply)
676 {
677
678 }
679
680 /**
681  * Perform an asynchronous GET operation on the DHT identified.
682  *
683  * @param h handle to the DHT service
684  * @param type expected type of the response object
685  * @param key the key to look up
686  * @param iter function to call on each result
687  * @param iter_cls closure for iter
688  * @param cont continuation to call once message sent
689  * @param cont_cls closure for continuation
690  *
691  * @return handle to stop the async get
692  */
693 struct GNUNET_DHT_RouteHandle *
694 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
695                       struct GNUNET_TIME_Relative timeout,
696                       uint32_t type,
697                       const GNUNET_HashCode * key,
698                       GNUNET_DHT_GetIterator iter,
699                       void *iter_cls,
700                       GNUNET_SCHEDULER_Task cont,
701                       void *cont_cls)
702 {
703   struct GNUNET_DHT_GetContext *get_context;
704   struct GNUNET_DHT_GetMessage *get_msg;
705
706   if (handle->current != NULL) /* Can't send right now, we have a pending message... */
707     return NULL;
708
709   get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext));
710   get_context->iter = iter;
711   get_context->iter_cls = iter_cls;
712
713 #if DEBUG_DHT_API
714   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715               "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key));
716 #endif
717
718   get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
719   get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
720   get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
721   get_msg->type = htonl(type);
722
723   return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, cont, cont_cls);
724
725 }
726
727
728 void
729 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
730 {
731   struct PendingMessage *pending;
732   struct GNUNET_DHT_StopMessage *message;
733   size_t msize;
734   GNUNET_HashCode *uid_key;
735
736   msize = sizeof(struct GNUNET_DHT_StopMessage);
737
738   message = GNUNET_malloc(msize);
739   message->header.size = htons(msize);
740   message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
741 #if DEBUG_DHT_API
742       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743                   "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid);
744 #endif
745   message->unique_id = GNUNET_htonll(route_handle->uid);
746
747   GNUNET_assert(route_handle->dht_handle->current == NULL);
748
749   pending = GNUNET_malloc(sizeof(struct PendingMessage));
750   pending->msg = (struct GNUNET_MessageHeader *)message;
751   pending->timeout = DEFAULT_DHT_TIMEOUT;
752   pending->cont = NULL;
753   pending->cont_cls = NULL;
754   pending->is_unique = GNUNET_NO;
755   pending->unique_id = route_handle->uid;
756
757   GNUNET_assert(route_handle->dht_handle->current == NULL);
758
759   route_handle->dht_handle->current = pending;
760
761   process_pending_message(route_handle->dht_handle);
762
763   uid_key = hash_from_uid(route_handle->uid);
764
765   if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES)
766     {
767 #if DEBUG_DHT_API
768       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769                   "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
770 #endif
771     }
772
773   return;
774 }
775
776
777 /**
778  * Stop async DHT-get.  Frees associated resources.
779  *
780  * @param record GET operation to stop.
781  */
782 void
783 GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *handle)
784 {
785 #if OLDREMOVE
786   struct GNUNET_DHT_GetMessage *get_msg;
787   struct GNUNET_DHT_Handle *handle;
788   GNUNET_HashCode *uid_key;
789 #endif
790
791   GNUNET_DHT_route_stop(handle);
792
793 #if OLDREMOVE
794   uid_key = hash_from_uid(get_handle->uid);
795   GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES);
796
797   if (handle->do_destroy == GNUNET_NO)
798     {
799       get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
800       get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
801       get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
802
803
804     }
805 #endif
806 #if DEBUG_DHT_API
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&handle->key), handle->uid);
809 #endif
810 }
811
812
813 /**
814  * Perform a PUT operation storing data in the DHT.
815  *
816  * @param h handle to DHT service
817  * @param key the key to store under
818  * @param type type of the value
819  * @param size number of bytes in data; must be less than 64k
820  * @param data the data to store
821  * @param exp desired expiration time for the value
822  * @param cont continuation to call when done;
823  *             reason will be TIMEOUT on error,
824  *             reason will be PREREQ_DONE on success
825  * @param cont_cls closure for cont
826  *
827  * @return GNUNET_YES if put message is queued for transmission
828  */
829 void
830 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
831                 const GNUNET_HashCode * key,
832                 uint32_t type,
833                 uint32_t size,
834                 const char *data,
835                 struct GNUNET_TIME_Absolute exp,
836                 struct GNUNET_TIME_Relative timeout,
837                 GNUNET_SCHEDULER_Task cont,
838                 void *cont_cls)
839 {
840   struct GNUNET_DHT_PutMessage *put_msg;
841   size_t msize;
842
843   if (handle->current != NULL)
844     {
845       GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
846       return;
847     }
848
849 #if DEBUG_DHT_API
850   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851               "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key));
852 #endif
853
854   msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
855   put_msg = GNUNET_malloc(msize);
856   put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
857   put_msg->header.size = htons(msize);
858   put_msg->type = htonl(type);
859   put_msg->data_size = htons(size);
860   put_msg->expiration = exp;
861   memcpy(&put_msg[1], data, size);
862
863   GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls);
864
865 }