migrate transport_core API to MQ
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2012, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39
40 /**
41  * Handle to a PUT request.
42  */
43 struct GNUNET_DHT_PutHandle
44 {
45   /**
46    * Kept in a DLL.
47    */
48   struct GNUNET_DHT_PutHandle *next;
49
50   /**
51    * Kept in a DLL.
52    */
53   struct GNUNET_DHT_PutHandle *prev;
54
55   /**
56    * Continuation to call when done.
57    */
58   GNUNET_DHT_PutContinuation cont;
59
60   /**
61    * Main handle to this DHT api
62    */
63   struct GNUNET_DHT_Handle *dht_handle;
64
65   /**
66    * Closure for @e cont.
67    */
68   void *cont_cls;
69
70   /**
71    * Unique ID for the PUT operation.
72    */
73   uint64_t unique_id;
74
75 };
76
77 /**
78  * Handle to a GET request
79  */
80 struct GNUNET_DHT_GetHandle
81 {
82
83   /**
84    * Iterator to call on data receipt
85    */
86   GNUNET_DHT_GetIterator iter;
87
88   /**
89    * Closure for @a iter.
90    */
91   void *iter_cls;
92
93   /**
94    * Main handle to this DHT api
95    */
96   struct GNUNET_DHT_Handle *dht_handle;
97
98   /**
99    * Array of hash codes over the results that we have already
100    * seen.
101    */
102   struct GNUNET_HashCode *seen_results;
103
104   /**
105    * Key that this get request is for
106    */
107   struct GNUNET_HashCode key;
108
109   /**
110    * Unique identifier for this request (for key collisions).
111    */
112   uint64_t unique_id;
113
114   /**
115    * Size of the extended query, allocated at the end of this struct.
116    */
117   size_t xquery_size;
118
119   /**
120    * Desired replication level.
121    */
122   uint32_t desired_replication_level;
123
124   /**
125    * Type of the block we are looking for.
126    */
127   enum GNUNET_BLOCK_Type type;
128
129   /**
130    * Routing options.
131    */
132   enum GNUNET_DHT_RouteOption options;
133
134   /**
135    * Size of the @e seen_results array.  Note that not
136    * all positions might be used (as we over-allocate).
137    */
138   unsigned int seen_results_size;
139
140   /**
141    * Offset into the @e seen_results array marking the
142    * end of the positions that are actually used.
143    */
144   unsigned int seen_results_end;
145
146 };
147
148
149 /**
150  * Handle to a monitoring request.
151  */
152 struct GNUNET_DHT_MonitorHandle
153 {
154   /**
155    * DLL.
156    */
157   struct GNUNET_DHT_MonitorHandle *next;
158
159   /**
160    * DLL.
161    */
162   struct GNUNET_DHT_MonitorHandle *prev;
163
164   /**
165    * Main handle to this DHT api.
166    */
167   struct GNUNET_DHT_Handle *dht_handle;
168
169   /**
170    * Type of block looked for.
171    */
172   enum GNUNET_BLOCK_Type type;
173
174   /**
175    * Key being looked for, NULL == all.
176    */
177   struct GNUNET_HashCode *key;
178
179   /**
180    * Callback for each received message of type get.
181    */
182   GNUNET_DHT_MonitorGetCB get_cb;
183
184   /**
185    * Callback for each received message of type get response.
186    */
187   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
188
189   /**
190    * Callback for each received message of type put.
191    */
192   GNUNET_DHT_MonitorPutCB put_cb;
193
194   /**
195    * Closure for @e get_cb, @e put_cb and @e get_resp_cb.
196    */
197   void *cb_cls;
198
199 };
200
201
202 /**
203  * Connection to the DHT service.
204  */
205 struct GNUNET_DHT_Handle
206 {
207
208   /**
209    * Configuration to use.
210    */
211   const struct GNUNET_CONFIGURATION_Handle *cfg;
212
213   /**
214    * Connection to DHT service.
215    */
216   struct GNUNET_MQ_Handle *mq;
217
218   /**
219    * Head of linked list of messages we would like to monitor.
220    */
221   struct GNUNET_DHT_MonitorHandle *monitor_head;
222
223   /**
224    * Tail of linked list of messages we would like to monitor.
225    */
226   struct GNUNET_DHT_MonitorHandle *monitor_tail;
227
228   /**
229    * Head of active PUT requests.
230    */
231   struct GNUNET_DHT_PutHandle *put_head;
232
233   /**
234    * Tail of active PUT requests.
235    */
236   struct GNUNET_DHT_PutHandle *put_tail;
237
238   /**
239    * Hash map containing the current outstanding unique GET requests
240    * (values are of type `struct GNUNET_DHT_GetHandle`).
241    */
242   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
243
244   /**
245    * Task for trying to reconnect.
246    */
247   struct GNUNET_SCHEDULER_Task *reconnect_task;
248
249   /**
250    * How quickly should we retry?  Used for exponential back-off on
251    * connect-errors.
252    */
253   struct GNUNET_TIME_Relative retry_time;
254
255   /**
256    * Generator for unique ids.
257    */
258   uint64_t uid_gen;
259
260
261 };
262
263
264 /**
265  * Try to (re)connect to the DHT service.
266  *
267  * @param h DHT handle to reconnect
268  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
269  */
270 static int
271 try_connect (struct GNUNET_DHT_Handle *h);
272
273
274 /**
275  * Send GET message for a @a get_handle to DHT.
276  *
277  * @param gh GET to generate messages for.
278  */
279 static void
280 send_get (struct GNUNET_DHT_GetHandle *gh)
281 {
282   struct GNUNET_DHT_Handle *h = gh->dht_handle;
283   struct GNUNET_MQ_Envelope *env;
284   struct GNUNET_DHT_ClientGetMessage *get_msg;
285
286   env = GNUNET_MQ_msg_extra (get_msg,
287                              gh->xquery_size,
288                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
289   get_msg->options = htonl ((uint32_t) gh->options);
290   get_msg->desired_replication_level = htonl (gh->desired_replication_level);
291   get_msg->type = htonl (gh->type);
292   get_msg->key = gh->key;
293   get_msg->unique_id = gh->unique_id;
294   memcpy (&get_msg[1],
295           &gh[1],
296           gh->xquery_size);
297   GNUNET_MQ_send (h->mq,
298                   env);
299 }
300
301
302 /**
303  * Send GET message(s) for indicating which results are already known
304  * for a @a get_handle to DHT.  Complex as we need to send the list of
305  * known results, which means we may need mulitple messages to block
306  * known results from the result set.
307  *
308  * @param gh GET to generate messages for
309  * @param transmission_offset_start at which offset should we start?
310  */
311 static void
312 send_get_known_results (struct GNUNET_DHT_GetHandle *gh,
313                         unsigned int transmission_offset_start)
314 {
315   struct GNUNET_DHT_Handle *h = gh->dht_handle;
316   struct GNUNET_MQ_Envelope *env;
317   struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
318   unsigned int delta;
319   unsigned int max;
320   unsigned int transmission_offset;
321
322   max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*msg))
323     / sizeof (struct GNUNET_HashCode);
324   transmission_offset = transmission_offset_start;
325   while (transmission_offset < gh->seen_results_end)
326   {
327     delta = gh->seen_results_end - transmission_offset;
328     if (delta > max)
329       delta = max;
330     env = GNUNET_MQ_msg_extra (msg,
331                                delta * sizeof (struct GNUNET_HashCode),
332                                GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
333     msg->key = gh->key;
334     msg->unique_id = gh->unique_id;
335     memcpy (&msg[1],
336             &gh->seen_results[transmission_offset],
337             sizeof (struct GNUNET_HashCode) * delta);
338     GNUNET_MQ_send (h->mq,
339                     env);
340     transmission_offset += delta;
341   }
342 }
343
344
345 /**
346  * Add the GET request corresponding to the given route handle
347  * to the pending queue (if it is not already in there).
348  *
349  * @param cls the `struct GNUNET_DHT_Handle *`
350  * @param key key for the request (not used)
351  * @param value the `struct GNUNET_DHT_GetHandle *`
352  * @return #GNUNET_YES (always)
353  */
354 static int
355 add_get_request_to_pending (void *cls,
356                             const struct GNUNET_HashCode *key,
357                             void *value)
358 {
359   struct GNUNET_DHT_Handle *handle = cls;
360   struct GNUNET_DHT_GetHandle *gh = value;
361
362   LOG (GNUNET_ERROR_TYPE_DEBUG,
363        "Retransmitting request related to %s to DHT %p\n",
364        GNUNET_h2s (key),
365        handle);
366   send_get (gh);
367   send_get_known_results (gh, 0);
368   return GNUNET_YES;
369 }
370
371
372 /**
373  * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message.
374  *
375  * @param mh monitor handle to generate start message for
376  */
377 static void
378 send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh)
379 {
380   struct GNUNET_DHT_Handle *h = mh->dht_handle;
381   struct GNUNET_MQ_Envelope *env;
382   struct GNUNET_DHT_MonitorStartStopMessage *m;
383
384   env = GNUNET_MQ_msg (m,
385                        GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
386   m->type = htonl (mh->type);
387   m->get = htons (NULL != mh->get_cb);
388   m->get_resp = htons (NULL != mh->get_resp_cb);
389   m->put = htons (NULL != mh->put_cb);
390   if (NULL != mh->key)
391   {
392     m->filter_key = htons(1);
393     m->key = *mh->key;
394   }
395   GNUNET_MQ_send (h->mq,
396                   env);
397 }
398
399
400 /**
401  * Try reconnecting to the dht service.
402  *
403  * @param cls a `struct GNUNET_DHT_Handle`
404  */
405 static void
406 try_reconnect (void *cls)
407 {
408   struct GNUNET_DHT_Handle *h = cls;
409   struct GNUNET_DHT_MonitorHandle *mh;
410
411   LOG (GNUNET_ERROR_TYPE_DEBUG,
412        "Reconnecting with DHT %p\n",
413        h);
414   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
415   h->reconnect_task = NULL;
416   if (GNUNET_YES != try_connect (h))
417   {
418     LOG (GNUNET_ERROR_TYPE_WARNING,
419          "DHT reconnect failed!\n");
420     h->reconnect_task
421       = GNUNET_SCHEDULER_add_delayed (h->retry_time,
422                                       &try_reconnect,
423                                       h);
424     return;
425   }
426   GNUNET_CONTAINER_multihashmap_iterate (h->active_requests,
427                                          &add_get_request_to_pending,
428                                          h);
429   for (mh = h->monitor_head; NULL != mh; mh = mh->next)
430     send_monitor_start (mh);
431 }
432
433
434 /**
435  * Try reconnecting to the DHT service.
436  *
437  * @param h handle to dht to (possibly) disconnect and reconnect
438  */
439 static void
440 do_disconnect (struct GNUNET_DHT_Handle *h)
441 {
442   struct GNUNET_DHT_PutHandle *ph;
443   GNUNET_DHT_PutContinuation cont;
444   void *cont_cls;
445
446   if (NULL == h->mq)
447     return;
448   GNUNET_MQ_destroy (h->mq);
449   h->mq = NULL;
450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451               "Disconnecting from DHT service, will try to reconnect in %s\n",
452               GNUNET_STRINGS_relative_time_to_string (h->retry_time,
453                                                       GNUNET_YES));
454   /* notify client about all PUTs that (may) have failed due to disconnect */
455   while (NULL != (ph = h->put_head))
456   {
457     cont = ph->cont;
458     cont_cls = ph->cont_cls;
459     GNUNET_DHT_put_cancel (ph);
460     if (NULL != cont)
461       cont (cont_cls,
462             GNUNET_SYSERR);
463   }
464   GNUNET_assert (NULL == h->reconnect_task);
465   h->reconnect_task
466     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
467                                     &try_reconnect,
468                                     h);
469 }
470
471
472 /**
473  * Generic error handler, called with the appropriate error code and
474  * the same closure specified at the creation of the message queue.
475  * Not every message queue implementation supports an error handler.
476  *
477  * @param cls closure with the `struct GNUNET_DHT_Handle *`
478  * @param error error code
479  */
480 static void
481 mq_error_handler (void *cls,
482                   enum GNUNET_MQ_Error error)
483 {
484   struct GNUNET_DHT_Handle *h = cls;
485
486   do_disconnect (h);
487 }
488
489
490 /**
491  * Verify integrity of a get monitor message from the service.
492  *
493  * @param cls The DHT handle.
494  * @param msg Monitor get message from the service.
495  * @return #GNUNET_OK if everything went fine,
496  *         #GNUNET_SYSERR if the message is malformed.
497  */
498 static int
499 check_monitor_get (void *cls,
500                    const struct GNUNET_DHT_MonitorGetMessage *msg)
501 {
502   uint32_t plen = ntohl (msg->get_path_length);
503   uint16_t msize = ntohs (msg->header.size) - sizeof (*msg);
504
505   if ( (plen > UINT16_MAX) ||
506        (plen * sizeof (struct GNUNET_HashCode) != msize) )
507   {
508     GNUNET_break (0);
509     return GNUNET_SYSERR;
510   }
511   return GNUNET_OK;
512 }
513
514
515 /**
516  * Process a get monitor message from the service.
517  *
518  * @param cls The DHT handle.
519  * @param msg Monitor get message from the service.
520  */
521 static void
522 handle_monitor_get (void *cls,
523                     const struct GNUNET_DHT_MonitorGetMessage *msg)
524 {
525   struct GNUNET_DHT_Handle *handle = cls;
526   struct GNUNET_DHT_MonitorHandle *mh;
527
528   for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
529   {
530     if (NULL == mh->get_cb)
531       continue;
532     if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
533            (mh->type == ntohl (msg->type)) ) &&
534          ( (NULL == mh->key) ||
535            (0 == memcmp (mh->key,
536                          &msg->key,
537                          sizeof (struct GNUNET_HashCode))) ) )
538       mh->get_cb (mh->cb_cls,
539                   ntohl (msg->options),
540                   (enum GNUNET_BLOCK_Type) ntohl(msg->type),
541                   ntohl (msg->hop_count),
542                   ntohl (msg->desired_replication_level),
543                   ntohl (msg->get_path_length),
544                   (struct GNUNET_PeerIdentity *) &msg[1],
545                   &msg->key);
546   }
547 }
548
549
550 /**
551  * Validate a get response monitor message from the service.
552  *
553  * @param cls The DHT handle.
554  * @param msg monitor get response message from the service
555  * @return #GNUNET_OK if everything went fine,
556  *         #GNUNET_SYSERR if the message is malformed.
557  */
558 static int
559 check_monitor_get_resp (void *cls,
560                         const struct GNUNET_DHT_MonitorGetRespMessage *msg)
561 {
562   size_t msize = ntohs (msg->header.size) - sizeof (*msg);
563   uint32_t getl = ntohl (msg->get_path_length);
564   uint32_t putl = ntohl (msg->put_path_length);
565
566   if ( (getl + putl < getl) ||
567        ( (msize / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
568   {
569     GNUNET_break (0);
570     return GNUNET_SYSERR;
571   }
572   return GNUNET_OK;
573 }
574
575
576 /**
577  * Process a get response monitor message from the service.
578  *
579  * @param cls The DHT handle.
580  * @param msg monitor get response message from the service
581  */
582 static void
583 handle_monitor_get_resp (void *cls,
584                          const struct GNUNET_DHT_MonitorGetRespMessage *msg)
585 {
586   struct GNUNET_DHT_Handle *handle = cls;
587   size_t msize = ntohs (msg->header.size) - sizeof (*msg);
588   const struct GNUNET_PeerIdentity *path;
589   uint32_t getl = ntohl (msg->get_path_length);
590   uint32_t putl = ntohl (msg->put_path_length);
591   struct GNUNET_DHT_MonitorHandle *mh;
592
593   path = (const struct GNUNET_PeerIdentity *) &msg[1];
594   for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
595   {
596     if (NULL == mh->get_resp_cb)
597       continue;
598     if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
599            (mh->type == ntohl(msg->type)) ) &&
600          ( (NULL == mh->key) ||
601            (0 == memcmp (mh->key,
602                          &msg->key,
603                          sizeof (struct GNUNET_HashCode))) ) )
604       mh->get_resp_cb (mh->cb_cls,
605                        (enum GNUNET_BLOCK_Type) ntohl (msg->type),
606                        path,
607                        getl,
608                        &path[getl],
609                        putl,
610                        GNUNET_TIME_absolute_ntoh(msg->expiration_time),
611                        &msg->key,
612                        (const void *) &path[getl + putl],
613                        msize - sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
614   }
615 }
616
617
618 /**
619  * Check validity of a put monitor message from the service.
620  *
621  * @param cls The DHT handle.
622  * @param msg Monitor put message from the service.
623  * @return #GNUNET_OK if everything went fine,
624  *         #GNUNET_SYSERR if the message is malformed.
625  */
626 static int
627 check_monitor_put (void *cls,
628                    const struct GNUNET_DHT_MonitorPutMessage *msg)
629 {
630   size_t msize;
631   uint32_t putl;
632
633   msize = ntohs (msg->header.size) - sizeof (*msg);
634   putl = ntohl (msg->put_path_length);
635   if ((msize / sizeof (struct GNUNET_PeerIdentity)) < putl)
636   {
637     GNUNET_break (0);
638     return GNUNET_SYSERR;
639   }
640   return GNUNET_OK;
641 }
642
643
644 /**
645  * Process a put monitor message from the service.
646  *
647  * @param cls The DHT handle.
648  * @param msg Monitor put message from the service.
649  */
650 static void
651 handle_monitor_put (void *cls,
652                     const struct GNUNET_DHT_MonitorPutMessage *msg)
653 {
654   struct GNUNET_DHT_Handle *handle = cls;
655   size_t msize = ntohs (msg->header.size) - sizeof (*msg);
656   uint32_t putl = ntohl (msg->put_path_length);
657   const struct GNUNET_PeerIdentity *path;
658   struct GNUNET_DHT_MonitorHandle *mh;
659
660   path = (const struct GNUNET_PeerIdentity *) &msg[1];
661   for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
662   {
663     if (NULL == mh->put_cb)
664       continue;
665     if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
666            (mh->type == ntohl(msg->type)) ) &&
667          ( (NULL == mh->key) ||
668            (0 == memcmp (mh->key,
669                          &msg->key,
670                          sizeof (struct GNUNET_HashCode))) ) )
671       mh->put_cb (mh->cb_cls,
672                   ntohl (msg->options),
673                   (enum GNUNET_BLOCK_Type) ntohl(msg->type),
674                   ntohl (msg->hop_count),
675                   ntohl (msg->desired_replication_level),
676                   putl,
677                   path,
678                   GNUNET_TIME_absolute_ntoh(msg->expiration_time),
679                   &msg->key,
680                   (const void *) &path[putl],
681                   msize - sizeof (struct GNUNET_PeerIdentity) * putl);
682   }
683 }
684
685
686 /**
687  * Verify that client result  message received from the service is well-formed.
688  *
689  * @param cls The DHT handle.
690  * @param msg Monitor put message from the service.
691  * @return #GNUNET_OK if everything went fine,
692  *         #GNUNET_SYSERR if the message is malformed.
693  */
694 static int
695 check_client_result (void *cls,
696                      const struct GNUNET_DHT_ClientResultMessage *msg)
697 {
698   size_t msize = ntohs (msg->header.size) - sizeof (*msg);
699   uint32_t put_path_length = ntohl (msg->put_path_length);
700   uint32_t get_path_length = ntohl (msg->get_path_length);
701   size_t meta_length;
702
703   meta_length =
704     sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
705   if ( (msize < meta_length) ||
706        (get_path_length >
707         GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
708        (put_path_length >
709         GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
710   {
711     GNUNET_break (0);
712     return GNUNET_SYSERR;
713   }
714   return GNUNET_OK;
715 }
716
717
718 /**
719  * Process a given reply that might match the given request.
720  *
721  * @param cls the `struct GNUNET_DHT_ClientResultMessage`
722  * @param key query of the request
723  * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key
724  * @return #GNUNET_YES to continue to iterate over all results
725  */
726 static int
727 process_client_result (void *cls,
728                        const struct GNUNET_HashCode *key,
729                        void *value)
730 {
731   const struct GNUNET_DHT_ClientResultMessage *crm = cls;
732   struct GNUNET_DHT_GetHandle *get_handle = value;
733   size_t msize = ntohs (crm->header.size) - sizeof (*crm);
734   uint32_t put_path_length = ntohl (crm->put_path_length);
735   uint32_t get_path_length = ntohl (crm->get_path_length);
736   const struct GNUNET_PeerIdentity *put_path;
737   const struct GNUNET_PeerIdentity *get_path;
738   struct GNUNET_HashCode hc;
739   size_t data_length;
740   size_t meta_length;
741   const void *data;
742
743   if (crm->unique_id != get_handle->unique_id)
744   {
745     /* UID mismatch */
746     LOG (GNUNET_ERROR_TYPE_DEBUG,
747          "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
748          GNUNET_h2s (key),
749          crm->unique_id,
750          get_handle->unique_id);
751     return GNUNET_YES;
752   }
753   /* FIXME: might want to check that type matches */
754   meta_length =
755       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
756   data_length = msize - meta_length;
757   LOG (GNUNET_ERROR_TYPE_DEBUG,
758        "Giving %u byte reply for %s to application\n",
759        (unsigned int) data_length,
760        GNUNET_h2s (key));
761   put_path = (const struct GNUNET_PeerIdentity *) &crm[1];
762   get_path = &put_path[put_path_length];
763   data = &get_path[get_path_length];
764   /* remember that we've seen this result */
765   GNUNET_CRYPTO_hash (data,
766                       data_length,
767                       &hc);
768   if (get_handle->seen_results_size == get_handle->seen_results_end)
769     GNUNET_array_grow (get_handle->seen_results,
770                        get_handle->seen_results_size,
771                        get_handle->seen_results_size * 2 + 1);
772   get_handle->seen_results[get_handle->seen_results_end++] = hc;
773   /* no need to block it explicitly, service already knows about it! */
774   get_handle->iter (get_handle->iter_cls,
775                     GNUNET_TIME_absolute_ntoh (crm->expiration),
776                     key,
777                     get_path,
778                     get_path_length,
779                     put_path,
780                     put_path_length,
781                     ntohl (crm->type),
782                     data_length,
783                     data);
784   return GNUNET_YES;
785 }
786
787
788 /**
789  * Process a client result  message received from the service.
790  *
791  * @param cls The DHT handle.
792  * @param msg Monitor put message from the service.
793  */
794 static void
795 handle_client_result (void *cls,
796                       const struct GNUNET_DHT_ClientResultMessage *msg)
797 {
798   struct GNUNET_DHT_Handle *handle = cls;
799
800   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
801                                               &msg->key,
802                                               &process_client_result,
803                                               (void *) msg);
804 }
805
806
807 /**
808  * Process a put confirmation message from the service.
809  *
810  * @param cls The DHT handle.
811  * @param msg confirmation message from the service.
812  */
813 static void
814 handle_put_confirmation (void *cls,
815                          const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
816 {
817   struct GNUNET_DHT_Handle *handle = cls;
818   struct GNUNET_DHT_PutHandle *ph;
819   GNUNET_DHT_PutContinuation cont;
820   void *cont_cls;
821
822   for (ph = handle->put_head; NULL != ph; ph = ph->next)
823     if (ph->unique_id == msg->unique_id)
824       break;
825   if (NULL == ph)
826     return;
827   cont = ph->cont;
828   cont_cls = ph->cont_cls;
829   GNUNET_DHT_put_cancel (ph);
830   if (NULL != cont)
831     cont (cont_cls,
832           GNUNET_OK);
833 }
834
835
836 /**
837  * Try to (re)connect to the DHT service.
838  *
839  * @param h DHT handle to reconnect
840  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
841  */
842 static int
843 try_connect (struct GNUNET_DHT_Handle *h)
844 {
845   GNUNET_MQ_hd_var_size (monitor_get,
846                          GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
847                          struct GNUNET_DHT_MonitorGetMessage);
848   GNUNET_MQ_hd_var_size (monitor_get_resp,
849                          GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
850                          struct GNUNET_DHT_MonitorGetRespMessage);
851   GNUNET_MQ_hd_var_size (monitor_put,
852                          GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
853                          struct GNUNET_DHT_MonitorPutMessage);
854   GNUNET_MQ_hd_var_size (client_result,
855                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
856                          struct GNUNET_DHT_ClientResultMessage);
857   GNUNET_MQ_hd_fixed_size (put_confirmation,
858                            GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK,
859                            struct GNUNET_DHT_ClientPutConfirmationMessage);
860   struct GNUNET_MQ_MessageHandler handlers[] = {
861     make_monitor_get_handler (h),
862     make_monitor_get_resp_handler (h),
863     make_monitor_put_handler (h),
864     make_client_result_handler (h),
865     make_put_confirmation_handler (h),
866     GNUNET_MQ_handler_end ()
867   };
868   if (NULL != h->mq)
869     return GNUNET_OK;
870   h->mq = GNUNET_CLIENT_connecT (h->cfg,
871                                  "dht",
872                                  handlers,
873                                  &mq_error_handler,
874                                  h);
875   if (NULL == h->mq)
876   {
877     LOG (GNUNET_ERROR_TYPE_WARNING,
878          "Failed to connect to the DHT service!\n");
879     return GNUNET_NO;
880   }
881   return GNUNET_YES;
882 }
883
884
885 /**
886  * Initialize the connection with the DHT service.
887  *
888  * @param cfg configuration to use
889  * @param ht_len size of the internal hash table to use for
890  *               processing multiple GET/FIND requests in parallel
891  * @return handle to the DHT service, or NULL on error
892  */
893 struct GNUNET_DHT_Handle *
894 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
895                     unsigned int ht_len)
896 {
897   struct GNUNET_DHT_Handle *handle;
898
899   handle = GNUNET_new (struct GNUNET_DHT_Handle);
900   handle->cfg = cfg;
901   handle->uid_gen
902     = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
903                                 UINT64_MAX);
904   handle->active_requests
905     = GNUNET_CONTAINER_multihashmap_create (ht_len,
906                                             GNUNET_YES);
907   if (GNUNET_NO == try_connect (handle))
908   {
909     GNUNET_DHT_disconnect (handle);
910     return NULL;
911   }
912   return handle;
913 }
914
915
916 /**
917  * Shutdown connection with the DHT service.
918  *
919  * @param handle handle of the DHT connection to stop
920  */
921 void
922 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
923 {
924   struct GNUNET_DHT_PutHandle *ph;
925
926   GNUNET_assert (0 ==
927                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
928   while (NULL != (ph = handle->put_head))
929   {
930     if (NULL != ph->cont)
931       ph->cont (ph->cont_cls,
932                 GNUNET_SYSERR);
933     GNUNET_DHT_put_cancel (ph);
934   }
935   if (NULL != handle->mq)
936   {
937     GNUNET_MQ_destroy (handle->mq);
938     handle->mq = NULL;
939   }
940   if (NULL != handle->reconnect_task)
941   {
942     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
943     handle->reconnect_task = NULL;
944   }
945   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
946   GNUNET_free (handle);
947 }
948
949
950 /**
951  * Perform a PUT operation storing data in the DHT.  FIXME: we should
952  * change the protocol to get a confirmation for the PUT from the DHT
953  * and call 'cont' only after getting the confirmation; otherwise, the
954  * client has no good way of telling if the 'PUT' message actually got
955  * to the DHT service!
956  *
957  * @param handle handle to DHT service
958  * @param key the key to store under
959  * @param desired_replication_level estimate of how many
960  *                nearest peers this request should reach
961  * @param options routing options for this message
962  * @param type type of the value
963  * @param size number of bytes in data; must be less than 64k
964  * @param data the data to store
965  * @param exp desired expiration time for the value
966  * @param cont continuation to call when done (transmitting request to service)
967  *        You must not call #GNUNET_DHT_disconnect in this continuation
968  * @param cont_cls closure for @a cont
969  */
970 struct GNUNET_DHT_PutHandle *
971 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
972                 const struct GNUNET_HashCode *key,
973                 uint32_t desired_replication_level,
974                 enum GNUNET_DHT_RouteOption options,
975                 enum GNUNET_BLOCK_Type type,
976                 size_t size,
977                 const void *data,
978                 struct GNUNET_TIME_Absolute exp,
979                 GNUNET_DHT_PutContinuation cont,
980                 void *cont_cls)
981 {
982   struct GNUNET_MQ_Envelope *env;
983   struct GNUNET_DHT_ClientPutMessage *put_msg;
984   size_t msize;
985   struct GNUNET_DHT_PutHandle *ph;
986
987   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
988   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
989       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
990   {
991     GNUNET_break (0);
992     return NULL;
993   }
994   if (NULL == handle->mq)
995     return NULL;
996   LOG (GNUNET_ERROR_TYPE_DEBUG,
997        "Sending PUT for %s to DHT via %p\n",
998        GNUNET_h2s (key),
999        handle);
1000   ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
1001   ph->dht_handle = handle;
1002   ph->cont = cont;
1003   ph->cont_cls = cont_cls;
1004   ph->unique_id = ++handle->uid_gen;
1005   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1006                                     handle->put_tail,
1007                                     ph);
1008   env = GNUNET_MQ_msg_extra (put_msg,
1009                              size,
1010                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1011   put_msg->type = htonl ((uint32_t) type);
1012   put_msg->options = htonl ((uint32_t) options);
1013   put_msg->desired_replication_level = htonl (desired_replication_level);
1014   put_msg->unique_id = ph->unique_id;
1015   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1016   put_msg->key = *key;
1017   memcpy (&put_msg[1],
1018           data,
1019           size);
1020   GNUNET_MQ_send (handle->mq,
1021                   env);
1022   return ph;
1023 }
1024
1025
1026 /**
1027  * Cancels a DHT PUT operation.  Note that the PUT request may still
1028  * go out over the network (we can't stop that); However, if the PUT
1029  * has not yet been sent to the service, cancelling the PUT will stop
1030  * this from happening (but there is no way for the user of this API
1031  * to tell if that is the case).  The only use for this API is to
1032  * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1033  * the system is shutting down).
1034  *
1035  * @param ph put operation to cancel ('cont' will no longer be called)
1036  */
1037 void
1038 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1039 {
1040   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1041
1042   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1043                                handle->put_tail,
1044                                ph);
1045   GNUNET_free (ph);
1046 }
1047
1048
1049 /**
1050  * Perform an asynchronous GET operation on the DHT identified. See
1051  * also #GNUNET_BLOCK_evaluate.
1052  *
1053  * @param handle handle to the DHT service
1054  * @param type expected type of the response object
1055  * @param key the key to look up
1056  * @param desired_replication_level estimate of how many
1057                   nearest peers this request should reach
1058  * @param options routing options for this message
1059  * @param xquery extended query data (can be NULL, depending on type)
1060  * @param xquery_size number of bytes in @a xquery
1061  * @param iter function to call on each result
1062  * @param iter_cls closure for @a iter
1063  * @return handle to stop the async get
1064  */
1065 struct GNUNET_DHT_GetHandle *
1066 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1067                       enum GNUNET_BLOCK_Type type,
1068                       const struct GNUNET_HashCode *key,
1069                       uint32_t desired_replication_level,
1070                       enum GNUNET_DHT_RouteOption options,
1071                       const void *xquery,
1072                       size_t xquery_size,
1073                       GNUNET_DHT_GetIterator iter,
1074                       void *iter_cls)
1075 {
1076   struct GNUNET_DHT_GetHandle *gh;
1077   size_t msize;
1078
1079   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1080   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1081       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1082   {
1083     GNUNET_break (0);
1084     return NULL;
1085   }
1086   LOG (GNUNET_ERROR_TYPE_DEBUG,
1087        "Sending query for %s to DHT %p\n",
1088        GNUNET_h2s (key),
1089        handle);
1090   gh = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle) +
1091                       xquery_size);
1092   gh->iter = iter;
1093   gh->iter_cls = iter_cls;
1094   gh->dht_handle = handle;
1095   gh->key = *key;
1096   gh->unique_id = ++handle->uid_gen;
1097   gh->xquery_size = xquery_size;
1098   gh->desired_replication_level = desired_replication_level;
1099   gh->type = type;
1100   gh->options = options;
1101   memcpy (&gh[1],
1102           xquery,
1103           xquery_size);
1104   GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1105                                      &gh->key,
1106                                      gh,
1107                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1108   if (NULL != handle->mq)
1109     send_get (gh);
1110   return gh;
1111 }
1112
1113
1114 /**
1115  * Tell the DHT not to return any of the following known results
1116  * to this client.
1117  *
1118  * @param get_handle get operation for which results should be filtered
1119  * @param num_results number of results to be blocked that are
1120  *        provided in this call (size of the @a results array)
1121  * @param results array of hash codes over the 'data' of the results
1122  *        to be blocked
1123  */
1124 void
1125 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1126                                      unsigned int num_results,
1127                                      const struct GNUNET_HashCode *results)
1128 {
1129   unsigned int needed;
1130   unsigned int had;
1131
1132   had = get_handle->seen_results_end;
1133   needed = had + num_results;
1134   if (needed > get_handle->seen_results_size)
1135     GNUNET_array_grow (get_handle->seen_results,
1136                        get_handle->seen_results_size,
1137                        needed);
1138   memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1139           results,
1140           num_results * sizeof (struct GNUNET_HashCode));
1141   get_handle->seen_results_end += num_results;
1142   if (NULL != get_handle->dht_handle->mq)
1143     send_get_known_results (get_handle,
1144                             had);
1145 }
1146
1147
1148 /**
1149  * Stop async DHT-get.
1150  *
1151  * @param get_handle handle to the GET operation to stop
1152  */
1153 void
1154 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1155 {
1156   struct GNUNET_DHT_Handle *handle = get_handle->dht_handle;
1157
1158   LOG (GNUNET_ERROR_TYPE_DEBUG,
1159        "Sending STOP for %s to DHT via %p\n",
1160        GNUNET_h2s (&get_handle->key),
1161        handle);
1162   if (NULL != handle->mq)
1163   {
1164     struct GNUNET_MQ_Envelope *env;
1165     struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1166
1167     env = GNUNET_MQ_msg (stop_msg,
1168                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1169     stop_msg->reserved = htonl (0);
1170     stop_msg->unique_id = get_handle->unique_id;
1171     stop_msg->key = get_handle->key;
1172     GNUNET_MQ_send (handle->mq,
1173                     env);
1174   }
1175   GNUNET_assert (GNUNET_YES ==
1176                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1177                                                        &get_handle->key,
1178                                                        get_handle));
1179   GNUNET_array_grow (get_handle->seen_results,
1180                      get_handle->seen_results_end,
1181                      0);
1182   GNUNET_free (get_handle);
1183 }
1184
1185
1186 /**
1187  * Start monitoring the local DHT service.
1188  *
1189  * @param handle Handle to the DHT service.
1190  * @param type Type of blocks that are of interest.
1191  * @param key Key of data of interest, NULL for all.
1192  * @param get_cb Callback to process monitored get messages.
1193  * @param get_resp_cb Callback to process monitored get response messages.
1194  * @param put_cb Callback to process monitored put messages.
1195  * @param cb_cls Closure for callbacks.
1196  * @return Handle to stop monitoring.
1197  */
1198 struct GNUNET_DHT_MonitorHandle *
1199 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1200                           enum GNUNET_BLOCK_Type type,
1201                           const struct GNUNET_HashCode *key,
1202                           GNUNET_DHT_MonitorGetCB get_cb,
1203                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1204                           GNUNET_DHT_MonitorPutCB put_cb,
1205                           void *cb_cls)
1206 {
1207   struct GNUNET_DHT_MonitorHandle *mh;
1208
1209   mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1210   mh->get_cb = get_cb;
1211   mh->get_resp_cb = get_resp_cb;
1212   mh->put_cb = put_cb;
1213   mh->cb_cls = cb_cls;
1214   mh->type = type;
1215   mh->dht_handle = handle;
1216   if (NULL != key)
1217   {
1218     mh->key = GNUNET_new (struct GNUNET_HashCode);
1219     *mh->key = *key;
1220   }
1221   GNUNET_CONTAINER_DLL_insert (handle->monitor_head,
1222                                handle->monitor_tail,
1223                                mh);
1224   if (NULL != handle->mq)
1225     send_monitor_start (mh);
1226   return mh;
1227 }
1228
1229
1230 /**
1231  * Stop monitoring.
1232  *
1233  * @param mh The handle to the monitor request returned by monitor_start.
1234  *
1235  * On return get_handle will no longer be valid, caller must not use again!!!
1236  */
1237 void
1238 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh)
1239 {
1240   struct GNUNET_DHT_Handle *handle = mh->dht_handle;
1241   struct GNUNET_DHT_MonitorStartStopMessage *m;
1242   struct GNUNET_MQ_Envelope *env;
1243
1244   GNUNET_CONTAINER_DLL_remove (handle->monitor_head,
1245                                handle->monitor_tail,
1246                                mh);
1247   env = GNUNET_MQ_msg (m,
1248                        GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1249   m->type = htonl (mh->type);
1250   m->get = htons (NULL != mh->get_cb);
1251   m->get_resp = htons(NULL != mh->get_resp_cb);
1252   m->put = htons (NULL != mh->put_cb);
1253   if (NULL != mh->key)
1254   {
1255     m->filter_key = htons (1);
1256     m->key = *mh->key;
1257   }
1258   GNUNET_MQ_send (handle->mq,
1259                   env);
1260   GNUNET_free_non_null (mh->key);
1261   GNUNET_free (mh);
1262 }
1263
1264
1265 /* end of dht_api.c */