5fff137a6fbe38fb3f7e8edd8191484d65b5ecab
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define DEBUG_DHT_API GNUNET_EXTRA_LOGGING
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Timeout task for this message
78    */
79   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81   /**
82    * Unique ID for this request
83    */
84   uint64_t unique_id;
85
86   /**
87    * Free the saved message once sent, set to GNUNET_YES for messages
88    * that do not receive responses; GNUNET_NO if this pending message
89    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
90    * from there.
91    */
92   int free_on_send;
93
94   /**
95    * GNUNET_YES if this message is in our pending queue right now.
96    */
97   int in_pending_queue;
98
99 };
100
101
102 /**
103  * Handle to a GET request
104  */
105 struct GNUNET_DHT_GetHandle
106 {
107
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_GetIterator iter;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *iter_cls;
117
118   /**
119    * Main handle to this DHT api
120    */
121   struct GNUNET_DHT_Handle *dht_handle;
122
123   /**
124    * The actual message sent for this request,
125    * used for retransmitting requests on service
126    * failure/reconnect.  Freed on route_stop.
127    */
128   struct PendingMessage *message;
129
130   /**
131    * Key that this get request is for
132    */
133   GNUNET_HashCode key;
134
135   /**
136    * Unique identifier for this request (for key collisions).
137    */
138   uint64_t unique_id;
139
140 };
141
142
143 /**
144  * Connection to the DHT service.
145  */
146 struct GNUNET_DHT_Handle
147 {
148
149   /**
150    * Configuration to use.
151    */
152   const struct GNUNET_CONFIGURATION_Handle *cfg;
153
154   /**
155    * Socket (if available).
156    */
157   struct GNUNET_CLIENT_Connection *client;
158
159   /**
160    * Currently pending transmission request (or NULL).
161    */
162   struct GNUNET_CLIENT_TransmitHandle *th;
163
164   /**
165    * Head of linked list of messages we would like to transmit.
166    */
167   struct PendingMessage *pending_head;
168
169   /**
170    * Tail of linked list of messages we would like to transmit.
171    */
172   struct PendingMessage *pending_tail;
173
174   /**
175    * Hash map containing the current outstanding unique requests
176    * (values are of type 'struct GNUNET_DHT_RouteHandle').
177    */
178   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
179
180   /**
181    * Task for trying to reconnect.
182    */
183   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
184
185   /**
186    * How quickly should we retry?  Used for exponential back-off on
187    * connect-errors.
188    */
189   struct GNUNET_TIME_Relative retry_time;
190
191   /**
192    * Generator for unique ids.
193    */
194   uint64_t uid_gen;
195
196 };
197
198
199 /**
200  * Handler for messages received from the DHT service
201  * a demultiplexer which handles numerous message types
202  *
203  */
204 static void
205 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
206
207
208 /**
209  * Try to (re)connect to the DHT service.
210  *
211  * @return GNUNET_YES on success, GNUNET_NO on failure.
212  */
213 static int
214 try_connect (struct GNUNET_DHT_Handle *handle)
215 {
216   if (handle->client != NULL)
217     return GNUNET_OK;
218   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
219   if (handle->client == NULL)
220   {
221     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
222                 _("Failed to connect to the DHT service!\n"));
223     return GNUNET_NO;
224   }
225 #if DEBUG_DHT
226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
227               "Starting to process replies from DHT\n");
228 #endif
229   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
230                          GNUNET_TIME_UNIT_FOREVER_REL);
231   return GNUNET_YES;
232 }
233
234
235 /**
236  * Add the request corresponding to the given route handle
237  * to the pending queue (if it is not already in there).
238  *
239  * @param cls the 'struct GNUNET_DHT_Handle*'
240  * @param key key for the request (not used)
241  * @param value the 'struct GNUNET_DHT_GetHandle*'
242  * @return GNUNET_YES (always)
243  */
244 static int
245 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
246 {
247   struct GNUNET_DHT_Handle *handle = cls;
248   struct GNUNET_DHT_GetHandle *rh = value;
249
250   if (GNUNET_NO == rh->message->in_pending_queue)
251   {
252     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
253                                  rh->message);
254     rh->message->in_pending_queue = GNUNET_YES;
255   }
256   return GNUNET_YES;
257 }
258
259
260 /**
261  * Try to send messages from list of messages to send
262  * @param handle DHT_Handle
263  */
264 static void
265 process_pending_messages (struct GNUNET_DHT_Handle *handle);
266
267
268 /**
269  * Try reconnecting to the dht service.
270  *
271  * @param cls GNUNET_DHT_Handle
272  * @param tc scheduler context
273  */
274 static void
275 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
276 {
277   struct GNUNET_DHT_Handle *handle = cls;
278
279   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
280   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
281     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
282   else
283     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
284   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
285     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
286   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
287   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
288   if (handle->client == NULL)
289   {
290     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
291     return;
292   }
293   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
294                                          &add_request_to_pending, handle);
295   process_pending_messages (handle);
296 }
297
298
299 /**
300  * Try reconnecting to the DHT service.
301  *
302  * @param handle handle to dht to (possibly) disconnect and reconnect
303  */
304 static void
305 do_disconnect (struct GNUNET_DHT_Handle *handle)
306 {
307   if (handle->client == NULL)
308     return;
309   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
310   if (NULL != handle->th)
311       GNUNET_CLIENT_notify_transmit_ready_cancel(handle->th);
312   handle->th = NULL;
313   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
314   handle->client = NULL;
315   handle->reconnect_task =
316       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
317 }
318
319
320 /**
321  * Transmit the next pending message, called by notify_transmit_ready
322  */
323 static size_t
324 transmit_pending (void *cls, size_t size, void *buf);
325
326
327 /**
328  * Try to send messages from list of messages to send
329  */
330 static void
331 process_pending_messages (struct GNUNET_DHT_Handle *handle)
332 {
333   struct PendingMessage *head;
334
335   if (handle->client == NULL)
336   {
337     do_disconnect (handle);
338     return;
339   }
340   if (handle->th != NULL)
341     return;
342   if (NULL == (head = handle->pending_head))
343     return;
344   handle->th =
345       GNUNET_CLIENT_notify_transmit_ready (handle->client,
346                                            ntohs (head->msg->size),
347                                            GNUNET_TIME_UNIT_FOREVER_REL,
348                                            GNUNET_YES, &transmit_pending,
349                                            handle);
350   if (NULL != handle->th)
351     return;
352   do_disconnect (handle);
353 }
354
355
356 /**
357  * Transmit the next pending message, called by notify_transmit_ready
358  */
359 static size_t
360 transmit_pending (void *cls, size_t size, void *buf)
361 {
362   struct GNUNET_DHT_Handle *handle = cls;
363   struct PendingMessage *head;
364   size_t tsize;
365
366   handle->th = NULL;
367   if (buf == NULL)
368   {
369     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370                 "Transmission to DHT service failed!  Reconnecting!\n");
371     do_disconnect (handle);
372     return 0;
373   }
374   if (NULL == (head = handle->pending_head))
375     return 0;
376
377   tsize = ntohs (head->msg->size);
378   if (size < tsize)
379   {
380     process_pending_messages (handle);
381     return 0;
382   }
383   memcpy (buf, head->msg, tsize);
384   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
385                                head);
386   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
387   {
388     GNUNET_SCHEDULER_cancel (head->timeout_task);
389     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
390   }
391   if (NULL != head->cont)
392   {
393     GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
394                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
395     head->cont = NULL;
396     head->cont_cls = NULL;
397   }
398   head->in_pending_queue = GNUNET_NO;
399   if (GNUNET_YES == head->free_on_send)
400     GNUNET_free (head);
401   process_pending_messages (handle);
402 #if DEBUG_DHT
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Forwarded request of %u bytes to DHT service\n",
405               (unsigned int) tsize);
406 #endif
407   return tsize;
408 }
409
410
411 /**
412  * Process a given reply that might match the given
413  * request.
414  *
415  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
416  * @param key query of the request
417  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
418  * @return GNUNET_YES to continue to iterate over all results,
419  *         GNUNET_NO if the reply is malformed
420  */
421 static int
422 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
423 {
424   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
425   struct GNUNET_DHT_GetHandle *get_handle = value;
426   const struct GNUNET_PeerIdentity *put_path;
427   const struct GNUNET_PeerIdentity *get_path;
428   uint32_t put_path_length;
429   uint32_t get_path_length;
430   size_t data_length;
431   size_t msize;
432   size_t meta_length;
433   const void *data;
434
435   if (dht_msg->unique_id != get_handle->unique_id)
436   {
437     /* UID mismatch */
438     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439                 "Ignoring reply (UID mismatch: %llu/%llu)\n",
440                 dht_msg->unique_id,
441                 get_handle->unique_id);  
442     return GNUNET_YES;
443   }
444   msize = ntohs (dht_msg->header.size);
445   put_path_length = ntohl (dht_msg->put_path_length);
446   get_path_length = ntohl (dht_msg->get_path_length);
447   meta_length = sizeof (struct GNUNET_DHT_ClientResultMessage) +
448     sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
449   if ( (msize < meta_length) ||
450        (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
451        (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
452     {
453       GNUNET_break (0);
454       return GNUNET_NO;
455     }
456   data_length = msize - meta_length;  
457   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
458   get_path = &put_path[put_path_length];
459   data = &get_path[get_path_length];
460   get_handle->iter (get_handle->iter_cls,
461                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
462                     key,
463                     get_path, get_path_length,
464                     put_path, put_path_length,
465                     ntohl (dht_msg->type),
466                     data_length, data);
467   return GNUNET_YES;
468 }
469
470
471 /**
472  * Handler for messages received from the DHT service
473  * a demultiplexer which handles numerous message types
474  *
475  * @param cls the 'struct GNUNET_DHT_Handle'
476  * @param msg the incoming message
477  */
478 static void
479 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
480 {
481   struct GNUNET_DHT_Handle *handle = cls;
482   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
483
484   if (msg == NULL)
485   {
486     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487                 "Error receiving data from DHT service, reconnecting\n");
488     do_disconnect (handle);
489     return;
490   }
491   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
492   {
493     GNUNET_break (0);
494     do_disconnect (handle);
495     return;
496   }
497   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
498   {
499     GNUNET_break (0);
500     do_disconnect (handle);
501     return;
502   }
503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504               "Received reply from DHT service\n");  
505   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
506   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
507                                               &dht_msg->key, &process_reply,
508                                               (void *) dht_msg);
509   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
510                          GNUNET_TIME_UNIT_FOREVER_REL);
511 }
512
513
514 /**
515  * Initialize the connection with the DHT service.
516  *
517  * @param cfg configuration to use
518  * @param ht_len size of the internal hash table to use for
519  *               processing multiple GET/FIND requests in parallel
520  *
521  * @return handle to the DHT service, or NULL on error
522  */
523 struct GNUNET_DHT_Handle *
524 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
525                     unsigned int ht_len)
526 {
527   struct GNUNET_DHT_Handle *handle;
528
529   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
530   handle->cfg = cfg;
531   handle->uid_gen =
532       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
533   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
534   if (GNUNET_NO == try_connect (handle))
535   {
536     GNUNET_DHT_disconnect (handle);
537     return NULL;
538   }
539   return handle;
540 }
541
542
543 /**
544  * Shutdown connection with the DHT service.
545  *
546  * @param handle handle of the DHT connection to stop
547  */
548 void
549 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
550 {
551   struct PendingMessage *pm;
552
553   GNUNET_assert (handle != NULL);
554   GNUNET_assert (0 ==
555                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
556   if (handle->th != NULL)
557   {
558     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
559     handle->th = NULL;
560   }
561   while (NULL != (pm = handle->pending_head))
562   {
563     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
564                                  pm);
565     GNUNET_assert (GNUNET_YES == pm->free_on_send);
566     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
567       GNUNET_SCHEDULER_cancel (pm->timeout_task);
568     if (NULL != pm->cont)
569       GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
570                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
571     pm->in_pending_queue = GNUNET_NO;
572     GNUNET_free (pm);
573   }
574   if (handle->client != NULL)
575   {
576     GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
577     handle->client = NULL;
578   }
579   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
580     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
581   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
582   GNUNET_free (handle);
583 }
584
585
586 /**
587  * Timeout for the transmission of a fire&forget-request.  Clean it up.
588  *
589  * @param cls the 'struct PendingMessage'
590  * @param tc scheduler context
591  */
592 static void
593 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
594 {
595   struct PendingMessage *pending = cls;
596   struct GNUNET_DHT_Handle *handle;
597
598   handle = pending->handle;
599   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
600                                pending);
601   if (pending->cont != NULL)
602     pending->cont (pending->cont_cls, tc);
603   GNUNET_free (pending);
604 }
605
606
607 /**
608  * Perform a PUT operation storing data in the DHT.
609  *
610  * @param handle handle to DHT service
611  * @param key the key to store under
612  * @param desired_replication_level estimate of how many
613  *                nearest peers this request should reach
614  * @param options routing options for this message
615  * @param type type of the value
616  * @param size number of bytes in data; must be less than 64k
617  * @param data the data to store
618  * @param exp desired expiration time for the value
619  * @param timeout how long to wait for transmission of this request
620  * @param cont continuation to call when done (transmitting request to service)
621  * @param cont_cls closure for cont
622  */
623 void
624 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
625                 uint32_t desired_replication_level,
626                 enum GNUNET_DHT_RouteOption options,
627                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
628                 struct GNUNET_TIME_Absolute exp,
629                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
630                 void *cont_cls)
631 {
632   struct GNUNET_DHT_ClientPutMessage *put_msg;
633   size_t msize;
634   struct PendingMessage *pending;
635
636   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
637   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
638        (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
639     {
640       GNUNET_break (0);
641       if (NULL != cont)
642         cont (cont_cls, NULL);
643       return;
644     }
645   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
646   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
647   pending->msg = &put_msg->header;
648   pending->handle = handle;
649   pending->cont = cont;
650   pending->cont_cls = cont_cls;
651   pending->free_on_send = GNUNET_YES;
652   pending->timeout_task =
653     GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
654   put_msg->header.size = htons (msize);
655   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
656   put_msg->type = htonl (type);
657   put_msg->options = htonl ((uint32_t) options);
658   put_msg->desired_replication_level = htonl (desired_replication_level);
659   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
660   put_msg->key = *key;
661   memcpy (&put_msg[1], data, size);
662   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
663                                pending);
664   pending->in_pending_queue = GNUNET_YES;
665   process_pending_messages (handle);
666 }
667
668
669 /**
670  * Perform an asynchronous GET operation on the DHT identified. See
671  * also "GNUNET_BLOCK_evaluate".
672  *
673  * @param handle handle to the DHT service
674  * @param timeout how long to wait for transmission of this request to the service
675  * @param type expected type of the response object
676  * @param key the key to look up
677  * @param desired_replication_level estimate of how many
678                   nearest peers this request should reach
679  * @param options routing options for this message
680  * @param xquery extended query data (can be NULL, depending on type)
681  * @param xquery_size number of bytes in xquery
682  * @param iter function to call on each result
683  * @param iter_cls closure for iter
684  * @return handle to stop the async get
685  */
686 struct GNUNET_DHT_GetHandle *
687 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
688                       struct GNUNET_TIME_Relative timeout,
689                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
690                       uint32_t desired_replication_level,
691                       enum GNUNET_DHT_RouteOption options,
692                       const void *xquery, size_t xquery_size, 
693                       GNUNET_DHT_GetIterator iter, void *iter_cls)
694 {
695   struct GNUNET_DHT_ClientGetMessage *get_msg;
696   struct GNUNET_DHT_GetHandle *get_handle;
697   size_t msize;
698   struct PendingMessage *pending;
699
700   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
701   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
702        (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
703     {
704       GNUNET_break (0);
705       return NULL;
706     }
707   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
708   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
709   pending->msg = &get_msg->header;
710   pending->handle = handle;
711   pending->free_on_send = GNUNET_NO;
712   get_msg->header.size = htons (msize);
713   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
714   get_msg->options = htonl ((uint32_t) options);
715   get_msg->desired_replication_level = htonl (desired_replication_level);
716   get_msg->type = htonl (type);
717   get_msg->key = *key;
718   handle->uid_gen++;
719   get_msg->unique_id = handle->uid_gen;
720   memcpy (&get_msg[1], xquery, xquery_size);
721   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
722                                pending);
723   pending->in_pending_queue = GNUNET_YES;
724   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
725   get_handle->iter = iter;
726   get_handle->iter_cls = iter_cls;
727   get_handle->message = pending;  
728   get_handle->unique_id = get_msg->unique_id;
729   GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
730                                      key, get_handle,
731                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
732   process_pending_messages (handle);
733   return get_handle;
734 }
735
736
737 /**
738  * Stop async DHT-get.
739  *
740  * @param get_handle handle to the GET operation to stop
741  */
742 void
743 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
744 {
745   struct GNUNET_DHT_Handle *handle;
746   const struct GNUNET_DHT_ClientGetMessage *get_msg;
747   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
748   struct PendingMessage *pending;
749
750   handle = get_handle->message->handle;
751   get_msg = (const struct GNUNET_DHT_ClientGetMessage*) get_handle->message->msg;
752
753   /* generate STOP */
754   pending = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct GNUNET_DHT_ClientGetStopMessage));
755   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
756   pending->msg = &stop_msg->header;
757   pending->handle = handle;
758   pending->free_on_send = GNUNET_YES;
759   stop_msg->header.size = htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
760   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
761   stop_msg->reserved = htonl (0);
762   stop_msg->unique_id = get_msg->unique_id;
763   stop_msg->key = get_msg->key;
764   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
765                                pending);
766   pending->in_pending_queue = GNUNET_YES;
767
768   /* remove 'GET' from active status */
769   GNUNET_assert (GNUNET_YES ==
770                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
771                                                        &get_msg->key, get_handle));
772   if (GNUNET_YES == get_handle->message->in_pending_queue)
773     {
774       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
775                                    handle->pending_tail,
776                                    get_handle->message);
777       get_handle->message->in_pending_queue = GNUNET_NO;
778     }
779   GNUNET_free (get_handle->message);
780   GNUNET_free (get_handle);
781
782   process_pending_messages (handle);
783 }
784
785
786 /* end of dht_api.c */