-fixing source port randomization for DNS service
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define DEBUG_DHT_API GNUNET_EXTRA_LOGGING
38
39 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
40
41 /**
42  * Entry in our list of messages to be (re-)transmitted.
43  */
44 struct PendingMessage
45 {
46   /**
47    * This is a doubly-linked list.
48    */
49   struct PendingMessage *prev;
50
51   /**
52    * This is a doubly-linked list.
53    */
54   struct PendingMessage *next;
55
56   /**
57    * Message that is pending, allocated at the end
58    * of this struct.
59    */
60   const struct GNUNET_MessageHeader *msg;
61
62   /**
63    * Handle to the DHT API context.
64    */
65   struct GNUNET_DHT_Handle *handle;
66
67   /**
68    * Continuation to call when the request has been
69    * transmitted (for the first time) to the service; can be NULL.
70    */
71   GNUNET_SCHEDULER_Task cont;
72
73   /**
74    * Closure for 'cont'.
75    */
76   void *cont_cls;
77
78   /**
79    * Timeout task for this message
80    */
81   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82
83   /**
84    * Unique ID for this request
85    */
86   uint64_t unique_id;
87
88   /**
89    * Free the saved message once sent, set to GNUNET_YES for messages
90    * that do not receive responses; GNUNET_NO if this pending message
91    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
92    * from there.
93    */
94   int free_on_send;
95
96   /**
97    * GNUNET_YES if this message is in our pending queue right now.
98    */
99   int in_pending_queue;
100
101 };
102
103
104 /**
105  * Handle to a GET request
106  */
107 struct GNUNET_DHT_GetHandle
108 {
109
110   /**
111    * Iterator to call on data receipt
112    */
113   GNUNET_DHT_GetIterator iter;
114
115   /**
116    * Closure for the iterator callback
117    */
118   void *iter_cls;
119
120   /**
121    * Main handle to this DHT api
122    */
123   struct GNUNET_DHT_Handle *dht_handle;
124
125   /**
126    * The actual message sent for this request,
127    * used for retransmitting requests on service
128    * failure/reconnect.  Freed on route_stop.
129    */
130   struct PendingMessage *message;
131
132   /**
133    * Key that this get request is for
134    */
135   GNUNET_HashCode key;
136
137   /**
138    * Unique identifier for this request (for key collisions).
139    */
140   uint64_t unique_id;
141
142 };
143
144
145 /**
146  * Handle to a monitoring request.
147  */
148 struct GNUNET_DHT_MonitorHandle
149 {
150   /**
151    * DLL.
152    */
153   struct GNUNET_DHT_MonitorHandle *next;
154
155   /**
156    * DLL.
157    */
158   struct GNUNET_DHT_MonitorHandle *prev;
159   
160   /**
161    * Main handle to this DHT api.
162    */
163   struct GNUNET_DHT_Handle *dht_handle;
164
165   /**
166    * Type of block looked for.
167    */
168   enum GNUNET_BLOCK_Type type;
169
170   /**
171    * Key being looked for, NULL == all.
172    */
173   GNUNET_HashCode *key;
174
175   /**
176    * Callback for each received message of interest.
177    */
178   GNUNET_DHT_MonitorCB cb;
179
180   /**
181    * Closure for cb.
182    */
183   void *cb_cls;
184   
185 };
186
187
188 /**
189  * Connection to the DHT service.
190  */
191 struct GNUNET_DHT_Handle
192 {
193
194   /**
195    * Configuration to use.
196    */
197   const struct GNUNET_CONFIGURATION_Handle *cfg;
198
199   /**
200    * Socket (if available).
201    */
202   struct GNUNET_CLIENT_Connection *client;
203
204   /**
205    * Currently pending transmission request (or NULL).
206    */
207   struct GNUNET_CLIENT_TransmitHandle *th;
208
209   /**
210    * Head of linked list of messages we would like to transmit.
211    */
212   struct PendingMessage *pending_head;
213
214   /**
215    * Tail of linked list of messages we would like to transmit.
216    */
217   struct PendingMessage *pending_tail;
218
219   /**
220    * Head of linked list of messages we would like to monitor. 
221    */
222   struct GNUNET_DHT_MonitorHandle *monitor_head;
223
224   /**
225    * Tail of linked list of messages we would like to monitor.
226    */
227   struct GNUNET_DHT_MonitorHandle *monitor_tail;
228
229   /**
230    * Hash map containing the current outstanding unique requests
231    * (values are of type 'struct GNUNET_DHT_RouteHandle').
232    */
233   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
234
235   /**
236    * Task for trying to reconnect.
237    */
238   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
239
240   /**
241    * How quickly should we retry?  Used for exponential back-off on
242    * connect-errors.
243    */
244   struct GNUNET_TIME_Relative retry_time;
245
246   /**
247    * Generator for unique ids.
248    */
249   uint64_t uid_gen;
250
251   /**
252    * Did we start our receive loop yet?
253    */
254   int in_receive;
255 };
256
257
258 /**
259  * Handler for messages received from the DHT service
260  * a demultiplexer which handles numerous message types
261  *
262  */
263 static void
264 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
265
266
267 /**
268  * Try to (re)connect to the DHT service.
269  *
270  * @return GNUNET_YES on success, GNUNET_NO on failure.
271  */
272 static int
273 try_connect (struct GNUNET_DHT_Handle *handle)
274 {
275   if (handle->client != NULL)
276     return GNUNET_OK;
277   handle->in_receive = GNUNET_NO;
278   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
279   if (handle->client == NULL)
280   {
281     LOG (GNUNET_ERROR_TYPE_WARNING,
282          _("Failed to connect to the DHT service!\n"));
283     return GNUNET_NO;
284   }
285   return GNUNET_YES;
286 }
287
288
289 /**
290  * Add the request corresponding to the given route handle
291  * to the pending queue (if it is not already in there).
292  *
293  * @param cls the 'struct GNUNET_DHT_Handle*'
294  * @param key key for the request (not used)
295  * @param value the 'struct GNUNET_DHT_GetHandle*'
296  * @return GNUNET_YES (always)
297  */
298 static int
299 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
300 {
301   struct GNUNET_DHT_Handle *handle = cls;
302   struct GNUNET_DHT_GetHandle *rh = value;
303
304   if (GNUNET_NO == rh->message->in_pending_queue)
305   {
306 #if DEBUG_DHT
307     LOG (GNUNET_ERROR_TYPE_DEBUG,
308          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
309          handle);
310 #endif
311     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
312                                  rh->message);
313     rh->message->in_pending_queue = GNUNET_YES;
314   }
315   return GNUNET_YES;
316 }
317
318
319 /**
320  * Try to send messages from list of messages to send
321  * @param handle DHT_Handle
322  */
323 static void
324 process_pending_messages (struct GNUNET_DHT_Handle *handle);
325
326
327 /**
328  * Try reconnecting to the dht service.
329  *
330  * @param cls GNUNET_DHT_Handle
331  * @param tc scheduler context
332  */
333 static void
334 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
335 {
336   struct GNUNET_DHT_Handle *handle = cls;
337
338 #if DEBUG_DHT
339   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
340 #endif
341   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
342   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
343     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
344   else
345     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
346   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
347     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
348   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
349   if (GNUNET_YES != try_connect (handle))
350   {
351 #if DEBUG_DHT
352     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
353 #endif
354     return;
355   }
356   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
357                                          &add_request_to_pending, handle);
358   process_pending_messages (handle);
359 }
360
361
362 /**
363  * Try reconnecting to the DHT service.
364  *
365  * @param handle handle to dht to (possibly) disconnect and reconnect
366  */
367 static void
368 do_disconnect (struct GNUNET_DHT_Handle *handle)
369 {
370   if (handle->client == NULL)
371     return;
372   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
373   if (NULL != handle->th)
374     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
375   handle->th = NULL;
376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377               "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
378               (unsigned long long) handle->retry_time.rel_value);
379   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
380   handle->client = NULL;
381   handle->reconnect_task =
382       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
383 }
384
385
386 /**
387  * Transmit the next pending message, called by notify_transmit_ready
388  */
389 static size_t
390 transmit_pending (void *cls, size_t size, void *buf);
391
392
393 /**
394  * Try to send messages from list of messages to send
395  */
396 static void
397 process_pending_messages (struct GNUNET_DHT_Handle *handle)
398 {
399   struct PendingMessage *head;
400
401   if (handle->client == NULL)
402   {
403     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404                 "process_pending_messages called, but client is null, reconnecting\n");
405     do_disconnect (handle);
406     return;
407   }
408   if (handle->th != NULL)
409     return;
410   if (NULL == (head = handle->pending_head))
411     return;
412   handle->th =
413       GNUNET_CLIENT_notify_transmit_ready (handle->client,
414                                            ntohs (head->msg->size),
415                                            GNUNET_TIME_UNIT_FOREVER_REL,
416                                            GNUNET_YES, &transmit_pending,
417                                            handle);
418   if (NULL != handle->th)
419     return;
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421               "notify_transmit_ready returned NULL, reconnecting\n");
422   do_disconnect (handle);
423 }
424
425
426 /**
427  * Transmit the next pending message, called by notify_transmit_ready
428  */
429 static size_t
430 transmit_pending (void *cls, size_t size, void *buf)
431 {
432   struct GNUNET_DHT_Handle *handle = cls;
433   struct PendingMessage *head;
434   size_t tsize;
435
436   handle->th = NULL;
437   if (buf == NULL)
438   {
439 #if DEBUG_DHT
440     LOG (GNUNET_ERROR_TYPE_DEBUG,
441          "Transmission to DHT service failed!  Reconnecting!\n");
442 #endif
443     do_disconnect (handle);
444     return 0;
445   }
446   if (NULL == (head = handle->pending_head))
447     return 0;
448
449   tsize = ntohs (head->msg->size);
450   if (size < tsize)
451   {
452     process_pending_messages (handle);
453     return 0;
454   }
455   memcpy (buf, head->msg, tsize);
456   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
457                                head);
458   head->in_pending_queue = GNUNET_NO;
459   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
460   {
461     GNUNET_SCHEDULER_cancel (head->timeout_task);
462     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
463   }
464   if (NULL != head->cont)
465   {
466     GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
467                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
468     head->cont = NULL;
469     head->cont_cls = NULL;
470   }
471   if (GNUNET_YES == head->free_on_send)
472     GNUNET_free (head);
473   process_pending_messages (handle);
474 #if DEBUG_DHT
475   LOG (GNUNET_ERROR_TYPE_DEBUG,
476        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
477 #endif
478   if (GNUNET_NO == handle->in_receive)
479   {
480 #if DEBUG_DHT
481     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
482 #endif
483     handle->in_receive = GNUNET_YES;
484     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
485                            GNUNET_TIME_UNIT_FOREVER_REL);
486   }
487   return tsize;
488 }
489
490
491 /**
492  * Process a given reply that might match the given
493  * request.
494  *
495  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
496  * @param key query of the request
497  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
498  * @return GNUNET_YES to continue to iterate over all results,
499  *         GNUNET_NO if the reply is malformed
500  */
501 static int
502 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
503 {
504   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
505   struct GNUNET_DHT_GetHandle *get_handle = value;
506   const struct GNUNET_PeerIdentity *put_path;
507   const struct GNUNET_PeerIdentity *get_path;
508   uint32_t put_path_length;
509   uint32_t get_path_length;
510   size_t data_length;
511   size_t msize;
512   size_t meta_length;
513   const void *data;
514
515   if (dht_msg->unique_id != get_handle->unique_id)
516   {
517     /* UID mismatch */
518 #if DEBUG_DHT
519     LOG (GNUNET_ERROR_TYPE_DEBUG,
520          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
521          dht_msg->unique_id, get_handle->unique_id);
522 #endif
523     return GNUNET_YES;
524   }
525   msize = ntohs (dht_msg->header.size);
526   put_path_length = ntohl (dht_msg->put_path_length);
527   get_path_length = ntohl (dht_msg->get_path_length);
528   meta_length =
529       sizeof (struct GNUNET_DHT_ClientResultMessage) +
530       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
531   if ((msize < meta_length) ||
532       (get_path_length >
533        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
534       (put_path_length >
535        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
536   {
537     GNUNET_break (0);
538     return GNUNET_NO;
539   }
540   data_length = msize - meta_length;
541 #if DEBUG_DHT
542   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
543        (unsigned int) data_length, GNUNET_h2s (key));
544 #endif
545   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
546   get_path = &put_path[put_path_length];
547   data = &get_path[get_path_length];
548   get_handle->iter (get_handle->iter_cls,
549                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
550                     get_path, get_path_length, put_path, put_path_length,
551                     ntohl (dht_msg->type), data_length, data);
552   return GNUNET_YES;
553 }
554
555
556 /**
557  * Process a monitoring message from the service.
558  *
559  * @param handle The DHT handle.
560  * @param msg Message from the service.
561  * 
562  * @return GNUNET_OK if everything went fine,
563  *         GNUNET_SYSERR if the message is malformed.
564  */
565 static int
566 process_monitor_message (struct GNUNET_DHT_Handle *handle,
567                          const struct GNUNET_MessageHeader *msg)
568 {
569   struct GNUNET_DHT_MonitorMessage *m;
570   struct GNUNET_DHT_MonitorHandle *h;
571   size_t msize;
572
573   if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
574       ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
575     return GNUNET_SYSERR;
576   msize = ntohs (msg->size);
577   if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
578     return GNUNET_SYSERR;
579
580   m = (struct GNUNET_DHT_MonitorMessage *) msg;
581   h = handle->monitor_head;
582   while (NULL != h)
583   {
584     if (h->type == ntohl(m->type) &&
585       (NULL == h->key ||
586        memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
587     {
588       struct GNUNET_PeerIdentity *path;
589       uint32_t getl;
590       uint32_t putl;
591
592       path = (struct GNUNET_PeerIdentity *) &m[1];
593       getl = ntohl (m->get_path_length);
594       putl = ntohl (m->put_path_length);
595       h->cb (h->cb_cls, ntohs(msg->type),
596              GNUNET_TIME_absolute_ntoh(m->expiration),
597              &m->key,
598              &path[getl], putl, path, getl,
599              ntohl (m->desired_replication_level),
600              ntohl (m->options), ntohl (m->type),
601              (void *) &path[getl + putl],
602              ntohs (msg->size) -
603              sizeof (struct GNUNET_DHT_MonitorMessage) -
604              sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
605     }
606     h = h->next;
607   }
608
609   return GNUNET_OK;
610 }
611
612 /**
613  * Handler for messages received from the DHT service
614  * a demultiplexer which handles numerous message types
615  *
616  * @param cls the 'struct GNUNET_DHT_Handle'
617  * @param msg the incoming message
618  */
619 static void
620 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
621 {
622   struct GNUNET_DHT_Handle *handle = cls;
623   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
624
625   if (msg == NULL)
626   {
627 #if DEBUG_DHT
628     LOG (GNUNET_ERROR_TYPE_DEBUG,
629          "Error receiving data from DHT service, reconnecting\n");
630 #endif
631     do_disconnect (handle);
632     return;
633   }
634   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
635   {
636     if (process_monitor_message (handle, msg) == GNUNET_OK)
637     {
638       GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
639                              GNUNET_TIME_UNIT_FOREVER_REL);
640       return;
641     }
642     GNUNET_break (0);
643     do_disconnect (handle);
644     return;
645   }
646   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
647   {
648     GNUNET_break (0);
649     do_disconnect (handle);
650     return;
651   }
652   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
653 #if DEBUG_DHT
654   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
655        GNUNET_h2s (&dht_msg->key), handle);
656 #endif
657   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
658                                               &dht_msg->key, &process_reply,
659                                               (void *) dht_msg);
660   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
661                          GNUNET_TIME_UNIT_FOREVER_REL);
662 }
663
664
665 /**
666  * Initialize the connection with the DHT service.
667  *
668  * @param cfg configuration to use
669  * @param ht_len size of the internal hash table to use for
670  *               processing multiple GET/FIND requests in parallel
671  *
672  * @return handle to the DHT service, or NULL on error
673  */
674 struct GNUNET_DHT_Handle *
675 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
676                     unsigned int ht_len)
677 {
678   struct GNUNET_DHT_Handle *handle;
679
680   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
681   handle->cfg = cfg;
682   handle->uid_gen =
683       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
684   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
685   if (GNUNET_NO == try_connect (handle))
686   {
687     GNUNET_DHT_disconnect (handle);
688     return NULL;
689   }
690   return handle;
691 }
692
693
694 /**
695  * Shutdown connection with the DHT service.
696  *
697  * @param handle handle of the DHT connection to stop
698  */
699 void
700 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
701 {
702   struct PendingMessage *pm;
703
704   GNUNET_assert (handle != NULL);
705   GNUNET_assert (0 ==
706                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
707   if (handle->th != NULL)
708   {
709     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
710     handle->th = NULL;
711   }
712   while (NULL != (pm = handle->pending_head))
713   {
714     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
715     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
716                                  pm);
717     pm->in_pending_queue = GNUNET_NO;
718     GNUNET_assert (GNUNET_YES == pm->free_on_send);
719     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
720       GNUNET_SCHEDULER_cancel (pm->timeout_task);
721     if (NULL != pm->cont)
722       GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
723                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
724     GNUNET_free (pm);
725   }
726   if (handle->client != NULL)
727   {
728     GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
729     handle->client = NULL;
730   }
731   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
732     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
733   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
734   GNUNET_free (handle);
735 }
736
737
738 /**
739  * Timeout for the transmission of a fire&forget-request.  Clean it up.
740  *
741  * @param cls the 'struct PendingMessage'
742  * @param tc scheduler context
743  */
744 static void
745 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
746 {
747   struct PendingMessage *pending = cls;
748   struct GNUNET_DHT_Handle *handle;
749
750   handle = pending->handle;
751   GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
752   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
753                                pending);
754   pending->in_pending_queue = GNUNET_NO;
755   if (pending->cont != NULL)
756     pending->cont (pending->cont_cls, tc);
757   GNUNET_free (pending);
758 }
759
760
761 /**
762  * Perform a PUT operation storing data in the DHT.
763  *
764  * @param handle handle to DHT service
765  * @param key the key to store under
766  * @param desired_replication_level estimate of how many
767  *                nearest peers this request should reach
768  * @param options routing options for this message
769  * @param type type of the value
770  * @param size number of bytes in data; must be less than 64k
771  * @param data the data to store
772  * @param exp desired expiration time for the value
773  * @param timeout how long to wait for transmission of this request
774  * @param cont continuation to call when done (transmitting request to service)
775  * @param cont_cls closure for cont
776  */
777 void
778 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
779                 uint32_t desired_replication_level,
780                 enum GNUNET_DHT_RouteOption options,
781                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
782                 struct GNUNET_TIME_Absolute exp,
783                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
784                 void *cont_cls)
785 {
786   struct GNUNET_DHT_ClientPutMessage *put_msg;
787   size_t msize;
788   struct PendingMessage *pending;
789
790   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
791   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
792       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
793   {
794     GNUNET_break (0);
795     if (NULL != cont)
796       cont (cont_cls, NULL);
797     return;
798   }
799   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
800   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
801   pending->msg = &put_msg->header;
802   pending->handle = handle;
803   pending->cont = cont;
804   pending->cont_cls = cont_cls;
805   pending->free_on_send = GNUNET_YES;
806   pending->timeout_task =
807       GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
808   put_msg->header.size = htons (msize);
809   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
810   put_msg->type = htonl (type);
811   put_msg->options = htonl ((uint32_t) options);
812   put_msg->desired_replication_level = htonl (desired_replication_level);
813   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
814   put_msg->key = *key;
815   memcpy (&put_msg[1], data, size);
816   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
817                                pending);
818   pending->in_pending_queue = GNUNET_YES;
819   process_pending_messages (handle);
820 }
821
822
823 /**
824  * Perform an asynchronous GET operation on the DHT identified. See
825  * also "GNUNET_BLOCK_evaluate".
826  *
827  * @param handle handle to the DHT service
828  * @param timeout how long to wait for transmission of this request to the service
829  * @param type expected type of the response object
830  * @param key the key to look up
831  * @param desired_replication_level estimate of how many
832                   nearest peers this request should reach
833  * @param options routing options for this message
834  * @param xquery extended query data (can be NULL, depending on type)
835  * @param xquery_size number of bytes in xquery
836  * @param iter function to call on each result
837  * @param iter_cls closure for iter
838  * @return handle to stop the async get
839  */
840 struct GNUNET_DHT_GetHandle *
841 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
842                       struct GNUNET_TIME_Relative timeout,
843                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
844                       uint32_t desired_replication_level,
845                       enum GNUNET_DHT_RouteOption options, const void *xquery,
846                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
847                       void *iter_cls)
848 {
849   struct GNUNET_DHT_ClientGetMessage *get_msg;
850   struct GNUNET_DHT_GetHandle *get_handle;
851   size_t msize;
852   struct PendingMessage *pending;
853
854   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
855   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
856       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
857   {
858     GNUNET_break (0);
859     return NULL;
860   }
861 #if DEBUG_DHT
862   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
863        GNUNET_h2s (key), handle);
864 #endif
865   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
866   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
867   pending->msg = &get_msg->header;
868   pending->handle = handle;
869   pending->free_on_send = GNUNET_NO;
870   get_msg->header.size = htons (msize);
871   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
872   get_msg->options = htonl ((uint32_t) options);
873   get_msg->desired_replication_level = htonl (desired_replication_level);
874   get_msg->type = htonl (type);
875   get_msg->key = *key;
876   handle->uid_gen++;
877   get_msg->unique_id = handle->uid_gen;
878   memcpy (&get_msg[1], xquery, xquery_size);
879   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
880                                pending);
881   pending->in_pending_queue = GNUNET_YES;
882   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
883   get_handle->iter = iter;
884   get_handle->iter_cls = iter_cls;
885   get_handle->message = pending;
886   get_handle->unique_id = get_msg->unique_id;
887   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
888                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
889   process_pending_messages (handle);
890   return get_handle;
891 }
892
893
894 /**
895  * Stop async DHT-get.
896  *
897  * @param get_handle handle to the GET operation to stop
898  */
899 void
900 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
901 {
902   struct GNUNET_DHT_Handle *handle;
903   const struct GNUNET_DHT_ClientGetMessage *get_msg;
904   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
905   struct PendingMessage *pending;
906
907   handle = get_handle->message->handle;
908   get_msg =
909       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
910 #if DEBUG_DHT
911   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
912        GNUNET_h2s (&get_msg->key), handle);
913 #endif
914   /* generate STOP */
915   pending =
916       GNUNET_malloc (sizeof (struct PendingMessage) +
917                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
918   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
919   pending->msg = &stop_msg->header;
920   pending->handle = handle;
921   pending->free_on_send = GNUNET_YES;
922   stop_msg->header.size =
923       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
924   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
925   stop_msg->reserved = htonl (0);
926   stop_msg->unique_id = get_msg->unique_id;
927   stop_msg->key = get_msg->key;
928   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
929                                pending);
930   pending->in_pending_queue = GNUNET_YES;
931
932   /* remove 'GET' from active status */
933   GNUNET_assert (GNUNET_YES ==
934                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
935                                                        &get_msg->key,
936                                                        get_handle));
937   if (GNUNET_YES == get_handle->message->in_pending_queue)
938   {
939     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
940                                  get_handle->message);
941     get_handle->message->in_pending_queue = GNUNET_NO;
942   }
943   GNUNET_free (get_handle->message);
944   GNUNET_free (get_handle);
945
946   process_pending_messages (handle);
947 }
948
949
950 /**
951  * Start monitoring the local DHT service.
952  *
953  * @param handle Handle to the DHT service.
954  * @param type Type of blocks that are of interest.
955  * @param key Key of data of interest, NULL for all.
956  * @param cb Callback to process all monitored data.
957  * @param cb_cls Closure for cb.
958  *
959  * @return Handle to stop monitoring.
960  */
961 struct GNUNET_DHT_MonitorHandle *
962 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
963                           enum GNUNET_BLOCK_Type type,
964                           const GNUNET_HashCode *key,
965                           GNUNET_DHT_MonitorCB cb,
966                           void *cb_cls)
967 {
968   struct GNUNET_DHT_MonitorHandle *h;
969   struct GNUNET_DHT_MonitorMessage *m;
970   struct PendingMessage *pending;
971
972   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
973   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
974
975   GNUNET_assert (NULL != cb);
976   h->cb = cb;
977   h->cb_cls = cb_cls;
978   h->type = type;
979   h->dht_handle = handle;
980   if (NULL != key)
981   {
982     h->key = GNUNET_malloc (sizeof(GNUNET_HashCode));
983     memcpy (h->key, key, sizeof(GNUNET_HashCode));
984   }
985
986   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) +
987                            sizeof (struct PendingMessage));
988   m = (struct GNUNET_DHT_MonitorMessage *) &pending[1];
989   pending->msg = &m->header;
990   pending->handle = handle;
991   pending->free_on_send = GNUNET_YES;
992   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
993   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage));
994   m->type = htonl(type);
995   if (NULL != key)
996     memcpy (&m->key, key, sizeof(GNUNET_HashCode));
997   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
998                                pending);
999   pending->in_pending_queue = GNUNET_YES;
1000   process_pending_messages (handle);
1001
1002   return h;
1003 }
1004
1005
1006 /**
1007  * Stop monitoring.
1008  *
1009  * @param handle The handle to the monitor request returned by monitor_start.
1010  *
1011  * On return get_handle will no longer be valid, caller must not use again!!!
1012  */
1013 void
1014 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1015 {
1016   GNUNET_free_non_null (handle->key);
1017   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1018                                handle->dht_handle->monitor_tail,
1019                                handle);
1020   GNUNET_free (handle);
1021 }
1022
1023
1024
1025 /* end of dht_api.c */