2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2012, 2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
41 * Handle to a PUT request.
43 struct GNUNET_DHT_PutHandle
48 struct GNUNET_DHT_PutHandle *next;
53 struct GNUNET_DHT_PutHandle *prev;
56 * Continuation to call when done.
58 GNUNET_DHT_PutContinuation cont;
61 * Main handle to this DHT api
63 struct GNUNET_DHT_Handle *dht_handle;
66 * Closure for @e cont.
71 * Unique ID for the PUT operation.
78 * Handle to a GET request
80 struct GNUNET_DHT_GetHandle
84 * Iterator to call on data receipt
86 GNUNET_DHT_GetIterator iter;
89 * Closure for @a iter.
94 * Main handle to this DHT api
96 struct GNUNET_DHT_Handle *dht_handle;
99 * Array of hash codes over the results that we have already
102 struct GNUNET_HashCode *seen_results;
105 * Key that this get request is for
107 struct GNUNET_HashCode key;
110 * Unique identifier for this request (for key collisions).
115 * Size of the extended query, allocated at the end of this struct.
120 * Desired replication level.
122 uint32_t desired_replication_level;
125 * Type of the block we are looking for.
127 enum GNUNET_BLOCK_Type type;
132 enum GNUNET_DHT_RouteOption options;
135 * Size of the @e seen_results array. Note that not
136 * all positions might be used (as we over-allocate).
138 unsigned int seen_results_size;
141 * Offset into the @e seen_results array marking the
142 * end of the positions that are actually used.
144 unsigned int seen_results_end;
150 * Handle to a monitoring request.
152 struct GNUNET_DHT_MonitorHandle
157 struct GNUNET_DHT_MonitorHandle *next;
162 struct GNUNET_DHT_MonitorHandle *prev;
165 * Main handle to this DHT api.
167 struct GNUNET_DHT_Handle *dht_handle;
170 * Type of block looked for.
172 enum GNUNET_BLOCK_Type type;
175 * Key being looked for, NULL == all.
177 struct GNUNET_HashCode *key;
180 * Callback for each received message of type get.
182 GNUNET_DHT_MonitorGetCB get_cb;
185 * Callback for each received message of type get response.
187 GNUNET_DHT_MonitorGetRespCB get_resp_cb;
190 * Callback for each received message of type put.
192 GNUNET_DHT_MonitorPutCB put_cb;
195 * Closure for @e get_cb, @e put_cb and @e get_resp_cb.
203 * Connection to the DHT service.
205 struct GNUNET_DHT_Handle
209 * Configuration to use.
211 const struct GNUNET_CONFIGURATION_Handle *cfg;
214 * Connection to DHT service.
216 struct GNUNET_MQ_Handle *mq;
219 * Head of linked list of messages we would like to monitor.
221 struct GNUNET_DHT_MonitorHandle *monitor_head;
224 * Tail of linked list of messages we would like to monitor.
226 struct GNUNET_DHT_MonitorHandle *monitor_tail;
229 * Head of active PUT requests.
231 struct GNUNET_DHT_PutHandle *put_head;
234 * Tail of active PUT requests.
236 struct GNUNET_DHT_PutHandle *put_tail;
239 * Hash map containing the current outstanding unique GET requests
240 * (values are of type `struct GNUNET_DHT_GetHandle`).
242 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
245 * Task for trying to reconnect.
247 struct GNUNET_SCHEDULER_Task *reconnect_task;
250 * How quickly should we retry? Used for exponential back-off on
253 struct GNUNET_TIME_Relative retry_time;
256 * Generator for unique ids.
265 * Try to (re)connect to the DHT service.
267 * @param h DHT handle to reconnect
268 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
271 try_connect (struct GNUNET_DHT_Handle *h);
275 * Send GET message for a @a get_handle to DHT.
277 * @param gh GET to generate messages for.
280 send_get (struct GNUNET_DHT_GetHandle *gh)
282 struct GNUNET_DHT_Handle *h = gh->dht_handle;
283 struct GNUNET_MQ_Envelope *env;
284 struct GNUNET_DHT_ClientGetMessage *get_msg;
286 env = GNUNET_MQ_msg_extra (get_msg,
288 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
289 get_msg->options = htonl ((uint32_t) gh->options);
290 get_msg->desired_replication_level = htonl (gh->desired_replication_level);
291 get_msg->type = htonl (gh->type);
292 get_msg->key = gh->key;
293 get_msg->unique_id = gh->unique_id;
294 GNUNET_memcpy (&get_msg[1],
297 GNUNET_MQ_send (h->mq,
303 * Send GET message(s) for indicating which results are already known
304 * for a @a get_handle to DHT. Complex as we need to send the list of
305 * known results, which means we may need mulitple messages to block
306 * known results from the result set.
308 * @param gh GET to generate messages for
309 * @param transmission_offset_start at which offset should we start?
312 send_get_known_results (struct GNUNET_DHT_GetHandle *gh,
313 unsigned int transmission_offset_start)
315 struct GNUNET_DHT_Handle *h = gh->dht_handle;
316 struct GNUNET_MQ_Envelope *env;
317 struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
320 unsigned int transmission_offset;
322 max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*msg))
323 / sizeof (struct GNUNET_HashCode);
324 transmission_offset = transmission_offset_start;
325 while (transmission_offset < gh->seen_results_end)
327 delta = gh->seen_results_end - transmission_offset;
330 env = GNUNET_MQ_msg_extra (msg,
331 delta * sizeof (struct GNUNET_HashCode),
332 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
334 msg->unique_id = gh->unique_id;
335 GNUNET_memcpy (&msg[1],
336 &gh->seen_results[transmission_offset],
337 sizeof (struct GNUNET_HashCode) * delta);
338 GNUNET_MQ_send (h->mq,
340 transmission_offset += delta;
346 * Add the GET request corresponding to the given route handle
347 * to the pending queue (if it is not already in there).
349 * @param cls the `struct GNUNET_DHT_Handle *`
350 * @param key key for the request (not used)
351 * @param value the `struct GNUNET_DHT_GetHandle *`
352 * @return #GNUNET_YES (always)
355 add_get_request_to_pending (void *cls,
356 const struct GNUNET_HashCode *key,
359 struct GNUNET_DHT_Handle *handle = cls;
360 struct GNUNET_DHT_GetHandle *gh = value;
362 LOG (GNUNET_ERROR_TYPE_DEBUG,
363 "Retransmitting request related to %s to DHT %p\n",
367 send_get_known_results (gh, 0);
373 * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message.
375 * @param mh monitor handle to generate start message for
378 send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh)
380 struct GNUNET_DHT_Handle *h = mh->dht_handle;
381 struct GNUNET_MQ_Envelope *env;
382 struct GNUNET_DHT_MonitorStartStopMessage *m;
384 env = GNUNET_MQ_msg (m,
385 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
386 m->type = htonl (mh->type);
387 m->get = htons (NULL != mh->get_cb);
388 m->get_resp = htons (NULL != mh->get_resp_cb);
389 m->put = htons (NULL != mh->put_cb);
392 m->filter_key = htons(1);
395 GNUNET_MQ_send (h->mq,
401 * Try reconnecting to the dht service.
403 * @param cls a `struct GNUNET_DHT_Handle`
406 try_reconnect (void *cls)
408 struct GNUNET_DHT_Handle *h = cls;
409 struct GNUNET_DHT_MonitorHandle *mh;
411 LOG (GNUNET_ERROR_TYPE_DEBUG,
412 "Reconnecting with DHT %p\n",
414 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
415 h->reconnect_task = NULL;
416 if (GNUNET_YES != try_connect (h))
418 LOG (GNUNET_ERROR_TYPE_WARNING,
419 "DHT reconnect failed!\n");
421 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
426 GNUNET_CONTAINER_multihashmap_iterate (h->active_requests,
427 &add_get_request_to_pending,
429 for (mh = h->monitor_head; NULL != mh; mh = mh->next)
430 send_monitor_start (mh);
435 * Try reconnecting to the DHT service.
437 * @param h handle to dht to (possibly) disconnect and reconnect
440 do_disconnect (struct GNUNET_DHT_Handle *h)
442 struct GNUNET_DHT_PutHandle *ph;
443 GNUNET_DHT_PutContinuation cont;
448 GNUNET_MQ_destroy (h->mq);
450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451 "Disconnecting from DHT service, will try to reconnect in %s\n",
452 GNUNET_STRINGS_relative_time_to_string (h->retry_time,
454 /* notify client about all PUTs that (may) have failed due to disconnect */
455 while (NULL != (ph = h->put_head))
458 cont_cls = ph->cont_cls;
459 GNUNET_DHT_put_cancel (ph);
464 GNUNET_assert (NULL == h->reconnect_task);
466 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
473 * Generic error handler, called with the appropriate error code and
474 * the same closure specified at the creation of the message queue.
475 * Not every message queue implementation supports an error handler.
477 * @param cls closure with the `struct GNUNET_DHT_Handle *`
478 * @param error error code
481 mq_error_handler (void *cls,
482 enum GNUNET_MQ_Error error)
484 struct GNUNET_DHT_Handle *h = cls;
491 * Verify integrity of a get monitor message from the service.
493 * @param cls The DHT handle.
494 * @param msg Monitor get message from the service.
495 * @return #GNUNET_OK if everything went fine,
496 * #GNUNET_SYSERR if the message is malformed.
499 check_monitor_get (void *cls,
500 const struct GNUNET_DHT_MonitorGetMessage *msg)
502 uint32_t plen = ntohl (msg->get_path_length);
503 uint16_t msize = ntohs (msg->header.size) - sizeof (*msg);
505 if ( (plen > UINT16_MAX) ||
506 (plen * sizeof (struct GNUNET_PeerIdentity) != msize) )
509 return GNUNET_SYSERR;
516 * Process a get monitor message from the service.
518 * @param cls The DHT handle.
519 * @param msg Monitor get message from the service.
522 handle_monitor_get (void *cls,
523 const struct GNUNET_DHT_MonitorGetMessage *msg)
525 struct GNUNET_DHT_Handle *handle = cls;
526 struct GNUNET_DHT_MonitorHandle *mh;
528 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
530 if (NULL == mh->get_cb)
532 if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
533 (mh->type == ntohl (msg->type)) ) &&
534 ( (NULL == mh->key) ||
535 (0 == memcmp (mh->key,
537 sizeof (struct GNUNET_HashCode))) ) )
538 mh->get_cb (mh->cb_cls,
539 ntohl (msg->options),
540 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
541 ntohl (msg->hop_count),
542 ntohl (msg->desired_replication_level),
543 ntohl (msg->get_path_length),
544 (struct GNUNET_PeerIdentity *) &msg[1],
551 * Validate a get response monitor message from the service.
553 * @param cls The DHT handle.
554 * @param msg monitor get response message from the service
555 * @return #GNUNET_OK if everything went fine,
556 * #GNUNET_SYSERR if the message is malformed.
559 check_monitor_get_resp (void *cls,
560 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
562 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
563 uint32_t getl = ntohl (msg->get_path_length);
564 uint32_t putl = ntohl (msg->put_path_length);
566 if ( (getl + putl < getl) ||
567 ( (msize / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
570 return GNUNET_SYSERR;
577 * Process a get response monitor message from the service.
579 * @param cls The DHT handle.
580 * @param msg monitor get response message from the service
583 handle_monitor_get_resp (void *cls,
584 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
586 struct GNUNET_DHT_Handle *handle = cls;
587 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
588 const struct GNUNET_PeerIdentity *path;
589 uint32_t getl = ntohl (msg->get_path_length);
590 uint32_t putl = ntohl (msg->put_path_length);
591 struct GNUNET_DHT_MonitorHandle *mh;
593 path = (const struct GNUNET_PeerIdentity *) &msg[1];
594 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
596 if (NULL == mh->get_resp_cb)
598 if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
599 (mh->type == ntohl(msg->type)) ) &&
600 ( (NULL == mh->key) ||
601 (0 == memcmp (mh->key,
603 sizeof (struct GNUNET_HashCode))) ) )
604 mh->get_resp_cb (mh->cb_cls,
605 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
610 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
612 (const void *) &path[getl + putl],
613 msize - sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
619 * Check validity of a put monitor message from the service.
621 * @param cls The DHT handle.
622 * @param msg Monitor put message from the service.
623 * @return #GNUNET_OK if everything went fine,
624 * #GNUNET_SYSERR if the message is malformed.
627 check_monitor_put (void *cls,
628 const struct GNUNET_DHT_MonitorPutMessage *msg)
633 msize = ntohs (msg->header.size) - sizeof (*msg);
634 putl = ntohl (msg->put_path_length);
635 if ((msize / sizeof (struct GNUNET_PeerIdentity)) < putl)
638 return GNUNET_SYSERR;
645 * Process a put monitor message from the service.
647 * @param cls The DHT handle.
648 * @param msg Monitor put message from the service.
651 handle_monitor_put (void *cls,
652 const struct GNUNET_DHT_MonitorPutMessage *msg)
654 struct GNUNET_DHT_Handle *handle = cls;
655 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
656 uint32_t putl = ntohl (msg->put_path_length);
657 const struct GNUNET_PeerIdentity *path;
658 struct GNUNET_DHT_MonitorHandle *mh;
660 path = (const struct GNUNET_PeerIdentity *) &msg[1];
661 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
663 if (NULL == mh->put_cb)
665 if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) ||
666 (mh->type == ntohl(msg->type)) ) &&
667 ( (NULL == mh->key) ||
668 (0 == memcmp (mh->key,
670 sizeof (struct GNUNET_HashCode))) ) )
671 mh->put_cb (mh->cb_cls,
672 ntohl (msg->options),
673 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
674 ntohl (msg->hop_count),
675 ntohl (msg->desired_replication_level),
678 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
680 (const void *) &path[putl],
681 msize - sizeof (struct GNUNET_PeerIdentity) * putl);
687 * Verify that client result message received from the service is well-formed.
689 * @param cls The DHT handle.
690 * @param msg Monitor put message from the service.
691 * @return #GNUNET_OK if everything went fine,
692 * #GNUNET_SYSERR if the message is malformed.
695 check_client_result (void *cls,
696 const struct GNUNET_DHT_ClientResultMessage *msg)
698 size_t msize = ntohs (msg->header.size) - sizeof (*msg);
699 uint32_t put_path_length = ntohl (msg->put_path_length);
700 uint32_t get_path_length = ntohl (msg->get_path_length);
704 sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
705 if ( (msize < meta_length) ||
707 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
709 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
712 return GNUNET_SYSERR;
719 * Process a given reply that might match the given request.
721 * @param cls the `struct GNUNET_DHT_ClientResultMessage`
722 * @param key query of the request
723 * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key
724 * @return #GNUNET_YES to continue to iterate over all results
727 process_client_result (void *cls,
728 const struct GNUNET_HashCode *key,
731 const struct GNUNET_DHT_ClientResultMessage *crm = cls;
732 struct GNUNET_DHT_GetHandle *get_handle = value;
733 size_t msize = ntohs (crm->header.size) - sizeof (*crm);
734 uint32_t put_path_length = ntohl (crm->put_path_length);
735 uint32_t get_path_length = ntohl (crm->get_path_length);
736 const struct GNUNET_PeerIdentity *put_path;
737 const struct GNUNET_PeerIdentity *get_path;
738 struct GNUNET_HashCode hc;
743 if (crm->unique_id != get_handle->unique_id)
746 LOG (GNUNET_ERROR_TYPE_DEBUG,
747 "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
750 get_handle->unique_id);
753 /* FIXME: might want to check that type matches */
755 sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
756 data_length = msize - meta_length;
757 LOG (GNUNET_ERROR_TYPE_DEBUG,
758 "Giving %u byte reply for %s to application\n",
759 (unsigned int) data_length,
761 put_path = (const struct GNUNET_PeerIdentity *) &crm[1];
762 get_path = &put_path[put_path_length];
763 data = &get_path[get_path_length];
764 /* remember that we've seen this result */
765 GNUNET_CRYPTO_hash (data,
768 if (get_handle->seen_results_size == get_handle->seen_results_end)
769 GNUNET_array_grow (get_handle->seen_results,
770 get_handle->seen_results_size,
771 get_handle->seen_results_size * 2 + 1);
772 get_handle->seen_results[get_handle->seen_results_end++] = hc;
773 /* no need to block it explicitly, service already knows about it! */
774 get_handle->iter (get_handle->iter_cls,
775 GNUNET_TIME_absolute_ntoh (crm->expiration),
789 * Process a client result message received from the service.
791 * @param cls The DHT handle.
792 * @param msg Monitor put message from the service.
795 handle_client_result (void *cls,
796 const struct GNUNET_DHT_ClientResultMessage *msg)
798 struct GNUNET_DHT_Handle *handle = cls;
800 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
802 &process_client_result,
808 * Process a put confirmation message from the service.
810 * @param cls The DHT handle.
811 * @param msg confirmation message from the service.
814 handle_put_confirmation (void *cls,
815 const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
817 struct GNUNET_DHT_Handle *handle = cls;
818 struct GNUNET_DHT_PutHandle *ph;
819 GNUNET_DHT_PutContinuation cont;
822 for (ph = handle->put_head; NULL != ph; ph = ph->next)
823 if (ph->unique_id == msg->unique_id)
828 cont_cls = ph->cont_cls;
829 GNUNET_DHT_put_cancel (ph);
837 * Try to (re)connect to the DHT service.
839 * @param h DHT handle to reconnect
840 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
843 try_connect (struct GNUNET_DHT_Handle *h)
845 struct GNUNET_MQ_MessageHandler handlers[] = {
846 GNUNET_MQ_hd_var_size (monitor_get,
847 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
848 struct GNUNET_DHT_MonitorGetMessage,
850 GNUNET_MQ_hd_var_size (monitor_get_resp,
851 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
852 struct GNUNET_DHT_MonitorGetRespMessage,
854 GNUNET_MQ_hd_var_size (monitor_put,
855 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
856 struct GNUNET_DHT_MonitorPutMessage,
858 GNUNET_MQ_hd_var_size (client_result,
859 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
860 struct GNUNET_DHT_ClientResultMessage,
862 GNUNET_MQ_hd_fixed_size (put_confirmation,
863 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK,
864 struct GNUNET_DHT_ClientPutConfirmationMessage,
866 GNUNET_MQ_handler_end ()
870 h->mq = GNUNET_CLIENT_connect (h->cfg,
877 LOG (GNUNET_ERROR_TYPE_WARNING,
878 "Failed to connect to the DHT service!\n");
886 * Initialize the connection with the DHT service.
888 * @param cfg configuration to use
889 * @param ht_len size of the internal hash table to use for
890 * processing multiple GET/FIND requests in parallel
891 * @return handle to the DHT service, or NULL on error
893 struct GNUNET_DHT_Handle *
894 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
897 struct GNUNET_DHT_Handle *handle;
899 handle = GNUNET_new (struct GNUNET_DHT_Handle);
902 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
904 handle->active_requests
905 = GNUNET_CONTAINER_multihashmap_create (ht_len,
907 if (GNUNET_NO == try_connect (handle))
909 GNUNET_DHT_disconnect (handle);
917 * Shutdown connection with the DHT service.
919 * @param handle handle of the DHT connection to stop
922 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
924 struct GNUNET_DHT_PutHandle *ph;
927 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
928 while (NULL != (ph = handle->put_head))
930 if (NULL != ph->cont)
931 ph->cont (ph->cont_cls,
933 GNUNET_DHT_put_cancel (ph);
935 if (NULL != handle->mq)
937 GNUNET_MQ_destroy (handle->mq);
940 if (NULL != handle->reconnect_task)
942 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
943 handle->reconnect_task = NULL;
945 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
946 GNUNET_free (handle);
951 * Perform a PUT operation storing data in the DHT. FIXME: we should
952 * change the protocol to get a confirmation for the PUT from the DHT
953 * and call 'cont' only after getting the confirmation; otherwise, the
954 * client has no good way of telling if the 'PUT' message actually got
955 * to the DHT service!
957 * @param handle handle to DHT service
958 * @param key the key to store under
959 * @param desired_replication_level estimate of how many
960 * nearest peers this request should reach
961 * @param options routing options for this message
962 * @param type type of the value
963 * @param size number of bytes in data; must be less than 64k
964 * @param data the data to store
965 * @param exp desired expiration time for the value
966 * @param cont continuation to call when done (transmitting request to service)
967 * You must not call #GNUNET_DHT_disconnect in this continuation
968 * @param cont_cls closure for @a cont
970 struct GNUNET_DHT_PutHandle *
971 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
972 const struct GNUNET_HashCode *key,
973 uint32_t desired_replication_level,
974 enum GNUNET_DHT_RouteOption options,
975 enum GNUNET_BLOCK_Type type,
978 struct GNUNET_TIME_Absolute exp,
979 GNUNET_DHT_PutContinuation cont,
982 struct GNUNET_MQ_Envelope *env;
983 struct GNUNET_DHT_ClientPutMessage *put_msg;
985 struct GNUNET_DHT_PutHandle *ph;
987 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
988 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
989 (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
994 if (NULL == handle->mq)
996 LOG (GNUNET_ERROR_TYPE_DEBUG,
997 "Sending PUT for %s to DHT via %p\n",
1000 ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
1001 ph->dht_handle = handle;
1003 ph->cont_cls = cont_cls;
1004 ph->unique_id = ++handle->uid_gen;
1005 GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1008 env = GNUNET_MQ_msg_extra (put_msg,
1010 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1011 put_msg->type = htonl ((uint32_t) type);
1012 put_msg->options = htonl ((uint32_t) options);
1013 put_msg->desired_replication_level = htonl (desired_replication_level);
1014 put_msg->unique_id = ph->unique_id;
1015 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1016 put_msg->key = *key;
1017 GNUNET_memcpy (&put_msg[1],
1020 GNUNET_MQ_send (handle->mq,
1027 * Cancels a DHT PUT operation. Note that the PUT request may still
1028 * go out over the network (we can't stop that); However, if the PUT
1029 * has not yet been sent to the service, cancelling the PUT will stop
1030 * this from happening (but there is no way for the user of this API
1031 * to tell if that is the case). The only use for this API is to
1032 * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1033 * the system is shutting down).
1035 * @param ph put operation to cancel ('cont' will no longer be called)
1038 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1040 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1042 GNUNET_CONTAINER_DLL_remove (handle->put_head,
1050 * Perform an asynchronous GET operation on the DHT identified. See
1051 * also #GNUNET_BLOCK_evaluate.
1053 * @param handle handle to the DHT service
1054 * @param type expected type of the response object
1055 * @param key the key to look up
1056 * @param desired_replication_level estimate of how many
1057 nearest peers this request should reach
1058 * @param options routing options for this message
1059 * @param xquery extended query data (can be NULL, depending on type)
1060 * @param xquery_size number of bytes in @a xquery
1061 * @param iter function to call on each result
1062 * @param iter_cls closure for @a iter
1063 * @return handle to stop the async get
1065 struct GNUNET_DHT_GetHandle *
1066 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1067 enum GNUNET_BLOCK_Type type,
1068 const struct GNUNET_HashCode *key,
1069 uint32_t desired_replication_level,
1070 enum GNUNET_DHT_RouteOption options,
1073 GNUNET_DHT_GetIterator iter,
1076 struct GNUNET_DHT_GetHandle *gh;
1079 msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1080 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1081 (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1086 LOG (GNUNET_ERROR_TYPE_DEBUG,
1087 "Sending query for %s to DHT %p\n",
1090 gh = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle) +
1093 gh->iter_cls = iter_cls;
1094 gh->dht_handle = handle;
1096 gh->unique_id = ++handle->uid_gen;
1097 gh->xquery_size = xquery_size;
1098 gh->desired_replication_level = desired_replication_level;
1100 gh->options = options;
1101 GNUNET_memcpy (&gh[1],
1104 GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1107 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1108 if (NULL != handle->mq)
1115 * Tell the DHT not to return any of the following known results
1118 * @param get_handle get operation for which results should be filtered
1119 * @param num_results number of results to be blocked that are
1120 * provided in this call (size of the @a results array)
1121 * @param results array of hash codes over the 'data' of the results
1125 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1126 unsigned int num_results,
1127 const struct GNUNET_HashCode *results)
1129 unsigned int needed;
1132 had = get_handle->seen_results_end;
1133 needed = had + num_results;
1134 if (needed > get_handle->seen_results_size)
1135 GNUNET_array_grow (get_handle->seen_results,
1136 get_handle->seen_results_size,
1138 GNUNET_memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1140 num_results * sizeof (struct GNUNET_HashCode));
1141 get_handle->seen_results_end += num_results;
1142 if (NULL != get_handle->dht_handle->mq)
1143 send_get_known_results (get_handle,
1149 * Stop async DHT-get.
1151 * @param get_handle handle to the GET operation to stop
1154 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1156 struct GNUNET_DHT_Handle *handle = get_handle->dht_handle;
1158 LOG (GNUNET_ERROR_TYPE_DEBUG,
1159 "Sending STOP for %s to DHT via %p\n",
1160 GNUNET_h2s (&get_handle->key),
1162 if (NULL != handle->mq)
1164 struct GNUNET_MQ_Envelope *env;
1165 struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1167 env = GNUNET_MQ_msg (stop_msg,
1168 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1169 stop_msg->reserved = htonl (0);
1170 stop_msg->unique_id = get_handle->unique_id;
1171 stop_msg->key = get_handle->key;
1172 GNUNET_MQ_send (handle->mq,
1175 GNUNET_assert (GNUNET_YES ==
1176 GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1179 GNUNET_array_grow (get_handle->seen_results,
1180 get_handle->seen_results_end,
1182 GNUNET_free (get_handle);
1187 * Start monitoring the local DHT service.
1189 * @param handle Handle to the DHT service.
1190 * @param type Type of blocks that are of interest.
1191 * @param key Key of data of interest, NULL for all.
1192 * @param get_cb Callback to process monitored get messages.
1193 * @param get_resp_cb Callback to process monitored get response messages.
1194 * @param put_cb Callback to process monitored put messages.
1195 * @param cb_cls Closure for callbacks.
1196 * @return Handle to stop monitoring.
1198 struct GNUNET_DHT_MonitorHandle *
1199 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1200 enum GNUNET_BLOCK_Type type,
1201 const struct GNUNET_HashCode *key,
1202 GNUNET_DHT_MonitorGetCB get_cb,
1203 GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1204 GNUNET_DHT_MonitorPutCB put_cb,
1207 struct GNUNET_DHT_MonitorHandle *mh;
1209 mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1210 mh->get_cb = get_cb;
1211 mh->get_resp_cb = get_resp_cb;
1212 mh->put_cb = put_cb;
1213 mh->cb_cls = cb_cls;
1215 mh->dht_handle = handle;
1218 mh->key = GNUNET_new (struct GNUNET_HashCode);
1221 GNUNET_CONTAINER_DLL_insert (handle->monitor_head,
1222 handle->monitor_tail,
1224 if (NULL != handle->mq)
1225 send_monitor_start (mh);
1233 * @param mh The handle to the monitor request returned by monitor_start.
1235 * On return get_handle will no longer be valid, caller must not use again!!!
1238 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh)
1240 struct GNUNET_DHT_Handle *handle = mh->dht_handle;
1241 struct GNUNET_DHT_MonitorStartStopMessage *m;
1242 struct GNUNET_MQ_Envelope *env;
1244 GNUNET_CONTAINER_DLL_remove (handle->monitor_head,
1245 handle->monitor_tail,
1247 env = GNUNET_MQ_msg (m,
1248 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1249 m->type = htonl (mh->type);
1250 m->get = htons (NULL != mh->get_cb);
1251 m->get_resp = htons(NULL != mh->get_resp_cb);
1252 m->put = htons (NULL != mh->put_cb);
1253 if (NULL != mh->key)
1255 m->filter_key = htons (1);
1258 GNUNET_MQ_send (handle->mq,
1260 GNUNET_free_non_null (mh->key);
1265 /* end of dht_api.c */