a715fec9dd51c8ff79b568b0dfb9115ebd331635
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define DEBUG_DHT_API GNUNET_EXTRA_LOGGING
38
39 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
40
41 /**
42  * Entry in our list of messages to be (re-)transmitted.
43  */
44 struct PendingMessage
45 {
46   /**
47    * This is a doubly-linked list.
48    */
49   struct PendingMessage *prev;
50
51   /**
52    * This is a doubly-linked list.
53    */
54   struct PendingMessage *next;
55
56   /**
57    * Message that is pending, allocated at the end
58    * of this struct.
59    */
60   const struct GNUNET_MessageHeader *msg;
61
62   /**
63    * Handle to the DHT API context.
64    */
65   struct GNUNET_DHT_Handle *handle;
66
67   /**
68    * Continuation to call when the request has been
69    * transmitted (for the first time) to the service; can be NULL.
70    */
71   GNUNET_SCHEDULER_Task cont;
72
73   /**
74    * Closure for 'cont'.
75    */
76   void *cont_cls;
77
78   /**
79    * Timeout task for this message
80    */
81   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82
83   /**
84    * Unique ID for this request
85    */
86   uint64_t unique_id;
87
88   /**
89    * Free the saved message once sent, set to GNUNET_YES for messages
90    * that do not receive responses; GNUNET_NO if this pending message
91    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
92    * from there.
93    */
94   int free_on_send;
95
96   /**
97    * GNUNET_YES if this message is in our pending queue right now.
98    */
99   int in_pending_queue;
100
101 };
102
103
104 /**
105  * Handle to a GET request
106  */
107 struct GNUNET_DHT_GetHandle
108 {
109
110   /**
111    * Iterator to call on data receipt
112    */
113   GNUNET_DHT_GetIterator iter;
114
115   /**
116    * Closure for the iterator callback
117    */
118   void *iter_cls;
119
120   /**
121    * Main handle to this DHT api
122    */
123   struct GNUNET_DHT_Handle *dht_handle;
124
125   /**
126    * The actual message sent for this request,
127    * used for retransmitting requests on service
128    * failure/reconnect.  Freed on route_stop.
129    */
130   struct PendingMessage *message;
131
132   /**
133    * Key that this get request is for
134    */
135   GNUNET_HashCode key;
136
137   /**
138    * Unique identifier for this request (for key collisions).
139    */
140   uint64_t unique_id;
141
142 };
143
144
145 /**
146  * Handle to a monitoring request.
147  */
148 struct GNUNET_DHT_MonitorHandle
149 {
150   /**
151    * DLL.
152    */
153   struct GNUNET_DHT_MonitorHandle *next;
154
155   /**
156    * DLL.
157    */
158   struct GNUNET_DHT_MonitorHandle *prev;
159   
160   /**
161    * Main handle to this DHT api.
162    */
163   struct GNUNET_DHT_Handle *dht_handle;
164
165   /**
166    * Type of block looked for.
167    */
168   enum GNUNET_BLOCK_Type type;
169
170   /**
171    * Key being looked for, NULL == all.
172    */
173   GNUNET_HashCode *key;
174
175   /**
176    * Callback for each received message of interest.
177    */
178   GNUNET_DHT_MonitorCB cb;
179
180   /**
181    * Closure for cb.
182    */
183   void *cb_cls;
184   
185 };
186
187
188 /**
189  * Connection to the DHT service.
190  */
191 struct GNUNET_DHT_Handle
192 {
193
194   /**
195    * Configuration to use.
196    */
197   const struct GNUNET_CONFIGURATION_Handle *cfg;
198
199   /**
200    * Socket (if available).
201    */
202   struct GNUNET_CLIENT_Connection *client;
203
204   /**
205    * Currently pending transmission request (or NULL).
206    */
207   struct GNUNET_CLIENT_TransmitHandle *th;
208
209   /**
210    * Head of linked list of messages we would like to transmit.
211    */
212   struct PendingMessage *pending_head;
213
214   /**
215    * Tail of linked list of messages we would like to transmit.
216    */
217   struct PendingMessage *pending_tail;
218
219   /**
220    * Head of linked list of messages we would like to monitor. 
221    */
222   struct GNUNET_DHT_MonitorHandle *monitor_head;
223
224   /**
225    * Tail of linked list of messages we would like to monitor.
226    */
227   struct GNUNET_DHT_MonitorHandle *monitor_tail;
228
229   /**
230    * Hash map containing the current outstanding unique requests
231    * (values are of type 'struct GNUNET_DHT_RouteHandle').
232    */
233   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
234
235   /**
236    * Task for trying to reconnect.
237    */
238   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
239
240   /**
241    * How quickly should we retry?  Used for exponential back-off on
242    * connect-errors.
243    */
244   struct GNUNET_TIME_Relative retry_time;
245
246   /**
247    * Generator for unique ids.
248    */
249   uint64_t uid_gen;
250
251   /**
252    * Did we start our receive loop yet?
253    */
254   int in_receive;
255 };
256
257
258 /**
259  * Handler for messages received from the DHT service
260  * a demultiplexer which handles numerous message types
261  *
262  */
263 static void
264 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
265
266
267 /**
268  * Try to (re)connect to the DHT service.
269  *
270  * @return GNUNET_YES on success, GNUNET_NO on failure.
271  */
272 static int
273 try_connect (struct GNUNET_DHT_Handle *handle)
274 {
275   if (handle->client != NULL)
276     return GNUNET_OK;
277   handle->in_receive = GNUNET_NO;
278   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
279   if (handle->client == NULL)
280   {
281     LOG (GNUNET_ERROR_TYPE_WARNING,
282          _("Failed to connect to the DHT service!\n"));
283     return GNUNET_NO;
284   }
285   return GNUNET_YES;
286 }
287
288
289 /**
290  * Add the request corresponding to the given route handle
291  * to the pending queue (if it is not already in there).
292  *
293  * @param cls the 'struct GNUNET_DHT_Handle*'
294  * @param key key for the request (not used)
295  * @param value the 'struct GNUNET_DHT_GetHandle*'
296  * @return GNUNET_YES (always)
297  */
298 static int
299 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
300 {
301   struct GNUNET_DHT_Handle *handle = cls;
302   struct GNUNET_DHT_GetHandle *rh = value;
303
304   if (GNUNET_NO == rh->message->in_pending_queue)
305   {
306 #if DEBUG_DHT
307     LOG (GNUNET_ERROR_TYPE_DEBUG,
308          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
309          handle);
310 #endif
311     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
312                                  rh->message);
313     rh->message->in_pending_queue = GNUNET_YES;
314   }
315   return GNUNET_YES;
316 }
317
318
319 /**
320  * Try to send messages from list of messages to send
321  * @param handle DHT_Handle
322  */
323 static void
324 process_pending_messages (struct GNUNET_DHT_Handle *handle);
325
326
327 /**
328  * Try reconnecting to the dht service.
329  *
330  * @param cls GNUNET_DHT_Handle
331  * @param tc scheduler context
332  */
333 static void
334 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
335 {
336   struct GNUNET_DHT_Handle *handle = cls;
337
338 #if DEBUG_DHT
339   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
340 #endif
341   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
342   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
343     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
344   else
345     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
346   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
347     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
348   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
349   if (GNUNET_YES != try_connect (handle))
350   {
351 #if DEBUG_DHT
352     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
353 #endif
354     return;
355   }
356   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
357                                          &add_request_to_pending, handle);
358   process_pending_messages (handle);
359 }
360
361
362 /**
363  * Try reconnecting to the DHT service.
364  *
365  * @param handle handle to dht to (possibly) disconnect and reconnect
366  */
367 static void
368 do_disconnect (struct GNUNET_DHT_Handle *handle)
369 {
370   if (handle->client == NULL)
371     return;
372   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
373   if (NULL != handle->th)
374     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
375   handle->th = NULL;
376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377               "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
378               (unsigned long long) handle->retry_time.rel_value);
379   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
380   handle->client = NULL;
381   handle->reconnect_task =
382       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
383 }
384
385
386 /**
387  * Transmit the next pending message, called by notify_transmit_ready
388  */
389 static size_t
390 transmit_pending (void *cls, size_t size, void *buf);
391
392
393 /**
394  * Try to send messages from list of messages to send
395  */
396 static void
397 process_pending_messages (struct GNUNET_DHT_Handle *handle)
398 {
399   struct PendingMessage *head;
400
401   if (handle->client == NULL)
402   {
403     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404                 "process_pending_messages called, but client is null, reconnecting\n");
405     do_disconnect (handle);
406     return;
407   }
408   if (handle->th != NULL)
409     return;
410   if (NULL == (head = handle->pending_head))
411     return;
412   handle->th =
413       GNUNET_CLIENT_notify_transmit_ready (handle->client,
414                                            ntohs (head->msg->size),
415                                            GNUNET_TIME_UNIT_FOREVER_REL,
416                                            GNUNET_YES, &transmit_pending,
417                                            handle);
418   if (NULL != handle->th)
419     return;
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421               "notify_transmit_ready returned NULL, reconnecting\n");
422   do_disconnect (handle);
423 }
424
425
426 /**
427  * Transmit the next pending message, called by notify_transmit_ready
428  */
429 static size_t
430 transmit_pending (void *cls, size_t size, void *buf)
431 {
432   struct GNUNET_DHT_Handle *handle = cls;
433   struct PendingMessage *head;
434   size_t tsize;
435
436   handle->th = NULL;
437   if (buf == NULL)
438   {
439 #if DEBUG_DHT
440     LOG (GNUNET_ERROR_TYPE_DEBUG,
441          "Transmission to DHT service failed!  Reconnecting!\n");
442 #endif
443     do_disconnect (handle);
444     return 0;
445   }
446   if (NULL == (head = handle->pending_head))
447     return 0;
448
449   tsize = ntohs (head->msg->size);
450   if (size < tsize)
451   {
452     process_pending_messages (handle);
453     return 0;
454   }
455   memcpy (buf, head->msg, tsize);
456   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
457                                head);
458   head->in_pending_queue = GNUNET_NO;
459   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
460   {
461     GNUNET_SCHEDULER_cancel (head->timeout_task);
462     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
463   }
464   if (NULL != head->cont)
465   {
466     GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
467                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
468     head->cont = NULL;
469     head->cont_cls = NULL;
470   }
471   if (GNUNET_YES == head->free_on_send)
472     GNUNET_free (head);
473   process_pending_messages (handle);
474 #if DEBUG_DHT
475   LOG (GNUNET_ERROR_TYPE_DEBUG,
476        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
477 #endif
478   if (GNUNET_NO == handle->in_receive)
479   {
480 #if DEBUG_DHT
481     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
482 #endif
483     handle->in_receive = GNUNET_YES;
484     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
485                            GNUNET_TIME_UNIT_FOREVER_REL);
486   }
487   return tsize;
488 }
489
490
491 /**
492  * Process a given reply that might match the given
493  * request.
494  *
495  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
496  * @param key query of the request
497  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
498  * @return GNUNET_YES to continue to iterate over all results,
499  *         GNUNET_NO if the reply is malformed
500  */
501 static int
502 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
503 {
504   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
505   struct GNUNET_DHT_GetHandle *get_handle = value;
506   const struct GNUNET_PeerIdentity *put_path;
507   const struct GNUNET_PeerIdentity *get_path;
508   uint32_t put_path_length;
509   uint32_t get_path_length;
510   size_t data_length;
511   size_t msize;
512   size_t meta_length;
513   const void *data;
514
515   if (dht_msg->unique_id != get_handle->unique_id)
516   {
517     /* UID mismatch */
518 #if DEBUG_DHT
519     LOG (GNUNET_ERROR_TYPE_DEBUG,
520          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
521          dht_msg->unique_id, get_handle->unique_id);
522 #endif
523     return GNUNET_YES;
524   }
525   msize = ntohs (dht_msg->header.size);
526   put_path_length = ntohl (dht_msg->put_path_length);
527   get_path_length = ntohl (dht_msg->get_path_length);
528   meta_length =
529       sizeof (struct GNUNET_DHT_ClientResultMessage) +
530       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
531   if ((msize < meta_length) ||
532       (get_path_length >
533        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
534       (put_path_length >
535        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
536   {
537     GNUNET_break (0);
538     return GNUNET_NO;
539   }
540   data_length = msize - meta_length;
541 #if DEBUG_DHT
542   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
543        (unsigned int) data_length, GNUNET_h2s (key));
544 #endif
545   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
546   get_path = &put_path[put_path_length];
547   data = &get_path[get_path_length];
548   get_handle->iter (get_handle->iter_cls,
549                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
550                     get_path, get_path_length, put_path, put_path_length,
551                     ntohl (dht_msg->type), data_length, data);
552   return GNUNET_YES;
553 }
554
555
556 /**
557  * Process a monitoring message from the service.
558  *
559  * @param handle The DHT handle.
560  * @param msg Message from the service.
561  * 
562  * @return GNUNET_OK if everything went fine,
563  *         GNUNET_SYSERR if the message is malformed.
564  */
565 static int
566 process_monitor_message (struct GNUNET_DHT_Handle *handle,
567                          const struct GNUNET_MessageHeader *msg)
568 {
569   struct GNUNET_DHT_MonitorMessage *m;
570   struct GNUNET_DHT_MonitorHandle *h;
571   size_t msize;
572
573   if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
574       ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
575     return GNUNET_SYSERR;
576   msize = ntohs (msg->size);
577   if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
578     return GNUNET_SYSERR;
579
580   m = (struct GNUNET_DHT_MonitorMessage *) msg;
581   h = handle->monitor_head;
582   while (NULL != h)
583   {
584     if (h->type == ntohl(m->type) &&
585       (NULL == h->key ||
586        memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
587     {
588       struct GNUNET_PeerIdentity *path;
589       uint32_t getl;
590       uint32_t putl;
591
592       path = (struct GNUNET_PeerIdentity *) &m[1];
593       getl = ntohl (m->get_path_length);
594       putl = ntohl (m->put_path_length);
595       h->cb (h->cb_cls, ntohs(msg->type),
596              GNUNET_TIME_absolute_ntoh(m->expiration),
597              &m->key,
598              &path[getl], putl, path, getl,
599              ntohl (m->desired_replication_level),
600              ntohl (m->options), ntohl (m->type),
601              (void *) &path[getl + putl],
602              ntohs (msg->size) -
603              sizeof (struct GNUNET_DHT_MonitorMessage) -
604              sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
605     }
606     h = h->next;
607   }
608
609   return GNUNET_OK;
610 }
611
612 /**
613  * Handler for messages received from the DHT service
614  * a demultiplexer which handles numerous message types
615  *
616  * @param cls the 'struct GNUNET_DHT_Handle'
617  * @param msg the incoming message
618  */
619 static void
620 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
621 {
622   struct GNUNET_DHT_Handle *handle = cls;
623   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
624
625   if (msg == NULL)
626   {
627 #if DEBUG_DHT
628     LOG (GNUNET_ERROR_TYPE_DEBUG,
629          "Error receiving data from DHT service, reconnecting\n");
630 #endif
631     do_disconnect (handle);
632     return;
633   }
634   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
635   {
636     if (process_monitor_message (handle, msg) == GNUNET_OK)
637       return;
638     GNUNET_break (0);
639     do_disconnect (handle);
640     return;
641   }
642   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
643   {
644     GNUNET_break (0);
645     do_disconnect (handle);
646     return;
647   }
648   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
649 #if DEBUG_DHT
650   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
651        GNUNET_h2s (&dht_msg->key), handle);
652 #endif
653   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
654                                               &dht_msg->key, &process_reply,
655                                               (void *) dht_msg);
656   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
657                          GNUNET_TIME_UNIT_FOREVER_REL);
658 }
659
660
661 /**
662  * Initialize the connection with the DHT service.
663  *
664  * @param cfg configuration to use
665  * @param ht_len size of the internal hash table to use for
666  *               processing multiple GET/FIND requests in parallel
667  *
668  * @return handle to the DHT service, or NULL on error
669  */
670 struct GNUNET_DHT_Handle *
671 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
672                     unsigned int ht_len)
673 {
674   struct GNUNET_DHT_Handle *handle;
675
676   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
677   handle->cfg = cfg;
678   handle->uid_gen =
679       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
680   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
681   if (GNUNET_NO == try_connect (handle))
682   {
683     GNUNET_DHT_disconnect (handle);
684     return NULL;
685   }
686   return handle;
687 }
688
689
690 /**
691  * Shutdown connection with the DHT service.
692  *
693  * @param handle handle of the DHT connection to stop
694  */
695 void
696 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
697 {
698   struct PendingMessage *pm;
699
700   GNUNET_assert (handle != NULL);
701   GNUNET_assert (0 ==
702                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
703   if (handle->th != NULL)
704   {
705     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
706     handle->th = NULL;
707   }
708   while (NULL != (pm = handle->pending_head))
709   {
710     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
711     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
712                                  pm);
713     pm->in_pending_queue = GNUNET_NO;
714     GNUNET_assert (GNUNET_YES == pm->free_on_send);
715     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
716       GNUNET_SCHEDULER_cancel (pm->timeout_task);
717     if (NULL != pm->cont)
718       GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
719                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
720     GNUNET_free (pm);
721   }
722   if (handle->client != NULL)
723   {
724     GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
725     handle->client = NULL;
726   }
727   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
728     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
729   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
730   GNUNET_free (handle);
731 }
732
733
734 /**
735  * Timeout for the transmission of a fire&forget-request.  Clean it up.
736  *
737  * @param cls the 'struct PendingMessage'
738  * @param tc scheduler context
739  */
740 static void
741 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
742 {
743   struct PendingMessage *pending = cls;
744   struct GNUNET_DHT_Handle *handle;
745
746   handle = pending->handle;
747   GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
748   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
749                                pending);
750   pending->in_pending_queue = GNUNET_NO;
751   if (pending->cont != NULL)
752     pending->cont (pending->cont_cls, tc);
753   GNUNET_free (pending);
754 }
755
756
757 /**
758  * Perform a PUT operation storing data in the DHT.
759  *
760  * @param handle handle to DHT service
761  * @param key the key to store under
762  * @param desired_replication_level estimate of how many
763  *                nearest peers this request should reach
764  * @param options routing options for this message
765  * @param type type of the value
766  * @param size number of bytes in data; must be less than 64k
767  * @param data the data to store
768  * @param exp desired expiration time for the value
769  * @param timeout how long to wait for transmission of this request
770  * @param cont continuation to call when done (transmitting request to service)
771  * @param cont_cls closure for cont
772  */
773 void
774 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
775                 uint32_t desired_replication_level,
776                 enum GNUNET_DHT_RouteOption options,
777                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
778                 struct GNUNET_TIME_Absolute exp,
779                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
780                 void *cont_cls)
781 {
782   struct GNUNET_DHT_ClientPutMessage *put_msg;
783   size_t msize;
784   struct PendingMessage *pending;
785
786   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
787   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
788       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
789   {
790     GNUNET_break (0);
791     if (NULL != cont)
792       cont (cont_cls, NULL);
793     return;
794   }
795   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
796   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
797   pending->msg = &put_msg->header;
798   pending->handle = handle;
799   pending->cont = cont;
800   pending->cont_cls = cont_cls;
801   pending->free_on_send = GNUNET_YES;
802   pending->timeout_task =
803       GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
804   put_msg->header.size = htons (msize);
805   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
806   put_msg->type = htonl (type);
807   put_msg->options = htonl ((uint32_t) options);
808   put_msg->desired_replication_level = htonl (desired_replication_level);
809   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
810   put_msg->key = *key;
811   memcpy (&put_msg[1], data, size);
812   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
813                                pending);
814   pending->in_pending_queue = GNUNET_YES;
815   process_pending_messages (handle);
816 }
817
818
819 /**
820  * Perform an asynchronous GET operation on the DHT identified. See
821  * also "GNUNET_BLOCK_evaluate".
822  *
823  * @param handle handle to the DHT service
824  * @param timeout how long to wait for transmission of this request to the service
825  * @param type expected type of the response object
826  * @param key the key to look up
827  * @param desired_replication_level estimate of how many
828                   nearest peers this request should reach
829  * @param options routing options for this message
830  * @param xquery extended query data (can be NULL, depending on type)
831  * @param xquery_size number of bytes in xquery
832  * @param iter function to call on each result
833  * @param iter_cls closure for iter
834  * @return handle to stop the async get
835  */
836 struct GNUNET_DHT_GetHandle *
837 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
838                       struct GNUNET_TIME_Relative timeout,
839                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
840                       uint32_t desired_replication_level,
841                       enum GNUNET_DHT_RouteOption options, const void *xquery,
842                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
843                       void *iter_cls)
844 {
845   struct GNUNET_DHT_ClientGetMessage *get_msg;
846   struct GNUNET_DHT_GetHandle *get_handle;
847   size_t msize;
848   struct PendingMessage *pending;
849
850   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
851   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
852       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
853   {
854     GNUNET_break (0);
855     return NULL;
856   }
857 #if DEBUG_DHT
858   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
859        GNUNET_h2s (key), handle);
860 #endif
861   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
862   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
863   pending->msg = &get_msg->header;
864   pending->handle = handle;
865   pending->free_on_send = GNUNET_NO;
866   get_msg->header.size = htons (msize);
867   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
868   get_msg->options = htonl ((uint32_t) options);
869   get_msg->desired_replication_level = htonl (desired_replication_level);
870   get_msg->type = htonl (type);
871   get_msg->key = *key;
872   handle->uid_gen++;
873   get_msg->unique_id = handle->uid_gen;
874   memcpy (&get_msg[1], xquery, xquery_size);
875   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
876                                pending);
877   pending->in_pending_queue = GNUNET_YES;
878   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
879   get_handle->iter = iter;
880   get_handle->iter_cls = iter_cls;
881   get_handle->message = pending;
882   get_handle->unique_id = get_msg->unique_id;
883   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
884                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
885   process_pending_messages (handle);
886   return get_handle;
887 }
888
889
890 /**
891  * Stop async DHT-get.
892  *
893  * @param get_handle handle to the GET operation to stop
894  */
895 void
896 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
897 {
898   struct GNUNET_DHT_Handle *handle;
899   const struct GNUNET_DHT_ClientGetMessage *get_msg;
900   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
901   struct PendingMessage *pending;
902
903   handle = get_handle->message->handle;
904   get_msg =
905       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
906 #if DEBUG_DHT
907   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
908        GNUNET_h2s (&get_msg->key), handle);
909 #endif
910   /* generate STOP */
911   pending =
912       GNUNET_malloc (sizeof (struct PendingMessage) +
913                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
914   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
915   pending->msg = &stop_msg->header;
916   pending->handle = handle;
917   pending->free_on_send = GNUNET_YES;
918   stop_msg->header.size =
919       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
920   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
921   stop_msg->reserved = htonl (0);
922   stop_msg->unique_id = get_msg->unique_id;
923   stop_msg->key = get_msg->key;
924   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
925                                pending);
926   pending->in_pending_queue = GNUNET_YES;
927
928   /* remove 'GET' from active status */
929   GNUNET_assert (GNUNET_YES ==
930                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
931                                                        &get_msg->key,
932                                                        get_handle));
933   if (GNUNET_YES == get_handle->message->in_pending_queue)
934   {
935     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
936                                  get_handle->message);
937     get_handle->message->in_pending_queue = GNUNET_NO;
938   }
939   GNUNET_free (get_handle->message);
940   GNUNET_free (get_handle);
941
942   process_pending_messages (handle);
943 }
944
945
946 /**
947  * Start monitoring the local DHT service.
948  *
949  * @param handle Handle to the DHT service.
950  * @param type Type of blocks that are of interest.
951  * @param key Key of data of interest, NULL for all.
952  * @param cb Callback to process all monitored data.
953  * @param cb_cls Closure for cb.
954  *
955  * @return Handle to stop monitoring.
956  */
957 struct GNUNET_DHT_MonitorHandle *
958 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
959                           enum GNUNET_BLOCK_Type type,
960                           const GNUNET_HashCode *key,
961                           GNUNET_DHT_MonitorCB cb,
962                           void *cb_cls)
963 {
964   struct GNUNET_DHT_MonitorHandle *h;
965   struct GNUNET_DHT_MonitorMessage *m;
966   struct PendingMessage *pending;
967
968   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
969   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
970
971   h->cb = cb;
972   h->cb_cls = cb_cls;
973   h->type = type;
974   h->dht_handle = handle;
975   if (NULL != key)
976   {
977     h->key = GNUNET_malloc (sizeof(GNUNET_HashCode));
978     memcpy (h->key, key, sizeof(GNUNET_HashCode));
979   }
980
981   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) +
982                            sizeof (struct PendingMessage));
983   m = (struct GNUNET_DHT_MonitorMessage *) &pending[1];
984   pending->msg = &m->header;
985   pending->handle = handle;
986   pending->free_on_send = GNUNET_YES;
987   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
988                                pending);
989   pending->in_pending_queue = GNUNET_YES;
990   process_pending_messages (handle);
991
992   return h;
993 }
994
995
996 /**
997  * Stop monitoring.
998  *
999  * @param handle The handle to the monitor request returned by monitor_start.
1000  *
1001  * On return get_handle will no longer be valid, caller must not use again!!!
1002  */
1003 void
1004 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1005 {
1006   GNUNET_free_non_null (handle->key);
1007   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1008                                handle->dht_handle->monitor_tail,
1009                                handle);
1010   GNUNET_free (handle);
1011 }
1012
1013
1014
1015 /* end of dht_api.c */