-moving to port 80 as well
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Timeout task for this message
78    */
79   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81   /**
82    * Unique ID for this request
83    */
84   uint64_t unique_id;
85
86   /**
87    * Free the saved message once sent, set to GNUNET_YES for messages
88    * that do not receive responses; GNUNET_NO if this pending message
89    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
90    * from there.
91    */
92   int free_on_send;
93
94   /**
95    * GNUNET_YES if this message is in our pending queue right now.
96    */
97   int in_pending_queue;
98
99 };
100
101
102 /**
103  * Handle to a GET request
104  */
105 struct GNUNET_DHT_GetHandle
106 {
107
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_GetIterator iter;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *iter_cls;
117
118   /**
119    * Main handle to this DHT api
120    */
121   struct GNUNET_DHT_Handle *dht_handle;
122
123   /**
124    * The actual message sent for this request,
125    * used for retransmitting requests on service
126    * failure/reconnect.  Freed on route_stop.
127    */
128   struct PendingMessage *message;
129
130   /**
131    * Key that this get request is for
132    */
133   GNUNET_HashCode key;
134
135   /**
136    * Unique identifier for this request (for key collisions).
137    */
138   uint64_t unique_id;
139
140 };
141
142
143 /**
144  * Handle to a monitoring request.
145  */
146 struct GNUNET_DHT_MonitorHandle
147 {
148   /**
149    * DLL.
150    */
151   struct GNUNET_DHT_MonitorHandle *next;
152
153   /**
154    * DLL.
155    */
156   struct GNUNET_DHT_MonitorHandle *prev;
157   
158   /**
159    * Main handle to this DHT api.
160    */
161   struct GNUNET_DHT_Handle *dht_handle;
162
163   /**
164    * Type of block looked for.
165    */
166   enum GNUNET_BLOCK_Type type;
167
168   /**
169    * Key being looked for, NULL == all.
170    */
171   GNUNET_HashCode *key;
172
173   /**
174    * Callback for each received message of interest.
175    */
176   GNUNET_DHT_MonitorCB cb;
177
178   /**
179    * Closure for cb.
180    */
181   void *cb_cls;
182   
183 };
184
185
186 /**
187  * Connection to the DHT service.
188  */
189 struct GNUNET_DHT_Handle
190 {
191
192   /**
193    * Configuration to use.
194    */
195   const struct GNUNET_CONFIGURATION_Handle *cfg;
196
197   /**
198    * Socket (if available).
199    */
200   struct GNUNET_CLIENT_Connection *client;
201
202   /**
203    * Currently pending transmission request (or NULL).
204    */
205   struct GNUNET_CLIENT_TransmitHandle *th;
206
207   /**
208    * Head of linked list of messages we would like to transmit.
209    */
210   struct PendingMessage *pending_head;
211
212   /**
213    * Tail of linked list of messages we would like to transmit.
214    */
215   struct PendingMessage *pending_tail;
216
217   /**
218    * Head of linked list of messages we would like to monitor. 
219    */
220   struct GNUNET_DHT_MonitorHandle *monitor_head;
221
222   /**
223    * Tail of linked list of messages we would like to monitor.
224    */
225   struct GNUNET_DHT_MonitorHandle *monitor_tail;
226
227   /**
228    * Hash map containing the current outstanding unique requests
229    * (values are of type 'struct GNUNET_DHT_RouteHandle').
230    */
231   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
232
233   /**
234    * Task for trying to reconnect.
235    */
236   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
237
238   /**
239    * How quickly should we retry?  Used for exponential back-off on
240    * connect-errors.
241    */
242   struct GNUNET_TIME_Relative retry_time;
243
244   /**
245    * Generator for unique ids.
246    */
247   uint64_t uid_gen;
248
249   /**
250    * Did we start our receive loop yet?
251    */
252   int in_receive;
253 };
254
255
256 /**
257  * Handler for messages received from the DHT service
258  * a demultiplexer which handles numerous message types
259  *
260  */
261 static void
262 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
263
264
265 /**
266  * Try to (re)connect to the DHT service.
267  *
268  * @return GNUNET_YES on success, GNUNET_NO on failure.
269  */
270 static int
271 try_connect (struct GNUNET_DHT_Handle *handle)
272 {
273   if (handle->client != NULL)
274     return GNUNET_OK;
275   handle->in_receive = GNUNET_NO;
276   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
277   if (handle->client == NULL)
278   {
279     LOG (GNUNET_ERROR_TYPE_WARNING,
280          _("Failed to connect to the DHT service!\n"));
281     return GNUNET_NO;
282   }
283   return GNUNET_YES;
284 }
285
286
287 /**
288  * Add the request corresponding to the given route handle
289  * to the pending queue (if it is not already in there).
290  *
291  * @param cls the 'struct GNUNET_DHT_Handle*'
292  * @param key key for the request (not used)
293  * @param value the 'struct GNUNET_DHT_GetHandle*'
294  * @return GNUNET_YES (always)
295  */
296 static int
297 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
298 {
299   struct GNUNET_DHT_Handle *handle = cls;
300   struct GNUNET_DHT_GetHandle *rh = value;
301
302   if (GNUNET_NO == rh->message->in_pending_queue)
303   {
304     LOG (GNUNET_ERROR_TYPE_DEBUG,
305          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
306          handle);
307     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
308                                  rh->message);
309     rh->message->in_pending_queue = GNUNET_YES;
310   }
311   return GNUNET_YES;
312 }
313
314
315 /**
316  * Try to send messages from list of messages to send
317  * @param handle DHT_Handle
318  */
319 static void
320 process_pending_messages (struct GNUNET_DHT_Handle *handle);
321
322
323 /**
324  * Try reconnecting to the dht service.
325  *
326  * @param cls GNUNET_DHT_Handle
327  * @param tc scheduler context
328  */
329 static void
330 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
331 {
332   struct GNUNET_DHT_Handle *handle = cls;
333
334   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
335   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
336   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
337     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
338   else
339     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
340   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
341     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
342   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
343   if (GNUNET_YES != try_connect (handle))
344   {
345     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
346     return;
347   }
348   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
349                                          &add_request_to_pending, handle);
350   process_pending_messages (handle);
351 }
352
353
354 /**
355  * Try reconnecting to the DHT service.
356  *
357  * @param handle handle to dht to (possibly) disconnect and reconnect
358  */
359 static void
360 do_disconnect (struct GNUNET_DHT_Handle *handle)
361 {
362   if (handle->client == NULL)
363     return;
364   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
365   if (NULL != handle->th)
366     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
367   handle->th = NULL;
368   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
369               "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
370               (unsigned long long) handle->retry_time.rel_value);
371   GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
372   handle->client = NULL;
373   handle->reconnect_task =
374       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
375 }
376
377
378 /**
379  * Transmit the next pending message, called by notify_transmit_ready
380  */
381 static size_t
382 transmit_pending (void *cls, size_t size, void *buf);
383
384
385 /**
386  * Try to send messages from list of messages to send
387  */
388 static void
389 process_pending_messages (struct GNUNET_DHT_Handle *handle)
390 {
391   struct PendingMessage *head;
392
393   if (handle->client == NULL)
394   {
395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396                 "process_pending_messages called, but client is null, reconnecting\n");
397     do_disconnect (handle);
398     return;
399   }
400   if (handle->th != NULL)
401     return;
402   if (NULL == (head = handle->pending_head))
403     return;
404   handle->th =
405       GNUNET_CLIENT_notify_transmit_ready (handle->client,
406                                            ntohs (head->msg->size),
407                                            GNUNET_TIME_UNIT_FOREVER_REL,
408                                            GNUNET_YES, &transmit_pending,
409                                            handle);
410   if (NULL != handle->th)
411     return;
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "notify_transmit_ready returned NULL, reconnecting\n");
414   do_disconnect (handle);
415 }
416
417
418 /**
419  * Transmit the next pending message, called by notify_transmit_ready
420  */
421 static size_t
422 transmit_pending (void *cls, size_t size, void *buf)
423 {
424   struct GNUNET_DHT_Handle *handle = cls;
425   struct PendingMessage *head;
426   size_t tsize;
427
428   handle->th = NULL;
429   if (buf == NULL)
430   {    
431     LOG (GNUNET_ERROR_TYPE_DEBUG,
432          "Transmission to DHT service failed!  Reconnecting!\n");
433     do_disconnect (handle);
434     return 0;
435   }
436   if (NULL == (head = handle->pending_head))
437     return 0;
438
439   tsize = ntohs (head->msg->size);
440   if (size < tsize)
441   {
442     process_pending_messages (handle);
443     return 0;
444   }
445   memcpy (buf, head->msg, tsize);
446   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
447                                head);
448   head->in_pending_queue = GNUNET_NO;
449   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
450   {
451     GNUNET_SCHEDULER_cancel (head->timeout_task);
452     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
453   }
454   if (NULL != head->cont)
455   {
456     head->cont (head->cont_cls, NULL);
457     head->cont = NULL;
458     head->cont_cls = NULL;
459   }
460   if (GNUNET_YES == head->free_on_send)
461     GNUNET_free (head);
462   process_pending_messages (handle);
463   LOG (GNUNET_ERROR_TYPE_DEBUG,
464        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
465   if (GNUNET_NO == handle->in_receive)
466   {
467     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
468     handle->in_receive = GNUNET_YES;
469     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
470                            GNUNET_TIME_UNIT_FOREVER_REL);
471   }
472   return tsize;
473 }
474
475
476 /**
477  * Process a given reply that might match the given
478  * request.
479  *
480  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
481  * @param key query of the request
482  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
483  * @return GNUNET_YES to continue to iterate over all results,
484  *         GNUNET_NO if the reply is malformed
485  */
486 static int
487 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
488 {
489   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
490   struct GNUNET_DHT_GetHandle *get_handle = value;
491   const struct GNUNET_PeerIdentity *put_path;
492   const struct GNUNET_PeerIdentity *get_path;
493   uint32_t put_path_length;
494   uint32_t get_path_length;
495   size_t data_length;
496   size_t msize;
497   size_t meta_length;
498   const void *data;
499
500   if (dht_msg->unique_id != get_handle->unique_id)
501   {
502     /* UID mismatch */
503     LOG (GNUNET_ERROR_TYPE_DEBUG,
504          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
505          dht_msg->unique_id, get_handle->unique_id);
506     return GNUNET_YES;
507   }
508   msize = ntohs (dht_msg->header.size);
509   put_path_length = ntohl (dht_msg->put_path_length);
510   get_path_length = ntohl (dht_msg->get_path_length);
511   meta_length =
512       sizeof (struct GNUNET_DHT_ClientResultMessage) +
513       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
514   if ((msize < meta_length) ||
515       (get_path_length >
516        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
517       (put_path_length >
518        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
519   {
520     GNUNET_break (0);
521     return GNUNET_NO;
522   }
523   data_length = msize - meta_length;
524   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
525        (unsigned int) data_length, GNUNET_h2s (key));
526   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
527   get_path = &put_path[put_path_length];
528   data = &get_path[get_path_length];
529   get_handle->iter (get_handle->iter_cls,
530                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
531                     get_path, get_path_length, put_path, put_path_length,
532                     ntohl (dht_msg->type), data_length, data);
533   return GNUNET_YES;
534 }
535
536
537 /**
538  * Process a monitoring message from the service.
539  *
540  * @param handle The DHT handle.
541  * @param msg Message from the service.
542  * 
543  * @return GNUNET_OK if everything went fine,
544  *         GNUNET_SYSERR if the message is malformed.
545  */
546 static int
547 process_monitor_message (struct GNUNET_DHT_Handle *handle,
548                          const struct GNUNET_MessageHeader *msg)
549 {
550   struct GNUNET_DHT_MonitorMessage *m;
551   struct GNUNET_DHT_MonitorHandle *h;
552   size_t msize;
553
554   if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
555       ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
556     return GNUNET_SYSERR;
557   msize = ntohs (msg->size);
558   if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
559     return GNUNET_SYSERR;
560
561   m = (struct GNUNET_DHT_MonitorMessage *) msg;
562   h = handle->monitor_head;
563   while (NULL != h)
564   {
565     if (h->type == ntohl(m->type) &&
566       (NULL == h->key ||
567        memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
568     {
569       struct GNUNET_PeerIdentity *path;
570       uint32_t getl;
571       uint32_t putl;
572
573       path = (struct GNUNET_PeerIdentity *) &m[1];
574       getl = ntohl (m->get_path_length);
575       putl = ntohl (m->put_path_length);
576       h->cb (h->cb_cls, ntohs(msg->type),
577              GNUNET_TIME_absolute_ntoh(m->expiration),
578              &m->key,
579              &path[getl], putl, path, getl,
580              ntohl (m->desired_replication_level),
581              ntohl (m->options), ntohl (m->type),
582              (void *) &path[getl + putl],
583              ntohs (msg->size) -
584              sizeof (struct GNUNET_DHT_MonitorMessage) -
585              sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
586     }
587     h = h->next;
588   }
589
590   return GNUNET_OK;
591 }
592
593 /**
594  * Handler for messages received from the DHT service
595  * a demultiplexer which handles numerous message types
596  *
597  * @param cls the 'struct GNUNET_DHT_Handle'
598  * @param msg the incoming message
599  */
600 static void
601 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
602 {
603   struct GNUNET_DHT_Handle *handle = cls;
604   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
605
606   if (msg == NULL)
607   {
608     LOG (GNUNET_ERROR_TYPE_DEBUG,
609          "Error receiving data from DHT service, reconnecting\n");
610     do_disconnect (handle);
611     return;
612   }
613   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
614   {
615     if (process_monitor_message (handle, msg) == GNUNET_OK)
616     {
617       GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
618                              GNUNET_TIME_UNIT_FOREVER_REL);
619       return;
620     }
621     GNUNET_break (0);
622     do_disconnect (handle);
623     return;
624   }
625   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
626   {
627     GNUNET_break (0);
628     do_disconnect (handle);
629     return;
630   }
631   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
632   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
633        GNUNET_h2s (&dht_msg->key), handle);
634   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
635                                               &dht_msg->key, &process_reply,
636                                               (void *) dht_msg);
637   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
638                          GNUNET_TIME_UNIT_FOREVER_REL);
639 }
640
641
642 /**
643  * Initialize the connection with the DHT service.
644  *
645  * @param cfg configuration to use
646  * @param ht_len size of the internal hash table to use for
647  *               processing multiple GET/FIND requests in parallel
648  *
649  * @return handle to the DHT service, or NULL on error
650  */
651 struct GNUNET_DHT_Handle *
652 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
653                     unsigned int ht_len)
654 {
655   struct GNUNET_DHT_Handle *handle;
656
657   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
658   handle->cfg = cfg;
659   handle->uid_gen =
660       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
661   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
662   if (GNUNET_NO == try_connect (handle))
663   {
664     GNUNET_DHT_disconnect (handle);
665     return NULL;
666   }
667   return handle;
668 }
669
670
671 /**
672  * Shutdown connection with the DHT service.
673  *
674  * @param handle handle of the DHT connection to stop
675  */
676 void
677 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
678 {
679   struct PendingMessage *pm;
680
681   GNUNET_assert (handle != NULL);
682   GNUNET_assert (0 ==
683                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
684   if (handle->th != NULL)
685   {
686     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
687     handle->th = NULL;
688   }
689   while (NULL != (pm = handle->pending_head))
690   {
691     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
692     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
693                                  pm);
694     pm->in_pending_queue = GNUNET_NO;
695     GNUNET_assert (GNUNET_YES == pm->free_on_send);
696     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
697       GNUNET_SCHEDULER_cancel (pm->timeout_task);
698     if (NULL != pm->cont)
699       pm->cont (pm->cont_cls, NULL);
700     GNUNET_free (pm);
701   }
702   if (handle->client != NULL)
703   {
704     GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
705     handle->client = NULL;
706   }
707   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
708     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
709   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
710   GNUNET_free (handle);
711 }
712
713
714 /**
715  * Timeout for the transmission of a fire&forget-request.  Clean it up.
716  *
717  * @param cls the 'struct PendingMessage'
718  * @param tc scheduler context
719  */
720 static void
721 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
722 {
723   struct PendingMessage *pending = cls;
724   struct GNUNET_DHT_Handle *handle;
725
726   handle = pending->handle;
727   GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
728   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
729                                pending);
730   pending->in_pending_queue = GNUNET_NO;
731   if (pending->cont != NULL)
732     pending->cont (pending->cont_cls, tc);
733   GNUNET_free (pending);
734 }
735
736
737 /**
738  * Perform a PUT operation storing data in the DHT.
739  *
740  * @param handle handle to DHT service
741  * @param key the key to store under
742  * @param desired_replication_level estimate of how many
743  *                nearest peers this request should reach
744  * @param options routing options for this message
745  * @param type type of the value
746  * @param size number of bytes in data; must be less than 64k
747  * @param data the data to store
748  * @param exp desired expiration time for the value
749  * @param timeout how long to wait for transmission of this request
750  * @param cont continuation to call when done (transmitting request to service)
751  * @param cont_cls closure for cont
752  */
753 void
754 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
755                 uint32_t desired_replication_level,
756                 enum GNUNET_DHT_RouteOption options,
757                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
758                 struct GNUNET_TIME_Absolute exp,
759                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
760                 void *cont_cls)
761 {
762   struct GNUNET_DHT_ClientPutMessage *put_msg;
763   size_t msize;
764   struct PendingMessage *pending;
765
766   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
767   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
768       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
769   {
770     GNUNET_break (0);
771     if (NULL != cont)
772       cont (cont_cls, NULL);
773     return;
774   }
775   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
776   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
777   pending->msg = &put_msg->header;
778   pending->handle = handle;
779   pending->cont = cont;
780   pending->cont_cls = cont_cls;
781   pending->free_on_send = GNUNET_YES;
782   pending->timeout_task =
783       GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
784   put_msg->header.size = htons (msize);
785   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
786   put_msg->type = htonl (type);
787   put_msg->options = htonl ((uint32_t) options);
788   put_msg->desired_replication_level = htonl (desired_replication_level);
789   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
790   put_msg->key = *key;
791   memcpy (&put_msg[1], data, size);
792   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
793                                pending);
794   pending->in_pending_queue = GNUNET_YES;
795   process_pending_messages (handle);
796 }
797
798
799 /**
800  * Perform an asynchronous GET operation on the DHT identified. See
801  * also "GNUNET_BLOCK_evaluate".
802  *
803  * @param handle handle to the DHT service
804  * @param timeout how long to wait for transmission of this request to the service
805  * @param type expected type of the response object
806  * @param key the key to look up
807  * @param desired_replication_level estimate of how many
808                   nearest peers this request should reach
809  * @param options routing options for this message
810  * @param xquery extended query data (can be NULL, depending on type)
811  * @param xquery_size number of bytes in xquery
812  * @param iter function to call on each result
813  * @param iter_cls closure for iter
814  * @return handle to stop the async get
815  */
816 struct GNUNET_DHT_GetHandle *
817 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
818                       struct GNUNET_TIME_Relative timeout,
819                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
820                       uint32_t desired_replication_level,
821                       enum GNUNET_DHT_RouteOption options, const void *xquery,
822                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
823                       void *iter_cls)
824 {
825   struct GNUNET_DHT_ClientGetMessage *get_msg;
826   struct GNUNET_DHT_GetHandle *get_handle;
827   size_t msize;
828   struct PendingMessage *pending;
829
830   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
831   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
832       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
833   {
834     GNUNET_break (0);
835     return NULL;
836   }
837   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
838        GNUNET_h2s (key), handle);
839   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
840   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
841   pending->msg = &get_msg->header;
842   pending->handle = handle;
843   pending->free_on_send = GNUNET_NO;
844   get_msg->header.size = htons (msize);
845   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
846   get_msg->options = htonl ((uint32_t) options);
847   get_msg->desired_replication_level = htonl (desired_replication_level);
848   get_msg->type = htonl (type);
849   get_msg->key = *key;
850   handle->uid_gen++;
851   get_msg->unique_id = handle->uid_gen;
852   memcpy (&get_msg[1], xquery, xquery_size);
853   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
854                                pending);
855   pending->in_pending_queue = GNUNET_YES;
856   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
857   get_handle->iter = iter;
858   get_handle->iter_cls = iter_cls;
859   get_handle->message = pending;
860   get_handle->unique_id = get_msg->unique_id;
861   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
862                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
863   process_pending_messages (handle);
864   return get_handle;
865 }
866
867
868 /**
869  * Stop async DHT-get.
870  *
871  * @param get_handle handle to the GET operation to stop
872  */
873 void
874 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
875 {
876   struct GNUNET_DHT_Handle *handle;
877   const struct GNUNET_DHT_ClientGetMessage *get_msg;
878   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
879   struct PendingMessage *pending;
880
881   handle = get_handle->message->handle;
882   get_msg =
883       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
884   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
885        GNUNET_h2s (&get_msg->key), handle);
886   /* generate STOP */
887   pending =
888       GNUNET_malloc (sizeof (struct PendingMessage) +
889                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
890   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
891   pending->msg = &stop_msg->header;
892   pending->handle = handle;
893   pending->free_on_send = GNUNET_YES;
894   stop_msg->header.size =
895       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
896   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
897   stop_msg->reserved = htonl (0);
898   stop_msg->unique_id = get_msg->unique_id;
899   stop_msg->key = get_msg->key;
900   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
901                                pending);
902   pending->in_pending_queue = GNUNET_YES;
903
904   /* remove 'GET' from active status */
905   GNUNET_assert (GNUNET_YES ==
906                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
907                                                        &get_msg->key,
908                                                        get_handle));
909   if (GNUNET_YES == get_handle->message->in_pending_queue)
910   {
911     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
912                                  get_handle->message);
913     get_handle->message->in_pending_queue = GNUNET_NO;
914   }
915   GNUNET_free (get_handle->message);
916   GNUNET_free (get_handle);
917
918   process_pending_messages (handle);
919 }
920
921
922 /**
923  * Start monitoring the local DHT service.
924  *
925  * @param handle Handle to the DHT service.
926  * @param type Type of blocks that are of interest.
927  * @param key Key of data of interest, NULL for all.
928  * @param cb Callback to process all monitored data.
929  * @param cb_cls Closure for cb.
930  *
931  * @return Handle to stop monitoring.
932  */
933 struct GNUNET_DHT_MonitorHandle *
934 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
935                           enum GNUNET_BLOCK_Type type,
936                           const GNUNET_HashCode *key,
937                           GNUNET_DHT_MonitorCB cb,
938                           void *cb_cls)
939 {
940   struct GNUNET_DHT_MonitorHandle *h;
941   struct GNUNET_DHT_MonitorMessage *m;
942   struct PendingMessage *pending;
943
944   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
945   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
946
947   GNUNET_assert (NULL != cb);
948   h->cb = cb;
949   h->cb_cls = cb_cls;
950   h->type = type;
951   h->dht_handle = handle;
952   if (NULL != key)
953   {
954     h->key = GNUNET_malloc (sizeof(GNUNET_HashCode));
955     memcpy (h->key, key, sizeof(GNUNET_HashCode));
956   }
957
958   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) +
959                            sizeof (struct PendingMessage));
960   m = (struct GNUNET_DHT_MonitorMessage *) &pending[1];
961   pending->msg = &m->header;
962   pending->handle = handle;
963   pending->free_on_send = GNUNET_YES;
964   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
965   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage));
966   m->type = htonl(type);
967   if (NULL != key)
968     memcpy (&m->key, key, sizeof(GNUNET_HashCode));
969   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
970                                pending);
971   pending->in_pending_queue = GNUNET_YES;
972   process_pending_messages (handle);
973
974   return h;
975 }
976
977
978 /**
979  * Stop monitoring.
980  *
981  * @param handle The handle to the monitor request returned by monitor_start.
982  *
983  * On return get_handle will no longer be valid, caller must not use again!!!
984  */
985 void
986 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
987 {
988   GNUNET_free_non_null (handle->key);
989   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
990                                handle->dht_handle->monitor_tail,
991                                handle);
992   GNUNET_free (handle);
993 }
994
995
996
997 /* end of dht_api.c */