2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2012, 2016, 2018 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
41 * Handle to a PUT request.
43 struct GNUNET_DHT_PutHandle
48 struct GNUNET_DHT_PutHandle *next;
53 struct GNUNET_DHT_PutHandle *prev;
56 * Continuation to call when done.
58 GNUNET_SCHEDULER_TaskCallback cont;
61 * Main handle to this DHT api
63 struct GNUNET_DHT_Handle *dht_handle;
66 * Closure for @e cont.
71 * Envelope from the PUT operation.
73 struct GNUNET_MQ_Envelope *env;
78 * Handle to a GET request
80 struct GNUNET_DHT_GetHandle
84 * Iterator to call on data receipt
86 GNUNET_DHT_GetIterator iter;
89 * Closure for @e iter.
94 * Main handle to this DHT api
96 struct GNUNET_DHT_Handle *dht_handle;
99 * Array of hash codes over the results that we have already
102 struct GNUNET_HashCode *seen_results;
105 * Key that this get request is for
107 struct GNUNET_HashCode key;
110 * Unique identifier for this request (for key collisions).
115 * Size of the extended query, allocated at the end of this struct.
120 * Desired replication level.
122 uint32_t desired_replication_level;
125 * Type of the block we are looking for.
127 enum GNUNET_BLOCK_Type type;
132 enum GNUNET_DHT_RouteOption options;
135 * Size of the @e seen_results array. Note that not
136 * all positions might be used (as we over-allocate).
138 unsigned int seen_results_size;
141 * Offset into the @e seen_results array marking the
142 * end of the positions that are actually used.
144 unsigned int seen_results_end;
150 * Handle to a monitoring request.
152 struct GNUNET_DHT_MonitorHandle
157 struct GNUNET_DHT_MonitorHandle *next;
162 struct GNUNET_DHT_MonitorHandle *prev;
165 * Main handle to this DHT api.
167 struct GNUNET_DHT_Handle *dht_handle;
170 * Type of block looked for.
172 enum GNUNET_BLOCK_Type type;
175 * Key being looked for, NULL == all.
177 struct GNUNET_HashCode *key;
180 * Callback for each received message of type get.
182 GNUNET_DHT_MonitorGetCB get_cb;
185 * Callback for each received message of type get response.
187 GNUNET_DHT_MonitorGetRespCB get_resp_cb;
190 * Callback for each received message of type put.
192 GNUNET_DHT_MonitorPutCB put_cb;
195 * Closure for @e get_cb, @e put_cb and @e get_resp_cb.
203 * Connection to the DHT service.
205 struct GNUNET_DHT_Handle
209 * Configuration to use.
211 const struct GNUNET_CONFIGURATION_Handle *cfg;
214 * Connection to DHT service.
216 struct GNUNET_MQ_Handle *mq;
219 * Head of linked list of messages we would like to monitor.
221 struct GNUNET_DHT_MonitorHandle *monitor_head;
224 * Tail of linked list of messages we would like to monitor.
226 struct GNUNET_DHT_MonitorHandle *monitor_tail;
229 * Head of active PUT requests.
231 struct GNUNET_DHT_PutHandle *put_head;
234 * Tail of active PUT requests.
236 struct GNUNET_DHT_PutHandle *put_tail;
239 * Hash map containing the current outstanding unique GET requests
240 * (values are of type `struct GNUNET_DHT_GetHandle`).
242 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
245 * Task for trying to reconnect.
247 struct GNUNET_SCHEDULER_Task *reconnect_task;
250 * How quickly should we retry? Used for exponential back-off on
253 struct GNUNET_TIME_Relative retry_time;
256 * Generator for unique ids.
265 * Try to (re)connect to the DHT service.
267 * @param h DHT handle to reconnect
268 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
271 try_connect (struct GNUNET_DHT_Handle *h);
275 * Send GET message for a @a get_handle to DHT.
277 * @param gh GET to generate messages for.
280 send_get (struct GNUNET_DHT_GetHandle *gh)
282 struct GNUNET_DHT_Handle *h = gh->dht_handle;
283 struct GNUNET_MQ_Envelope *env;
284 struct GNUNET_DHT_ClientGetMessage *get_msg;
286 env = GNUNET_MQ_msg_extra (get_msg,
288 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
289 get_msg->options = htonl ((uint32_t) gh->options);
290 get_msg->desired_replication_level = htonl (gh->desired_replication_level);
291 get_msg->type = htonl (gh->type);
292 get_msg->key = gh->key;
293 get_msg->unique_id = gh->unique_id;
294 GNUNET_memcpy (&get_msg[1],
297 GNUNET_MQ_send (h->mq,
303 * Send GET message(s) for indicating which results are already known
304 * for a @a get_handle to DHT. Complex as we need to send the list of
305 * known results, which means we may need mulitple messages to block
306 * known results from the result set.
308 * @param gh GET to generate messages for
309 * @param transmission_offset_start at which offset should we start?
312 send_get_known_results (struct GNUNET_DHT_GetHandle *gh,
313 unsigned int transmission_offset_start)
315 struct GNUNET_DHT_Handle *h = gh->dht_handle;
316 struct GNUNET_MQ_Envelope *env;
317 struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
320 unsigned int transmission_offset;
322 max = (GNUNET_MAX_MESSAGE_SIZE - sizeof (*msg))
323 / sizeof (struct GNUNET_HashCode);
324 transmission_offset = transmission_offset_start;
325 while (transmission_offset < gh->seen_results_end)
327 delta = gh->seen_results_end - transmission_offset;
330 env = GNUNET_MQ_msg_extra (msg,
331 delta * sizeof (struct GNUNET_HashCode),
332 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
334 msg->unique_id = gh->unique_id;
335 GNUNET_memcpy (&msg[1],
336 &gh->seen_results[transmission_offset],
337 sizeof (struct GNUNET_HashCode) * delta);
338 GNUNET_MQ_send (h->mq,
340 transmission_offset += delta;
346 * Add the GET request corresponding to the given route handle
347 * to the pending queue (if it is not already in there).
349 * @param cls the `struct GNUNET_DHT_Handle *`
350 * @param key key for the request (not used)
351 * @param value the `struct GNUNET_DHT_GetHandle *`
352 * @return #GNUNET_YES (always)
355 add_get_request_to_pending (void *cls,
356 const struct GNUNET_HashCode *key,
359 struct GNUNET_DHT_Handle *handle = cls;
360 struct GNUNET_DHT_GetHandle *gh = value;
362 LOG (GNUNET_ERROR_TYPE_DEBUG,
363 "Retransmitting request related to %s to DHT %p\n",
367 send_get_known_results (gh, 0);
373 * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message.
375 * @param mh monitor handle to generate start message for
378 send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh)
380 struct GNUNET_DHT_Handle *h = mh->dht_handle;
381 struct GNUNET_MQ_Envelope *env;
382 struct GNUNET_DHT_MonitorStartStopMessage *m;
384 env = GNUNET_MQ_msg (m,
385 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
386 m->type = htonl (mh->type);
387 m->get = htons (NULL != mh->get_cb);
388 m->get_resp = htons (NULL != mh->get_resp_cb);
389 m->put = htons (NULL != mh->put_cb);
392 m->filter_key = htons(1);
395 GNUNET_MQ_send (h->mq,
401 * Try reconnecting to the dht service.
403 * @param cls a `struct GNUNET_DHT_Handle`
406 try_reconnect (void *cls)
408 struct GNUNET_DHT_Handle *h = cls;
409 struct GNUNET_DHT_MonitorHandle *mh;
411 LOG (GNUNET_ERROR_TYPE_DEBUG,
412 "Reconnecting with DHT %p\n",
414 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
415 h->reconnect_task = NULL;
416 if (GNUNET_YES != try_connect (h))
418 LOG (GNUNET_ERROR_TYPE_WARNING,
419 "DHT reconnect failed!\n");
421 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
426 GNUNET_CONTAINER_multihashmap_iterate (h->active_requests,
427 &add_get_request_to_pending,
429 for (mh = h->monitor_head; NULL != mh; mh = mh->next)
430 send_monitor_start (mh);
435 * Try reconnecting to the DHT service.
437 * @param h handle to dht to (possibly) disconnect and reconnect
440 do_disconnect (struct GNUNET_DHT_Handle *h)
442 struct GNUNET_DHT_PutHandle *ph;
443 GNUNET_SCHEDULER_TaskCallback cont;
448 GNUNET_MQ_destroy (h->mq);
450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451 "Disconnecting from DHT service, will try to reconnect in %s\n",
452 GNUNET_STRINGS_relative_time_to_string (h->retry_time,
454 /* notify client about all PUTs that (may) have failed due to disconnect */
455 while (NULL != (ph = h->put_head))
458 cont_cls = ph->cont_cls;
460 GNUNET_DHT_put_cancel (ph);
464 GNUNET_assert (NULL == h->reconnect_task);
466 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
473 * Generic error handler, called with the appropriate error code and
474 * the same closure specified at the creation of the message queue.
475 * Not every message queue implementation supports an error handler.
477 * @param cls closure with the `struct GNUNET_DHT_Handle *`
478 * @param error error code
481 mq_error_handler (void *cls,
482 enum GNUNET_MQ_Error error)
484 struct GNUNET_DHT_Handle *h = cls;
491 * Verify integrity of a get monitor message from the service.
493 * @param cls The DHT handle.
494 * @param msg Monitor get message from the service.
495 * @return #GNUNET_OK if everything went fine,
496 * #GNUNET_SYSERR if the message is malformed.
499 check_monitor_get (void *cls,
500 const struct GNUNET_DHT_MonitorGetMessage *msg)
502 uint32_t plen = ntohl (msg->get_path_length);
503 uint16_t msize = ntohs (msg->header.size) - sizeof (*msg);
505 if ( (plen > UINT16_MAX) ||
506 (plen * sizeof (struct GNUNET_PeerIdentity) != msize) )
509 return GNUNET_SYSERR;
516 * Process a get monitor message from the service.
518 * @param cls The DHT handle.
519 * @param msg Monitor get message from the service.
522 handle_monitor_get (void *cls,
523 const struct GNUNET_DHT_MonitorGetMessage *msg)
525 struct GNUNET_DHT_Handle *handle = cls;
526 struct GNUNET_DHT_MonitorHandle *mh;
528 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
530 if (NULL == mh->get_cb)
532 if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
533 (mh->type == ntohl (msg->type)) ) &&
534 ( (NULL == mh->key) ||
535 (0 == memcmp (mh->key,
537 sizeof (struct GNUNET_HashCode))) ) )
538 mh->get_cb (mh->cb_cls,
539 ntohl (msg->options),
540 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
541 ntohl (msg->hop_count),
542 ntohl (msg->desired_replication_level),
543 ntohl (msg->get_path_length),
544 (struct GNUNET_PeerIdentity *) &msg[1],
551 * Validate a get response monitor message from the service.
553 * @param cls The DHT handle.
554 * @param msg monitor get response message from the service
555 * @return #GNUNET_OK if everything went fine,
556 * #GNUNET_SYSERR if the message is malformed.
559 check_monitor_get_resp (void *cls,
560 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
562 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
563 uint32_t getl = ntohl (msg->get_path_length);
564 uint32_t putl = ntohl (msg->put_path_length);
566 if ( (getl + putl < getl) ||
567 ( (msize / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
570 return GNUNET_SYSERR;
577 * Process a get response monitor message from the service.
579 * @param cls The DHT handle.
580 * @param msg monitor get response message from the service
583 handle_monitor_get_resp (void *cls,
584 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
586 struct GNUNET_DHT_Handle *handle = cls;
587 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
588 const struct GNUNET_PeerIdentity *path;
589 uint32_t getl = ntohl (msg->get_path_length);
590 uint32_t putl = ntohl (msg->put_path_length);
591 struct GNUNET_DHT_MonitorHandle *mh;
593 path = (const struct GNUNET_PeerIdentity *) &msg[1];
594 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
596 if (NULL == mh->get_resp_cb)
598 if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
599 (mh->type == ntohl(msg->type)) ) &&
600 ( (NULL == mh->key) ||
601 (0 == memcmp (mh->key,
603 sizeof (struct GNUNET_HashCode))) ) )
604 mh->get_resp_cb (mh->cb_cls,
605 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
610 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
612 (const void *) &path[getl + putl],
613 msize - sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
619 * Check validity of a put monitor message from the service.
621 * @param cls The DHT handle.
622 * @param msg Monitor put message from the service.
623 * @return #GNUNET_OK if everything went fine,
624 * #GNUNET_SYSERR if the message is malformed.
627 check_monitor_put (void *cls,
628 const struct GNUNET_DHT_MonitorPutMessage *msg)
633 msize = ntohs (msg->header.size) - sizeof (*msg);
634 putl = ntohl (msg->put_path_length);
635 if ((msize / sizeof (struct GNUNET_PeerIdentity)) < putl)
638 return GNUNET_SYSERR;
645 * Process a put monitor message from the service.
647 * @param cls The DHT handle.
648 * @param msg Monitor put message from the service.
651 handle_monitor_put (void *cls,
652 const struct GNUNET_DHT_MonitorPutMessage *msg)
654 struct GNUNET_DHT_Handle *handle = cls;
655 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
656 uint32_t putl = ntohl (msg->put_path_length);
657 const struct GNUNET_PeerIdentity *path;
658 struct GNUNET_DHT_MonitorHandle *mh;
660 path = (const struct GNUNET_PeerIdentity *) &msg[1];
661 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
663 if (NULL == mh->put_cb)
665 if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
666 (mh->type == ntohl(msg->type)) ) &&
667 ( (NULL == mh->key) ||
668 (0 == memcmp (mh->key,
670 sizeof (struct GNUNET_HashCode))) ) )
671 mh->put_cb (mh->cb_cls,
672 ntohl (msg->options),
673 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
674 ntohl (msg->hop_count),
675 ntohl (msg->desired_replication_level),
678 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
680 (const void *) &path[putl],
681 msize - sizeof (struct GNUNET_PeerIdentity) * putl);
687 * Verify that client result message received from the service is well-formed.
689 * @param cls The DHT handle.
690 * @param msg Monitor put message from the service.
691 * @return #GNUNET_OK if everything went fine,
692 * #GNUNET_SYSERR if the message is malformed.
695 check_client_result (void *cls,
696 const struct GNUNET_DHT_ClientResultMessage *msg)
698 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
699 uint32_t put_path_length = ntohl (msg->put_path_length);
700 uint32_t get_path_length = ntohl (msg->get_path_length);
704 sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
705 if ( (msize < meta_length) ||
707 GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
709 GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
712 return GNUNET_SYSERR;
719 * Process a given reply that might match the given request.
721 * @param cls the `struct GNUNET_DHT_ClientResultMessage`
722 * @param key query of the request
723 * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key
724 * @return #GNUNET_YES to continue to iterate over all results
727 process_client_result (void *cls,
728 const struct GNUNET_HashCode *key,
731 const struct GNUNET_DHT_ClientResultMessage *crm = cls;
732 struct GNUNET_DHT_GetHandle *get_handle = value;
733 size_t msize = ntohs (crm->header.size) - sizeof (*crm);
734 uint32_t put_path_length = ntohl (crm->put_path_length);
735 uint32_t get_path_length = ntohl (crm->get_path_length);
736 const struct GNUNET_PeerIdentity *put_path;
737 const struct GNUNET_PeerIdentity *get_path;
738 struct GNUNET_HashCode hc;
743 if (crm->unique_id != get_handle->unique_id)
746 LOG (GNUNET_ERROR_TYPE_DEBUG,
747 "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
750 get_handle->unique_id);
753 /* FIXME: might want to check that type matches */
755 sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
756 data_length = msize - meta_length;
757 put_path = (const struct GNUNET_PeerIdentity *) &crm[1];
758 get_path = &put_path[put_path_length];
763 gp = GNUNET_STRINGS_pp2s (get_path,
765 pp = GNUNET_STRINGS_pp2s (put_path,
767 LOG (GNUNET_ERROR_TYPE_DEBUG,
768 "Giving %u byte reply for %s to application (GP: %s, PP: %s)\n",
769 (unsigned int) data_length,
776 data = &get_path[get_path_length];
777 /* remember that we've seen this result */
778 GNUNET_CRYPTO_hash (data,
781 if (get_handle->seen_results_size == get_handle->seen_results_end)
782 GNUNET_array_grow (get_handle->seen_results,
783 get_handle->seen_results_size,
784 get_handle->seen_results_size * 2 + 1);
785 get_handle->seen_results[get_handle->seen_results_end++] = hc;
786 /* no need to block it explicitly, service already knows about it! */
787 get_handle->iter (get_handle->iter_cls,
788 GNUNET_TIME_absolute_ntoh (crm->expiration),
802 * Process a client result message received from the service.
804 * @param cls The DHT handle.
805 * @param msg Monitor put message from the service.
808 handle_client_result (void *cls,
809 const struct GNUNET_DHT_ClientResultMessage *msg)
811 struct GNUNET_DHT_Handle *handle = cls;
813 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
815 &process_client_result,
821 * Process a MQ PUT transmission notification.
823 * @param cls The DHT handle.
826 handle_put_cont (void *cls)
828 struct GNUNET_DHT_PutHandle *ph = cls;
829 GNUNET_SCHEDULER_TaskCallback cont;
833 cont_cls = ph->cont_cls;
835 GNUNET_DHT_put_cancel (ph);
842 * Try to (re)connect to the DHT service.
844 * @param h DHT handle to reconnect
845 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
848 try_connect (struct GNUNET_DHT_Handle *h)
850 struct GNUNET_MQ_MessageHandler handlers[] = {
851 GNUNET_MQ_hd_var_size (monitor_get,
852 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
853 struct GNUNET_DHT_MonitorGetMessage,
855 GNUNET_MQ_hd_var_size (monitor_get_resp,
856 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
857 struct GNUNET_DHT_MonitorGetRespMessage,
859 GNUNET_MQ_hd_var_size (monitor_put,
860 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
861 struct GNUNET_DHT_MonitorPutMessage,
863 GNUNET_MQ_hd_var_size (client_result,
864 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
865 struct GNUNET_DHT_ClientResultMessage,
867 GNUNET_MQ_handler_end ()
871 h->mq = GNUNET_CLIENT_connect (h->cfg,
878 LOG (GNUNET_ERROR_TYPE_WARNING,
879 "Failed to connect to the DHT service!\n");
887 * Initialize the connection with the DHT service.
889 * @param cfg configuration to use
890 * @param ht_len size of the internal hash table to use for
891 * processing multiple GET/FIND requests in parallel
892 * @return handle to the DHT service, or NULL on error
894 struct GNUNET_DHT_Handle *
895 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
898 struct GNUNET_DHT_Handle *handle;
900 handle = GNUNET_new (struct GNUNET_DHT_Handle);
903 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
905 handle->active_requests
906 = GNUNET_CONTAINER_multihashmap_create (ht_len,
908 if (GNUNET_NO == try_connect (handle))
910 GNUNET_DHT_disconnect (handle);
918 * Shutdown connection with the DHT service.
920 * @param handle handle of the DHT connection to stop
923 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
925 struct GNUNET_DHT_PutHandle *ph;
928 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
929 while (NULL != (ph = handle->put_head))
931 if (NULL != ph->cont)
932 ph->cont (ph->cont_cls);
933 GNUNET_DHT_put_cancel (ph);
935 if (NULL != handle->mq)
937 GNUNET_MQ_destroy (handle->mq);
940 if (NULL != handle->reconnect_task)
942 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
943 handle->reconnect_task = NULL;
945 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
946 GNUNET_free (handle);
951 * Perform a PUT operation storing data in the DHT. FIXME: we should
952 * change the protocol to get a confirmation for the PUT from the DHT
953 * and call 'cont' only after getting the confirmation; otherwise, the
954 * client has no good way of telling if the 'PUT' message actually got
955 * to the DHT service!
957 * @param handle handle to DHT service
958 * @param key the key to store under
959 * @param desired_replication_level estimate of how many
960 * nearest peers this request should reach
961 * @param options routing options for this message
962 * @param type type of the value
963 * @param size number of bytes in data; must be less than 64k
964 * @param data the data to store
965 * @param exp desired expiration time for the value
966 * @param cont continuation to call when done (transmitting request to service)
967 * You must not call #GNUNET_DHT_disconnect in this continuation
968 * @param cont_cls closure for @a cont
970 struct GNUNET_DHT_PutHandle *
971 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
972 const struct GNUNET_HashCode *key,
973 uint32_t desired_replication_level,
974 enum GNUNET_DHT_RouteOption options,
975 enum GNUNET_BLOCK_Type type,
978 struct GNUNET_TIME_Absolute exp,
979 GNUNET_SCHEDULER_TaskCallback cont,
982 struct GNUNET_MQ_Envelope *env;
983 struct GNUNET_DHT_ClientPutMessage *put_msg;
985 struct GNUNET_DHT_PutHandle *ph;
987 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
988 if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
989 (size >= GNUNET_MAX_MESSAGE_SIZE))
994 if (NULL == handle->mq)
996 LOG (GNUNET_ERROR_TYPE_DEBUG,
997 "Sending PUT for %s to DHT via %p\n",
1000 ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
1001 ph->dht_handle = handle;
1003 ph->cont_cls = cont_cls;
1004 GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1007 env = GNUNET_MQ_msg_extra (put_msg,
1009 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1010 GNUNET_MQ_notify_sent (env,
1014 put_msg->type = htonl ((uint32_t) type);
1015 put_msg->options = htonl ((uint32_t) options);
1016 put_msg->desired_replication_level = htonl (desired_replication_level);
1017 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1018 put_msg->key = *key;
1019 GNUNET_memcpy (&put_msg[1],
1022 GNUNET_MQ_send (handle->mq,
1029 * Cancels a DHT PUT operation. Note that the PUT request may still
1030 * go out over the network (we can't stop that); However, if the PUT
1031 * has not yet been sent to the service, cancelling the PUT will stop
1032 * this from happening (but there is no way for the user of this API
1033 * to tell if that is the case). The only use for this API is to
1034 * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1035 * the system is shutting down).
1037 * @param ph put operation to cancel ('cont' will no longer be called)
1040 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1042 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1044 if (NULL != ph->env)
1045 GNUNET_MQ_notify_sent (ph->env,
1048 GNUNET_CONTAINER_DLL_remove (handle->put_head,
1056 * Perform an asynchronous GET operation on the DHT identified. See
1057 * also #GNUNET_BLOCK_evaluate.
1059 * @param handle handle to the DHT service
1060 * @param type expected type of the response object
1061 * @param key the key to look up
1062 * @param desired_replication_level estimate of how many
1063 nearest peers this request should reach
1064 * @param options routing options for this message
1065 * @param xquery extended query data (can be NULL, depending on type)
1066 * @param xquery_size number of bytes in @a xquery
1067 * @param iter function to call on each result
1068 * @param iter_cls closure for @a iter
1069 * @return handle to stop the async get
1071 struct GNUNET_DHT_GetHandle *
1072 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1073 enum GNUNET_BLOCK_Type type,
1074 const struct GNUNET_HashCode *key,
1075 uint32_t desired_replication_level,
1076 enum GNUNET_DHT_RouteOption options,
1079 GNUNET_DHT_GetIterator iter,
1082 struct GNUNET_DHT_GetHandle *gh;
1085 msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1086 if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
1087 (xquery_size >= GNUNET_MAX_MESSAGE_SIZE))
1092 LOG (GNUNET_ERROR_TYPE_DEBUG,
1093 "Sending query for %s to DHT %p\n",
1096 gh = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle) +
1099 gh->iter_cls = iter_cls;
1100 gh->dht_handle = handle;
1102 gh->unique_id = ++handle->uid_gen;
1103 gh->xquery_size = xquery_size;
1104 gh->desired_replication_level = desired_replication_level;
1106 gh->options = options;
1107 GNUNET_memcpy (&gh[1],
1110 GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1113 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1114 if (NULL != handle->mq)
1121 * Tell the DHT not to return any of the following known results
1124 * @param get_handle get operation for which results should be filtered
1125 * @param num_results number of results to be blocked that are
1126 * provided in this call (size of the @a results array)
1127 * @param results array of hash codes over the 'data' of the results
1131 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1132 unsigned int num_results,
1133 const struct GNUNET_HashCode *results)
1135 unsigned int needed;
1138 had = get_handle->seen_results_end;
1139 needed = had + num_results;
1140 if (needed > get_handle->seen_results_size)
1141 GNUNET_array_grow (get_handle->seen_results,
1142 get_handle->seen_results_size,
1144 GNUNET_memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1146 num_results * sizeof (struct GNUNET_HashCode));
1147 get_handle->seen_results_end += num_results;
1148 if (NULL != get_handle->dht_handle->mq)
1149 send_get_known_results (get_handle,
1155 * Stop async DHT-get.
1157 * @param get_handle handle to the GET operation to stop
1160 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1162 struct GNUNET_DHT_Handle *handle = get_handle->dht_handle;
1164 LOG (GNUNET_ERROR_TYPE_DEBUG,
1165 "Sending STOP for %s to DHT via %p\n",
1166 GNUNET_h2s (&get_handle->key),
1168 if (NULL != handle->mq)
1170 struct GNUNET_MQ_Envelope *env;
1171 struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1173 env = GNUNET_MQ_msg (stop_msg,
1174 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1175 stop_msg->reserved = htonl (0);
1176 stop_msg->unique_id = get_handle->unique_id;
1177 stop_msg->key = get_handle->key;
1178 GNUNET_MQ_send (handle->mq,
1181 GNUNET_assert (GNUNET_YES ==
1182 GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1185 GNUNET_array_grow (get_handle->seen_results,
1186 get_handle->seen_results_end,
1188 GNUNET_free (get_handle);
1193 * Start monitoring the local DHT service.
1195 * @param handle Handle to the DHT service.
1196 * @param type Type of blocks that are of interest.
1197 * @param key Key of data of interest, NULL for all.
1198 * @param get_cb Callback to process monitored get messages.
1199 * @param get_resp_cb Callback to process monitored get response messages.
1200 * @param put_cb Callback to process monitored put messages.
1201 * @param cb_cls Closure for callbacks.
1202 * @return Handle to stop monitoring.
1204 struct GNUNET_DHT_MonitorHandle *
1205 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1206 enum GNUNET_BLOCK_Type type,
1207 const struct GNUNET_HashCode *key,
1208 GNUNET_DHT_MonitorGetCB get_cb,
1209 GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1210 GNUNET_DHT_MonitorPutCB put_cb,
1213 struct GNUNET_DHT_MonitorHandle *mh;
1215 mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1216 mh->get_cb = get_cb;
1217 mh->get_resp_cb = get_resp_cb;
1218 mh->put_cb = put_cb;
1219 mh->cb_cls = cb_cls;
1221 mh->dht_handle = handle;
1224 mh->key = GNUNET_new (struct GNUNET_HashCode);
1227 GNUNET_CONTAINER_DLL_insert (handle->monitor_head,
1228 handle->monitor_tail,
1230 if (NULL != handle->mq)
1231 send_monitor_start (mh);
1239 * @param mh The handle to the monitor request returned by monitor_start.
1241 * On return get_handle will no longer be valid, caller must not use again!!!
1244 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh)
1246 struct GNUNET_DHT_Handle *handle = mh->dht_handle;
1247 struct GNUNET_DHT_MonitorStartStopMessage *m;
1248 struct GNUNET_MQ_Envelope *env;
1250 GNUNET_CONTAINER_DLL_remove (handle->monitor_head,
1251 handle->monitor_tail,
1253 env = GNUNET_MQ_msg (m,
1254 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1255 m->type = htonl (mh->type);
1256 m->get = htons (NULL != mh->get_cb);
1257 m->get_resp = htons(NULL != mh->get_resp_cb);
1258 m->put = htons (NULL != mh->put_cb);
1259 if (NULL != mh->key)
1261 m->filter_key = htons (1);
1264 GNUNET_MQ_send (handle->mq,
1266 GNUNET_free_non_null (mh->key);
1271 /* end of dht_api.c */