ee91c6f0637d6b89dcbd9cfec135034d1dc3fa2e
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define DEBUG_DHT_API GNUNET_EXTRA_LOGGING
38
39 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
40
41 /**
42  * Entry in our list of messages to be (re-)transmitted.
43  */
44 struct PendingMessage
45 {
46   /**
47    * This is a doubly-linked list.
48    */
49   struct PendingMessage *prev;
50
51   /**
52    * This is a doubly-linked list.
53    */
54   struct PendingMessage *next;
55
56   /**
57    * Message that is pending, allocated at the end
58    * of this struct.
59    */
60   const struct GNUNET_MessageHeader *msg;
61
62   /**
63    * Handle to the DHT API context.
64    */
65   struct GNUNET_DHT_Handle *handle;
66
67   /**
68    * Continuation to call when the request has been
69    * transmitted (for the first time) to the service; can be NULL.
70    */
71   GNUNET_SCHEDULER_Task cont;
72
73   /**
74    * Closure for 'cont'.
75    */
76   void *cont_cls;
77
78   /**
79    * Timeout task for this message
80    */
81   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82
83   /**
84    * Unique ID for this request
85    */
86   uint64_t unique_id;
87
88   /**
89    * Free the saved message once sent, set to GNUNET_YES for messages
90    * that do not receive responses; GNUNET_NO if this pending message
91    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
92    * from there.
93    */
94   int free_on_send;
95
96   /**
97    * GNUNET_YES if this message is in our pending queue right now.
98    */
99   int in_pending_queue;
100
101 };
102
103
104 /**
105  * Handle to a GET request
106  */
107 struct GNUNET_DHT_GetHandle
108 {
109
110   /**
111    * Iterator to call on data receipt
112    */
113   GNUNET_DHT_GetIterator iter;
114
115   /**
116    * Closure for the iterator callback
117    */
118   void *iter_cls;
119
120   /**
121    * Main handle to this DHT api
122    */
123   struct GNUNET_DHT_Handle *dht_handle;
124
125   /**
126    * The actual message sent for this request,
127    * used for retransmitting requests on service
128    * failure/reconnect.  Freed on route_stop.
129    */
130   struct PendingMessage *message;
131
132   /**
133    * Key that this get request is for
134    */
135   GNUNET_HashCode key;
136
137   /**
138    * Unique identifier for this request (for key collisions).
139    */
140   uint64_t unique_id;
141
142 };
143
144
145 /**
146  * Connection to the DHT service.
147  */
148 struct GNUNET_DHT_Handle
149 {
150
151   /**
152    * Configuration to use.
153    */
154   const struct GNUNET_CONFIGURATION_Handle *cfg;
155
156   /**
157    * Socket (if available).
158    */
159   struct GNUNET_CLIENT_Connection *client;
160
161   /**
162    * Currently pending transmission request (or NULL).
163    */
164   struct GNUNET_CLIENT_TransmitHandle *th;
165
166   /**
167    * Head of linked list of messages we would like to transmit.
168    */
169   struct PendingMessage *pending_head;
170
171   /**
172    * Tail of linked list of messages we would like to transmit.
173    */
174   struct PendingMessage *pending_tail;
175
176   /**
177    * Hash map containing the current outstanding unique requests
178    * (values are of type 'struct GNUNET_DHT_RouteHandle').
179    */
180   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
181
182   /**
183    * Task for trying to reconnect.
184    */
185   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
186
187   /**
188    * How quickly should we retry?  Used for exponential back-off on
189    * connect-errors.
190    */
191   struct GNUNET_TIME_Relative retry_time;
192
193   /**
194    * Generator for unique ids.
195    */
196   uint64_t uid_gen;
197
198   /**
199    * Did we start our receive loop yet?
200    */
201   int in_receive;
202 };
203
204
205 /**
206  * Handler for messages received from the DHT service
207  * a demultiplexer which handles numerous message types
208  *
209  */
210 static void
211 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
212
213
214 /**
215  * Try to (re)connect to the DHT service.
216  *
217  * @return GNUNET_YES on success, GNUNET_NO on failure.
218  */
219 static int
220 try_connect (struct GNUNET_DHT_Handle *handle)
221 {
222   if (handle->client != NULL)
223     return GNUNET_OK;
224   handle->in_receive = GNUNET_NO;
225   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
226   if (handle->client == NULL)
227   {
228     LOG (GNUNET_ERROR_TYPE_WARNING,
229          _("Failed to connect to the DHT service!\n"));
230     return GNUNET_NO;
231   }
232   return GNUNET_YES;
233 }
234
235
236 /**
237  * Add the request corresponding to the given route handle
238  * to the pending queue (if it is not already in there).
239  *
240  * @param cls the 'struct GNUNET_DHT_Handle*'
241  * @param key key for the request (not used)
242  * @param value the 'struct GNUNET_DHT_GetHandle*'
243  * @return GNUNET_YES (always)
244  */
245 static int
246 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
247 {
248   struct GNUNET_DHT_Handle *handle = cls;
249   struct GNUNET_DHT_GetHandle *rh = value;
250
251   if (GNUNET_NO == rh->message->in_pending_queue)
252   {    
253     LOG (GNUNET_ERROR_TYPE_DEBUG,
254          "Retransmitting request related to %s to DHT %p\n",
255          GNUNET_h2s (key),
256          handle);
257     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
258                                  rh->message);
259     rh->message->in_pending_queue = GNUNET_YES;
260   }
261   return GNUNET_YES;
262 }
263
264
265 /**
266  * Try to send messages from list of messages to send
267  * @param handle DHT_Handle
268  */
269 static void
270 process_pending_messages (struct GNUNET_DHT_Handle *handle);
271
272
273 /**
274  * Try reconnecting to the dht service.
275  *
276  * @param cls GNUNET_DHT_Handle
277  * @param tc scheduler context
278  */
279 static void
280 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
281 {
282   struct GNUNET_DHT_Handle *handle = cls;
283
284   LOG (GNUNET_ERROR_TYPE_DEBUG,
285        "Reconnecting with DHT %p\n",
286        handle);
287   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
288   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
289     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
290   else
291     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
292   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
293     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
294   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
295   if (GNUNET_YES != try_connect (handle))
296   {
297     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
298     return;
299   }
300   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
301                                          &add_request_to_pending, handle);
302   process_pending_messages (handle);
303 }
304
305
306 /**
307  * Try reconnecting to the DHT service.
308  *
309  * @param handle handle to dht to (possibly) disconnect and reconnect
310  */
311 static void
312 do_disconnect (struct GNUNET_DHT_Handle *handle)
313 {
314   if (handle->client == NULL)
315     return;
316   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
317   if (NULL != handle->th)
318     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
319   handle->th = NULL;
320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
321               "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
322               (unsigned long long) handle->retry_time.rel_value);
323   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
324   handle->client = NULL;
325   handle->reconnect_task =
326       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
327 }
328
329
330 /**
331  * Transmit the next pending message, called by notify_transmit_ready
332  */
333 static size_t
334 transmit_pending (void *cls, size_t size, void *buf);
335
336
337 /**
338  * Try to send messages from list of messages to send
339  */
340 static void
341 process_pending_messages (struct GNUNET_DHT_Handle *handle)
342 {
343   struct PendingMessage *head;
344
345   if (handle->client == NULL)
346   {
347     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348                 "process_pending_messages called, but client is null, reconnecting\n");
349     do_disconnect (handle);
350     return;
351   }
352   if (handle->th != NULL)
353     return;
354   if (NULL == (head = handle->pending_head))
355     return;
356   handle->th =
357       GNUNET_CLIENT_notify_transmit_ready (handle->client,
358                                            ntohs (head->msg->size),
359                                            GNUNET_TIME_UNIT_FOREVER_REL,
360                                            GNUNET_YES, &transmit_pending,
361                                            handle);
362   if (NULL != handle->th)
363     return;
364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365               "notify_transmit_ready returned NULL, reconnecting\n");
366   do_disconnect (handle);
367 }
368
369
370 /**
371  * Transmit the next pending message, called by notify_transmit_ready
372  */
373 static size_t
374 transmit_pending (void *cls, size_t size, void *buf)
375 {
376   struct GNUNET_DHT_Handle *handle = cls;
377   struct PendingMessage *head;
378   size_t tsize;
379
380   handle->th = NULL;
381   if (buf == NULL)
382   {
383     LOG (GNUNET_ERROR_TYPE_DEBUG,
384          "Transmission to DHT service failed!  Reconnecting!\n");
385     do_disconnect (handle);
386     return 0;
387   }
388   if (NULL == (head = handle->pending_head))
389     return 0;
390
391   tsize = ntohs (head->msg->size);
392   if (size < tsize)
393   {
394     process_pending_messages (handle);
395     return 0;
396   }
397   memcpy (buf, head->msg, tsize);
398   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
399                                head);
400   head->in_pending_queue = GNUNET_NO;
401   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
402   {
403     GNUNET_SCHEDULER_cancel (head->timeout_task);
404     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
405   }
406   if (NULL != head->cont)
407   {
408     GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
409                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
410     head->cont = NULL;
411     head->cont_cls = NULL;
412   }
413   if (GNUNET_YES == head->free_on_send)
414     GNUNET_free (head);
415   process_pending_messages (handle);
416 #if DEBUG_DHT
417   LOG (GNUNET_ERROR_TYPE_DEBUG,
418        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
419 #endif
420   if (GNUNET_NO == handle->in_receive)
421   {
422 #if DEBUG_DHT
423     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
424 #endif
425     handle->in_receive = GNUNET_YES;
426     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
427                            GNUNET_TIME_UNIT_FOREVER_REL);
428   }
429   return tsize;
430 }
431
432
433 /**
434  * Process a given reply that might match the given
435  * request.
436  *
437  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
438  * @param key query of the request
439  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
440  * @return GNUNET_YES to continue to iterate over all results,
441  *         GNUNET_NO if the reply is malformed
442  */
443 static int
444 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
445 {
446   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
447   struct GNUNET_DHT_GetHandle *get_handle = value;
448   const struct GNUNET_PeerIdentity *put_path;
449   const struct GNUNET_PeerIdentity *get_path;
450   uint32_t put_path_length;
451   uint32_t get_path_length;
452   size_t data_length;
453   size_t msize;
454   size_t meta_length;
455   const void *data;
456
457   if (dht_msg->unique_id != get_handle->unique_id)
458   {
459     /* UID mismatch */
460     LOG (GNUNET_ERROR_TYPE_DEBUG, 
461          "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
462          GNUNET_h2s (key),
463          dht_msg->unique_id, get_handle->unique_id);
464     return GNUNET_YES;
465   }
466   msize = ntohs (dht_msg->header.size);
467   put_path_length = ntohl (dht_msg->put_path_length);
468   get_path_length = ntohl (dht_msg->get_path_length);
469   meta_length =
470       sizeof (struct GNUNET_DHT_ClientResultMessage) +
471       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
472   if ((msize < meta_length) ||
473       (get_path_length >
474        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
475       (put_path_length >
476        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
477   {
478     GNUNET_break (0);
479     return GNUNET_NO;
480   }
481   data_length = msize - meta_length;
482   LOG (GNUNET_ERROR_TYPE_DEBUG, 
483        "Giving %u byte reply for %s to application\n",
484        (unsigned int) data_length,
485        GNUNET_h2s (key));
486   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
487   get_path = &put_path[put_path_length];
488   data = &get_path[get_path_length];
489   get_handle->iter (get_handle->iter_cls,
490                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
491                     get_path, get_path_length, put_path, put_path_length,
492                     ntohl (dht_msg->type), data_length, data);
493   return GNUNET_YES;
494 }
495
496
497 /**
498  * Handler for messages received from the DHT service
499  * a demultiplexer which handles numerous message types
500  *
501  * @param cls the 'struct GNUNET_DHT_Handle'
502  * @param msg the incoming message
503  */
504 static void
505 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
506 {
507   struct GNUNET_DHT_Handle *handle = cls;
508   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
509
510   if (msg == NULL)
511   {
512     LOG (GNUNET_ERROR_TYPE_DEBUG,
513          "Error receiving data from DHT service, reconnecting\n");
514     do_disconnect (handle);
515     return;
516   }
517   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
518   {
519     GNUNET_break (0);
520     do_disconnect (handle);
521     return;
522   }
523   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
524   {
525     GNUNET_break (0);
526     do_disconnect (handle);
527     return;
528   }
529   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
530   LOG (GNUNET_ERROR_TYPE_DEBUG, 
531        "Received reply for `%s' from DHT service %p\n",
532        GNUNET_h2s (&dht_msg->key),
533        handle);
534   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
535                                               &dht_msg->key, &process_reply,
536                                               (void *) dht_msg);
537   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
538                          GNUNET_TIME_UNIT_FOREVER_REL);
539 }
540
541
542 /**
543  * Initialize the connection with the DHT service.
544  *
545  * @param cfg configuration to use
546  * @param ht_len size of the internal hash table to use for
547  *               processing multiple GET/FIND requests in parallel
548  *
549  * @return handle to the DHT service, or NULL on error
550  */
551 struct GNUNET_DHT_Handle *
552 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
553                     unsigned int ht_len)
554 {
555   struct GNUNET_DHT_Handle *handle;
556
557   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
558   handle->cfg = cfg;
559   handle->uid_gen =
560       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
561   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
562   if (GNUNET_NO == try_connect (handle))
563   {
564     GNUNET_DHT_disconnect (handle);
565     return NULL;
566   }
567   return handle;
568 }
569
570
571 /**
572  * Shutdown connection with the DHT service.
573  *
574  * @param handle handle of the DHT connection to stop
575  */
576 void
577 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
578 {
579   struct PendingMessage *pm;
580
581   GNUNET_assert (handle != NULL);
582   GNUNET_assert (0 ==
583                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
584   if (handle->th != NULL)
585   {
586     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
587     handle->th = NULL;
588   }
589   while (NULL != (pm = handle->pending_head))
590   {
591     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
592     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
593                                  pm);
594     pm->in_pending_queue = GNUNET_NO;
595     GNUNET_assert (GNUNET_YES == pm->free_on_send);
596     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
597       GNUNET_SCHEDULER_cancel (pm->timeout_task);
598     if (NULL != pm->cont)
599       GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
600                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
601     GNUNET_free (pm);
602   }
603   if (handle->client != NULL)
604   {
605     GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
606     handle->client = NULL;
607   }
608   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
609     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
610   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
611   GNUNET_free (handle);
612 }
613
614
615 /**
616  * Timeout for the transmission of a fire&forget-request.  Clean it up.
617  *
618  * @param cls the 'struct PendingMessage'
619  * @param tc scheduler context
620  */
621 static void
622 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
623 {
624   struct PendingMessage *pending = cls;
625   struct GNUNET_DHT_Handle *handle;
626
627   handle = pending->handle;
628     GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
629   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
630                                pending);
631   pending->in_pending_queue = GNUNET_NO;
632   if (pending->cont != NULL)
633     pending->cont (pending->cont_cls, tc);
634   GNUNET_free (pending);
635 }
636
637
638 /**
639  * Perform a PUT operation storing data in the DHT.
640  *
641  * @param handle handle to DHT service
642  * @param key the key to store under
643  * @param desired_replication_level estimate of how many
644  *                nearest peers this request should reach
645  * @param options routing options for this message
646  * @param type type of the value
647  * @param size number of bytes in data; must be less than 64k
648  * @param data the data to store
649  * @param exp desired expiration time for the value
650  * @param timeout how long to wait for transmission of this request
651  * @param cont continuation to call when done (transmitting request to service)
652  * @param cont_cls closure for cont
653  */
654 void
655 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
656                 uint32_t desired_replication_level,
657                 enum GNUNET_DHT_RouteOption options,
658                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
659                 struct GNUNET_TIME_Absolute exp,
660                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
661                 void *cont_cls)
662 {
663   struct GNUNET_DHT_ClientPutMessage *put_msg;
664   size_t msize;
665   struct PendingMessage *pending;
666
667   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
668   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
669       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
670   {
671     GNUNET_break (0);
672     if (NULL != cont)
673       cont (cont_cls, NULL);
674     return;
675   }
676   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
677   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
678   pending->msg = &put_msg->header;
679   pending->handle = handle;
680   pending->cont = cont;
681   pending->cont_cls = cont_cls;
682   pending->free_on_send = GNUNET_YES;
683   pending->timeout_task =
684       GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
685   put_msg->header.size = htons (msize);
686   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
687   put_msg->type = htonl (type);
688   put_msg->options = htonl ((uint32_t) options);
689   put_msg->desired_replication_level = htonl (desired_replication_level);
690   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
691   put_msg->key = *key;
692   memcpy (&put_msg[1], data, size);
693   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
694                                pending);
695   pending->in_pending_queue = GNUNET_YES;
696   process_pending_messages (handle);
697 }
698
699
700 /**
701  * Perform an asynchronous GET operation on the DHT identified. See
702  * also "GNUNET_BLOCK_evaluate".
703  *
704  * @param handle handle to the DHT service
705  * @param timeout how long to wait for transmission of this request to the service
706  * @param type expected type of the response object
707  * @param key the key to look up
708  * @param desired_replication_level estimate of how many
709                   nearest peers this request should reach
710  * @param options routing options for this message
711  * @param xquery extended query data (can be NULL, depending on type)
712  * @param xquery_size number of bytes in xquery
713  * @param iter function to call on each result
714  * @param iter_cls closure for iter
715  * @return handle to stop the async get
716  */
717 struct GNUNET_DHT_GetHandle *
718 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
719                       struct GNUNET_TIME_Relative timeout,
720                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
721                       uint32_t desired_replication_level,
722                       enum GNUNET_DHT_RouteOption options, const void *xquery,
723                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
724                       void *iter_cls)
725 {
726   struct GNUNET_DHT_ClientGetMessage *get_msg;
727   struct GNUNET_DHT_GetHandle *get_handle;
728   size_t msize;
729   struct PendingMessage *pending;
730
731   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
732   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
733       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
734   {
735     GNUNET_break (0);
736     return NULL;
737   }
738   LOG (GNUNET_ERROR_TYPE_DEBUG,
739        "Sending query for %s to DHT %p\n",
740        GNUNET_h2s (key),
741        handle);
742   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
743   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
744   pending->msg = &get_msg->header;
745   pending->handle = handle;
746   pending->free_on_send = GNUNET_NO;
747   get_msg->header.size = htons (msize);
748   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
749   get_msg->options = htonl ((uint32_t) options);
750   get_msg->desired_replication_level = htonl (desired_replication_level);
751   get_msg->type = htonl (type);
752   get_msg->key = *key;
753   handle->uid_gen++;
754   get_msg->unique_id = handle->uid_gen;
755   memcpy (&get_msg[1], xquery, xquery_size);
756   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
757                                pending);
758   pending->in_pending_queue = GNUNET_YES;
759   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
760   get_handle->iter = iter;
761   get_handle->iter_cls = iter_cls;
762   get_handle->message = pending;
763   get_handle->unique_id = get_msg->unique_id;
764   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
765                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
766   process_pending_messages (handle);
767   return get_handle;
768 }
769
770
771 /**
772  * Stop async DHT-get.
773  *
774  * @param get_handle handle to the GET operation to stop
775  */
776 void
777 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
778 {
779   struct GNUNET_DHT_Handle *handle;
780   const struct GNUNET_DHT_ClientGetMessage *get_msg;
781   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
782   struct PendingMessage *pending;
783
784   handle = get_handle->message->handle;
785   get_msg =
786       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
787   LOG (GNUNET_ERROR_TYPE_DEBUG,
788        "Sending STOP for %s to DHT via %p\n",
789        GNUNET_h2s (&get_msg->key),
790        handle);
791   /* generate STOP */
792   pending =
793       GNUNET_malloc (sizeof (struct PendingMessage) +
794                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
795   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
796   pending->msg = &stop_msg->header;
797   pending->handle = handle;
798   pending->free_on_send = GNUNET_YES;
799   stop_msg->header.size =
800       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
801   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
802   stop_msg->reserved = htonl (0);
803   stop_msg->unique_id = get_msg->unique_id;
804   stop_msg->key = get_msg->key;
805   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
806                                pending);
807   pending->in_pending_queue = GNUNET_YES;
808
809   /* remove 'GET' from active status */
810   GNUNET_assert (GNUNET_YES ==
811                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
812                                                        &get_msg->key,
813                                                        get_handle));
814   if (GNUNET_YES == get_handle->message->in_pending_queue)
815   {
816     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
817                                  get_handle->message);
818     get_handle->message->in_pending_queue = GNUNET_NO;
819   }
820   GNUNET_free (get_handle->message);
821   GNUNET_free (get_handle);
822
823   process_pending_messages (handle);
824 }
825
826
827 /* end of dht_api.c */