2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
27 * TODO: retransmission of pending requests maybe happens now, at least
28 * the code is in place to do so. Need to add checks when api calls
29 * happen to check if retransmission is in progress, and if so set
30 * the single pending message for transmission once the list of
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
47 #define DEBUG_DHT_API GNUNET_NO
49 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
54 * Message that is pending
56 struct GNUNET_MessageHeader *msg;
59 * Timeout for this message
61 struct GNUNET_TIME_Relative timeout;
64 * Continuation to call on message send
65 * or message receipt confirmation
67 GNUNET_SCHEDULER_Task cont;
70 * Continuation closure
75 * Unique ID for this request
81 struct PendingMessageList
84 * This is a singly linked list.
86 struct PendingMessageList *next;
89 * The pending message.
91 struct PendingMessage *message;
94 struct GNUNET_DHT_GetContext
97 * Iterator to call on data receipt
99 GNUNET_DHT_GetIterator iter;
102 * Closure for the iterator callback
108 struct GNUNET_DHT_FindPeerContext
111 * Iterator to call on data receipt
113 GNUNET_DHT_FindPeerProcessor proc;
116 * Closure for the iterator callback
123 * Handle to a route request
125 struct GNUNET_DHT_RouteHandle
129 * Unique identifier for this request (for key collisions)
134 * Key that this get request is for
139 * Iterator to call on data receipt
141 GNUNET_DHT_ReplyProcessor iter;
144 * Closure for the iterator callback
149 * Main handle to this DHT api
151 struct GNUNET_DHT_Handle *dht_handle;
154 * The actual message sent for this request,
155 * used for retransmitting requests on service
156 * failure/reconnect. Freed on route_stop.
158 struct GNUNET_DHT_RouteMessage *message;
163 * Handle to control a get operation.
165 struct GNUNET_DHT_GetHandle
168 * Handle to the actual route operation for the get
170 struct GNUNET_DHT_RouteHandle *route_handle;
173 * The context of the get request
175 struct GNUNET_DHT_GetContext get_context;
180 * Handle to control a find peer operation.
182 struct GNUNET_DHT_FindPeerHandle
185 * Handle to the actual route operation for the request
187 struct GNUNET_DHT_RouteHandle *route_handle;
190 * The context of the find peer request
192 struct GNUNET_DHT_FindPeerContext find_peer_context;
196 enum DHT_Retransmit_Stage
199 * The API is not retransmitting anything at this time.
201 DHT_NOT_RETRANSMITTING,
204 * The API is retransmitting, and nothing has been single
205 * queued for sending.
210 * The API is retransmitting, and a single message has been
211 * queued for transmission once finished.
213 DHT_RETRANSMITTING_MESSAGE_QUEUED
218 * Connection to the DHT service.
220 struct GNUNET_DHT_Handle
225 struct GNUNET_SCHEDULER_Handle *sched;
228 * Configuration to use.
230 const struct GNUNET_CONFIGURATION_Handle *cfg;
233 * Socket (if available).
235 struct GNUNET_CLIENT_Connection *client;
238 * Currently pending transmission request.
240 struct GNUNET_CLIENT_TransmitHandle *th;
243 * Message we are currently sending, only allow
244 * a single message to be queued. If not unique
245 * (typically a put request), await a confirmation
246 * from the service that the message was received.
247 * If unique, just fire and forget.
249 struct PendingMessage *current;
252 * Hash map containing the current outstanding unique requests
254 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
257 * Generator for unique ids.
262 * Are we currently retransmitting requests? If so queue a _single_
263 * new request when received.
265 enum DHT_Retransmit_Stage retransmit_stage;
268 * Linked list of retranmissions, to be used in the event
269 * of a dht service disconnect/reconnect.
271 struct PendingMessageList *retransmissions;
274 * A single pending message allowed to be scheduled
275 * during retransmission phase.
277 struct PendingMessage *retransmission_buffer;
282 * Convert unique ID to hash code.
284 * @param uid unique ID to convert
285 * @param hash set to uid (extended with zeros)
288 hash_from_uid (uint64_t uid,
289 GNUNET_HashCode *hash)
291 memset (hash, 0, sizeof(GNUNET_HashCode));
292 *((uint64_t*)hash) = uid;
296 * Iterator callback to retransmit each outstanding request
297 * because the connection to the DHT service went down (and
302 static int retransmit_iterator (void *cls,
303 const GNUNET_HashCode * key,
306 struct GNUNET_DHT_RouteHandle *route_handle = value;
307 struct PendingMessageList *pending_message_list;
309 pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
310 pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
311 pending_message_list->message->msg = &route_handle->message->header;
312 pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
313 pending_message_list->message->cont = NULL;
314 pending_message_list->message->cont_cls = NULL;
315 pending_message_list->message->unique_id = route_handle->uid;
316 /* Add the new pending message to the front of the retransmission list */
317 pending_message_list->next = route_handle->dht_handle->retransmissions;
323 * Try to (re)connect to the dht service.
325 * @return GNUNET_YES on success, GNUNET_NO on failure.
328 try_connect (struct GNUNET_DHT_Handle *handle)
330 if (handle->client != NULL)
332 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
333 if (handle->client != NULL)
336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337 _("Failed to connect to the dht service!\n"));
343 * Send complete (or failed), call continuation if we have one.
346 finish (struct GNUNET_DHT_Handle *handle, int code)
348 struct PendingMessage *pos = handle->current;
349 GNUNET_HashCode uid_hash;
350 hash_from_uid (pos->unique_id, &uid_hash);
352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
354 GNUNET_assert (pos != NULL);
356 if (pos->cont != NULL)
358 if (code == GNUNET_SYSERR)
359 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
361 GNUNET_SCHEDULER_REASON_TIMEOUT);
363 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
365 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
368 if (pos->unique_id == 0)
369 GNUNET_free(pos->msg);
371 handle->current = NULL;
375 * Transmit the next pending message, called by notify_transmit_ready
378 transmit_pending (void *cls, size_t size, void *buf)
380 struct GNUNET_DHT_Handle *handle = cls;
384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385 "`%s': In transmit_pending\n", "DHT API");
390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391 "`%s': In transmit_pending buf is NULL\n", "DHT API");
393 finish (handle, GNUNET_SYSERR);
399 if (handle->current != NULL)
401 tsize = ntohs (handle->current->msg->size);
405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406 "`%s': Sending message size %d\n", "DHT API", tsize);
408 memcpy (buf, handle->current->msg, tsize);
409 finish (handle, GNUNET_OK);
417 /* Have no pending request */
422 * Try to send messages from list of messages to send
425 process_pending_message (struct GNUNET_DHT_Handle *handle)
428 if (handle->current == NULL)
429 return; /* action already pending */
430 if (GNUNET_YES != try_connect (handle))
432 finish (handle, GNUNET_SYSERR);
437 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448 "Failed to transmit request to dht service.\n");
450 finish (handle, GNUNET_SYSERR);
454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455 "`%s': Scheduled sending message of size %d to service\n",
456 "DHT API", ntohs (handle->current->msg->size));
461 * Send complete (or failed), call continuation if we have one.
462 * Forward declaration.
465 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
467 /* Forward declaration */
469 transmit_pending_retransmission (void *cls, size_t size, void *buf);
472 * Try to send messages from list of messages to send
475 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
478 if (handle->current == NULL)
479 return; /* action already pending */
480 if (GNUNET_YES != try_connect (handle))
482 finish_retransmission (handle, GNUNET_SYSERR);
487 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
493 &transmit_pending_retransmission,
497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498 "Failed to transmit request to dht service.\n");
500 finish_retransmission (handle, GNUNET_SYSERR);
504 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
505 "`%s': Scheduled sending message of size %d to service\n",
506 "DHT API", ntohs (handle->current->msg->size));
511 * Send complete (or failed), call continuation if we have one.
514 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
516 struct PendingMessage *pos = handle->current;
517 struct PendingMessageList *pending_list;
519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
521 GNUNET_assert (pos == handle->retransmissions->message);
522 pending_list = handle->retransmissions;
523 handle->retransmissions = handle->retransmissions->next;
524 GNUNET_free (pending_list);
526 if (handle->retransmissions == NULL)
528 handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
531 if (handle->retransmissions != NULL)
533 handle->current = handle->retransmissions->message;
534 process_pending_retransmissions(handle);
536 else if (handle->retransmission_buffer != NULL)
538 handle->current = handle->retransmission_buffer;
539 process_pending_message(handle);
544 * Handler for messages received from the DHT service
545 * a demultiplexer which handles numerous message types
549 service_message_handler (void *cls,
550 const struct GNUNET_MessageHeader *msg)
552 struct GNUNET_DHT_Handle *handle = cls;
553 struct GNUNET_DHT_RouteResultMessage *dht_msg;
554 struct GNUNET_MessageHeader *enc_msg;
555 struct GNUNET_DHT_RouteHandle *route_handle;
557 GNUNET_HashCode uid_hash;
563 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564 "`%s': Received NULL from server, connection down!\n",
567 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
568 handle->client = GNUNET_CLIENT_connect (handle->sched,
572 handle->retransmit_stage = DHT_RETRANSMITTING;
573 GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle);
574 handle->current = handle->retransmissions->message;
575 process_pending_retransmissions(handle);
579 switch (ntohs (msg->type))
581 case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT:
583 dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
584 uid = GNUNET_ntohll (dht_msg->unique_id);
586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587 "`%s': Received response to message (uid %llu)\n",
591 hash_from_uid (uid, &uid_hash);
593 GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
595 if (route_handle == NULL) /* We have no recollection of this request */
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
606 ntohs (dht_msg->header.size) -
607 sizeof (struct GNUNET_DHT_RouteResultMessage);
608 GNUNET_assert (enc_size > 0);
609 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
610 route_handle->iter (route_handle->iter_cls, enc_msg);
617 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
618 "`%s': Received unknown message type %d\n", "DHT API",
622 GNUNET_CLIENT_receive (handle->client,
623 &service_message_handler,
624 handle, GNUNET_TIME_UNIT_FOREVER_REL);
630 * Initialize the connection with the DHT service.
632 * @param sched scheduler to use
633 * @param cfg configuration to use
634 * @param ht_len size of the internal hash table to use for
635 * processing multiple GET/FIND requests in parallel
637 * @return handle to the DHT service, or NULL on error
639 struct GNUNET_DHT_Handle *
640 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
641 const struct GNUNET_CONFIGURATION_Handle *cfg,
644 struct GNUNET_DHT_Handle *handle;
646 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
648 handle->sched = sched;
649 handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
650 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
651 if (handle->client == NULL)
653 GNUNET_free (handle);
656 handle->outstanding_requests =
657 GNUNET_CONTAINER_multihashmap_create (ht_len);
659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
660 "`%s': Connection to service in progress\n", "DHT API");
662 GNUNET_CLIENT_receive (handle->client,
663 &service_message_handler,
664 handle, GNUNET_TIME_UNIT_FOREVER_REL);
670 * Shutdown connection with the DHT service.
672 * @param handle handle of the DHT connection to stop
675 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
678 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
681 GNUNET_assert (handle != NULL);
682 if (handle->th != NULL) /* We have a live transmit request in the Aether */
684 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
687 if (handle->current != NULL) /* We are trying to send something now, clean it up */
688 GNUNET_free (handle->current);
690 if (handle->client != NULL) /* Finally, disconnect from the service */
692 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
693 handle->client = NULL;
696 GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
697 GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
698 GNUNET_free (handle);
703 * Transmit the next pending message, called by notify_transmit_ready
706 transmit_pending_retransmission (void *cls, size_t size, void *buf)
708 struct GNUNET_DHT_Handle *handle = cls;
712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713 "`%s': In transmit_pending\n", "DHT API");
718 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719 "`%s': In transmit_pending buf is NULL\n", "DHT API");
721 finish_retransmission (handle, GNUNET_SYSERR);
727 if (handle->current != NULL)
729 tsize = ntohs (handle->current->msg->size);
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "`%s': Sending message size %d\n", "DHT API", tsize);
736 memcpy (buf, handle->current->msg, tsize);
737 finish_retransmission (handle, GNUNET_OK);
745 /* Have no pending request */
751 * Iterator called on each result obtained from a generic route
755 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
757 struct GNUNET_DHT_GetHandle *get_handle = cls;
758 struct GNUNET_DHT_GetResultMessage *result;
762 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
765 GNUNET_assert (ntohs (reply->size) >=
766 sizeof (struct GNUNET_DHT_GetResultMessage));
767 result = (struct GNUNET_DHT_GetResultMessage *) reply;
768 data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
770 result_data = (char *) &result[1]; /* Set data pointer to end of message */
772 get_handle->get_context.iter (get_handle->get_context.iter_cls,
773 result->expiration, &result->key,
774 ntohs (result->type), data_size, result_data);
779 * Iterator called on each result obtained from a generic route
783 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
785 struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
786 struct GNUNET_MessageHeader *hello;
789 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "Received wrong type of response to a find peer request...\n");
797 GNUNET_assert (ntohs (reply->size) >=
798 sizeof (struct GNUNET_MessageHeader));
799 hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader);
800 hello = (struct GNUNET_MessageHeader *)&reply[1];
802 if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
804 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805 "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
808 find_peer_handle->find_peer_context.proc (find_peer_handle->
809 find_peer_context.proc_cls,
810 (struct GNUNET_HELLO_Message *)hello);
814 * Perform an asynchronous FIND_PEER operation on the DHT.
816 * @param handle handle to the DHT service
817 * @param key the key to look up
818 * @param desired_replication_level how many peers should ultimately receive
819 * this message (advisory only, target may be too high for the
820 * given DHT or not hit exactly).
821 * @param options options for routing
822 * @param enc send the encapsulated message to a peer close to the key
823 * @param iter function to call on each result, NULL if no replies are expected
824 * @param iter_cls closure for iter
825 * @param timeout when to abort with an error if we fail to get
826 * a confirmation for the request (when necessary) or how long
827 * to wait for tramission to the service
828 * @param cont continuation to call when done;
829 * reason will be TIMEOUT on error,
830 * reason will be PREREQ_DONE on success
831 * @param cont_cls closure for cont
833 * @return handle to stop the request, NULL if the request is "fire and forget"
835 struct GNUNET_DHT_RouteHandle *
836 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
837 const GNUNET_HashCode * key,
838 unsigned int desired_replication_level,
839 enum GNUNET_DHT_RouteOption options,
840 const struct GNUNET_MessageHeader *enc,
841 struct GNUNET_TIME_Relative timeout,
842 GNUNET_DHT_ReplyProcessor iter,
844 GNUNET_SCHEDULER_Task cont, void *cont_cls)
846 struct GNUNET_DHT_RouteHandle *route_handle;
847 struct PendingMessage *pending;
848 struct GNUNET_DHT_RouteMessage *message;
850 GNUNET_HashCode uid_key;
852 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
855 if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
861 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
862 memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
863 route_handle->iter = iter;
864 route_handle->iter_cls = iter_cls;
865 route_handle->dht_handle = handle;
868 route_handle->uid = handle->uid_gen++;
869 hash_from_uid (route_handle->uid, &uid_key);
870 GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
871 &uid_key, route_handle,
872 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
876 route_handle->uid = 0;
880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881 "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
884 msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
885 message = GNUNET_malloc (msize);
886 message->header.size = htons (msize);
887 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE);
888 memcpy (&message->key, key, sizeof (GNUNET_HashCode));
889 message->options = htonl (options);
890 message->desired_replication_level = htonl (options);
891 message->unique_id = GNUNET_htonll (route_handle->uid);
892 memcpy (&message[1], enc, ntohs (enc->size));
893 pending = GNUNET_malloc (sizeof (struct PendingMessage));
894 pending->msg = &message->header;
895 pending->timeout = timeout;
896 pending->cont = cont;
897 pending->cont_cls = cont_cls;
898 pending->unique_id = route_handle->uid;
899 if (handle->current == NULL)
901 handle->current = pending;
902 process_pending_message (handle);
904 else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING))
906 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
907 handle->retransmission_buffer = pending;
910 route_handle->message = message;
916 * Perform an asynchronous GET operation on the DHT identified.
918 * @param handle handle to the DHT service
919 * @param timeout how long to wait for transmission of this request to the service
920 * @param type expected type of the response object
921 * @param key the key to look up
922 * @param iter function to call on each result
923 * @param iter_cls closure for iter
924 * @param cont continuation to call once message sent
925 * @param cont_cls closure for continuation
927 * @return handle to stop the async get
929 struct GNUNET_DHT_GetHandle *
930 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
931 struct GNUNET_TIME_Relative timeout,
933 const GNUNET_HashCode * key,
934 GNUNET_DHT_GetIterator iter,
936 GNUNET_SCHEDULER_Task cont, void *cont_cls)
938 struct GNUNET_DHT_GetHandle *get_handle;
939 struct GNUNET_DHT_GetMessage get_msg;
941 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
944 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
945 get_handle->get_context.iter = iter;
946 get_handle->get_context.iter_cls = iter_cls;
949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950 "`%s': Inserting pending get request with key %s\n", "DHT API",
954 get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
955 get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
956 get_msg.type = htons (type);
958 get_handle->route_handle =
959 GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
960 &get_reply_iterator, get_handle, cont, cont_cls);
967 * Stop a previously issued routing request
969 * @param route_handle handle to the request to stop
970 * @param cont continuation to call once this message is sent to the service or times out
971 * @param cont_cls closure for the continuation
974 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
975 GNUNET_SCHEDULER_Task cont, void *cont_cls)
977 struct PendingMessage *pending;
978 struct GNUNET_DHT_StopMessage *message;
980 GNUNET_HashCode uid_key;
982 msize = sizeof (struct GNUNET_DHT_StopMessage);
983 message = GNUNET_malloc (msize);
984 message->header.size = htons (msize);
985 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
987 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988 "`%s': Remove outstanding request for uid %llu\n", "DHT API",
991 message->unique_id = GNUNET_htonll (route_handle->uid);
992 GNUNET_assert (route_handle->dht_handle->current == NULL);
993 pending = GNUNET_malloc (sizeof (struct PendingMessage));
994 pending->msg = (struct GNUNET_MessageHeader *) message;
995 pending->timeout = DEFAULT_DHT_TIMEOUT;
996 pending->cont = cont;
997 pending->cont_cls = cont_cls;
998 pending->unique_id = 0; /* When finished is called, free pending->msg */
1000 if (route_handle->dht_handle->current == NULL)
1002 route_handle->dht_handle->current = pending;
1003 process_pending_message (route_handle->dht_handle);
1005 else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING))
1007 route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1008 route_handle->dht_handle->retransmission_buffer = pending;
1015 hash_from_uid (route_handle->uid, &uid_key);
1016 GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1017 (route_handle->dht_handle->outstanding_requests, &uid_key,
1018 route_handle) == GNUNET_YES);
1020 GNUNET_free(route_handle->message);
1021 GNUNET_free(route_handle);
1026 * Stop async DHT-get.
1028 * @param get_handle handle to the GET operation to stop
1029 * @param cont continuation to call once this message is sent to the service or times out
1030 * @param cont_cls closure for the continuation
1033 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1034 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1036 if ((get_handle->route_handle->dht_handle->current != NULL) &&
1037 (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1041 GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1042 GNUNET_SCHEDULER_REASON_TIMEOUT);
1048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049 "`%s': Removing pending get request with key %s, uid %llu\n",
1050 "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1051 get_handle->route_handle->uid);
1053 GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1054 GNUNET_free (get_handle);
1059 * Perform an asynchronous FIND PEER operation on the DHT.
1061 * @param handle handle to the DHT service
1062 * @param timeout timeout for this request to be sent to the
1064 * @param options routing options for this message
1065 * @param key the key to look up
1066 * @param proc function to call on each result
1067 * @param proc_cls closure for proc
1068 * @param cont continuation to call once message sent
1069 * @param cont_cls closure for continuation
1071 * @return handle to stop the async get, NULL on error
1073 struct GNUNET_DHT_FindPeerHandle *
1074 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1075 struct GNUNET_TIME_Relative timeout,
1076 enum GNUNET_DHT_RouteOption options,
1077 const GNUNET_HashCode * key,
1078 GNUNET_DHT_FindPeerProcessor proc,
1080 GNUNET_SCHEDULER_Task cont,
1083 struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1084 struct GNUNET_MessageHeader find_peer_msg;
1086 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1090 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1091 find_peer_handle->find_peer_context.proc = proc;
1092 find_peer_handle->find_peer_context.proc_cls = proc_cls;
1095 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096 "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1097 "FIND PEER", GNUNET_h2s (key));
1100 find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
1101 find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1102 find_peer_handle->route_handle =
1103 GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
1104 timeout, &find_peer_reply_iterator,
1105 find_peer_handle, cont, cont_cls);
1106 return find_peer_handle;
1110 * Stop async find peer. Frees associated resources.
1112 * @param find_peer_handle GET operation to stop.
1113 * @param cont continuation to call once this message is sent to the service or times out
1114 * @param cont_cls closure for the continuation
1117 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1118 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1120 if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1121 (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1125 GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1126 GNUNET_SCHEDULER_REASON_TIMEOUT);
1132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133 "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1134 "DHT API", "FIND PEER",
1135 GNUNET_h2s (&find_peer_handle->route_handle->key),
1136 find_peer_handle->route_handle->uid);
1138 GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1139 GNUNET_free (find_peer_handle);
1145 * Perform a PUT operation storing data in the DHT.
1147 * @param handle handle to DHT service
1148 * @param key the key to store under
1149 * @param type type of the value
1150 * @param size number of bytes in data; must be less than 64k
1151 * @param data the data to store
1152 * @param exp desired expiration time for the value
1153 * @param timeout how long to wait for transmission of this request
1154 * @param cont continuation to call when done;
1155 * reason will be TIMEOUT on error,
1156 * reason will be PREREQ_DONE on success
1157 * @param cont_cls closure for cont
1159 * @return GNUNET_YES if put message is queued for transmission
1162 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1163 const GNUNET_HashCode * key,
1167 struct GNUNET_TIME_Absolute exp,
1168 struct GNUNET_TIME_Relative timeout,
1169 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1171 struct GNUNET_DHT_PutMessage *put_msg;
1172 struct GNUNET_DHT_RouteHandle *put_route;
1175 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1179 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1180 GNUNET_SCHEDULER_REASON_TIMEOUT);
1186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187 "`%s': Inserting pending put request with key %s\n", "DHT API",
1191 msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1192 put_msg = GNUNET_malloc (msize);
1193 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1194 put_msg->header.size = htons (msize);
1195 put_msg->type = htons (type);
1196 put_msg->data_size = htons (size);
1197 put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1198 memcpy (&put_msg[1], data, size);
1200 put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1201 NULL, cont, cont_cls);
1203 if (put_route == NULL) /* Route start failed! */
1207 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1208 GNUNET_SCHEDULER_REASON_TIMEOUT);
1212 GNUNET_free(put_route);
1214 GNUNET_free (put_msg);