2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
37 #define DEBUG_DHT_API GNUNET_EXTRA_LOGGING
40 * Entry in our list of messages to be (re-)transmitted.
45 * This is a doubly-linked list.
47 struct PendingMessage *prev;
50 * This is a doubly-linked list.
52 struct PendingMessage *next;
55 * Message that is pending, allocated at the end
58 const struct GNUNET_MessageHeader *msg;
61 * Handle to the DHT API context.
63 struct GNUNET_DHT_Handle *handle;
66 * Continuation to call when the request has been
67 * transmitted (for the first time) to the service; can be NULL.
69 GNUNET_SCHEDULER_Task cont;
77 * Timeout task for this message
79 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82 * Unique ID for this request
87 * Free the saved message once sent, set to GNUNET_YES for messages
88 * that do not receive responses; GNUNET_NO if this pending message
89 * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
95 * GNUNET_YES if this message is in our pending queue right now.
103 * Handle to a GET request
105 struct GNUNET_DHT_GetHandle
109 * Iterator to call on data receipt
111 GNUNET_DHT_GetIterator iter;
114 * Closure for the iterator callback
119 * Main handle to this DHT api
121 struct GNUNET_DHT_Handle *dht_handle;
124 * The actual message sent for this request,
125 * used for retransmitting requests on service
126 * failure/reconnect. Freed on route_stop.
128 struct PendingMessage *message;
131 * Key that this get request is for
136 * Unique identifier for this request (for key collisions).
144 * Connection to the DHT service.
146 struct GNUNET_DHT_Handle
150 * Configuration to use.
152 const struct GNUNET_CONFIGURATION_Handle *cfg;
155 * Socket (if available).
157 struct GNUNET_CLIENT_Connection *client;
160 * Currently pending transmission request (or NULL).
162 struct GNUNET_CLIENT_TransmitHandle *th;
165 * Head of linked list of messages we would like to transmit.
167 struct PendingMessage *pending_head;
170 * Tail of linked list of messages we would like to transmit.
172 struct PendingMessage *pending_tail;
175 * Hash map containing the current outstanding unique requests
176 * (values are of type 'struct GNUNET_DHT_RouteHandle').
178 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
181 * Task for trying to reconnect.
183 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
186 * How quickly should we retry? Used for exponential back-off on
189 struct GNUNET_TIME_Relative retry_time;
192 * Generator for unique ids.
200 * Handler for messages received from the DHT service
201 * a demultiplexer which handles numerous message types
205 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
209 * Try to (re)connect to the DHT service.
211 * @return GNUNET_YES on success, GNUNET_NO on failure.
214 try_connect (struct GNUNET_DHT_Handle *handle)
216 if (handle->client != NULL)
218 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
219 if (handle->client == NULL)
221 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
222 _("Failed to connect to the DHT service!\n"));
226 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
227 "Starting to process replies from DHT\n");
229 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
230 GNUNET_TIME_UNIT_FOREVER_REL);
236 * Add the request corresponding to the given route handle
237 * to the pending queue (if it is not already in there).
239 * @param cls the 'struct GNUNET_DHT_Handle*'
240 * @param key key for the request (not used)
241 * @param value the 'struct GNUNET_DHT_GetHandle*'
242 * @return GNUNET_YES (always)
245 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
247 struct GNUNET_DHT_Handle *handle = cls;
248 struct GNUNET_DHT_GetHandle *rh = value;
250 if (GNUNET_NO == rh->message->in_pending_queue)
252 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
254 rh->message->in_pending_queue = GNUNET_YES;
261 * Try to send messages from list of messages to send
262 * @param handle DHT_Handle
265 process_pending_messages (struct GNUNET_DHT_Handle *handle);
269 * Try reconnecting to the dht service.
271 * @param cls GNUNET_DHT_Handle
272 * @param tc scheduler context
275 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
277 struct GNUNET_DHT_Handle *handle = cls;
279 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
280 if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
281 handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
283 handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
284 if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
285 handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
286 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
287 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
288 if (handle->client == NULL)
290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
293 GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
294 &add_request_to_pending, handle);
295 process_pending_messages (handle);
300 * Try reconnecting to the DHT service.
302 * @param handle handle to dht to (possibly) disconnect and reconnect
305 do_disconnect (struct GNUNET_DHT_Handle *handle)
307 if (handle->client == NULL)
309 GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
310 if (NULL != handle->th)
311 GNUNET_CLIENT_notify_transmit_ready_cancel(handle->th);
313 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
314 handle->client = NULL;
315 handle->reconnect_task =
316 GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
321 * Transmit the next pending message, called by notify_transmit_ready
324 transmit_pending (void *cls, size_t size, void *buf);
328 * Try to send messages from list of messages to send
331 process_pending_messages (struct GNUNET_DHT_Handle *handle)
333 struct PendingMessage *head;
335 if (handle->client == NULL)
337 do_disconnect (handle);
340 if (handle->th != NULL)
342 if (NULL == (head = handle->pending_head))
345 GNUNET_CLIENT_notify_transmit_ready (handle->client,
346 ntohs (head->msg->size),
347 GNUNET_TIME_UNIT_FOREVER_REL,
348 GNUNET_YES, &transmit_pending,
350 if (NULL != handle->th)
352 do_disconnect (handle);
357 * Transmit the next pending message, called by notify_transmit_ready
360 transmit_pending (void *cls, size_t size, void *buf)
362 struct GNUNET_DHT_Handle *handle = cls;
363 struct PendingMessage *head;
369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370 "Transmission to DHT service failed! Reconnecting!\n");
371 do_disconnect (handle);
374 if (NULL == (head = handle->pending_head))
377 tsize = ntohs (head->msg->size);
380 process_pending_messages (handle);
383 memcpy (buf, head->msg, tsize);
384 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
386 if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
388 GNUNET_SCHEDULER_cancel (head->timeout_task);
389 head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
391 if (NULL != head->cont)
393 GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
394 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
396 head->cont_cls = NULL;
398 head->in_pending_queue = GNUNET_NO;
399 if (GNUNET_YES == head->free_on_send)
401 process_pending_messages (handle);
403 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404 "Forwarded request of %u bytes to DHT service\n",
405 (unsigned int) tsize);
412 * Process a given reply that might match the given
415 * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
416 * @param key query of the request
417 * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
418 * @return GNUNET_YES to continue to iterate over all results,
419 * GNUNET_NO if the reply is malformed
422 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
424 const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
425 struct GNUNET_DHT_GetHandle *get_handle = value;
426 const struct GNUNET_PeerIdentity *put_path;
427 const struct GNUNET_PeerIdentity *get_path;
428 uint32_t put_path_length;
429 uint32_t get_path_length;
435 if (dht_msg->unique_id != get_handle->unique_id)
438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439 "Ignoring reply (UID mismatch: %llu/%llu)\n",
441 get_handle->unique_id);
444 msize = ntohs (dht_msg->header.size);
445 put_path_length = ntohl (dht_msg->put_path_length);
446 get_path_length = ntohl (dht_msg->get_path_length);
447 meta_length = sizeof (struct GNUNET_DHT_ClientResultMessage) +
448 sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
449 if ( (msize < meta_length) ||
450 (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
451 (put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
456 data_length = msize - meta_length;
457 put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
458 get_path = &put_path[put_path_length];
459 data = &get_path[get_path_length];
460 get_handle->iter (get_handle->iter_cls,
461 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
463 get_path, get_path_length,
464 put_path, put_path_length,
465 ntohl (dht_msg->type),
472 * Handler for messages received from the DHT service
473 * a demultiplexer which handles numerous message types
475 * @param cls the 'struct GNUNET_DHT_Handle'
476 * @param msg the incoming message
479 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
481 struct GNUNET_DHT_Handle *handle = cls;
482 const struct GNUNET_DHT_ClientResultMessage *dht_msg;
486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487 "Error receiving data from DHT service, reconnecting\n");
488 do_disconnect (handle);
491 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
494 do_disconnect (handle);
497 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
500 do_disconnect (handle);
503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504 "Received reply from DHT service\n");
505 dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
506 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
507 &dht_msg->key, &process_reply,
509 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
510 GNUNET_TIME_UNIT_FOREVER_REL);
515 * Initialize the connection with the DHT service.
517 * @param cfg configuration to use
518 * @param ht_len size of the internal hash table to use for
519 * processing multiple GET/FIND requests in parallel
521 * @return handle to the DHT service, or NULL on error
523 struct GNUNET_DHT_Handle *
524 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
527 struct GNUNET_DHT_Handle *handle;
529 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
532 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
533 handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
534 if (GNUNET_NO == try_connect (handle))
536 GNUNET_DHT_disconnect (handle);
544 * Shutdown connection with the DHT service.
546 * @param handle handle of the DHT connection to stop
549 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
551 struct PendingMessage *pm;
553 GNUNET_assert (handle != NULL);
555 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
556 if (handle->th != NULL)
558 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
561 while (NULL != (pm = handle->pending_head))
563 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
565 GNUNET_assert (GNUNET_YES == pm->free_on_send);
566 if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
567 GNUNET_SCHEDULER_cancel (pm->timeout_task);
568 if (NULL != pm->cont)
569 GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
570 GNUNET_SCHEDULER_REASON_TIMEOUT);
571 pm->in_pending_queue = GNUNET_NO;
574 if (handle->client != NULL)
576 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
577 handle->client = NULL;
579 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
580 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
581 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
582 GNUNET_free (handle);
587 * Timeout for the transmission of a fire&forget-request. Clean it up.
589 * @param cls the 'struct PendingMessage'
590 * @param tc scheduler context
593 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
595 struct PendingMessage *pending = cls;
596 struct GNUNET_DHT_Handle *handle;
598 handle = pending->handle;
599 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
601 if (pending->cont != NULL)
602 pending->cont (pending->cont_cls, tc);
603 GNUNET_free (pending);
608 * Perform a PUT operation storing data in the DHT.
610 * @param handle handle to DHT service
611 * @param key the key to store under
612 * @param desired_replication_level estimate of how many
613 * nearest peers this request should reach
614 * @param options routing options for this message
615 * @param type type of the value
616 * @param size number of bytes in data; must be less than 64k
617 * @param data the data to store
618 * @param exp desired expiration time for the value
619 * @param timeout how long to wait for transmission of this request
620 * @param cont continuation to call when done (transmitting request to service)
621 * @param cont_cls closure for cont
624 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
625 uint32_t desired_replication_level,
626 enum GNUNET_DHT_RouteOption options,
627 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
628 struct GNUNET_TIME_Absolute exp,
629 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
632 struct GNUNET_DHT_ClientPutMessage *put_msg;
634 struct PendingMessage *pending;
636 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
637 if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
638 (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
642 cont (cont_cls, NULL);
645 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
646 put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
647 pending->msg = &put_msg->header;
648 pending->handle = handle;
649 pending->cont = cont;
650 pending->cont_cls = cont_cls;
651 pending->free_on_send = GNUNET_YES;
652 pending->timeout_task =
653 GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
654 put_msg->header.size = htons (msize);
655 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
656 put_msg->type = htonl (type);
657 put_msg->options = htonl ((uint32_t) options);
658 put_msg->desired_replication_level = htonl (desired_replication_level);
659 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
661 memcpy (&put_msg[1], data, size);
662 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
664 pending->in_pending_queue = GNUNET_YES;
665 process_pending_messages (handle);
670 * Perform an asynchronous GET operation on the DHT identified. See
671 * also "GNUNET_BLOCK_evaluate".
673 * @param handle handle to the DHT service
674 * @param timeout how long to wait for transmission of this request to the service
675 * @param type expected type of the response object
676 * @param key the key to look up
677 * @param desired_replication_level estimate of how many
678 nearest peers this request should reach
679 * @param options routing options for this message
680 * @param xquery extended query data (can be NULL, depending on type)
681 * @param xquery_size number of bytes in xquery
682 * @param iter function to call on each result
683 * @param iter_cls closure for iter
684 * @return handle to stop the async get
686 struct GNUNET_DHT_GetHandle *
687 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
688 struct GNUNET_TIME_Relative timeout,
689 enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
690 uint32_t desired_replication_level,
691 enum GNUNET_DHT_RouteOption options,
692 const void *xquery, size_t xquery_size,
693 GNUNET_DHT_GetIterator iter, void *iter_cls)
695 struct GNUNET_DHT_ClientGetMessage *get_msg;
696 struct GNUNET_DHT_GetHandle *get_handle;
698 struct PendingMessage *pending;
700 msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
701 if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
702 (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
707 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
708 get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
709 pending->msg = &get_msg->header;
710 pending->handle = handle;
711 pending->free_on_send = GNUNET_NO;
712 get_msg->header.size = htons (msize);
713 get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
714 get_msg->options = htonl ((uint32_t) options);
715 get_msg->desired_replication_level = htonl (desired_replication_level);
716 get_msg->type = htonl (type);
719 get_msg->unique_id = handle->uid_gen;
720 memcpy (&get_msg[1], xquery, xquery_size);
721 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
723 pending->in_pending_queue = GNUNET_YES;
724 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
725 get_handle->iter = iter;
726 get_handle->iter_cls = iter_cls;
727 get_handle->message = pending;
728 get_handle->unique_id = get_msg->unique_id;
729 GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
731 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
732 process_pending_messages (handle);
738 * Stop async DHT-get.
740 * @param get_handle handle to the GET operation to stop
743 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
745 struct GNUNET_DHT_Handle *handle;
746 const struct GNUNET_DHT_ClientGetMessage *get_msg;
747 struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
748 struct PendingMessage *pending;
750 handle = get_handle->message->handle;
751 get_msg = (const struct GNUNET_DHT_ClientGetMessage*) get_handle->message->msg;
754 pending = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct GNUNET_DHT_ClientGetStopMessage));
755 stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
756 pending->msg = &stop_msg->header;
757 pending->handle = handle;
758 pending->free_on_send = GNUNET_YES;
759 stop_msg->header.size = htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
760 stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
761 stop_msg->reserved = htonl (0);
762 stop_msg->unique_id = get_msg->unique_id;
763 stop_msg->key = get_msg->key;
764 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
766 pending->in_pending_queue = GNUNET_YES;
768 /* remove 'GET' from active status */
769 GNUNET_assert (GNUNET_YES ==
770 GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
771 &get_msg->key, get_handle));
772 if (GNUNET_YES == get_handle->message->in_pending_queue)
774 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
775 handle->pending_tail,
776 get_handle->message);
777 get_handle->message->in_pending_queue = GNUNET_NO;
779 GNUNET_free (get_handle->message);
780 GNUNET_free (get_handle);
782 process_pending_messages (handle);
786 /* end of dht_api.c */