2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
27 * TODO: Only allow a single message until confirmed as received by
28 * the service. For put messages call continuation as soon as
29 * receipt acknowledged (then remove), for GET or other messages
30 * only call continuation when data received.
31 * Add unique identifier to message types requesting data to be
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
47 #define DEBUG_DHT_API GNUNET_YES
49 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
54 * Message that is pending
56 struct GNUNET_MessageHeader *msg;
59 * Timeout for this message
61 struct GNUNET_TIME_Relative timeout;
64 * Continuation to call on message send
65 * or message receipt confirmation
67 GNUNET_SCHEDULER_Task cont;
70 * Continuation closure
75 * Whether or not to await verification the message
76 * was received by the service
81 * Unique ID for this request
87 struct GNUNET_DHT_GetContext
92 * Iterator to call on data receipt
94 GNUNET_DHT_GetIterator iter;
97 * Closure for the iterator callback
104 * Handle to control a unique operation (one that is
105 * expected to return results)
107 struct GNUNET_DHT_RouteHandle
111 * Unique identifier for this request (for key collisions)
116 * Key that this get request is for
121 * Iterator to call on data receipt
123 GNUNET_DHT_ReplyProcessor iter;
126 * Closure for the iterator callback
131 * Main handle to this DHT api
133 struct GNUNET_DHT_Handle *dht_handle;
137 * Handle for a non unique request, holds callback
138 * which needs to be called before we allow other
139 * messages to be processed and sent to the DHT service
141 struct GNUNET_DHT_NonUniqueHandle
144 * Key that this get request is for
149 * Type of data get request was for
154 * Continuation to call on service
155 * confirmation of message receipt.
157 GNUNET_SCHEDULER_Task cont;
160 * Send continuation cls
167 * Connection to the DHT service.
169 struct GNUNET_DHT_Handle
174 struct GNUNET_SCHEDULER_Handle *sched;
177 * Configuration to use.
179 const struct GNUNET_CONFIGURATION_Handle *cfg;
182 * Socket (if available).
184 struct GNUNET_CLIENT_Connection *client;
187 * Currently pending transmission request.
189 struct GNUNET_CLIENT_TransmitHandle *th;
192 * Message we are currently sending, only allow
193 * a single message to be queued. If not unique
194 * (typically a put request), await a confirmation
195 * from the service that the message was received.
196 * If unique, just fire and forget.
198 struct PendingMessage *current;
201 * Hash map containing the current outstanding unique requests
203 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
206 * Non unique handle. If set don't schedule another non
209 struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
212 * Kill off the connection and any pending messages.
218 static struct GNUNET_TIME_Relative default_request_timeout;
220 /* Forward declaration */
221 static void process_pending_message(struct GNUNET_DHT_Handle *handle);
223 static GNUNET_HashCode * hash_from_uid(uint64_t uid)
227 GNUNET_HashCode *hash;
228 hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
231 while (count < sizeof(GNUNET_HashCode))
233 remaining = sizeof(GNUNET_HashCode) - count;
234 if (remaining > sizeof(uid))
235 remaining = sizeof(uid);
237 memcpy(hash, &uid, remaining);
245 * Handler for messages received from the DHT service
246 * a demultiplexer which handles numerous message types
249 void service_message_handler (void *cls,
250 const struct GNUNET_MessageHeader *msg)
252 struct GNUNET_DHT_Handle *handle = cls;
253 struct GNUNET_DHT_Message *dht_msg;
254 struct GNUNET_DHT_StopMessage *stop_msg;
255 struct GNUNET_MessageHeader *enc_msg;
256 struct GNUNET_DHT_RouteHandle *route_handle;
258 GNUNET_HashCode *uid_hash;
260 /* TODO: find out message type, handle callbacks for different types of messages.
261 * Should be a non unique acknowledgment, or unique result. */
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267 "`%s': Received NULL from server, connection down?\n", "DHT API");
272 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT)
274 dht_msg = (struct GNUNET_DHT_Message *)msg;
275 uid = GNUNET_ntohll(dht_msg->unique_id);
277 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278 "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
280 if (ntohs(dht_msg->unique))
282 uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
283 route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
284 GNUNET_free(uid_hash);
285 if (route_handle == NULL) /* We have no recollection of this request */
288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289 "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
294 enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
295 GNUNET_assert(enc_size > 0);
296 enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
297 route_handle->iter(route_handle->iter_cls, enc_msg);
301 else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP)
303 stop_msg = (struct GNUNET_DHT_StopMessage *)msg;
304 uid = GNUNET_ntohll(stop_msg->unique_id);
306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307 "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id);
309 if (handle->current->unique_id == uid)
312 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313 "`%s': Have pending confirmation for this message!\n", "DHT API", uid);
315 if (handle->current->cont != NULL)
316 GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
318 GNUNET_free(handle->current->msg);
319 GNUNET_free(handle->current);
320 handle->current = NULL;
326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327 "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type));
331 GNUNET_CLIENT_receive (handle->client,
332 &service_message_handler,
333 handle, GNUNET_TIME_UNIT_FOREVER_REL);
339 * Initialize the connection with the DHT service.
341 * @param sched scheduler to use
342 * @param cfg configuration to use
343 * @param ht_len size of the internal hash table to use for
344 * processing multiple GET/FIND requests in parallel
346 * @return handle to the DHT service, or NULL on error
348 struct GNUNET_DHT_Handle *
349 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
350 const struct GNUNET_CONFIGURATION_Handle *cfg,
353 struct GNUNET_DHT_Handle *handle;
355 handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
357 default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
359 handle->sched = sched;
361 handle->current = NULL;
362 handle->do_destroy = GNUNET_NO;
365 handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
366 handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
368 if (handle->client == NULL)
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375 "`%s': Connection to service in progress\n", "DHT API");
377 GNUNET_CLIENT_receive (handle->client,
378 &service_message_handler,
379 handle, GNUNET_TIME_UNIT_FOREVER_REL);
386 * Shutdown connection with the DHT service.
388 * @param handle handle of the DHT connection to stop
391 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
397 GNUNET_assert(handle != NULL);
399 if (handle->th != NULL) /* We have a live transmit request in the Aether */
401 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
404 if (handle->current != NULL) /* We are trying to send something now, clean it up */
405 GNUNET_free(handle->current);
407 if (handle->client != NULL) /* Finally, disconnect from the service */
409 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
410 handle->client = NULL;
413 GNUNET_free (handle);
418 * Send complete (or failed), schedule next (or don't)
421 finish (struct GNUNET_DHT_Handle *handle, int code)
423 /* TODO: if code is not GNUNET_OK, do something! */
424 struct PendingMessage *pos = handle->current;
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "`%s': Finish called!\n", "DHT API");
429 GNUNET_assert(pos != NULL);
433 if (pos->cont != NULL)
435 if (code == GNUNET_SYSERR)
436 GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
438 GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
441 GNUNET_free(pos->msg);
442 handle->current = NULL;
445 /* Otherwise we need to wait for a response to this message! */
449 * Transmit the next pending message, called by notify_transmit_ready
452 transmit_pending (void *cls, size_t size, void *buf)
454 struct GNUNET_DHT_Handle *handle = cls;
458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459 "`%s': In transmit_pending\n", "DHT API");
464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465 "`%s': In transmit_pending buf is NULL\n", "DHT API");
467 /* FIXME: free associated resources or summat */
468 finish(handle, GNUNET_SYSERR);
474 if (handle->current != NULL)
476 tsize = ntohs(handle->current->msg->size);
480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481 "`%s': Sending message size %d\n", "DHT API", tsize);
483 memcpy(buf, handle->current->msg, tsize);
484 finish(handle, GNUNET_OK);
492 /* Have no pending request */
498 * Try to (re)connect to the dht service.
500 * @return GNUNET_YES on success, GNUNET_NO on failure.
503 try_connect (struct GNUNET_DHT_Handle *handle)
505 if (handle->client != NULL)
507 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
508 if (handle->client != NULL)
511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512 _("Failed to connect to the dht service!\n"));
519 * Try to send messages from list of messages to send
521 static void process_pending_message(struct GNUNET_DHT_Handle *handle)
524 if (handle->current == NULL)
525 return; /* action already pending */
526 if (GNUNET_YES != try_connect (handle))
528 finish (handle, GNUNET_SYSERR);
532 /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
533 if (handle->do_destroy)
535 //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
540 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
541 ntohs(handle->current->msg->size),
542 handle->current->timeout,
544 &transmit_pending, handle)))
547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548 "Failed to transmit request to dht service.\n");
550 finish (handle, GNUNET_SYSERR);
553 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554 "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
559 * Iterator called on each result obtained from a generic route
562 void get_reply_iterator (void *cls,
563 const struct GNUNET_MessageHeader *reply)
569 * Perform an asynchronous FIND_PEER operation on the DHT.
571 * @param handle handle to the DHT service
572 * @param key the key to look up
573 * @param desired_replication_level how many peers should ultimately receive
574 * this message (advisory only, target may be too high for the
575 * given DHT or not hit exactly).
576 * @param options options for routing
577 * @param enc send the encapsulated message to a peer close to the key
578 * @param iter function to call on each result, NULL if no replies are expected
579 * @param iter_cls closure for iter
580 * @param timeout when to abort with an error if we fail to get
581 * a confirmation for the request (when necessary) or how long
582 * to wait for tramission to the service
583 * @param cont continuation to call when done;
584 * reason will be TIMEOUT on error,
585 * reason will be PREREQ_DONE on success
586 * @param cont_cls closure for cont
588 * @return handle to stop the request, NULL if the request is "fire and forget"
590 struct GNUNET_DHT_RouteHandle *
591 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
592 const GNUNET_HashCode *key,
593 unsigned int desired_replication_level,
594 enum GNUNET_DHT_RouteOption options,
595 const struct GNUNET_MessageHeader *enc,
596 struct GNUNET_TIME_Relative timeout,
597 GNUNET_DHT_ReplyProcessor iter,
599 GNUNET_SCHEDULER_Task cont,
602 struct GNUNET_DHT_RouteHandle *route_handle;
603 struct PendingMessage *pending;
604 struct GNUNET_DHT_Message *message;
607 GNUNET_HashCode *uid_key;
610 is_unique = GNUNET_YES;
612 is_unique = GNUNET_NO;
619 GNUNET_free_non_null(uid_key);
620 uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
621 uid_key = hash_from_uid(uid);
622 } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES);
626 route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
627 memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
628 route_handle->iter = iter;
629 route_handle->iter_cls = iter_cls;
630 route_handle->dht_handle = handle;
631 route_handle->uid = uid;
633 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634 "`%s': Unique ID is %llu\n", "DHT API", uid);
637 * Store based on random identifier!
639 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
640 msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
645 msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
648 GNUNET_free(uid_key);
649 message = GNUNET_malloc(msize);
650 message->header.size = htons(msize);
651 message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
652 memcpy(&message->key, key, sizeof(GNUNET_HashCode));
653 message->options = htons(options);
654 message->desired_replication_level = htons(options);
655 message->unique = htons(is_unique);
656 message->unique_id = GNUNET_htonll(uid);
657 memcpy(&message[1], enc, ntohs(enc->size));
659 pending = GNUNET_malloc(sizeof(struct PendingMessage));
660 pending->msg = &message->header;
661 pending->timeout = timeout;
662 pending->cont = cont;
663 pending->cont_cls = cont_cls;
664 pending->is_unique = is_unique;
665 pending->unique_id = uid;
667 GNUNET_assert(handle->current == NULL);
669 handle->current = pending;
671 process_pending_message(handle);
677 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
681 * Perform an asynchronous GET operation on the DHT identified.
683 * @param handle handle to the DHT service
684 * @param timeout how long to wait for transmission of this request to the service
685 * @param type expected type of the response object
686 * @param key the key to look up
687 * @param iter function to call on each result
688 * @param iter_cls closure for iter
689 * @param cont continuation to call once message sent
690 * @param cont_cls closure for continuation
692 * @return handle to stop the async get
694 struct GNUNET_DHT_RouteHandle *
695 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
696 struct GNUNET_TIME_Relative timeout,
698 const GNUNET_HashCode * key,
699 GNUNET_DHT_GetIterator iter,
701 GNUNET_SCHEDULER_Task cont,
704 struct GNUNET_DHT_GetContext *get_context;
705 struct GNUNET_DHT_GetMessage *get_msg;
707 if (handle->current != NULL) /* Can't send right now, we have a pending message... */
710 get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext));
711 get_context->iter = iter;
712 get_context->iter_cls = iter_cls;
715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716 "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key));
719 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
720 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
721 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
722 get_msg->type = htonl(type);
724 return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, cont, cont_cls);
730 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
732 struct PendingMessage *pending;
733 struct GNUNET_DHT_StopMessage *message;
735 GNUNET_HashCode *uid_key;
737 msize = sizeof(struct GNUNET_DHT_StopMessage);
739 message = GNUNET_malloc(msize);
740 message->header.size = htons(msize);
741 message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid);
746 message->unique_id = GNUNET_htonll(route_handle->uid);
748 GNUNET_assert(route_handle->dht_handle->current == NULL);
750 pending = GNUNET_malloc(sizeof(struct PendingMessage));
751 pending->msg = (struct GNUNET_MessageHeader *)message;
752 pending->timeout = DEFAULT_DHT_TIMEOUT;
753 pending->cont = NULL;
754 pending->cont_cls = NULL;
755 pending->is_unique = GNUNET_NO;
756 pending->unique_id = route_handle->uid;
758 GNUNET_assert(route_handle->dht_handle->current == NULL);
760 route_handle->dht_handle->current = pending;
762 process_pending_message(route_handle->dht_handle);
764 uid_key = hash_from_uid(route_handle->uid);
766 if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES)
769 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
770 "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
773 GNUNET_free(uid_key);
779 * Stop async DHT-get.
781 * @param get_handle handle to the GET operation to stop
784 GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *get_handle)
787 struct GNUNET_DHT_GetMessage *get_msg;
788 struct GNUNET_DHT_Handle *handle;
789 GNUNET_HashCode *uid_key;
792 GNUNET_DHT_route_stop(get_handle);
795 uid_key = hash_from_uid(get_handle->uid);
796 GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES);
798 if (handle->do_destroy == GNUNET_NO)
800 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
801 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
802 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809 "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&get_handle->key), get_handle->uid);
815 * Perform a PUT operation storing data in the DHT.
817 * @param h handle to DHT service
818 * @param key the key to store under
819 * @param type type of the value
820 * @param size number of bytes in data; must be less than 64k
821 * @param data the data to store
822 * @param exp desired expiration time for the value
823 * @param cont continuation to call when done;
824 * reason will be TIMEOUT on error,
825 * reason will be PREREQ_DONE on success
826 * @param cont_cls closure for cont
828 * @return GNUNET_YES if put message is queued for transmission
831 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
832 const GNUNET_HashCode * key,
836 struct GNUNET_TIME_Absolute exp,
837 struct GNUNET_TIME_Relative timeout,
838 GNUNET_SCHEDULER_Task cont,
841 struct GNUNET_DHT_PutMessage *put_msg;
844 if (handle->current != NULL)
846 GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
851 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852 "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key));
855 msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
856 put_msg = GNUNET_malloc(msize);
857 put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
858 put_msg->header.size = htons(msize);
859 put_msg->type = htonl(type);
860 put_msg->data_size = htons(size);
861 put_msg->expiration = exp;
862 memcpy(&put_msg[1], data, size);
864 GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls);
866 GNUNET_free(put_msg);