2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2012, 2016, 2018 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
37 #define LOG(kind, ...) GNUNET_log_from (kind, "dht-api", __VA_ARGS__)
41 * Handle to a PUT request.
43 struct GNUNET_DHT_PutHandle
48 struct GNUNET_DHT_PutHandle *next;
53 struct GNUNET_DHT_PutHandle *prev;
56 * Continuation to call when done.
58 GNUNET_SCHEDULER_TaskCallback cont;
61 * Main handle to this DHT api
63 struct GNUNET_DHT_Handle *dht_handle;
66 * Closure for @e cont.
71 * Envelope from the PUT operation.
73 struct GNUNET_MQ_Envelope *env;
77 * Handle to a GET request
79 struct GNUNET_DHT_GetHandle
82 * Iterator to call on data receipt
84 GNUNET_DHT_GetIterator iter;
87 * Closure for @e iter.
92 * Main handle to this DHT api
94 struct GNUNET_DHT_Handle *dht_handle;
97 * Array of hash codes over the results that we have already
100 struct GNUNET_HashCode *seen_results;
103 * Key that this get request is for
105 struct GNUNET_HashCode key;
108 * Unique identifier for this request (for key collisions).
113 * Size of the extended query, allocated at the end of this struct.
118 * Desired replication level.
120 uint32_t desired_replication_level;
123 * Type of the block we are looking for.
125 enum GNUNET_BLOCK_Type type;
130 enum GNUNET_DHT_RouteOption options;
133 * Size of the @e seen_results array. Note that not
134 * all positions might be used (as we over-allocate).
136 unsigned int seen_results_size;
139 * Offset into the @e seen_results array marking the
140 * end of the positions that are actually used.
142 unsigned int seen_results_end;
147 * Handle to a monitoring request.
149 struct GNUNET_DHT_MonitorHandle
154 struct GNUNET_DHT_MonitorHandle *next;
159 struct GNUNET_DHT_MonitorHandle *prev;
162 * Main handle to this DHT api.
164 struct GNUNET_DHT_Handle *dht_handle;
167 * Type of block looked for.
169 enum GNUNET_BLOCK_Type type;
172 * Key being looked for, NULL == all.
174 struct GNUNET_HashCode *key;
177 * Callback for each received message of type get.
179 GNUNET_DHT_MonitorGetCB get_cb;
182 * Callback for each received message of type get response.
184 GNUNET_DHT_MonitorGetRespCB get_resp_cb;
187 * Callback for each received message of type put.
189 GNUNET_DHT_MonitorPutCB put_cb;
192 * Closure for @e get_cb, @e put_cb and @e get_resp_cb.
199 * Connection to the DHT service.
201 struct GNUNET_DHT_Handle
204 * Configuration to use.
206 const struct GNUNET_CONFIGURATION_Handle *cfg;
209 * Connection to DHT service.
211 struct GNUNET_MQ_Handle *mq;
214 * Head of linked list of messages we would like to monitor.
216 struct GNUNET_DHT_MonitorHandle *monitor_head;
219 * Tail of linked list of messages we would like to monitor.
221 struct GNUNET_DHT_MonitorHandle *monitor_tail;
224 * Head of active PUT requests.
226 struct GNUNET_DHT_PutHandle *put_head;
229 * Tail of active PUT requests.
231 struct GNUNET_DHT_PutHandle *put_tail;
234 * Hash map containing the current outstanding unique GET requests
235 * (values are of type `struct GNUNET_DHT_GetHandle`).
237 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
240 * Task for trying to reconnect.
242 struct GNUNET_SCHEDULER_Task *reconnect_task;
245 * How quickly should we retry? Used for exponential back-off on
248 struct GNUNET_TIME_Relative retry_time;
251 * Generator for unique ids.
258 * Try to (re)connect to the DHT service.
260 * @param h DHT handle to reconnect
261 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
264 try_connect (struct GNUNET_DHT_Handle *h);
268 * Send GET message for a @a get_handle to DHT.
270 * @param gh GET to generate messages for.
273 send_get (struct GNUNET_DHT_GetHandle *gh)
275 struct GNUNET_DHT_Handle *h = gh->dht_handle;
276 struct GNUNET_MQ_Envelope *env;
277 struct GNUNET_DHT_ClientGetMessage *get_msg;
279 env = GNUNET_MQ_msg_extra (get_msg,
281 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
282 get_msg->options = htonl ((uint32_t) gh->options);
283 get_msg->desired_replication_level = htonl (gh->desired_replication_level);
284 get_msg->type = htonl (gh->type);
285 get_msg->key = gh->key;
286 get_msg->unique_id = gh->unique_id;
287 GNUNET_memcpy (&get_msg[1],
290 GNUNET_MQ_send (h->mq,
296 * Send GET message(s) for indicating which results are already known
297 * for a @a get_handle to DHT. Complex as we need to send the list of
298 * known results, which means we may need mulitple messages to block
299 * known results from the result set.
301 * @param gh GET to generate messages for
302 * @param transmission_offset_start at which offset should we start?
305 send_get_known_results (struct GNUNET_DHT_GetHandle *gh,
306 unsigned int transmission_offset_start)
308 struct GNUNET_DHT_Handle *h = gh->dht_handle;
309 struct GNUNET_MQ_Envelope *env;
310 struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
313 unsigned int transmission_offset;
315 max = (GNUNET_MAX_MESSAGE_SIZE - sizeof(*msg))
316 / sizeof(struct GNUNET_HashCode);
317 transmission_offset = transmission_offset_start;
318 while (transmission_offset < gh->seen_results_end)
320 delta = gh->seen_results_end - transmission_offset;
323 env = GNUNET_MQ_msg_extra (msg,
324 delta * sizeof(struct GNUNET_HashCode),
325 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
327 msg->unique_id = gh->unique_id;
328 GNUNET_memcpy (&msg[1],
329 &gh->seen_results[transmission_offset],
330 sizeof(struct GNUNET_HashCode) * delta);
331 GNUNET_MQ_send (h->mq,
333 transmission_offset += delta;
339 * Add the GET request corresponding to the given route handle
340 * to the pending queue (if it is not already in there).
342 * @param cls the `struct GNUNET_DHT_Handle *`
343 * @param key key for the request (not used)
344 * @param value the `struct GNUNET_DHT_GetHandle *`
345 * @return #GNUNET_YES (always)
348 add_get_request_to_pending (void *cls,
349 const struct GNUNET_HashCode *key,
352 struct GNUNET_DHT_Handle *handle = cls;
353 struct GNUNET_DHT_GetHandle *gh = value;
355 LOG (GNUNET_ERROR_TYPE_DEBUG,
356 "Retransmitting request related to %s to DHT %p\n",
360 send_get_known_results (gh, 0);
366 * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message.
368 * @param mh monitor handle to generate start message for
371 send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh)
373 struct GNUNET_DHT_Handle *h = mh->dht_handle;
374 struct GNUNET_MQ_Envelope *env;
375 struct GNUNET_DHT_MonitorStartStopMessage *m;
377 env = GNUNET_MQ_msg (m,
378 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
379 m->type = htonl (mh->type);
380 m->get = htons (NULL != mh->get_cb);
381 m->get_resp = htons (NULL != mh->get_resp_cb);
382 m->put = htons (NULL != mh->put_cb);
385 m->filter_key = htons (1);
388 GNUNET_MQ_send (h->mq,
394 * Try reconnecting to the dht service.
396 * @param cls a `struct GNUNET_DHT_Handle`
399 try_reconnect (void *cls)
401 struct GNUNET_DHT_Handle *h = cls;
402 struct GNUNET_DHT_MonitorHandle *mh;
404 LOG (GNUNET_ERROR_TYPE_DEBUG,
405 "Reconnecting with DHT %p\n",
407 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
408 h->reconnect_task = NULL;
409 if (GNUNET_YES != try_connect (h))
411 LOG (GNUNET_ERROR_TYPE_WARNING,
412 "DHT reconnect failed!\n");
414 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
419 GNUNET_CONTAINER_multihashmap_iterate (h->active_requests,
420 &add_get_request_to_pending,
422 for (mh = h->monitor_head; NULL != mh; mh = mh->next)
423 send_monitor_start (mh);
428 * Try reconnecting to the DHT service.
430 * @param h handle to dht to (possibly) disconnect and reconnect
433 do_disconnect (struct GNUNET_DHT_Handle *h)
435 struct GNUNET_DHT_PutHandle *ph;
436 GNUNET_SCHEDULER_TaskCallback cont;
441 GNUNET_MQ_destroy (h->mq);
443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
444 "Disconnecting from DHT service, will try to reconnect in %s\n",
445 GNUNET_STRINGS_relative_time_to_string (h->retry_time,
447 /* notify client about all PUTs that (may) have failed due to disconnect */
448 while (NULL != (ph = h->put_head))
451 cont_cls = ph->cont_cls;
453 GNUNET_DHT_put_cancel (ph);
457 GNUNET_assert (NULL == h->reconnect_task);
459 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
466 * Generic error handler, called with the appropriate error code and
467 * the same closure specified at the creation of the message queue.
468 * Not every message queue implementation supports an error handler.
470 * @param cls closure with the `struct GNUNET_DHT_Handle *`
471 * @param error error code
474 mq_error_handler (void *cls,
475 enum GNUNET_MQ_Error error)
477 struct GNUNET_DHT_Handle *h = cls;
484 * Verify integrity of a get monitor message from the service.
486 * @param cls The DHT handle.
487 * @param msg Monitor get message from the service.
488 * @return #GNUNET_OK if everything went fine,
489 * #GNUNET_SYSERR if the message is malformed.
492 check_monitor_get (void *cls,
493 const struct GNUNET_DHT_MonitorGetMessage *msg)
495 uint32_t plen = ntohl (msg->get_path_length);
496 uint16_t msize = ntohs (msg->header.size) - sizeof(*msg);
498 if ((plen > UINT16_MAX) ||
499 (plen * sizeof(struct GNUNET_PeerIdentity) != msize))
502 return GNUNET_SYSERR;
509 * Process a get monitor message from the service.
511 * @param cls The DHT handle.
512 * @param msg Monitor get message from the service.
515 handle_monitor_get (void *cls,
516 const struct GNUNET_DHT_MonitorGetMessage *msg)
518 struct GNUNET_DHT_Handle *handle = cls;
519 struct GNUNET_DHT_MonitorHandle *mh;
521 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
523 if (NULL == mh->get_cb)
525 if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
526 (mh->type == ntohl (msg->type))) &&
527 ((NULL == mh->key) ||
528 (0 == memcmp (mh->key,
530 sizeof(struct GNUNET_HashCode)))))
531 mh->get_cb (mh->cb_cls,
532 ntohl (msg->options),
533 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
534 ntohl (msg->hop_count),
535 ntohl (msg->desired_replication_level),
536 ntohl (msg->get_path_length),
537 (struct GNUNET_PeerIdentity *) &msg[1],
544 * Validate a get response monitor message from the service.
546 * @param cls The DHT handle.
547 * @param msg monitor get response message from the service
548 * @return #GNUNET_OK if everything went fine,
549 * #GNUNET_SYSERR if the message is malformed.
552 check_monitor_get_resp (void *cls,
553 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
555 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
556 uint32_t getl = ntohl (msg->get_path_length);
557 uint32_t putl = ntohl (msg->put_path_length);
559 if ((getl + putl < getl) ||
560 ((msize / sizeof(struct GNUNET_PeerIdentity)) < getl + putl))
563 return GNUNET_SYSERR;
570 * Process a get response monitor message from the service.
572 * @param cls The DHT handle.
573 * @param msg monitor get response message from the service
576 handle_monitor_get_resp (void *cls,
577 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
579 struct GNUNET_DHT_Handle *handle = cls;
580 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
581 const struct GNUNET_PeerIdentity *path;
582 uint32_t getl = ntohl (msg->get_path_length);
583 uint32_t putl = ntohl (msg->put_path_length);
584 struct GNUNET_DHT_MonitorHandle *mh;
586 path = (const struct GNUNET_PeerIdentity *) &msg[1];
587 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
589 if (NULL == mh->get_resp_cb)
591 if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
592 (mh->type == ntohl (msg->type))) &&
593 ((NULL == mh->key) ||
594 (0 == memcmp (mh->key,
596 sizeof(struct GNUNET_HashCode)))))
597 mh->get_resp_cb (mh->cb_cls,
598 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
603 GNUNET_TIME_absolute_ntoh (msg->expiration_time),
605 (const void *) &path[getl + putl],
606 msize - sizeof(struct GNUNET_PeerIdentity) * (putl
613 * Check validity of a put monitor message from the service.
615 * @param cls The DHT handle.
616 * @param msg Monitor put message from the service.
617 * @return #GNUNET_OK if everything went fine,
618 * #GNUNET_SYSERR if the message is malformed.
621 check_monitor_put (void *cls,
622 const struct GNUNET_DHT_MonitorPutMessage *msg)
627 msize = ntohs (msg->header.size) - sizeof(*msg);
628 putl = ntohl (msg->put_path_length);
629 if ((msize / sizeof(struct GNUNET_PeerIdentity)) < putl)
632 return GNUNET_SYSERR;
639 * Process a put monitor message from the service.
641 * @param cls The DHT handle.
642 * @param msg Monitor put message from the service.
645 handle_monitor_put (void *cls,
646 const struct GNUNET_DHT_MonitorPutMessage *msg)
648 struct GNUNET_DHT_Handle *handle = cls;
649 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
650 uint32_t putl = ntohl (msg->put_path_length);
651 const struct GNUNET_PeerIdentity *path;
652 struct GNUNET_DHT_MonitorHandle *mh;
654 path = (const struct GNUNET_PeerIdentity *) &msg[1];
655 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
657 if (NULL == mh->put_cb)
659 if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
660 (mh->type == ntohl (msg->type))) &&
661 ((NULL == mh->key) ||
662 (0 == memcmp (mh->key,
664 sizeof(struct GNUNET_HashCode)))))
665 mh->put_cb (mh->cb_cls,
666 ntohl (msg->options),
667 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
668 ntohl (msg->hop_count),
669 ntohl (msg->desired_replication_level),
672 GNUNET_TIME_absolute_ntoh (msg->expiration_time),
674 (const void *) &path[putl],
675 msize - sizeof(struct GNUNET_PeerIdentity) * putl);
681 * Verify that client result message received from the service is well-formed.
683 * @param cls The DHT handle.
684 * @param msg Monitor put message from the service.
685 * @return #GNUNET_OK if everything went fine,
686 * #GNUNET_SYSERR if the message is malformed.
689 check_client_result (void *cls,
690 const struct GNUNET_DHT_ClientResultMessage *msg)
692 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
693 uint32_t put_path_length = ntohl (msg->put_path_length);
694 uint32_t get_path_length = ntohl (msg->get_path_length);
698 sizeof(struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
699 if ((msize < meta_length) ||
701 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
703 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
706 return GNUNET_SYSERR;
713 * Process a given reply that might match the given request.
715 * @param cls the `struct GNUNET_DHT_ClientResultMessage`
716 * @param key query of the request
717 * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key
718 * @return #GNUNET_YES to continue to iterate over all results
721 process_client_result (void *cls,
722 const struct GNUNET_HashCode *key,
725 const struct GNUNET_DHT_ClientResultMessage *crm = cls;
726 struct GNUNET_DHT_GetHandle *get_handle = value;
727 size_t msize = ntohs (crm->header.size) - sizeof(*crm);
728 uint32_t put_path_length = ntohl (crm->put_path_length);
729 uint32_t get_path_length = ntohl (crm->get_path_length);
730 const struct GNUNET_PeerIdentity *put_path;
731 const struct GNUNET_PeerIdentity *get_path;
732 struct GNUNET_HashCode hc;
737 if (crm->unique_id != get_handle->unique_id)
740 LOG (GNUNET_ERROR_TYPE_DEBUG,
741 "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
744 get_handle->unique_id);
747 /* FIXME: might want to check that type matches */
749 sizeof(struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
750 data_length = msize - meta_length;
751 put_path = (const struct GNUNET_PeerIdentity *) &crm[1];
752 get_path = &put_path[put_path_length];
757 gp = GNUNET_STRINGS_pp2s (get_path,
759 pp = GNUNET_STRINGS_pp2s (put_path,
761 LOG (GNUNET_ERROR_TYPE_DEBUG,
762 "Giving %u byte reply for %s to application (GP: %s, PP: %s)\n",
763 (unsigned int) data_length,
770 data = &get_path[get_path_length];
771 /* remember that we've seen this result */
772 GNUNET_CRYPTO_hash (data,
775 if (get_handle->seen_results_size == get_handle->seen_results_end)
776 GNUNET_array_grow (get_handle->seen_results,
777 get_handle->seen_results_size,
778 get_handle->seen_results_size * 2 + 1);
779 get_handle->seen_results[get_handle->seen_results_end++] = hc;
780 /* no need to block it explicitly, service already knows about it! */
781 get_handle->iter (get_handle->iter_cls,
782 GNUNET_TIME_absolute_ntoh (crm->expiration),
796 * Process a client result message received from the service.
798 * @param cls The DHT handle.
799 * @param msg Monitor put message from the service.
802 handle_client_result (void *cls,
803 const struct GNUNET_DHT_ClientResultMessage *msg)
805 struct GNUNET_DHT_Handle *handle = cls;
807 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
809 &process_client_result,
815 * Process a MQ PUT transmission notification.
817 * @param cls The DHT handle.
820 handle_put_cont (void *cls)
822 struct GNUNET_DHT_PutHandle *ph = cls;
823 GNUNET_SCHEDULER_TaskCallback cont;
827 cont_cls = ph->cont_cls;
829 GNUNET_DHT_put_cancel (ph);
836 * Try to (re)connect to the DHT service.
838 * @param h DHT handle to reconnect
839 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
842 try_connect (struct GNUNET_DHT_Handle *h)
844 struct GNUNET_MQ_MessageHandler handlers[] = {
845 GNUNET_MQ_hd_var_size (monitor_get,
846 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
847 struct GNUNET_DHT_MonitorGetMessage,
849 GNUNET_MQ_hd_var_size (monitor_get_resp,
850 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
851 struct GNUNET_DHT_MonitorGetRespMessage,
853 GNUNET_MQ_hd_var_size (monitor_put,
854 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
855 struct GNUNET_DHT_MonitorPutMessage,
857 GNUNET_MQ_hd_var_size (client_result,
858 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
859 struct GNUNET_DHT_ClientResultMessage,
861 GNUNET_MQ_handler_end ()
866 h->mq = GNUNET_CLIENT_connect (h->cfg,
873 LOG (GNUNET_ERROR_TYPE_WARNING,
874 "Failed to connect to the DHT service!\n");
882 * Initialize the connection with the DHT service.
884 * @param cfg configuration to use
885 * @param ht_len size of the internal hash table to use for
886 * processing multiple GET/FIND requests in parallel
887 * @return handle to the DHT service, or NULL on error
889 struct GNUNET_DHT_Handle *
890 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
893 struct GNUNET_DHT_Handle *handle;
895 handle = GNUNET_new (struct GNUNET_DHT_Handle);
898 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
900 handle->active_requests
901 = GNUNET_CONTAINER_multihashmap_create (ht_len,
903 if (GNUNET_NO == try_connect (handle))
905 GNUNET_DHT_disconnect (handle);
913 * Shutdown connection with the DHT service.
915 * @param handle handle of the DHT connection to stop
918 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
920 struct GNUNET_DHT_PutHandle *ph;
923 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
924 while (NULL != (ph = handle->put_head))
926 if (NULL != ph->cont)
927 ph->cont (ph->cont_cls);
928 GNUNET_DHT_put_cancel (ph);
930 if (NULL != handle->mq)
932 GNUNET_MQ_destroy (handle->mq);
935 if (NULL != handle->reconnect_task)
937 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
938 handle->reconnect_task = NULL;
940 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
941 GNUNET_free (handle);
946 * Perform a PUT operation storing data in the DHT. FIXME: we should
947 * change the protocol to get a confirmation for the PUT from the DHT
948 * and call 'cont' only after getting the confirmation; otherwise, the
949 * client has no good way of telling if the 'PUT' message actually got
950 * to the DHT service!
952 * @param handle handle to DHT service
953 * @param key the key to store under
954 * @param desired_replication_level estimate of how many
955 * nearest peers this request should reach
956 * @param options routing options for this message
957 * @param type type of the value
958 * @param size number of bytes in data; must be less than 64k
959 * @param data the data to store
960 * @param exp desired expiration time for the value
961 * @param cont continuation to call when done (transmitting request to service)
962 * You must not call #GNUNET_DHT_disconnect in this continuation
963 * @param cont_cls closure for @a cont
965 struct GNUNET_DHT_PutHandle *
966 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
967 const struct GNUNET_HashCode *key,
968 uint32_t desired_replication_level,
969 enum GNUNET_DHT_RouteOption options,
970 enum GNUNET_BLOCK_Type type,
973 struct GNUNET_TIME_Absolute exp,
974 GNUNET_SCHEDULER_TaskCallback cont,
977 struct GNUNET_MQ_Envelope *env;
978 struct GNUNET_DHT_ClientPutMessage *put_msg;
980 struct GNUNET_DHT_PutHandle *ph;
982 msize = sizeof(struct GNUNET_DHT_ClientPutMessage) + size;
983 if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
984 (size >= GNUNET_MAX_MESSAGE_SIZE))
989 if (NULL == handle->mq)
991 LOG (GNUNET_ERROR_TYPE_DEBUG,
992 "Sending PUT for %s to DHT via %p\n",
995 ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
996 ph->dht_handle = handle;
998 ph->cont_cls = cont_cls;
999 GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1002 env = GNUNET_MQ_msg_extra (put_msg,
1004 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1005 GNUNET_MQ_notify_sent (env,
1009 put_msg->type = htonl ((uint32_t) type);
1010 put_msg->options = htonl ((uint32_t) options);
1011 put_msg->desired_replication_level = htonl (desired_replication_level);
1012 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1013 put_msg->key = *key;
1014 GNUNET_memcpy (&put_msg[1],
1017 GNUNET_MQ_send (handle->mq,
1024 * Cancels a DHT PUT operation. Note that the PUT request may still
1025 * go out over the network (we can't stop that); However, if the PUT
1026 * has not yet been sent to the service, cancelling the PUT will stop
1027 * this from happening (but there is no way for the user of this API
1028 * to tell if that is the case). The only use for this API is to
1029 * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1030 * the system is shutting down).
1032 * @param ph put operation to cancel ('cont' will no longer be called)
1035 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1037 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1039 if (NULL != ph->env)
1040 GNUNET_MQ_notify_sent (ph->env,
1043 GNUNET_CONTAINER_DLL_remove (handle->put_head,
1051 * Perform an asynchronous GET operation on the DHT identified. See
1052 * also #GNUNET_BLOCK_evaluate.
1054 * @param handle handle to the DHT service
1055 * @param type expected type of the response object
1056 * @param key the key to look up
1057 * @param desired_replication_level estimate of how many
1058 nearest peers this request should reach
1059 * @param options routing options for this message
1060 * @param xquery extended query data (can be NULL, depending on type)
1061 * @param xquery_size number of bytes in @a xquery
1062 * @param iter function to call on each result
1063 * @param iter_cls closure for @a iter
1064 * @return handle to stop the async get
1066 struct GNUNET_DHT_GetHandle *
1067 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1068 enum GNUNET_BLOCK_Type type,
1069 const struct GNUNET_HashCode *key,
1070 uint32_t desired_replication_level,
1071 enum GNUNET_DHT_RouteOption options,
1074 GNUNET_DHT_GetIterator iter,
1077 struct GNUNET_DHT_GetHandle *gh;
1080 msize = sizeof(struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1081 if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
1082 (xquery_size >= GNUNET_MAX_MESSAGE_SIZE))
1087 LOG (GNUNET_ERROR_TYPE_DEBUG,
1088 "Sending query for %s to DHT %p\n",
1091 gh = GNUNET_malloc (sizeof(struct GNUNET_DHT_GetHandle)
1094 gh->iter_cls = iter_cls;
1095 gh->dht_handle = handle;
1097 gh->unique_id = ++handle->uid_gen;
1098 gh->xquery_size = xquery_size;
1099 gh->desired_replication_level = desired_replication_level;
1101 gh->options = options;
1102 GNUNET_memcpy (&gh[1],
1105 GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1108 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1109 if (NULL != handle->mq)
1116 * Tell the DHT not to return any of the following known results
1119 * @param get_handle get operation for which results should be filtered
1120 * @param num_results number of results to be blocked that are
1121 * provided in this call (size of the @a results array)
1122 * @param results array of hash codes over the 'data' of the results
1126 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1127 unsigned int num_results,
1128 const struct GNUNET_HashCode *results)
1130 unsigned int needed;
1133 had = get_handle->seen_results_end;
1134 needed = had + num_results;
1135 if (needed > get_handle->seen_results_size)
1136 GNUNET_array_grow (get_handle->seen_results,
1137 get_handle->seen_results_size,
1139 GNUNET_memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1141 num_results * sizeof(struct GNUNET_HashCode));
1142 get_handle->seen_results_end += num_results;
1143 if (NULL != get_handle->dht_handle->mq)
1144 send_get_known_results (get_handle,
1150 * Stop async DHT-get.
1152 * @param get_handle handle to the GET operation to stop
1155 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1157 struct GNUNET_DHT_Handle *handle = get_handle->dht_handle;
1159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1160 "Sending STOP for %s to DHT via %p\n",
1161 GNUNET_h2s (&get_handle->key),
1163 if (NULL != handle->mq)
1165 struct GNUNET_MQ_Envelope *env;
1166 struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1168 env = GNUNET_MQ_msg (stop_msg,
1169 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1170 stop_msg->reserved = htonl (0);
1171 stop_msg->unique_id = get_handle->unique_id;
1172 stop_msg->key = get_handle->key;
1173 GNUNET_MQ_send (handle->mq,
1176 GNUNET_assert (GNUNET_YES ==
1177 GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1180 GNUNET_array_grow (get_handle->seen_results,
1181 get_handle->seen_results_end,
1183 GNUNET_free (get_handle);
1188 * Start monitoring the local DHT service.
1190 * @param handle Handle to the DHT service.
1191 * @param type Type of blocks that are of interest.
1192 * @param key Key of data of interest, NULL for all.
1193 * @param get_cb Callback to process monitored get messages.
1194 * @param get_resp_cb Callback to process monitored get response messages.
1195 * @param put_cb Callback to process monitored put messages.
1196 * @param cb_cls Closure for callbacks.
1197 * @return Handle to stop monitoring.
1199 struct GNUNET_DHT_MonitorHandle *
1200 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1201 enum GNUNET_BLOCK_Type type,
1202 const struct GNUNET_HashCode *key,
1203 GNUNET_DHT_MonitorGetCB get_cb,
1204 GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1205 GNUNET_DHT_MonitorPutCB put_cb,
1208 struct GNUNET_DHT_MonitorHandle *mh;
1210 mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1211 mh->get_cb = get_cb;
1212 mh->get_resp_cb = get_resp_cb;
1213 mh->put_cb = put_cb;
1214 mh->cb_cls = cb_cls;
1216 mh->dht_handle = handle;
1219 mh->key = GNUNET_new (struct GNUNET_HashCode);
1222 GNUNET_CONTAINER_DLL_insert (handle->monitor_head,
1223 handle->monitor_tail,
1225 if (NULL != handle->mq)
1226 send_monitor_start (mh);
1234 * @param mh The handle to the monitor request returned by monitor_start.
1236 * On return get_handle will no longer be valid, caller must not use again!!!
1239 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh)
1241 struct GNUNET_DHT_Handle *handle = mh->dht_handle;
1242 struct GNUNET_DHT_MonitorStartStopMessage *m;
1243 struct GNUNET_MQ_Envelope *env;
1245 GNUNET_CONTAINER_DLL_remove (handle->monitor_head,
1246 handle->monitor_tail,
1248 env = GNUNET_MQ_msg (m,
1249 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1250 m->type = htonl (mh->type);
1251 m->get = htons (NULL != mh->get_cb);
1252 m->get_resp = htons (NULL != mh->get_resp_cb);
1253 m->put = htons (NULL != mh->put_cb);
1254 if (NULL != mh->key)
1256 m->filter_key = htons (1);
1259 GNUNET_MQ_send (handle->mq,
1261 GNUNET_free_non_null (mh->key);
1266 /* end of dht_api.c */