print stat
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define DEBUG_DHT_API GNUNET_NO
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Timeout task for this message
78    */
79   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81   /**
82    * Unique ID for this request
83    */
84   uint64_t unique_id;
85
86   /**
87    * Free the saved message once sent, set to GNUNET_YES for messages
88    * that do not receive responses; GNUNET_NO if this pending message
89    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
90    * from there.
91    */
92   int free_on_send;
93
94   /**
95    * GNUNET_YES if this message is in our pending queue right now.
96    */
97   int in_pending_queue;
98
99 };
100
101
102 /**
103  * Handle to a GET request
104  */
105 struct GNUNET_DHT_GetHandle
106 {
107
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_GetIterator iter;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *iter_cls;
117
118   /**
119    * Main handle to this DHT api
120    */
121   struct GNUNET_DHT_Handle *dht_handle;
122
123   /**
124    * The actual message sent for this request,
125    * used for retransmitting requests on service
126    * failure/reconnect.  Freed on route_stop.
127    */
128   struct PendingMessage *message;
129
130   /**
131    * Key that this get request is for
132    */
133   GNUNET_HashCode key;
134
135   /**
136    * Unique identifier for this request (for key collisions).
137    */
138   uint64_t unique_id;
139
140 };
141
142
143 /**
144  * Connection to the DHT service.
145  */
146 struct GNUNET_DHT_Handle
147 {
148
149   /**
150    * Configuration to use.
151    */
152   const struct GNUNET_CONFIGURATION_Handle *cfg;
153
154   /**
155    * Socket (if available).
156    */
157   struct GNUNET_CLIENT_Connection *client;
158
159   /**
160    * Currently pending transmission request (or NULL).
161    */
162   struct GNUNET_CLIENT_TransmitHandle *th;
163
164   /**
165    * Head of linked list of messages we would like to transmit.
166    */
167   struct PendingMessage *pending_head;
168
169   /**
170    * Tail of linked list of messages we would like to transmit.
171    */
172   struct PendingMessage *pending_tail;
173
174   /**
175    * Hash map containing the current outstanding unique requests
176    * (values are of type 'struct GNUNET_DHT_RouteHandle').
177    */
178   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
179
180   /**
181    * Task for trying to reconnect.
182    */
183   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
184
185   /**
186    * How quickly should we retry?  Used for exponential back-off on
187    * connect-errors.
188    */
189   struct GNUNET_TIME_Relative retry_time;
190
191   /**
192    * Generator for unique ids.
193    */
194   uint64_t uid_gen;
195
196 };
197
198
199 /**
200  * Handler for messages received from the DHT service
201  * a demultiplexer which handles numerous message types
202  *
203  */
204 static void
205 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
206
207
208 /**
209  * Try to (re)connect to the DHT service.
210  *
211  * @return GNUNET_YES on success, GNUNET_NO on failure.
212  */
213 static int
214 try_connect (struct GNUNET_DHT_Handle *handle)
215 {
216   if (handle->client != NULL)
217     return GNUNET_OK;
218   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
219   if (handle->client == NULL)
220   {
221     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
222                 _("Failed to connect to the DHT service!\n"));
223     return GNUNET_NO;
224   }
225 #if DEBUG_DHT
226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
227               "Starting to process replies from DHT\n");
228 #endif
229   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
230                          GNUNET_TIME_UNIT_FOREVER_REL);
231   return GNUNET_YES;
232 }
233
234
235 /**
236  * Add the request corresponding to the given route handle
237  * to the pending queue (if it is not already in there).
238  *
239  * @param cls the 'struct GNUNET_DHT_Handle*'
240  * @param key key for the request (not used)
241  * @param value the 'struct GNUNET_DHT_GetHandle*'
242  * @return GNUNET_YES (always)
243  */
244 static int
245 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
246 {
247   struct GNUNET_DHT_Handle *handle = cls;
248   struct GNUNET_DHT_GetHandle *rh = value;
249
250   if (GNUNET_NO == rh->message->in_pending_queue)
251   {
252     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
253                                  rh->message);
254     rh->message->in_pending_queue = GNUNET_YES;
255   }
256   return GNUNET_YES;
257 }
258
259
260 /**
261  * Try to send messages from list of messages to send
262  * @param handle DHT_Handle
263  */
264 static void
265 process_pending_messages (struct GNUNET_DHT_Handle *handle);
266
267
268 /**
269  * Try reconnecting to the dht service.
270  *
271  * @param cls GNUNET_DHT_Handle
272  * @param tc scheduler context
273  */
274 static void
275 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
276 {
277   struct GNUNET_DHT_Handle *handle = cls;
278
279   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
280   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
281     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
282   else
283     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
284   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
285     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
286   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
287   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
288   if (handle->client == NULL)
289   {
290     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
291     return;
292   }
293   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
294                                          &add_request_to_pending, handle);
295   process_pending_messages (handle);
296 }
297
298
299 /**
300  * Try reconnecting to the DHT service.
301  *
302  * @param handle handle to dht to (possibly) disconnect and reconnect
303  */
304 static void
305 do_disconnect (struct GNUNET_DHT_Handle *handle)
306 {
307   if (handle->client == NULL)
308     return;
309   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
310   if (NULL != handle->th)
311       GNUNET_CLIENT_notify_transmit_ready_cancel(handle->th);
312   handle->th = NULL;
313   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
314   handle->client = NULL;
315   handle->reconnect_task =
316       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
317 }
318
319
320 /**
321  * Transmit the next pending message, called by notify_transmit_ready
322  */
323 static size_t
324 transmit_pending (void *cls, size_t size, void *buf);
325
326
327 /**
328  * Try to send messages from list of messages to send
329  */
330 static void
331 process_pending_messages (struct GNUNET_DHT_Handle *handle)
332 {
333   struct PendingMessage *head;
334
335   if (handle->client == NULL)
336   {
337     do_disconnect (handle);
338     return;
339   }
340   if (handle->th != NULL)
341     return;
342   if (NULL == (head = handle->pending_head))
343     return;
344   handle->th =
345       GNUNET_CLIENT_notify_transmit_ready (handle->client,
346                                            ntohs (head->msg->size),
347                                            GNUNET_TIME_UNIT_FOREVER_REL,
348                                            GNUNET_YES, &transmit_pending,
349                                            handle);
350   if (NULL != handle->th)
351     return;
352   do_disconnect (handle);
353 }
354
355
356 /**
357  * Transmit the next pending message, called by notify_transmit_ready
358  */
359 static size_t
360 transmit_pending (void *cls, size_t size, void *buf)
361 {
362   struct GNUNET_DHT_Handle *handle = cls;
363   struct PendingMessage *head;
364   size_t tsize;
365
366   handle->th = NULL;
367   if (buf == NULL)
368   {
369     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370                 "Transmission to DHT service failed!  Reconnecting!\n");
371     do_disconnect (handle);
372     return 0;
373   }
374   if (NULL == (head = handle->pending_head))
375     return 0;
376
377   tsize = ntohs (head->msg->size);
378   if (size < tsize)
379   {
380     process_pending_messages (handle);
381     return 0;
382   }
383   memcpy (buf, head->msg, tsize);
384   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
385                                head);
386   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
387   {
388     GNUNET_SCHEDULER_cancel (head->timeout_task);
389     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
390   }
391   if (NULL != head->cont)
392   {
393     GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
394                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
395     head->cont = NULL;
396     head->cont_cls = NULL;
397   }
398   head->in_pending_queue = GNUNET_NO;
399   if (GNUNET_YES == head->free_on_send)
400     GNUNET_free (head);
401   process_pending_messages (handle);
402   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403               "Forwarded request of %u bytes to DHT service\n",
404               (unsigned int) tsize);
405   return tsize;
406 }
407
408
409 /**
410  * Process a given reply that might match the given
411  * request.
412  *
413  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
414  * @param key query of the request
415  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
416  * @return GNUNET_YES to continue to iterate over all results,
417  *         GNUNET_NO if the reply is malformed
418  */
419 static int
420 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
421 {
422   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
423   struct GNUNET_DHT_GetHandle *get_handle = value;
424   const struct GNUNET_PeerIdentity *put_path;
425   const struct GNUNET_PeerIdentity *get_path;
426   uint32_t put_path_length;
427   uint32_t get_path_length;
428   size_t data_length;
429   size_t msize;
430   size_t meta_length;
431   const void *data;
432
433   if (dht_msg->unique_id != get_handle->unique_id)
434   {
435     /* UID mismatch */
436     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437                 "Ignoring reply (UID mismatch: %llu/%llu)\n",
438                 dht_msg->unique_id,
439                 get_handle->unique_id);  
440     return GNUNET_YES;
441   }
442   msize = ntohs (dht_msg->header.size);
443   put_path_length = ntohl (dht_msg->put_path_length);
444   get_path_length = ntohl (dht_msg->get_path_length);
445   meta_length = sizeof (struct GNUNET_DHT_ClientResultMessage) +
446     sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
447   if ( (msize < meta_length) ||
448        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
449        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
450     {
451       GNUNET_break (0);
452       return GNUNET_NO;
453     }
454   data_length = msize - meta_length;  
455   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
456   get_path = &put_path[put_path_length];
457   data = &get_path[get_path_length];
458   get_handle->iter (get_handle->iter_cls,
459                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
460                     key,
461                     get_path, get_path_length,
462                     put_path, put_path_length,
463                     ntohl (dht_msg->type),
464                     data_length, data);
465   return GNUNET_YES;
466 }
467
468
469 /**
470  * Handler for messages received from the DHT service
471  * a demultiplexer which handles numerous message types
472  *
473  * @param cls the 'struct GNUNET_DHT_Handle'
474  * @param msg the incoming message
475  */
476 static void
477 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
478 {
479   struct GNUNET_DHT_Handle *handle = cls;
480   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
481
482   if (msg == NULL)
483   {
484     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
485                 "Error receiving data from DHT service, reconnecting\n");
486     do_disconnect (handle);
487     return;
488   }
489   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
490   {
491     GNUNET_break (0);
492     do_disconnect (handle);
493     return;
494   }
495   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
496   {
497     GNUNET_break (0);
498     do_disconnect (handle);
499     return;
500   }
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502               "Received reply from DHT service\n");  
503   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
504   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
505                                               &dht_msg->key, &process_reply,
506                                               (void *) dht_msg);
507   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
508                          GNUNET_TIME_UNIT_FOREVER_REL);
509 }
510
511
512 /**
513  * Initialize the connection with the DHT service.
514  *
515  * @param cfg configuration to use
516  * @param ht_len size of the internal hash table to use for
517  *               processing multiple GET/FIND requests in parallel
518  *
519  * @return handle to the DHT service, or NULL on error
520  */
521 struct GNUNET_DHT_Handle *
522 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
523                     unsigned int ht_len)
524 {
525   struct GNUNET_DHT_Handle *handle;
526
527   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
528   handle->cfg = cfg;
529   handle->uid_gen =
530       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
531   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
532   if (GNUNET_NO == try_connect (handle))
533   {
534     GNUNET_DHT_disconnect (handle);
535     return NULL;
536   }
537   return handle;
538 }
539
540
541 /**
542  * Shutdown connection with the DHT service.
543  *
544  * @param handle handle of the DHT connection to stop
545  */
546 void
547 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
548 {
549   struct PendingMessage *pm;
550
551   GNUNET_assert (handle != NULL);
552   GNUNET_assert (0 ==
553                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
554   if (handle->th != NULL)
555   {
556     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
557     handle->th = NULL;
558   }
559   while (NULL != (pm = handle->pending_head))
560   {
561     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
562                                  pm);
563     GNUNET_assert (GNUNET_YES == pm->free_on_send);
564     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
565       GNUNET_SCHEDULER_cancel (pm->timeout_task);
566     if (NULL != pm->cont)
567       GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
568                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
569     pm->in_pending_queue = GNUNET_NO;
570     GNUNET_free (pm);
571   }
572   if (handle->client != NULL)
573   {
574     GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
575     handle->client = NULL;
576   }
577   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
578     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
579   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
580   GNUNET_free (handle);
581 }
582
583
584 /**
585  * Timeout for the transmission of a fire&forget-request.  Clean it up.
586  *
587  * @param cls the 'struct PendingMessage'
588  * @param tc scheduler context
589  */
590 static void
591 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
592 {
593   struct PendingMessage *pending = cls;
594   struct GNUNET_DHT_Handle *handle;
595
596   handle = pending->handle;
597   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
598                                pending);
599   if (pending->cont != NULL)
600     pending->cont (pending->cont_cls, tc);
601   GNUNET_free (pending);
602 }
603
604
605 /**
606  * Perform a PUT operation storing data in the DHT.
607  *
608  * @param handle handle to DHT service
609  * @param key the key to store under
610  * @param desired_replication_level estimate of how many
611  *                nearest peers this request should reach
612  * @param options routing options for this message
613  * @param type type of the value
614  * @param size number of bytes in data; must be less than 64k
615  * @param data the data to store
616  * @param exp desired expiration time for the value
617  * @param timeout how long to wait for transmission of this request
618  * @param cont continuation to call when done (transmitting request to service)
619  * @param cont_cls closure for cont
620  */
621 void
622 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
623                 uint32_t desired_replication_level,
624                 enum GNUNET_DHT_RouteOption options,
625                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
626                 struct GNUNET_TIME_Absolute exp,
627                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
628                 void *cont_cls)
629 {
630   struct GNUNET_DHT_ClientPutMessage *put_msg;
631   size_t msize;
632   struct PendingMessage *pending;
633
634   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
635   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
636        (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
637     {
638       GNUNET_break (0);
639       if (NULL != cont)
640         cont (cont_cls, NULL);
641       return;
642     }
643   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
644   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
645   pending->msg = &put_msg->header;
646   pending->handle = handle;
647   pending->cont = cont;
648   pending->cont_cls = cont_cls;
649   pending->free_on_send = GNUNET_YES;
650   pending->timeout_task =
651     GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
652   put_msg->header.size = htons (msize);
653   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
654   put_msg->type = htonl (type);
655   put_msg->options = htonl ((uint32_t) options);
656   put_msg->desired_replication_level = htonl (desired_replication_level);
657   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
658   put_msg->key = *key;
659   memcpy (&put_msg[1], data, size);
660   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
661                                pending);
662   pending->in_pending_queue = GNUNET_YES;
663   process_pending_messages (handle);
664 }
665
666
667 /**
668  * Perform an asynchronous GET operation on the DHT identified. See
669  * also "GNUNET_BLOCK_evaluate".
670  *
671  * @param handle handle to the DHT service
672  * @param timeout how long to wait for transmission of this request to the service
673  * @param type expected type of the response object
674  * @param key the key to look up
675  * @param desired_replication_level estimate of how many
676                   nearest peers this request should reach
677  * @param options routing options for this message
678  * @param xquery extended query data (can be NULL, depending on type)
679  * @param xquery_size number of bytes in xquery
680  * @param iter function to call on each result
681  * @param iter_cls closure for iter
682  * @return handle to stop the async get
683  */
684 struct GNUNET_DHT_GetHandle *
685 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
686                       struct GNUNET_TIME_Relative timeout,
687                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
688                       uint32_t desired_replication_level,
689                       enum GNUNET_DHT_RouteOption options,
690                       const void *xquery, size_t xquery_size, 
691                       GNUNET_DHT_GetIterator iter, void *iter_cls)
692 {
693   struct GNUNET_DHT_ClientGetMessage *get_msg;
694   struct GNUNET_DHT_GetHandle *get_handle;
695   size_t msize;
696   struct PendingMessage *pending;
697
698   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
699   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
700        (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
701     {
702       GNUNET_break (0);
703       return NULL;
704     }
705   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
706   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
707   pending->msg = &get_msg->header;
708   pending->handle = handle;
709   pending->free_on_send = GNUNET_NO;
710   get_msg->header.size = htons (msize);
711   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
712   get_msg->options = htonl ((uint32_t) options);
713   get_msg->desired_replication_level = htonl (desired_replication_level);
714   get_msg->type = htonl (type);
715   get_msg->key = *key;
716   handle->uid_gen++;
717   get_msg->unique_id = handle->uid_gen;
718   memcpy (&get_msg[1], xquery, xquery_size);
719   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
720                                pending);
721   pending->in_pending_queue = GNUNET_YES;
722   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
723   get_handle->iter = iter;
724   get_handle->iter_cls = iter_cls;
725   get_handle->message = pending;  
726   get_handle->unique_id = get_msg->unique_id;
727   GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
728                                      key, get_handle,
729                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
730   process_pending_messages (handle);
731   return get_handle;
732 }
733
734
735 /**
736  * Stop async DHT-get.
737  *
738  * @param get_handle handle to the GET operation to stop
739  */
740 void
741 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
742 {
743   struct GNUNET_DHT_Handle *handle;
744   const struct GNUNET_DHT_ClientGetMessage *get_msg;
745   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
746   struct PendingMessage *pending;
747
748   handle = get_handle->message->handle;
749   get_msg = (const struct GNUNET_DHT_ClientGetMessage*) get_handle->message->msg;
750
751   /* generate STOP */
752   pending = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct GNUNET_DHT_ClientGetStopMessage));
753   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
754   pending->msg = &stop_msg->header;
755   pending->handle = handle;
756   pending->free_on_send = GNUNET_YES;
757   stop_msg->header.size = htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
758   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
759   stop_msg->reserved = htonl (0);
760   stop_msg->unique_id = get_msg->unique_id;
761   stop_msg->key = get_msg->key;
762   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
763                                pending);
764   pending->in_pending_queue = GNUNET_YES;
765
766   /* remove 'GET' from active status */
767   GNUNET_assert (GNUNET_YES ==
768                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
769                                                        &get_msg->key, get_handle));
770   if (GNUNET_YES == get_handle->message->in_pending_queue)
771     {
772       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
773                                    handle->pending_tail,
774                                    get_handle->message);
775       get_handle->message->in_pending_queue = GNUNET_NO;
776     }
777   GNUNET_free (get_handle->message);
778   GNUNET_free (get_handle);
779
780   process_pending_messages (handle);
781 }
782
783
784 /* end of dht_api.c */