2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
27 * TODO: retransmission of pending requests maybe happens now, at least
28 * the code is in place to do so. Need to add checks when api calls
29 * happen to check if retransmission is in progress, and if so set
30 * the single pending message for transmission once the list of
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
47 #define DEBUG_DHT_API GNUNET_NO
52 * Message that is pending
54 struct GNUNET_MessageHeader *msg;
57 * Timeout for this message
59 struct GNUNET_TIME_Relative timeout;
62 * Continuation to call on message send
63 * or message receipt confirmation
65 GNUNET_SCHEDULER_Task cont;
68 * Continuation closure
73 * Unique ID for this request
79 struct PendingMessageList
82 * This is a singly linked list.
84 struct PendingMessageList *next;
87 * The pending message.
89 struct PendingMessage *message;
92 struct GNUNET_DHT_GetContext
95 * Iterator to call on data receipt
97 GNUNET_DHT_GetIterator iter;
100 * Closure for the iterator callback
106 struct GNUNET_DHT_FindPeerContext
109 * Iterator to call on data receipt
111 GNUNET_DHT_FindPeerProcessor proc;
114 * Closure for the iterator callback
121 * Handle to a route request
123 struct GNUNET_DHT_RouteHandle
127 * Unique identifier for this request (for key collisions)
132 * Key that this get request is for
137 * Iterator to call on data receipt
139 GNUNET_DHT_ReplyProcessor iter;
142 * Closure for the iterator callback
147 * Main handle to this DHT api
149 struct GNUNET_DHT_Handle *dht_handle;
152 * The actual message sent for this request,
153 * used for retransmitting requests on service
154 * failure/reconnect. Freed on route_stop.
156 struct GNUNET_DHT_RouteMessage *message;
161 * Handle to control a get operation.
163 struct GNUNET_DHT_GetHandle
166 * Handle to the actual route operation for the get
168 struct GNUNET_DHT_RouteHandle *route_handle;
171 * The context of the get request
173 struct GNUNET_DHT_GetContext get_context;
178 * Handle to control a find peer operation.
180 struct GNUNET_DHT_FindPeerHandle
183 * Handle to the actual route operation for the request
185 struct GNUNET_DHT_RouteHandle *route_handle;
188 * The context of the find peer request
190 struct GNUNET_DHT_FindPeerContext find_peer_context;
194 enum DHT_Retransmit_Stage
197 * The API is not retransmitting anything at this time.
199 DHT_NOT_RETRANSMITTING,
202 * The API is retransmitting, and nothing has been single
203 * queued for sending.
208 * The API is retransmitting, and a single message has been
209 * queued for transmission once finished.
211 DHT_RETRANSMITTING_MESSAGE_QUEUED
216 * Connection to the DHT service.
218 struct GNUNET_DHT_Handle
223 struct GNUNET_SCHEDULER_Handle *sched;
226 * Configuration to use.
228 const struct GNUNET_CONFIGURATION_Handle *cfg;
231 * Socket (if available).
233 struct GNUNET_CLIENT_Connection *client;
236 * Currently pending transmission request.
238 struct GNUNET_CLIENT_TransmitHandle *th;
241 * Message we are currently sending, only allow
242 * a single message to be queued. If not unique
243 * (typically a put request), await a confirmation
244 * from the service that the message was received.
245 * If unique, just fire and forget.
247 struct PendingMessage *current;
250 * Hash map containing the current outstanding unique requests
252 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
255 * Generator for unique ids.
260 * Are we currently retransmitting requests? If so queue a _single_
261 * new request when received.
263 enum DHT_Retransmit_Stage retransmit_stage;
266 * Linked list of retranmissions, to be used in the event
267 * of a dht service disconnect/reconnect.
269 struct PendingMessageList *retransmissions;
272 * A single pending message allowed to be scheduled
273 * during retransmission phase.
275 struct PendingMessage *retransmission_buffer;
280 * Convert unique ID to hash code.
282 * @param uid unique ID to convert
283 * @param hash set to uid (extended with zeros)
286 hash_from_uid (uint64_t uid,
287 GNUNET_HashCode *hash)
289 memset (hash, 0, sizeof(GNUNET_HashCode));
290 *((uint64_t*)hash) = uid;
295 * Iterator callback to retransmit each outstanding request
296 * because the connection to the DHT service went down (and
301 static int retransmit_iterator (void *cls,
302 const GNUNET_HashCode * key,
305 struct GNUNET_DHT_RouteHandle *route_handle = value;
306 struct PendingMessageList *pending_message_list;
308 pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
309 pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
310 pending_message_list->message->msg = &route_handle->message->header;
311 pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
312 pending_message_list->message->cont = NULL;
313 pending_message_list->message->cont_cls = NULL;
314 pending_message_list->message->unique_id = route_handle->uid;
315 /* Add the new pending message to the front of the retransmission list */
316 pending_message_list->next = route_handle->dht_handle->retransmissions;
317 route_handle->dht_handle->retransmissions = pending_message_list;
324 * Try to (re)connect to the dht service.
326 * @return GNUNET_YES on success, GNUNET_NO on failure.
329 try_connect (struct GNUNET_DHT_Handle *handle)
331 if (handle->client != NULL)
333 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
334 if (handle->client != NULL)
337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338 _("Failed to connect to the dht service!\n"));
344 * Send complete (or failed), call continuation if we have one.
347 finish (struct GNUNET_DHT_Handle *handle, int code)
349 struct PendingMessage *pos = handle->current;
350 GNUNET_HashCode uid_hash;
352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
354 GNUNET_assert (pos != NULL);
355 hash_from_uid (pos->unique_id, &uid_hash);
356 if (pos->cont != NULL)
358 if (code == GNUNET_SYSERR)
359 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
361 GNUNET_SCHEDULER_REASON_TIMEOUT);
363 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
365 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
368 GNUNET_assert(handle->th == NULL);
369 if (pos->unique_id == 0)
370 GNUNET_free(pos->msg);
372 handle->current = NULL;
376 * Transmit the next pending message, called by notify_transmit_ready
379 transmit_pending (void *cls, size_t size, void *buf)
381 struct GNUNET_DHT_Handle *handle = cls;
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386 "`%s': In transmit_pending\n", "DHT API");
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "`%s': In transmit_pending buf is NULL\n", "DHT API");
394 finish (handle, GNUNET_SYSERR);
400 if (handle->current != NULL)
402 tsize = ntohs (handle->current->msg->size);
406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407 "`%s': Sending message size %d\n", "DHT API", tsize);
409 memcpy (buf, handle->current->msg, tsize);
410 finish (handle, GNUNET_OK);
418 /* Have no pending request */
423 * Try to send messages from list of messages to send
426 process_pending_message (struct GNUNET_DHT_Handle *handle)
429 if (handle->current == NULL)
430 return; /* action already pending */
431 if (GNUNET_YES != try_connect (handle))
433 finish (handle, GNUNET_SYSERR);
438 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
448 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449 "Failed to transmit request to dht service.\n");
451 finish (handle, GNUNET_SYSERR);
455 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456 "`%s': Scheduled sending message of size %d to service\n",
457 "DHT API", ntohs (handle->current->msg->size));
462 * Send complete (or failed), call continuation if we have one.
463 * Forward declaration.
466 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
468 /* Forward declaration */
470 transmit_pending_retransmission (void *cls, size_t size, void *buf);
473 * Try to send messages from list of messages to send
476 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
479 if (handle->current == NULL)
480 return; /* action already pending */
481 if (GNUNET_YES != try_connect (handle))
483 finish_retransmission (handle, GNUNET_SYSERR);
488 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
494 &transmit_pending_retransmission,
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499 "Failed to transmit request to dht service.\n");
501 finish_retransmission (handle, GNUNET_SYSERR);
505 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506 "`%s': Scheduled sending message of size %d to service\n",
507 "DHT API", ntohs (handle->current->msg->size));
512 * Send complete (or failed), call continuation if we have one.
515 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
517 struct PendingMessage *pos = handle->current;
518 struct PendingMessageList *pending_list;
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
522 GNUNET_assert (pos == handle->retransmissions->message);
523 pending_list = handle->retransmissions;
524 handle->retransmissions = handle->retransmissions->next;
525 GNUNET_free (pending_list);
527 if (handle->retransmissions == NULL)
529 handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
532 if (handle->retransmissions != NULL)
534 handle->current = handle->retransmissions->message;
535 process_pending_retransmissions(handle);
537 else if (handle->retransmission_buffer != NULL)
539 handle->current = handle->retransmission_buffer;
540 process_pending_message(handle);
545 * Handler for messages received from the DHT service
546 * a demultiplexer which handles numerous message types
550 service_message_handler (void *cls,
551 const struct GNUNET_MessageHeader *msg)
553 struct GNUNET_DHT_Handle *handle = cls;
554 struct GNUNET_DHT_RouteResultMessage *dht_msg;
555 struct GNUNET_MessageHeader *enc_msg;
556 struct GNUNET_DHT_RouteHandle *route_handle;
558 GNUNET_HashCode uid_hash;
564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565 "`%s': Received NULL from server, connection down!\n",
568 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
569 handle->client = GNUNET_CLIENT_connect (handle->sched,
572 if (handle->current != NULL)
575 finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
578 if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
580 handle->retransmit_stage = DHT_RETRANSMITTING;
581 handle->current = handle->retransmissions->message;
582 process_pending_retransmissions(handle);
588 switch (ntohs (msg->type))
590 case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
592 dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
593 uid = GNUNET_ntohll (dht_msg->unique_id);
595 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596 "`%s': Received response to message (uid %llu)\n",
600 hash_from_uid (uid, &uid_hash);
602 GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
604 if (route_handle == NULL) /* We have no recollection of this request */
607 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
608 "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
615 ntohs (dht_msg->header.size) -
616 sizeof (struct GNUNET_DHT_RouteResultMessage);
617 GNUNET_assert (enc_size > 0);
618 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
619 route_handle->iter (route_handle->iter_cls, enc_msg);
626 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
627 "`%s': Received unknown message type %d\n", "DHT API",
631 GNUNET_CLIENT_receive (handle->client,
632 &service_message_handler,
633 handle, GNUNET_TIME_UNIT_FOREVER_REL);
639 * Initialize the connection with the DHT service.
641 * @param sched scheduler to use
642 * @param cfg configuration to use
643 * @param ht_len size of the internal hash table to use for
644 * processing multiple GET/FIND requests in parallel
646 * @return handle to the DHT service, or NULL on error
648 struct GNUNET_DHT_Handle *
649 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
650 const struct GNUNET_CONFIGURATION_Handle *cfg,
653 struct GNUNET_DHT_Handle *handle;
655 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
657 handle->sched = sched;
658 handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
659 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
660 if (handle->client == NULL)
662 GNUNET_free (handle);
665 handle->outstanding_requests =
666 GNUNET_CONTAINER_multihashmap_create (ht_len);
668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669 "`%s': Connection to service in progress\n", "DHT API");
671 GNUNET_CLIENT_receive (handle->client,
672 &service_message_handler,
673 handle, GNUNET_TIME_UNIT_FOREVER_REL);
679 * Shutdown connection with the DHT service.
681 * @param handle handle of the DHT connection to stop
684 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
690 GNUNET_assert (handle != NULL);
691 if (handle->th != NULL) /* We have a live transmit request */
693 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
696 if (handle->current != NULL) /* We are trying to send something now, clean it up */
697 GNUNET_free (handle->current);
699 if (handle->client != NULL) /* Finally, disconnect from the service */
701 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
702 handle->client = NULL;
705 GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
706 GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
707 GNUNET_free (handle);
712 * Transmit the next pending message, called by notify_transmit_ready
715 transmit_pending_retransmission (void *cls, size_t size, void *buf)
717 struct GNUNET_DHT_Handle *handle = cls;
721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722 "`%s': In transmit_pending\n", "DHT API");
727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728 "`%s': In transmit_pending buf is NULL\n", "DHT API");
730 finish_retransmission (handle, GNUNET_SYSERR);
736 if (handle->current != NULL)
738 tsize = ntohs (handle->current->msg->size);
742 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743 "`%s': Sending message size %d\n", "DHT API", tsize);
745 memcpy (buf, handle->current->msg, tsize);
746 finish_retransmission (handle, GNUNET_OK);
754 /* Have no pending request */
760 * Iterator called on each result obtained from a generic route
764 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
766 struct GNUNET_DHT_GetHandle *get_handle = cls;
767 struct GNUNET_DHT_GetResultMessage *result;
771 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
774 GNUNET_assert (ntohs (reply->size) >=
775 sizeof (struct GNUNET_DHT_GetResultMessage));
776 result = (struct GNUNET_DHT_GetResultMessage *) reply;
777 data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
779 result_data = (char *) &result[1]; /* Set data pointer to end of message */
781 get_handle->get_context.iter (get_handle->get_context.iter_cls,
782 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
783 ntohs (result->type), data_size, result_data);
788 * Iterator called on each result obtained from a generic route
792 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
794 struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
795 struct GNUNET_MessageHeader *hello;
797 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800 "Received wrong type of response to a find peer request...\n");
805 GNUNET_assert (ntohs (reply->size) >=
806 sizeof (struct GNUNET_MessageHeader));
807 hello = (struct GNUNET_MessageHeader *)&reply[1];
809 if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
815 find_peer_handle->find_peer_context.proc (find_peer_handle->
816 find_peer_context.proc_cls,
817 (struct GNUNET_HELLO_Message *)hello);
821 * Perform an asynchronous FIND_PEER operation on the DHT.
823 * @param handle handle to the DHT service
824 * @param key the key to look up
825 * @param desired_replication_level how many peers should ultimately receive
826 * this message (advisory only, target may be too high for the
827 * given DHT or not hit exactly).
828 * @param options options for routing
829 * @param enc send the encapsulated message to a peer close to the key
830 * @param iter function to call on each result, NULL if no replies are expected
831 * @param iter_cls closure for iter
832 * @param timeout when to abort with an error if we fail to get
833 * a confirmation for the request (when necessary) or how long
834 * to wait for tramission to the service
835 * @param cont continuation to call when done;
836 * reason will be TIMEOUT on error,
837 * reason will be PREREQ_DONE on success
838 * @param cont_cls closure for cont
840 * @return handle to stop the request, NULL if the request is "fire and forget"
842 struct GNUNET_DHT_RouteHandle *
843 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
844 const GNUNET_HashCode * key,
845 unsigned int desired_replication_level,
846 enum GNUNET_DHT_RouteOption options,
847 const struct GNUNET_MessageHeader *enc,
848 struct GNUNET_TIME_Relative timeout,
849 GNUNET_DHT_ReplyProcessor iter,
851 GNUNET_SCHEDULER_Task cont, void *cont_cls)
853 struct GNUNET_DHT_RouteHandle *route_handle;
854 struct PendingMessage *pending;
855 struct GNUNET_DHT_RouteMessage *message;
857 GNUNET_HashCode uid_key;
859 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
862 if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
868 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
869 memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
870 route_handle->iter = iter;
871 route_handle->iter_cls = iter_cls;
872 route_handle->dht_handle = handle;
873 route_handle->uid = handle->uid_gen++;
876 hash_from_uid (route_handle->uid, &uid_key);
877 GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
878 &uid_key, route_handle,
879 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884 "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
887 msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
888 message = GNUNET_malloc (msize);
889 message->header.size = htons (msize);
890 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
891 memcpy (&message->key, key, sizeof (GNUNET_HashCode));
892 message->options = htonl (options);
893 message->desired_replication_level = htonl (options);
894 message->unique_id = GNUNET_htonll (route_handle->uid);
895 memcpy (&message[1], enc, ntohs (enc->size));
896 pending = GNUNET_malloc (sizeof (struct PendingMessage));
897 pending->msg = &message->header;
898 pending->timeout = timeout;
899 pending->cont = cont;
900 pending->cont_cls = cont_cls;
901 pending->unique_id = route_handle->uid;
902 if (handle->current == NULL)
904 handle->current = pending;
905 process_pending_message (handle);
909 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
910 handle->retransmission_buffer = pending;
913 route_handle->message = message;
919 * Perform an asynchronous GET operation on the DHT identified.
921 * @param handle handle to the DHT service
922 * @param timeout how long to wait for transmission of this request to the service
923 * @param type expected type of the response object
924 * @param key the key to look up
925 * @param iter function to call on each result
926 * @param iter_cls closure for iter
927 * @param cont continuation to call once message sent
928 * @param cont_cls closure for continuation
930 * @return handle to stop the async get
932 struct GNUNET_DHT_GetHandle *
933 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
934 struct GNUNET_TIME_Relative timeout,
936 const GNUNET_HashCode * key,
937 GNUNET_DHT_GetIterator iter,
939 GNUNET_SCHEDULER_Task cont, void *cont_cls)
941 struct GNUNET_DHT_GetHandle *get_handle;
942 struct GNUNET_DHT_GetMessage get_msg;
944 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
947 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
948 get_handle->get_context.iter = iter;
949 get_handle->get_context.iter_cls = iter_cls;
952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953 "`%s': Inserting pending get request with key %s\n", "DHT API",
957 get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
958 get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
959 get_msg.type = htons (type);
961 get_handle->route_handle =
962 GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
963 &get_reply_iterator, get_handle, cont, cont_cls);
970 * Stop a previously issued routing request
972 * @param route_handle handle to the request to stop
973 * @param cont continuation to call once this message is sent to the service or times out
974 * @param cont_cls closure for the continuation
977 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
978 GNUNET_SCHEDULER_Task cont, void *cont_cls)
980 struct PendingMessage *pending;
981 struct GNUNET_DHT_StopMessage *message;
983 GNUNET_HashCode uid_key;
985 msize = sizeof (struct GNUNET_DHT_StopMessage);
986 message = GNUNET_malloc (msize);
987 message->header.size = htons (msize);
988 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991 "`%s': Remove outstanding request for uid %llu\n", "DHT API",
994 message->unique_id = GNUNET_htonll (route_handle->uid);
995 memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
996 pending = GNUNET_malloc (sizeof (struct PendingMessage));
997 pending->msg = (struct GNUNET_MessageHeader *) message;
998 pending->timeout = GNUNET_TIME_relative_get_forever();
999 pending->cont = cont;
1000 pending->cont_cls = cont_cls;
1001 pending->unique_id = 0; /* When finished is called, free pending->msg */
1003 if (route_handle->dht_handle->current == NULL)
1005 route_handle->dht_handle->current = pending;
1006 process_pending_message (route_handle->dht_handle);
1008 else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1010 route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1011 route_handle->dht_handle->retransmission_buffer = pending;
1015 GNUNET_free(pending);
1019 hash_from_uid (route_handle->uid, &uid_key);
1020 GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1021 (route_handle->dht_handle->outstanding_requests, &uid_key,
1022 route_handle) == GNUNET_YES);
1024 GNUNET_free(route_handle->message);
1025 GNUNET_free(route_handle);
1030 * Stop async DHT-get.
1032 * @param get_handle handle to the GET operation to stop
1033 * @param cont continuation to call once this message is sent to the service or times out
1034 * @param cont_cls closure for the continuation
1037 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1038 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1040 if ((get_handle->route_handle->dht_handle->current != NULL) &&
1041 (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1045 GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1046 GNUNET_SCHEDULER_REASON_TIMEOUT);
1052 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1053 "`%s': Removing pending get request with key %s, uid %llu\n",
1054 "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1055 get_handle->route_handle->uid);
1057 GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1058 GNUNET_free (get_handle);
1063 * Perform an asynchronous FIND PEER operation on the DHT.
1065 * @param handle handle to the DHT service
1066 * @param timeout timeout for this request to be sent to the
1068 * @param options routing options for this message
1069 * @param key the key to look up
1070 * @param proc function to call on each result
1071 * @param proc_cls closure for proc
1072 * @param cont continuation to call once message sent
1073 * @param cont_cls closure for continuation
1075 * @return handle to stop the async get, NULL on error
1077 struct GNUNET_DHT_FindPeerHandle *
1078 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1079 struct GNUNET_TIME_Relative timeout,
1080 enum GNUNET_DHT_RouteOption options,
1081 const GNUNET_HashCode * key,
1082 GNUNET_DHT_FindPeerProcessor proc,
1084 GNUNET_SCHEDULER_Task cont,
1087 struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1088 struct GNUNET_MessageHeader find_peer_msg;
1090 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1094 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1095 find_peer_handle->find_peer_context.proc = proc;
1096 find_peer_handle->find_peer_context.proc_cls = proc_cls;
1099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1100 "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1101 "FIND PEER", GNUNET_h2s (key));
1104 find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
1105 find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1106 find_peer_handle->route_handle =
1107 GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
1108 timeout, &find_peer_reply_iterator,
1109 find_peer_handle, cont, cont_cls);
1110 return find_peer_handle;
1114 * Stop async find peer. Frees associated resources.
1116 * @param find_peer_handle GET operation to stop.
1117 * @param cont continuation to call once this message is sent to the service or times out
1118 * @param cont_cls closure for the continuation
1121 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1122 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1124 if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1125 (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1129 GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1130 GNUNET_SCHEDULER_REASON_TIMEOUT);
1136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1138 "DHT API", "FIND PEER",
1139 GNUNET_h2s (&find_peer_handle->route_handle->key),
1140 find_peer_handle->route_handle->uid);
1142 GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1143 GNUNET_free (find_peer_handle);
1149 * Perform a PUT operation storing data in the DHT.
1151 * @param handle handle to DHT service
1152 * @param key the key to store under
1153 * @param type type of the value
1154 * @param size number of bytes in data; must be less than 64k
1155 * @param data the data to store
1156 * @param exp desired expiration time for the value
1157 * @param timeout how long to wait for transmission of this request
1158 * @param cont continuation to call when done;
1159 * reason will be TIMEOUT on error,
1160 * reason will be PREREQ_DONE on success
1161 * @param cont_cls closure for cont
1163 * @return GNUNET_YES if put message is queued for transmission
1166 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1167 const GNUNET_HashCode * key,
1171 struct GNUNET_TIME_Absolute exp,
1172 struct GNUNET_TIME_Relative timeout,
1173 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1175 struct GNUNET_DHT_PutMessage *put_msg;
1176 struct GNUNET_DHT_RouteHandle *put_route;
1179 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1181 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
1184 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1185 GNUNET_SCHEDULER_REASON_TIMEOUT);
1191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192 "`%s': Inserting pending put request with key %s\n", "DHT API",
1196 msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1197 put_msg = GNUNET_malloc (msize);
1198 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1199 put_msg->header.size = htons (msize);
1200 put_msg->type = htons (type);
1201 put_msg->data_size = htons (size);
1202 put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1203 memcpy (&put_msg[1], data, size);
1205 put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1206 NULL, cont, cont_cls);
1208 if (put_route == NULL) /* Route start failed! */
1210 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1213 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1214 GNUNET_SCHEDULER_REASON_TIMEOUT);
1218 GNUNET_free(put_route);
1220 GNUNET_free (put_msg);