remove 'illegal' (non-reentrant) log logic from signal handler
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2012, 2016, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind, ...) GNUNET_log_from (kind, "dht-api", __VA_ARGS__)
38
39
40 /**
41  * Handle to a PUT request.
42  */
43 struct GNUNET_DHT_PutHandle
44 {
45   /**
46    * Kept in a DLL.
47    */
48   struct GNUNET_DHT_PutHandle *next;
49
50   /**
51    * Kept in a DLL.
52    */
53   struct GNUNET_DHT_PutHandle *prev;
54
55   /**
56    * Continuation to call when done.
57    */
58   GNUNET_SCHEDULER_TaskCallback cont;
59
60   /**
61    * Main handle to this DHT api
62    */
63   struct GNUNET_DHT_Handle *dht_handle;
64
65   /**
66    * Closure for @e cont.
67    */
68   void *cont_cls;
69
70   /**
71    * Envelope from the PUT operation.
72    */
73   struct GNUNET_MQ_Envelope *env;
74 };
75
76 /**
77  * Handle to a GET request
78  */
79 struct GNUNET_DHT_GetHandle
80 {
81   /**
82    * Iterator to call on data receipt
83    */
84   GNUNET_DHT_GetIterator iter;
85
86   /**
87    * Closure for @e iter.
88    */
89   void *iter_cls;
90
91   /**
92    * Main handle to this DHT api
93    */
94   struct GNUNET_DHT_Handle *dht_handle;
95
96   /**
97    * Array of hash codes over the results that we have already
98    * seen.
99    */
100   struct GNUNET_HashCode *seen_results;
101
102   /**
103    * Key that this get request is for
104    */
105   struct GNUNET_HashCode key;
106
107   /**
108    * Unique identifier for this request (for key collisions).
109    */
110   uint64_t unique_id;
111
112   /**
113    * Size of the extended query, allocated at the end of this struct.
114    */
115   size_t xquery_size;
116
117   /**
118    * Desired replication level.
119    */
120   uint32_t desired_replication_level;
121
122   /**
123    * Type of the block we are looking for.
124    */
125   enum GNUNET_BLOCK_Type type;
126
127   /**
128    * Routing options.
129    */
130   enum GNUNET_DHT_RouteOption options;
131
132   /**
133    * Size of the @e seen_results array.  Note that not
134    * all positions might be used (as we over-allocate).
135    */
136   unsigned int seen_results_size;
137
138   /**
139    * Offset into the @e seen_results array marking the
140    * end of the positions that are actually used.
141    */
142   unsigned int seen_results_end;
143 };
144
145
146 /**
147  * Handle to a monitoring request.
148  */
149 struct GNUNET_DHT_MonitorHandle
150 {
151   /**
152    * DLL.
153    */
154   struct GNUNET_DHT_MonitorHandle *next;
155
156   /**
157    * DLL.
158    */
159   struct GNUNET_DHT_MonitorHandle *prev;
160
161   /**
162    * Main handle to this DHT api.
163    */
164   struct GNUNET_DHT_Handle *dht_handle;
165
166   /**
167    * Type of block looked for.
168    */
169   enum GNUNET_BLOCK_Type type;
170
171   /**
172    * Key being looked for, NULL == all.
173    */
174   struct GNUNET_HashCode *key;
175
176   /**
177    * Callback for each received message of type get.
178    */
179   GNUNET_DHT_MonitorGetCB get_cb;
180
181   /**
182    * Callback for each received message of type get response.
183    */
184   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
185
186   /**
187    * Callback for each received message of type put.
188    */
189   GNUNET_DHT_MonitorPutCB put_cb;
190
191   /**
192    * Closure for @e get_cb, @e put_cb and @e get_resp_cb.
193    */
194   void *cb_cls;
195 };
196
197
198 /**
199  * Connection to the DHT service.
200  */
201 struct GNUNET_DHT_Handle
202 {
203   /**
204    * Configuration to use.
205    */
206   const struct GNUNET_CONFIGURATION_Handle *cfg;
207
208   /**
209    * Connection to DHT service.
210    */
211   struct GNUNET_MQ_Handle *mq;
212
213   /**
214    * Head of linked list of messages we would like to monitor.
215    */
216   struct GNUNET_DHT_MonitorHandle *monitor_head;
217
218   /**
219    * Tail of linked list of messages we would like to monitor.
220    */
221   struct GNUNET_DHT_MonitorHandle *monitor_tail;
222
223   /**
224    * Head of active PUT requests.
225    */
226   struct GNUNET_DHT_PutHandle *put_head;
227
228   /**
229    * Tail of active PUT requests.
230    */
231   struct GNUNET_DHT_PutHandle *put_tail;
232
233   /**
234    * Hash map containing the current outstanding unique GET requests
235    * (values are of type `struct GNUNET_DHT_GetHandle`).
236    */
237   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
238
239   /**
240    * Task for trying to reconnect.
241    */
242   struct GNUNET_SCHEDULER_Task *reconnect_task;
243
244   /**
245    * How quickly should we retry?  Used for exponential back-off on
246    * connect-errors.
247    */
248   struct GNUNET_TIME_Relative retry_time;
249
250   /**
251    * Generator for unique ids.
252    */
253   uint64_t uid_gen;
254 };
255
256
257 /**
258  * Try to (re)connect to the DHT service.
259  *
260  * @param h DHT handle to reconnect
261  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
262  */
263 static int
264 try_connect (struct GNUNET_DHT_Handle *h);
265
266
267 /**
268  * Send GET message for a @a get_handle to DHT.
269  *
270  * @param gh GET to generate messages for.
271  */
272 static void
273 send_get (struct GNUNET_DHT_GetHandle *gh)
274 {
275   struct GNUNET_DHT_Handle *h = gh->dht_handle;
276   struct GNUNET_MQ_Envelope *env;
277   struct GNUNET_DHT_ClientGetMessage *get_msg;
278
279   env = GNUNET_MQ_msg_extra (get_msg,
280                              gh->xquery_size,
281                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
282   get_msg->options = htonl ((uint32_t) gh->options);
283   get_msg->desired_replication_level = htonl (gh->desired_replication_level);
284   get_msg->type = htonl (gh->type);
285   get_msg->key = gh->key;
286   get_msg->unique_id = gh->unique_id;
287   GNUNET_memcpy (&get_msg[1],
288                  &gh[1],
289                  gh->xquery_size);
290   GNUNET_MQ_send (h->mq,
291                   env);
292 }
293
294
295 /**
296  * Send GET message(s) for indicating which results are already known
297  * for a @a get_handle to DHT.  Complex as we need to send the list of
298  * known results, which means we may need mulitple messages to block
299  * known results from the result set.
300  *
301  * @param gh GET to generate messages for
302  * @param transmission_offset_start at which offset should we start?
303  */
304 static void
305 send_get_known_results (struct GNUNET_DHT_GetHandle *gh,
306                         unsigned int transmission_offset_start)
307 {
308   struct GNUNET_DHT_Handle *h = gh->dht_handle;
309   struct GNUNET_MQ_Envelope *env;
310   struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
311   unsigned int delta;
312   unsigned int max;
313   unsigned int transmission_offset;
314
315   max = (GNUNET_MAX_MESSAGE_SIZE - sizeof(*msg))
316         / sizeof(struct GNUNET_HashCode);
317   transmission_offset = transmission_offset_start;
318   while (transmission_offset < gh->seen_results_end)
319   {
320     delta = gh->seen_results_end - transmission_offset;
321     if (delta > max)
322       delta = max;
323     env = GNUNET_MQ_msg_extra (msg,
324                                delta * sizeof(struct GNUNET_HashCode),
325                                GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
326     msg->key = gh->key;
327     msg->unique_id = gh->unique_id;
328     GNUNET_memcpy (&msg[1],
329                    &gh->seen_results[transmission_offset],
330                    sizeof(struct GNUNET_HashCode) * delta);
331     GNUNET_MQ_send (h->mq,
332                     env);
333     transmission_offset += delta;
334   }
335 }
336
337
338 /**
339  * Add the GET request corresponding to the given route handle
340  * to the pending queue (if it is not already in there).
341  *
342  * @param cls the `struct GNUNET_DHT_Handle *`
343  * @param key key for the request (not used)
344  * @param value the `struct GNUNET_DHT_GetHandle *`
345  * @return #GNUNET_YES (always)
346  */
347 static int
348 add_get_request_to_pending (void *cls,
349                             const struct GNUNET_HashCode *key,
350                             void *value)
351 {
352   struct GNUNET_DHT_Handle *handle = cls;
353   struct GNUNET_DHT_GetHandle *gh = value;
354
355   LOG (GNUNET_ERROR_TYPE_DEBUG,
356        "Retransmitting request related to %s to DHT %p\n",
357        GNUNET_h2s (key),
358        handle);
359   send_get (gh);
360   send_get_known_results (gh, 0);
361   return GNUNET_YES;
362 }
363
364
365 /**
366  * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message.
367  *
368  * @param mh monitor handle to generate start message for
369  */
370 static void
371 send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh)
372 {
373   struct GNUNET_DHT_Handle *h = mh->dht_handle;
374   struct GNUNET_MQ_Envelope *env;
375   struct GNUNET_DHT_MonitorStartStopMessage *m;
376
377   env = GNUNET_MQ_msg (m,
378                        GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
379   m->type = htonl (mh->type);
380   m->get = htons (NULL != mh->get_cb);
381   m->get_resp = htons (NULL != mh->get_resp_cb);
382   m->put = htons (NULL != mh->put_cb);
383   if (NULL != mh->key)
384   {
385     m->filter_key = htons (1);
386     m->key = *mh->key;
387   }
388   GNUNET_MQ_send (h->mq,
389                   env);
390 }
391
392
393 /**
394  * Try reconnecting to the dht service.
395  *
396  * @param cls a `struct GNUNET_DHT_Handle`
397  */
398 static void
399 try_reconnect (void *cls)
400 {
401   struct GNUNET_DHT_Handle *h = cls;
402   struct GNUNET_DHT_MonitorHandle *mh;
403
404   LOG (GNUNET_ERROR_TYPE_DEBUG,
405        "Reconnecting with DHT %p\n",
406        h);
407   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
408   h->reconnect_task = NULL;
409   if (GNUNET_YES != try_connect (h))
410   {
411     LOG (GNUNET_ERROR_TYPE_WARNING,
412          "DHT reconnect failed!\n");
413     h->reconnect_task
414       = GNUNET_SCHEDULER_add_delayed (h->retry_time,
415                                       &try_reconnect,
416                                       h);
417     return;
418   }
419   GNUNET_CONTAINER_multihashmap_iterate (h->active_requests,
420                                          &add_get_request_to_pending,
421                                          h);
422   for (mh = h->monitor_head; NULL != mh; mh = mh->next)
423     send_monitor_start (mh);
424 }
425
426
427 /**
428  * Try reconnecting to the DHT service.
429  *
430  * @param h handle to dht to (possibly) disconnect and reconnect
431  */
432 static void
433 do_disconnect (struct GNUNET_DHT_Handle *h)
434 {
435   struct GNUNET_DHT_PutHandle *ph;
436   GNUNET_SCHEDULER_TaskCallback cont;
437   void *cont_cls;
438
439   if (NULL == h->mq)
440     return;
441   GNUNET_MQ_destroy (h->mq);
442   h->mq = NULL;
443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
444               "Disconnecting from DHT service, will try to reconnect in %s\n",
445               GNUNET_STRINGS_relative_time_to_string (h->retry_time,
446                                                       GNUNET_YES));
447   /* notify client about all PUTs that (may) have failed due to disconnect */
448   while (NULL != (ph = h->put_head))
449   {
450     cont = ph->cont;
451     cont_cls = ph->cont_cls;
452     ph->env = NULL;
453     GNUNET_DHT_put_cancel (ph);
454     if (NULL != cont)
455       cont (cont_cls);
456   }
457   GNUNET_assert (NULL == h->reconnect_task);
458   h->reconnect_task
459     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
460                                     &try_reconnect,
461                                     h);
462 }
463
464
465 /**
466  * Generic error handler, called with the appropriate error code and
467  * the same closure specified at the creation of the message queue.
468  * Not every message queue implementation supports an error handler.
469  *
470  * @param cls closure with the `struct GNUNET_DHT_Handle *`
471  * @param error error code
472  */
473 static void
474 mq_error_handler (void *cls,
475                   enum GNUNET_MQ_Error error)
476 {
477   struct GNUNET_DHT_Handle *h = cls;
478
479   do_disconnect (h);
480 }
481
482
483 /**
484  * Verify integrity of a get monitor message from the service.
485  *
486  * @param cls The DHT handle.
487  * @param msg Monitor get message from the service.
488  * @return #GNUNET_OK if everything went fine,
489  *         #GNUNET_SYSERR if the message is malformed.
490  */
491 static int
492 check_monitor_get (void *cls,
493                    const struct GNUNET_DHT_MonitorGetMessage *msg)
494 {
495   uint32_t plen = ntohl (msg->get_path_length);
496   uint16_t msize = ntohs (msg->header.size) - sizeof(*msg);
497
498   if ((plen > UINT16_MAX) ||
499       (plen * sizeof(struct GNUNET_PeerIdentity) != msize))
500   {
501     GNUNET_break (0);
502     return GNUNET_SYSERR;
503   }
504   return GNUNET_OK;
505 }
506
507
508 /**
509  * Process a get monitor message from the service.
510  *
511  * @param cls The DHT handle.
512  * @param msg Monitor get message from the service.
513  */
514 static void
515 handle_monitor_get (void *cls,
516                     const struct GNUNET_DHT_MonitorGetMessage *msg)
517 {
518   struct GNUNET_DHT_Handle *handle = cls;
519   struct GNUNET_DHT_MonitorHandle *mh;
520
521   for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
522   {
523     if (NULL == mh->get_cb)
524       continue;
525     if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
526          (mh->type == ntohl (msg->type))) &&
527         ((NULL == mh->key) ||
528          (0 == memcmp (mh->key,
529                        &msg->key,
530                        sizeof(struct GNUNET_HashCode)))))
531       mh->get_cb (mh->cb_cls,
532                   ntohl (msg->options),
533                   (enum GNUNET_BLOCK_Type) ntohl (msg->type),
534                   ntohl (msg->hop_count),
535                   ntohl (msg->desired_replication_level),
536                   ntohl (msg->get_path_length),
537                   (struct GNUNET_PeerIdentity *) &msg[1],
538                   &msg->key);
539   }
540 }
541
542
543 /**
544  * Validate a get response monitor message from the service.
545  *
546  * @param cls The DHT handle.
547  * @param msg monitor get response message from the service
548  * @return #GNUNET_OK if everything went fine,
549  *         #GNUNET_SYSERR if the message is malformed.
550  */
551 static int
552 check_monitor_get_resp (void *cls,
553                         const struct GNUNET_DHT_MonitorGetRespMessage *msg)
554 {
555   size_t msize = ntohs (msg->header.size) - sizeof(*msg);
556   uint32_t getl = ntohl (msg->get_path_length);
557   uint32_t putl = ntohl (msg->put_path_length);
558
559   if ((getl + putl < getl) ||
560       ((msize / sizeof(struct GNUNET_PeerIdentity)) < getl + putl))
561   {
562     GNUNET_break (0);
563     return GNUNET_SYSERR;
564   }
565   return GNUNET_OK;
566 }
567
568
569 /**
570  * Process a get response monitor message from the service.
571  *
572  * @param cls The DHT handle.
573  * @param msg monitor get response message from the service
574  */
575 static void
576 handle_monitor_get_resp (void *cls,
577                          const struct GNUNET_DHT_MonitorGetRespMessage *msg)
578 {
579   struct GNUNET_DHT_Handle *handle = cls;
580   size_t msize = ntohs (msg->header.size) - sizeof(*msg);
581   const struct GNUNET_PeerIdentity *path;
582   uint32_t getl = ntohl (msg->get_path_length);
583   uint32_t putl = ntohl (msg->put_path_length);
584   struct GNUNET_DHT_MonitorHandle *mh;
585
586   path = (const struct GNUNET_PeerIdentity *) &msg[1];
587   for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
588   {
589     if (NULL == mh->get_resp_cb)
590       continue;
591     if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
592          (mh->type == ntohl (msg->type))) &&
593         ((NULL == mh->key) ||
594          (0 == memcmp (mh->key,
595                        &msg->key,
596                        sizeof(struct GNUNET_HashCode)))))
597       mh->get_resp_cb (mh->cb_cls,
598                        (enum GNUNET_BLOCK_Type) ntohl (msg->type),
599                        path,
600                        getl,
601                        &path[getl],
602                        putl,
603                        GNUNET_TIME_absolute_ntoh (msg->expiration_time),
604                        &msg->key,
605                        (const void *) &path[getl + putl],
606                        msize - sizeof(struct GNUNET_PeerIdentity) * (putl
607                                                                      + getl));
608   }
609 }
610
611
612 /**
613  * Check validity of a put monitor message from the service.
614  *
615  * @param cls The DHT handle.
616  * @param msg Monitor put message from the service.
617  * @return #GNUNET_OK if everything went fine,
618  *         #GNUNET_SYSERR if the message is malformed.
619  */
620 static int
621 check_monitor_put (void *cls,
622                    const struct GNUNET_DHT_MonitorPutMessage *msg)
623 {
624   size_t msize;
625   uint32_t putl;
626
627   msize = ntohs (msg->header.size) - sizeof(*msg);
628   putl = ntohl (msg->put_path_length);
629   if ((msize / sizeof(struct GNUNET_PeerIdentity)) < putl)
630   {
631     GNUNET_break (0);
632     return GNUNET_SYSERR;
633   }
634   return GNUNET_OK;
635 }
636
637
638 /**
639  * Process a put monitor message from the service.
640  *
641  * @param cls The DHT handle.
642  * @param msg Monitor put message from the service.
643  */
644 static void
645 handle_monitor_put (void *cls,
646                     const struct GNUNET_DHT_MonitorPutMessage *msg)
647 {
648   struct GNUNET_DHT_Handle *handle = cls;
649   size_t msize = ntohs (msg->header.size) - sizeof(*msg);
650   uint32_t putl = ntohl (msg->put_path_length);
651   const struct GNUNET_PeerIdentity *path;
652   struct GNUNET_DHT_MonitorHandle *mh;
653
654   path = (const struct GNUNET_PeerIdentity *) &msg[1];
655   for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
656   {
657     if (NULL == mh->put_cb)
658       continue;
659     if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
660          (mh->type == ntohl (msg->type))) &&
661         ((NULL == mh->key) ||
662          (0 == memcmp (mh->key,
663                        &msg->key,
664                        sizeof(struct GNUNET_HashCode)))))
665       mh->put_cb (mh->cb_cls,
666                   ntohl (msg->options),
667                   (enum GNUNET_BLOCK_Type) ntohl (msg->type),
668                   ntohl (msg->hop_count),
669                   ntohl (msg->desired_replication_level),
670                   putl,
671                   path,
672                   GNUNET_TIME_absolute_ntoh (msg->expiration_time),
673                   &msg->key,
674                   (const void *) &path[putl],
675                   msize - sizeof(struct GNUNET_PeerIdentity) * putl);
676   }
677 }
678
679
680 /**
681  * Verify that client result  message received from the service is well-formed.
682  *
683  * @param cls The DHT handle.
684  * @param msg Monitor put message from the service.
685  * @return #GNUNET_OK if everything went fine,
686  *         #GNUNET_SYSERR if the message is malformed.
687  */
688 static int
689 check_client_result (void *cls,
690                      const struct GNUNET_DHT_ClientResultMessage *msg)
691 {
692   size_t msize = ntohs (msg->header.size) - sizeof(*msg);
693   uint32_t put_path_length = ntohl (msg->put_path_length);
694   uint32_t get_path_length = ntohl (msg->get_path_length);
695   size_t meta_length;
696
697   meta_length =
698     sizeof(struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
699   if ((msize < meta_length) ||
700       (get_path_length >
701        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
702       (put_path_length >
703        GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
704   {
705     GNUNET_break (0);
706     return GNUNET_SYSERR;
707   }
708   return GNUNET_OK;
709 }
710
711
712 /**
713  * Process a given reply that might match the given request.
714  *
715  * @param cls the `struct GNUNET_DHT_ClientResultMessage`
716  * @param key query of the request
717  * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key
718  * @return #GNUNET_YES to continue to iterate over all results
719  */
720 static int
721 process_client_result (void *cls,
722                        const struct GNUNET_HashCode *key,
723                        void *value)
724 {
725   const struct GNUNET_DHT_ClientResultMessage *crm = cls;
726   struct GNUNET_DHT_GetHandle *get_handle = value;
727   size_t msize = ntohs (crm->header.size) - sizeof(*crm);
728   uint32_t put_path_length = ntohl (crm->put_path_length);
729   uint32_t get_path_length = ntohl (crm->get_path_length);
730   const struct GNUNET_PeerIdentity *put_path;
731   const struct GNUNET_PeerIdentity *get_path;
732   struct GNUNET_HashCode hc;
733   size_t data_length;
734   size_t meta_length;
735   const void *data;
736
737   if (crm->unique_id != get_handle->unique_id)
738   {
739     /* UID mismatch */
740     LOG (GNUNET_ERROR_TYPE_DEBUG,
741          "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
742          GNUNET_h2s (key),
743          crm->unique_id,
744          get_handle->unique_id);
745     return GNUNET_YES;
746   }
747   /* FIXME: might want to check that type matches */
748   meta_length =
749     sizeof(struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
750   data_length = msize - meta_length;
751   put_path = (const struct GNUNET_PeerIdentity *) &crm[1];
752   get_path = &put_path[put_path_length];
753   {
754     char *pp;
755     char *gp;
756
757     gp = GNUNET_STRINGS_pp2s (get_path,
758                               get_path_length);
759     pp = GNUNET_STRINGS_pp2s (put_path,
760                               put_path_length);
761     LOG (GNUNET_ERROR_TYPE_DEBUG,
762          "Giving %u byte reply for %s to application (GP: %s, PP: %s)\n",
763          (unsigned int) data_length,
764          GNUNET_h2s (key),
765          gp,
766          pp);
767     GNUNET_free (gp);
768     GNUNET_free (pp);
769   }
770   data = &get_path[get_path_length];
771   /* remember that we've seen this result */
772   GNUNET_CRYPTO_hash (data,
773                       data_length,
774                       &hc);
775   if (get_handle->seen_results_size == get_handle->seen_results_end)
776     GNUNET_array_grow (get_handle->seen_results,
777                        get_handle->seen_results_size,
778                        get_handle->seen_results_size * 2 + 1);
779   get_handle->seen_results[get_handle->seen_results_end++] = hc;
780   /* no need to block it explicitly, service already knows about it! */
781   get_handle->iter (get_handle->iter_cls,
782                     GNUNET_TIME_absolute_ntoh (crm->expiration),
783                     key,
784                     get_path,
785                     get_path_length,
786                     put_path,
787                     put_path_length,
788                     ntohl (crm->type),
789                     data_length,
790                     data);
791   return GNUNET_YES;
792 }
793
794
795 /**
796  * Process a client result  message received from the service.
797  *
798  * @param cls The DHT handle.
799  * @param msg Monitor put message from the service.
800  */
801 static void
802 handle_client_result (void *cls,
803                       const struct GNUNET_DHT_ClientResultMessage *msg)
804 {
805   struct GNUNET_DHT_Handle *handle = cls;
806
807   GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
808                                               &msg->key,
809                                               &process_client_result,
810                                               (void *) msg);
811 }
812
813
814 /**
815  * Process a MQ PUT transmission notification.
816  *
817  * @param cls The DHT handle.
818  */
819 static void
820 handle_put_cont (void *cls)
821 {
822   struct GNUNET_DHT_PutHandle *ph = cls;
823   GNUNET_SCHEDULER_TaskCallback cont;
824   void *cont_cls;
825
826   cont = ph->cont;
827   cont_cls = ph->cont_cls;
828   ph->env = NULL;
829   GNUNET_DHT_put_cancel (ph);
830   if (NULL != cont)
831     cont (cont_cls);
832 }
833
834
835 /**
836  * Try to (re)connect to the DHT service.
837  *
838  * @param h DHT handle to reconnect
839  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
840  */
841 static int
842 try_connect (struct GNUNET_DHT_Handle *h)
843 {
844   struct GNUNET_MQ_MessageHandler handlers[] = {
845     GNUNET_MQ_hd_var_size (monitor_get,
846                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
847                            struct GNUNET_DHT_MonitorGetMessage,
848                            h),
849     GNUNET_MQ_hd_var_size (monitor_get_resp,
850                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
851                            struct GNUNET_DHT_MonitorGetRespMessage,
852                            h),
853     GNUNET_MQ_hd_var_size (monitor_put,
854                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
855                            struct GNUNET_DHT_MonitorPutMessage,
856                            h),
857     GNUNET_MQ_hd_var_size (client_result,
858                            GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
859                            struct GNUNET_DHT_ClientResultMessage,
860                            h),
861     GNUNET_MQ_handler_end ()
862   };
863
864   if (NULL != h->mq)
865     return GNUNET_OK;
866   h->mq = GNUNET_CLIENT_connect (h->cfg,
867                                  "dht",
868                                  handlers,
869                                  &mq_error_handler,
870                                  h);
871   if (NULL == h->mq)
872   {
873     LOG (GNUNET_ERROR_TYPE_WARNING,
874          "Failed to connect to the DHT service!\n");
875     return GNUNET_NO;
876   }
877   return GNUNET_YES;
878 }
879
880
881 /**
882  * Initialize the connection with the DHT service.
883  *
884  * @param cfg configuration to use
885  * @param ht_len size of the internal hash table to use for
886  *               processing multiple GET/FIND requests in parallel
887  * @return handle to the DHT service, or NULL on error
888  */
889 struct GNUNET_DHT_Handle *
890 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
891                     unsigned int ht_len)
892 {
893   struct GNUNET_DHT_Handle *handle;
894
895   handle = GNUNET_new (struct GNUNET_DHT_Handle);
896   handle->cfg = cfg;
897   handle->uid_gen
898     = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
899                                 UINT64_MAX);
900   handle->active_requests
901     = GNUNET_CONTAINER_multihashmap_create (ht_len,
902                                             GNUNET_YES);
903   if (GNUNET_NO == try_connect (handle))
904   {
905     GNUNET_DHT_disconnect (handle);
906     return NULL;
907   }
908   return handle;
909 }
910
911
912 /**
913  * Shutdown connection with the DHT service.
914  *
915  * @param handle handle of the DHT connection to stop
916  */
917 void
918 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
919 {
920   struct GNUNET_DHT_PutHandle *ph;
921
922   GNUNET_assert (0 ==
923                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
924   while (NULL != (ph = handle->put_head))
925   {
926     if (NULL != ph->cont)
927       ph->cont (ph->cont_cls);
928     GNUNET_DHT_put_cancel (ph);
929   }
930   if (NULL != handle->mq)
931   {
932     GNUNET_MQ_destroy (handle->mq);
933     handle->mq = NULL;
934   }
935   if (NULL != handle->reconnect_task)
936   {
937     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
938     handle->reconnect_task = NULL;
939   }
940   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
941   GNUNET_free (handle);
942 }
943
944
945 /**
946  * Perform a PUT operation storing data in the DHT.  FIXME: we should
947  * change the protocol to get a confirmation for the PUT from the DHT
948  * and call 'cont' only after getting the confirmation; otherwise, the
949  * client has no good way of telling if the 'PUT' message actually got
950  * to the DHT service!
951  *
952  * @param handle handle to DHT service
953  * @param key the key to store under
954  * @param desired_replication_level estimate of how many
955  *                nearest peers this request should reach
956  * @param options routing options for this message
957  * @param type type of the value
958  * @param size number of bytes in data; must be less than 64k
959  * @param data the data to store
960  * @param exp desired expiration time for the value
961  * @param cont continuation to call when done (transmitting request to service)
962  *        You must not call #GNUNET_DHT_disconnect in this continuation
963  * @param cont_cls closure for @a cont
964  */
965 struct GNUNET_DHT_PutHandle *
966 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
967                 const struct GNUNET_HashCode *key,
968                 uint32_t desired_replication_level,
969                 enum GNUNET_DHT_RouteOption options,
970                 enum GNUNET_BLOCK_Type type,
971                 size_t size,
972                 const void *data,
973                 struct GNUNET_TIME_Absolute exp,
974                 GNUNET_SCHEDULER_TaskCallback cont,
975                 void *cont_cls)
976 {
977   struct GNUNET_MQ_Envelope *env;
978   struct GNUNET_DHT_ClientPutMessage *put_msg;
979   size_t msize;
980   struct GNUNET_DHT_PutHandle *ph;
981
982   msize = sizeof(struct GNUNET_DHT_ClientPutMessage) + size;
983   if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
984       (size >= GNUNET_MAX_MESSAGE_SIZE))
985   {
986     GNUNET_break (0);
987     return NULL;
988   }
989   if (NULL == handle->mq)
990     return NULL;
991   LOG (GNUNET_ERROR_TYPE_DEBUG,
992        "Sending PUT for %s to DHT via %p\n",
993        GNUNET_h2s (key),
994        handle);
995   ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
996   ph->dht_handle = handle;
997   ph->cont = cont;
998   ph->cont_cls = cont_cls;
999   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1000                                     handle->put_tail,
1001                                     ph);
1002   env = GNUNET_MQ_msg_extra (put_msg,
1003                              size,
1004                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1005   GNUNET_MQ_notify_sent (env,
1006                          &handle_put_cont,
1007                          ph);
1008   ph->env = env;
1009   put_msg->type = htonl ((uint32_t) type);
1010   put_msg->options = htonl ((uint32_t) options);
1011   put_msg->desired_replication_level = htonl (desired_replication_level);
1012   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1013   put_msg->key = *key;
1014   GNUNET_memcpy (&put_msg[1],
1015                  data,
1016                  size);
1017   GNUNET_MQ_send (handle->mq,
1018                   env);
1019   return ph;
1020 }
1021
1022
1023 /**
1024  * Cancels a DHT PUT operation.  Note that the PUT request may still
1025  * go out over the network (we can't stop that); However, if the PUT
1026  * has not yet been sent to the service, cancelling the PUT will stop
1027  * this from happening (but there is no way for the user of this API
1028  * to tell if that is the case).  The only use for this API is to
1029  * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1030  * the system is shutting down).
1031  *
1032  * @param ph put operation to cancel ('cont' will no longer be called)
1033  */
1034 void
1035 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1036 {
1037   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1038
1039   if (NULL != ph->env)
1040     GNUNET_MQ_notify_sent (ph->env,
1041                            NULL,
1042                            NULL);
1043   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1044                                handle->put_tail,
1045                                ph);
1046   GNUNET_free (ph);
1047 }
1048
1049
1050 /**
1051  * Perform an asynchronous GET operation on the DHT identified. See
1052  * also #GNUNET_BLOCK_evaluate.
1053  *
1054  * @param handle handle to the DHT service
1055  * @param type expected type of the response object
1056  * @param key the key to look up
1057  * @param desired_replication_level estimate of how many
1058                   nearest peers this request should reach
1059  * @param options routing options for this message
1060  * @param xquery extended query data (can be NULL, depending on type)
1061  * @param xquery_size number of bytes in @a xquery
1062  * @param iter function to call on each result
1063  * @param iter_cls closure for @a iter
1064  * @return handle to stop the async get
1065  */
1066 struct GNUNET_DHT_GetHandle *
1067 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1068                       enum GNUNET_BLOCK_Type type,
1069                       const struct GNUNET_HashCode *key,
1070                       uint32_t desired_replication_level,
1071                       enum GNUNET_DHT_RouteOption options,
1072                       const void *xquery,
1073                       size_t xquery_size,
1074                       GNUNET_DHT_GetIterator iter,
1075                       void *iter_cls)
1076 {
1077   struct GNUNET_DHT_GetHandle *gh;
1078   size_t msize;
1079
1080   msize = sizeof(struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1081   if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
1082       (xquery_size >= GNUNET_MAX_MESSAGE_SIZE))
1083   {
1084     GNUNET_break (0);
1085     return NULL;
1086   }
1087   LOG (GNUNET_ERROR_TYPE_DEBUG,
1088        "Sending query for %s to DHT %p\n",
1089        GNUNET_h2s (key),
1090        handle);
1091   gh = GNUNET_malloc (sizeof(struct GNUNET_DHT_GetHandle)
1092                       + xquery_size);
1093   gh->iter = iter;
1094   gh->iter_cls = iter_cls;
1095   gh->dht_handle = handle;
1096   gh->key = *key;
1097   gh->unique_id = ++handle->uid_gen;
1098   gh->xquery_size = xquery_size;
1099   gh->desired_replication_level = desired_replication_level;
1100   gh->type = type;
1101   gh->options = options;
1102   GNUNET_memcpy (&gh[1],
1103                  xquery,
1104                  xquery_size);
1105   GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1106                                      &gh->key,
1107                                      gh,
1108                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1109   if (NULL != handle->mq)
1110     send_get (gh);
1111   return gh;
1112 }
1113
1114
1115 /**
1116  * Tell the DHT not to return any of the following known results
1117  * to this client.
1118  *
1119  * @param get_handle get operation for which results should be filtered
1120  * @param num_results number of results to be blocked that are
1121  *        provided in this call (size of the @a results array)
1122  * @param results array of hash codes over the 'data' of the results
1123  *        to be blocked
1124  */
1125 void
1126 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1127                                      unsigned int num_results,
1128                                      const struct GNUNET_HashCode *results)
1129 {
1130   unsigned int needed;
1131   unsigned int had;
1132
1133   had = get_handle->seen_results_end;
1134   needed = had + num_results;
1135   if (needed > get_handle->seen_results_size)
1136     GNUNET_array_grow (get_handle->seen_results,
1137                        get_handle->seen_results_size,
1138                        needed);
1139   GNUNET_memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1140                  results,
1141                  num_results * sizeof(struct GNUNET_HashCode));
1142   get_handle->seen_results_end += num_results;
1143   if (NULL != get_handle->dht_handle->mq)
1144     send_get_known_results (get_handle,
1145                             had);
1146 }
1147
1148
1149 /**
1150  * Stop async DHT-get.
1151  *
1152  * @param get_handle handle to the GET operation to stop
1153  */
1154 void
1155 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1156 {
1157   struct GNUNET_DHT_Handle *handle = get_handle->dht_handle;
1158
1159   LOG (GNUNET_ERROR_TYPE_DEBUG,
1160        "Sending STOP for %s to DHT via %p\n",
1161        GNUNET_h2s (&get_handle->key),
1162        handle);
1163   if (NULL != handle->mq)
1164   {
1165     struct GNUNET_MQ_Envelope *env;
1166     struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1167
1168     env = GNUNET_MQ_msg (stop_msg,
1169                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1170     stop_msg->reserved = htonl (0);
1171     stop_msg->unique_id = get_handle->unique_id;
1172     stop_msg->key = get_handle->key;
1173     GNUNET_MQ_send (handle->mq,
1174                     env);
1175   }
1176   GNUNET_assert (GNUNET_YES ==
1177                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1178                                                        &get_handle->key,
1179                                                        get_handle));
1180   GNUNET_array_grow (get_handle->seen_results,
1181                      get_handle->seen_results_end,
1182                      0);
1183   GNUNET_free (get_handle);
1184 }
1185
1186
1187 /**
1188  * Start monitoring the local DHT service.
1189  *
1190  * @param handle Handle to the DHT service.
1191  * @param type Type of blocks that are of interest.
1192  * @param key Key of data of interest, NULL for all.
1193  * @param get_cb Callback to process monitored get messages.
1194  * @param get_resp_cb Callback to process monitored get response messages.
1195  * @param put_cb Callback to process monitored put messages.
1196  * @param cb_cls Closure for callbacks.
1197  * @return Handle to stop monitoring.
1198  */
1199 struct GNUNET_DHT_MonitorHandle *
1200 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1201                           enum GNUNET_BLOCK_Type type,
1202                           const struct GNUNET_HashCode *key,
1203                           GNUNET_DHT_MonitorGetCB get_cb,
1204                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1205                           GNUNET_DHT_MonitorPutCB put_cb,
1206                           void *cb_cls)
1207 {
1208   struct GNUNET_DHT_MonitorHandle *mh;
1209
1210   mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1211   mh->get_cb = get_cb;
1212   mh->get_resp_cb = get_resp_cb;
1213   mh->put_cb = put_cb;
1214   mh->cb_cls = cb_cls;
1215   mh->type = type;
1216   mh->dht_handle = handle;
1217   if (NULL != key)
1218   {
1219     mh->key = GNUNET_new (struct GNUNET_HashCode);
1220     *mh->key = *key;
1221   }
1222   GNUNET_CONTAINER_DLL_insert (handle->monitor_head,
1223                                handle->monitor_tail,
1224                                mh);
1225   if (NULL != handle->mq)
1226     send_monitor_start (mh);
1227   return mh;
1228 }
1229
1230
1231 /**
1232  * Stop monitoring.
1233  *
1234  * @param mh The handle to the monitor request returned by monitor_start.
1235  *
1236  * On return get_handle will no longer be valid, caller must not use again!!!
1237  */
1238 void
1239 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh)
1240 {
1241   struct GNUNET_DHT_Handle *handle = mh->dht_handle;
1242   struct GNUNET_DHT_MonitorStartStopMessage *m;
1243   struct GNUNET_MQ_Envelope *env;
1244
1245   GNUNET_CONTAINER_DLL_remove (handle->monitor_head,
1246                                handle->monitor_tail,
1247                                mh);
1248   env = GNUNET_MQ_msg (m,
1249                        GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1250   m->type = htonl (mh->type);
1251   m->get = htons (NULL != mh->get_cb);
1252   m->get_resp = htons (NULL != mh->get_resp_cb);
1253   m->put = htons (NULL != mh->put_cb);
1254   if (NULL != mh->key)
1255   {
1256     m->filter_key = htons (1);
1257     m->key = *mh->key;
1258   }
1259   GNUNET_MQ_send (handle->mq,
1260                   env);
1261   GNUNET_free_non_null (mh->key);
1262   GNUNET_free (mh);
1263 }
1264
1265
1266 /* end of dht_api.c */