-fix
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Timeout task for this message
78    */
79   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81   /**
82    * Unique ID for this request
83    */
84   uint64_t unique_id;
85
86   /**
87    * Free the saved message once sent, set to GNUNET_YES for messages
88    * that do not receive responses; GNUNET_NO if this pending message
89    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
90    * from there.
91    */
92   int free_on_send;
93
94   /**
95    * GNUNET_YES if this message is in our pending queue right now.
96    */
97   int in_pending_queue;
98
99 };
100
101
102 /**
103  * Handle to a GET request
104  */
105 struct GNUNET_DHT_GetHandle
106 {
107
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_GetIterator iter;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *iter_cls;
117
118   /**
119    * Main handle to this DHT api
120    */
121   struct GNUNET_DHT_Handle *dht_handle;
122
123   /**
124    * The actual message sent for this request,
125    * used for retransmitting requests on service
126    * failure/reconnect.  Freed on route_stop.
127    */
128   struct PendingMessage *message;
129
130   /**
131    * Key that this get request is for
132    */
133   GNUNET_HashCode key;
134
135   /**
136    * Unique identifier for this request (for key collisions).
137    */
138   uint64_t unique_id;
139
140 };
141
142
143 /**
144  * Handle to a monitoring request.
145  */
146 struct GNUNET_DHT_MonitorHandle
147 {
148   /**
149    * DLL.
150    */
151   struct GNUNET_DHT_MonitorHandle *next;
152
153   /**
154    * DLL.
155    */
156   struct GNUNET_DHT_MonitorHandle *prev;
157   
158   /**
159    * Main handle to this DHT api.
160    */
161   struct GNUNET_DHT_Handle *dht_handle;
162
163   /**
164    * Type of block looked for.
165    */
166   enum GNUNET_BLOCK_Type type;
167
168   /**
169    * Key being looked for, NULL == all.
170    */
171   GNUNET_HashCode *key;
172
173   /**
174    * Callback for each received message of type get.
175    */
176   GNUNET_DHT_MonitorGetCB get_cb;
177
178   /**
179    * Callback for each received message of type get response.
180    */
181   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
182
183   /**
184    * Callback for each received message of type put.
185    */
186   GNUNET_DHT_MonitorPutCB put_cb;
187
188   /**
189    * Closure for cb.
190    */
191   void *cb_cls;
192   
193 };
194
195
196 /**
197  * Connection to the DHT service.
198  */
199 struct GNUNET_DHT_Handle
200 {
201
202   /**
203    * Configuration to use.
204    */
205   const struct GNUNET_CONFIGURATION_Handle *cfg;
206
207   /**
208    * Socket (if available).
209    */
210   struct GNUNET_CLIENT_Connection *client;
211
212   /**
213    * Currently pending transmission request (or NULL).
214    */
215   struct GNUNET_CLIENT_TransmitHandle *th;
216
217   /**
218    * Head of linked list of messages we would like to transmit.
219    */
220   struct PendingMessage *pending_head;
221
222   /**
223    * Tail of linked list of messages we would like to transmit.
224    */
225   struct PendingMessage *pending_tail;
226
227   /**
228    * Head of linked list of messages we would like to monitor. 
229    */
230   struct GNUNET_DHT_MonitorHandle *monitor_head;
231
232   /**
233    * Tail of linked list of messages we would like to monitor.
234    */
235   struct GNUNET_DHT_MonitorHandle *monitor_tail;
236
237   /**
238    * Hash map containing the current outstanding unique requests
239    * (values are of type 'struct GNUNET_DHT_RouteHandle').
240    */
241   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
242
243   /**
244    * Task for trying to reconnect.
245    */
246   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
247
248   /**
249    * How quickly should we retry?  Used for exponential back-off on
250    * connect-errors.
251    */
252   struct GNUNET_TIME_Relative retry_time;
253
254   /**
255    * Generator for unique ids.
256    */
257   uint64_t uid_gen;
258
259   /**
260    * Did we start our receive loop yet?
261    */
262   int in_receive;
263 };
264
265
266 /**
267  * Handler for messages received from the DHT service
268  * a demultiplexer which handles numerous message types
269  *
270  */
271 static void
272 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
273
274
275 /**
276  * Try to (re)connect to the DHT service.
277  *
278  * @return GNUNET_YES on success, GNUNET_NO on failure.
279  */
280 static int
281 try_connect (struct GNUNET_DHT_Handle *handle)
282 {
283   if (handle->client != NULL)
284     return GNUNET_OK;
285   handle->in_receive = GNUNET_NO;
286   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
287   if (handle->client == NULL)
288   {
289     LOG (GNUNET_ERROR_TYPE_WARNING,
290          _("Failed to connect to the DHT service!\n"));
291     return GNUNET_NO;
292   }
293   return GNUNET_YES;
294 }
295
296
297 /**
298  * Add the request corresponding to the given route handle
299  * to the pending queue (if it is not already in there).
300  *
301  * @param cls the 'struct GNUNET_DHT_Handle*'
302  * @param key key for the request (not used)
303  * @param value the 'struct GNUNET_DHT_GetHandle*'
304  * @return GNUNET_YES (always)
305  */
306 static int
307 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
308 {
309   struct GNUNET_DHT_Handle *handle = cls;
310   struct GNUNET_DHT_GetHandle *rh = value;
311
312   if (GNUNET_NO == rh->message->in_pending_queue)
313   {
314     LOG (GNUNET_ERROR_TYPE_DEBUG,
315          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
316          handle);
317     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
318                                  rh->message);
319     rh->message->in_pending_queue = GNUNET_YES;
320   }
321   return GNUNET_YES;
322 }
323
324
325 /**
326  * Try to send messages from list of messages to send
327  * @param handle DHT_Handle
328  */
329 static void
330 process_pending_messages (struct GNUNET_DHT_Handle *handle);
331
332
333 /**
334  * Try reconnecting to the dht service.
335  *
336  * @param cls GNUNET_DHT_Handle
337  * @param tc scheduler context
338  */
339 static void
340 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
341 {
342   struct GNUNET_DHT_Handle *handle = cls;
343
344   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
345   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
346   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
347     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
348   else
349     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
350   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
351     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
352   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
353   if (GNUNET_YES != try_connect (handle))
354   {
355     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
356     return;
357   }
358   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
359                                          &add_request_to_pending, handle);
360   process_pending_messages (handle);
361 }
362
363
364 /**
365  * Try reconnecting to the DHT service.
366  *
367  * @param handle handle to dht to (possibly) disconnect and reconnect
368  */
369 static void
370 do_disconnect (struct GNUNET_DHT_Handle *handle)
371 {
372   if (handle->client == NULL)
373     return;
374   GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
375   if (NULL != handle->th)
376     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
377   handle->th = NULL;
378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379               "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
380               (unsigned long long) handle->retry_time.rel_value);
381   GNUNET_CLIENT_disconnect (handle->client);
382   handle->client = NULL;
383   handle->reconnect_task =
384       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
385 }
386
387
388 /**
389  * Transmit the next pending message, called by notify_transmit_ready
390  */
391 static size_t
392 transmit_pending (void *cls, size_t size, void *buf);
393
394
395 /**
396  * Try to send messages from list of messages to send
397  */
398 static void
399 process_pending_messages (struct GNUNET_DHT_Handle *handle)
400 {
401   struct PendingMessage *head;
402
403   if (handle->client == NULL)
404   {
405     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406                 "process_pending_messages called, but client is null, reconnecting\n");
407     do_disconnect (handle);
408     return;
409   }
410   if (handle->th != NULL)
411     return;
412   if (NULL == (head = handle->pending_head))
413     return;
414   handle->th =
415       GNUNET_CLIENT_notify_transmit_ready (handle->client,
416                                            ntohs (head->msg->size),
417                                            GNUNET_TIME_UNIT_FOREVER_REL,
418                                            GNUNET_YES, &transmit_pending,
419                                            handle);
420   if (NULL != handle->th)
421     return;
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "notify_transmit_ready returned NULL, reconnecting\n");
424   do_disconnect (handle);
425 }
426
427
428 /**
429  * Transmit the next pending message, called by notify_transmit_ready
430  */
431 static size_t
432 transmit_pending (void *cls, size_t size, void *buf)
433 {
434   struct GNUNET_DHT_Handle *handle = cls;
435   struct PendingMessage *head;
436   size_t tsize;
437
438   handle->th = NULL;
439   if (buf == NULL)
440   {    
441     LOG (GNUNET_ERROR_TYPE_DEBUG,
442          "Transmission to DHT service failed!  Reconnecting!\n");
443     do_disconnect (handle);
444     return 0;
445   }
446   if (NULL == (head = handle->pending_head))
447     return 0;
448
449   tsize = ntohs (head->msg->size);
450   if (size < tsize)
451   {
452     process_pending_messages (handle);
453     return 0;
454   }
455   memcpy (buf, head->msg, tsize);
456   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
457                                head);
458   head->in_pending_queue = GNUNET_NO;
459   if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
460   {
461     GNUNET_SCHEDULER_cancel (head->timeout_task);
462     head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
463   }
464   if (NULL != head->cont)
465   {
466     head->cont (head->cont_cls, NULL);
467     head->cont = NULL;
468     head->cont_cls = NULL;
469   }
470   if (GNUNET_YES == head->free_on_send)
471     GNUNET_free (head);
472   process_pending_messages (handle);
473   LOG (GNUNET_ERROR_TYPE_DEBUG,
474        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
475   if (GNUNET_NO == handle->in_receive)
476   {
477     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
478     handle->in_receive = GNUNET_YES;
479     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
480                            GNUNET_TIME_UNIT_FOREVER_REL);
481   }
482   return tsize;
483 }
484
485
486 /**
487  * Process a given reply that might match the given
488  * request.
489  *
490  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
491  * @param key query of the request
492  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
493  * @return GNUNET_YES to continue to iterate over all results,
494  *         GNUNET_NO if the reply is malformed
495  */
496 static int
497 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
498 {
499   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
500   struct GNUNET_DHT_GetHandle *get_handle = value;
501   const struct GNUNET_PeerIdentity *put_path;
502   const struct GNUNET_PeerIdentity *get_path;
503   uint32_t put_path_length;
504   uint32_t get_path_length;
505   size_t data_length;
506   size_t msize;
507   size_t meta_length;
508   const void *data;
509
510   if (dht_msg->unique_id != get_handle->unique_id)
511   {
512     /* UID mismatch */
513     LOG (GNUNET_ERROR_TYPE_DEBUG,
514          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
515          dht_msg->unique_id, get_handle->unique_id);
516     return GNUNET_YES;
517   }
518   msize = ntohs (dht_msg->header.size);
519   put_path_length = ntohl (dht_msg->put_path_length);
520   get_path_length = ntohl (dht_msg->get_path_length);
521   meta_length =
522       sizeof (struct GNUNET_DHT_ClientResultMessage) +
523       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
524   if ((msize < meta_length) ||
525       (get_path_length >
526        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
527       (put_path_length >
528        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
529   {
530     GNUNET_break (0);
531     return GNUNET_NO;
532   }
533   data_length = msize - meta_length;
534   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
535        (unsigned int) data_length, GNUNET_h2s (key));
536   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
537   get_path = &put_path[put_path_length];
538   data = &get_path[get_path_length];
539   get_handle->iter (get_handle->iter_cls,
540                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
541                     get_path, get_path_length, put_path, put_path_length,
542                     ntohl (dht_msg->type), data_length, data);
543   return GNUNET_YES;
544 }
545
546 /**
547  * Process a get monitor message from the service.
548  *
549  * @param handle The DHT handle.
550  * @param msg Monitor get message from the service.
551  * 
552  * @return GNUNET_OK if everything went fine,
553  *         GNUNET_SYSERR if the message is malformed.
554  */
555 static int
556 process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
557                              const struct GNUNET_DHT_MonitorGetMessage *msg)
558 {
559   struct GNUNET_DHT_MonitorHandle *h;
560   size_t msize;
561
562   msize = ntohs (msg->header.size);
563   if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
564     return GNUNET_SYSERR;
565
566   h = handle->monitor_head;
567   while (NULL != h)
568   {
569     int type_ok;
570     int key_ok;
571
572     type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
573     key_ok = NULL == h->key || memcmp (h->key, &msg->key,
574                                        sizeof (GNUNET_HashCode)) == 0;
575     if (type_ok && key_ok && NULL != h->get_cb)
576     {
577       h->get_cb (h->cb_cls,
578                 ntohl (msg->options),
579                 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
580                 ntohl (msg->hop_count),
581                 ntohl (msg->desired_replication_level),
582                 ntohl (msg->get_path_length),
583                 (struct GNUNET_PeerIdentity *) &msg[1],
584                 &msg->key);
585     }
586     h = h->next;
587   }
588   return GNUNET_OK;
589 }
590
591
592 /**
593  * Process a get response monitor message from the service.
594  *
595  * @param handle The DHT handle.
596  * @param msg Monitor get response message from the service.
597  * 
598  * @return GNUNET_OK if everything went fine,
599  *         GNUNET_SYSERR if the message is malformed.
600  */
601 static int
602 process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
603                                   const struct GNUNET_DHT_MonitorGetRespMessage
604                                   *msg)
605 {
606   struct GNUNET_DHT_MonitorHandle *h;
607   size_t msize;
608
609   msize = ntohs (msg->header.size);
610   if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
611     return GNUNET_SYSERR;
612
613   h = handle->monitor_head;
614   while (NULL != h)
615   {
616     int type_ok;
617     int key_ok;
618
619     type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
620     key_ok = NULL == h->key || memcmp (h->key, &msg->key,
621                                        sizeof (GNUNET_HashCode)) == 0;
622     if (type_ok && key_ok && NULL != h->get_resp_cb)
623     {
624       struct GNUNET_PeerIdentity *path;
625       uint32_t getl;
626       uint32_t putl;
627
628       path = (struct GNUNET_PeerIdentity *) &msg[1];
629       getl = ntohl (msg->get_path_length);
630       putl = ntohl (msg->put_path_length);
631       h->get_resp_cb (h->cb_cls,
632                       (enum GNUNET_BLOCK_Type) ntohl(msg->type),
633                       path, getl,
634                       &path[getl], putl,
635                       GNUNET_TIME_absolute_ntoh(msg->expiration_time),
636                       &msg->key,
637                       (void *) &path[getl + putl],
638                       msize -
639                       sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
640                       sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
641     }
642     h = h->next;
643   }
644   return GNUNET_OK;
645 }
646
647
648 /**
649  * Process a put monitor message from the service.
650  *
651  * @param handle The DHT handle.
652  * @param msg Monitor put message from the service.
653  * 
654  * @return GNUNET_OK if everything went fine,
655  *         GNUNET_SYSERR if the message is malformed.
656  */
657 static int
658 process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
659                              const struct GNUNET_DHT_MonitorPutMessage *msg)
660 {
661   struct GNUNET_DHT_MonitorHandle *h;
662   size_t msize;
663
664   msize = ntohs (msg->header.size);
665   if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
666     return GNUNET_SYSERR;
667
668   h = handle->monitor_head;
669   while (NULL != h)
670   {
671     int type_ok;
672     int key_ok;
673
674     type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
675     key_ok = NULL == h->key || memcmp (h->key, &msg->key,
676                                        sizeof (GNUNET_HashCode)) == 0;
677     if (type_ok && key_ok && NULL != h->put_cb)
678     {
679       struct GNUNET_PeerIdentity *path;
680       uint32_t putl;
681
682       path = (struct GNUNET_PeerIdentity *) &msg[1];
683       putl = ntohl (msg->put_path_length);
684       h->put_cb (h->cb_cls,
685                  ntohl (msg->options),
686                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
687                  ntohl (msg->hop_count),
688                  ntohl (msg->desired_replication_level),
689                  putl, path,
690                  GNUNET_TIME_absolute_ntoh(msg->expiration_time),
691                  &msg->key,
692                  (void *) &path[putl],
693                  msize -
694                  sizeof (struct GNUNET_DHT_MonitorPutMessage) -
695                  sizeof (struct GNUNET_PeerIdentity) * putl);
696     }
697     h = h->next;
698   }
699   return GNUNET_OK;
700 }
701
702
703 /**
704  * Process a monitoring message from the service: demultiplex for proper type.
705  *
706  * @param handle The DHT handle.
707  * @param msg Message from the service.
708  * 
709  * @return GNUNET_OK if everything went fine,
710  *         GNUNET_SYSERR if the message is malformed.
711  */
712 static int
713 process_monitor_message (struct GNUNET_DHT_Handle *handle,
714                          const struct GNUNET_MessageHeader *msg)
715 {
716   switch (ntohs (msg->type))
717   {
718     case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
719       return process_monitor_get_message(handle,
720                                          (struct GNUNET_DHT_MonitorGetMessage *)
721                                          msg);
722
723     case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
724     {
725       return process_monitor_get_resp_message(
726         handle,
727         (struct GNUNET_DHT_MonitorGetRespMessage *) msg);
728     }
729     case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
730     {
731       return process_monitor_put_message(handle,
732                                          (struct GNUNET_DHT_MonitorPutMessage *)
733                                          msg);
734     }
735     case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
736       /* Not implemented yet */
737       GNUNET_break(0);
738       /* Fall through */
739     default:
740       GNUNET_break(0);
741       return GNUNET_SYSERR;
742   }
743 }
744
745 /**
746  * Handler for messages received from the DHT service
747  * a demultiplexer which handles numerous message types
748  *
749  * @param cls the 'struct GNUNET_DHT_Handle'
750  * @param msg the incoming message
751  */
752 static void
753 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
754 {
755   struct GNUNET_DHT_Handle *handle = cls;
756   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
757
758   if (msg == NULL)
759   {
760     LOG (GNUNET_ERROR_TYPE_DEBUG,
761          "Error receiving data from DHT service, reconnecting\n");
762     do_disconnect (handle);
763     return;
764   }
765   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
766   {
767     if (process_monitor_message (handle, msg) == GNUNET_OK)
768     {
769       GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
770                              GNUNET_TIME_UNIT_FOREVER_REL);
771       return;
772     }
773     GNUNET_break (0);
774     do_disconnect (handle);
775     return;
776   }
777   if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
778   {
779     GNUNET_break (0);
780     do_disconnect (handle);
781     return;
782   }
783   dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
784   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
785        GNUNET_h2s (&dht_msg->key), handle);
786   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
787                                               &dht_msg->key, &process_reply,
788                                               (void *) dht_msg);
789   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
790                          GNUNET_TIME_UNIT_FOREVER_REL);
791 }
792
793
794 /**
795  * Initialize the connection with the DHT service.
796  *
797  * @param cfg configuration to use
798  * @param ht_len size of the internal hash table to use for
799  *               processing multiple GET/FIND requests in parallel
800  *
801  * @return handle to the DHT service, or NULL on error
802  */
803 struct GNUNET_DHT_Handle *
804 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
805                     unsigned int ht_len)
806 {
807   struct GNUNET_DHT_Handle *handle;
808
809   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
810   handle->cfg = cfg;
811   handle->uid_gen =
812       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
813   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
814   if (GNUNET_NO == try_connect (handle))
815   {
816     GNUNET_DHT_disconnect (handle);
817     return NULL;
818   }
819   return handle;
820 }
821
822
823 /**
824  * Shutdown connection with the DHT service.
825  *
826  * @param handle handle of the DHT connection to stop
827  */
828 void
829 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
830 {
831   struct PendingMessage *pm;
832
833   GNUNET_assert (handle != NULL);
834   GNUNET_assert (0 ==
835                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
836   if (handle->th != NULL)
837   {
838     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
839     handle->th = NULL;
840   }
841   while (NULL != (pm = handle->pending_head))
842   {
843     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
844     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
845                                  pm);
846     pm->in_pending_queue = GNUNET_NO;
847     GNUNET_assert (GNUNET_YES == pm->free_on_send);
848     if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
849       GNUNET_SCHEDULER_cancel (pm->timeout_task);
850     if (NULL != pm->cont)
851       pm->cont (pm->cont_cls, NULL);
852     GNUNET_free (pm);
853   }
854   if (handle->client != NULL)
855   {
856     GNUNET_CLIENT_disconnect (handle->client);
857     handle->client = NULL;
858   }
859   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
860     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
861   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
862   GNUNET_free (handle);
863 }
864
865
866 /**
867  * Timeout for the transmission of a fire&forget-request.  Clean it up.
868  *
869  * @param cls the 'struct PendingMessage'
870  * @param tc scheduler context
871  */
872 static void
873 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
874 {
875   struct PendingMessage *pending = cls;
876   struct GNUNET_DHT_Handle *handle;
877
878   handle = pending->handle;
879   GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
880   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
881                                pending);
882   pending->in_pending_queue = GNUNET_NO;
883   if (pending->cont != NULL)
884     pending->cont (pending->cont_cls, tc);
885   GNUNET_free (pending);
886 }
887
888
889 /**
890  * Perform a PUT operation storing data in the DHT.  FIXME: we should
891  * change the protocol to get a confirmation for the PUT from the DHT
892  * and call 'cont' only after getting the confirmation; otherwise, the
893  * client has no good way of telling if the 'PUT' message actually got
894  * to the DHT service!
895  *
896  * @param handle handle to DHT service
897  * @param key the key to store under
898  * @param desired_replication_level estimate of how many
899  *                nearest peers this request should reach
900  * @param options routing options for this message
901  * @param type type of the value
902  * @param size number of bytes in data; must be less than 64k
903  * @param data the data to store
904  * @param exp desired expiration time for the value
905  * @param timeout how long to wait for transmission of this request
906  * @param cont continuation to call when done (transmitting request to service)
907  *        You must not call GNUNET_DHT_DISCONNECT in this continuation
908  * @param cont_cls closure for cont
909  */
910 void
911 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
912                 uint32_t desired_replication_level,
913                 enum GNUNET_DHT_RouteOption options,
914                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
915                 struct GNUNET_TIME_Absolute exp,
916                 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
917                 void *cont_cls)
918 {
919   struct GNUNET_DHT_ClientPutMessage *put_msg;
920   size_t msize;
921   struct PendingMessage *pending;
922
923   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
924   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
925       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
926   {
927     GNUNET_break (0);
928     if (NULL != cont)
929       cont (cont_cls, NULL);
930     return;
931   }
932   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
933   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
934   pending->msg = &put_msg->header;
935   pending->handle = handle;
936   pending->cont = cont;
937   pending->cont_cls = cont_cls;
938   pending->free_on_send = GNUNET_YES;
939   pending->timeout_task =
940       GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
941   put_msg->header.size = htons (msize);
942   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
943   put_msg->type = htonl (type);
944   put_msg->options = htonl ((uint32_t) options);
945   put_msg->desired_replication_level = htonl (desired_replication_level);
946   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
947   put_msg->key = *key;
948   memcpy (&put_msg[1], data, size);
949   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
950                                pending);
951   pending->in_pending_queue = GNUNET_YES;
952   process_pending_messages (handle);
953 }
954
955
956 /**
957  * Perform an asynchronous GET operation on the DHT identified. See
958  * also "GNUNET_BLOCK_evaluate".
959  *
960  * @param handle handle to the DHT service
961  * @param timeout how long to wait for transmission of this request to the service
962  * @param type expected type of the response object
963  * @param key the key to look up
964  * @param desired_replication_level estimate of how many
965                   nearest peers this request should reach
966  * @param options routing options for this message
967  * @param xquery extended query data (can be NULL, depending on type)
968  * @param xquery_size number of bytes in xquery
969  * @param iter function to call on each result
970  * @param iter_cls closure for iter
971  * @return handle to stop the async get
972  */
973 struct GNUNET_DHT_GetHandle *
974 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
975                       struct GNUNET_TIME_Relative timeout,
976                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
977                       uint32_t desired_replication_level,
978                       enum GNUNET_DHT_RouteOption options, const void *xquery,
979                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
980                       void *iter_cls)
981 {
982   struct GNUNET_DHT_ClientGetMessage *get_msg;
983   struct GNUNET_DHT_GetHandle *get_handle;
984   size_t msize;
985   struct PendingMessage *pending;
986
987   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
988   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
989       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
990   {
991     GNUNET_break (0);
992     return NULL;
993   }
994   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
995        GNUNET_h2s (key), handle);
996   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
997   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
998   pending->msg = &get_msg->header;
999   pending->handle = handle;
1000   pending->free_on_send = GNUNET_NO;
1001   get_msg->header.size = htons (msize);
1002   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
1003   get_msg->options = htonl ((uint32_t) options);
1004   get_msg->desired_replication_level = htonl (desired_replication_level);
1005   get_msg->type = htonl (type);
1006   get_msg->key = *key;
1007   handle->uid_gen++;
1008   get_msg->unique_id = handle->uid_gen;
1009   memcpy (&get_msg[1], xquery, xquery_size);
1010   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1011                                pending);
1012   pending->in_pending_queue = GNUNET_YES;
1013   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1014   get_handle->iter = iter;
1015   get_handle->iter_cls = iter_cls;
1016   get_handle->message = pending;
1017   get_handle->unique_id = get_msg->unique_id;
1018   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
1019                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1020   process_pending_messages (handle);
1021   return get_handle;
1022 }
1023
1024
1025 /**
1026  * Stop async DHT-get.
1027  *
1028  * @param get_handle handle to the GET operation to stop
1029  */
1030 void
1031 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1032 {
1033   struct GNUNET_DHT_Handle *handle;
1034   const struct GNUNET_DHT_ClientGetMessage *get_msg;
1035   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1036   struct PendingMessage *pending;
1037
1038   handle = get_handle->message->handle;
1039   get_msg =
1040       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
1041   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
1042        GNUNET_h2s (&get_msg->key), handle);
1043   /* generate STOP */
1044   pending =
1045       GNUNET_malloc (sizeof (struct PendingMessage) +
1046                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1047   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
1048   pending->msg = &stop_msg->header;
1049   pending->handle = handle;
1050   pending->free_on_send = GNUNET_YES;
1051   stop_msg->header.size =
1052       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1053   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1054   stop_msg->reserved = htonl (0);
1055   stop_msg->unique_id = get_msg->unique_id;
1056   stop_msg->key = get_msg->key;
1057   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1058                                pending);
1059   pending->in_pending_queue = GNUNET_YES;
1060
1061   /* remove 'GET' from active status */
1062   GNUNET_assert (GNUNET_YES ==
1063                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1064                                                        &get_msg->key,
1065                                                        get_handle));
1066   if (GNUNET_YES == get_handle->message->in_pending_queue)
1067   {
1068     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1069                                  get_handle->message);
1070     get_handle->message->in_pending_queue = GNUNET_NO;
1071   }
1072   GNUNET_free (get_handle->message);
1073   GNUNET_free (get_handle);
1074
1075   process_pending_messages (handle);
1076 }
1077
1078
1079 /**
1080  * Start monitoring the local DHT service.
1081  *
1082  * @param handle Handle to the DHT service.
1083  * @param type Type of blocks that are of interest.
1084  * @param key Key of data of interest, NULL for all.
1085  * @param get_cb Callback to process monitored get messages.
1086  * @param get_resp_cb Callback to process monitored get response messages.
1087  * @param put_cb Callback to process monitored put messages.
1088  * @param cb_cls Closure for cb.
1089  *
1090  * @return Handle to stop monitoring.
1091  */
1092 struct GNUNET_DHT_MonitorHandle *
1093 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1094                           enum GNUNET_BLOCK_Type type,
1095                           const GNUNET_HashCode *key,
1096                           GNUNET_DHT_MonitorGetCB get_cb,
1097                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1098                           GNUNET_DHT_MonitorPutCB put_cb,
1099                           void *cb_cls)
1100 {
1101   struct GNUNET_DHT_MonitorHandle *h;
1102   struct GNUNET_DHT_MonitorStartMessage *m;
1103   struct PendingMessage *pending;
1104
1105   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
1106   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
1107
1108   h->get_cb = get_cb;
1109   h->get_resp_cb = get_resp_cb;
1110   h->put_cb = put_cb;
1111   h->cb_cls = cb_cls;
1112   h->type = type;
1113   h->dht_handle = handle;
1114   if (NULL != key)
1115   {
1116     h->key = GNUNET_malloc (sizeof(GNUNET_HashCode));
1117     memcpy (h->key, key, sizeof(GNUNET_HashCode));
1118   }
1119
1120   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) +
1121                            sizeof (struct PendingMessage));
1122   m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1];
1123   pending->msg = &m->header;
1124   pending->handle = handle;
1125   pending->free_on_send = GNUNET_YES;
1126   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
1127   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage));
1128   m->type = htonl(type);
1129   m->get = (NULL != get_cb);
1130   m->get_resp = (NULL != get_resp_cb);
1131   m->put = (NULL != put_cb);
1132   if (NULL != key) {
1133     m->filter_key = 1;
1134     memcpy (&m->key, key, sizeof(GNUNET_HashCode));
1135   }
1136   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1137                                pending);
1138   pending->in_pending_queue = GNUNET_YES;
1139   process_pending_messages (handle);
1140
1141   return h;
1142 }
1143
1144
1145 /**
1146  * Stop monitoring.
1147  *
1148  * @param handle The handle to the monitor request returned by monitor_start.
1149  *
1150  * On return get_handle will no longer be valid, caller must not use again!!!
1151  */
1152 void
1153 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1154 {
1155   GNUNET_free_non_null (handle->key);
1156   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1157                                handle->dht_handle->monitor_tail,
1158                                handle);
1159   /* FIXME notify service of stop */
1160   GNUNET_free (handle);
1161 }
1162
1163
1164
1165 /* end of dht_api.c */