2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
27 * TODO: retransmission of pending requests maybe happens now, at least
28 * the code is in place to do so. Need to add checks when api calls
29 * happen to check if retransmission is in progress, and if so set
30 * the single pending message for transmission once the list of
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
47 #define DEBUG_DHT_API GNUNET_NO
52 * Message that is pending
54 struct GNUNET_MessageHeader *msg;
57 * Timeout for this message
59 struct GNUNET_TIME_Relative timeout;
62 * Continuation to call on message send
63 * or message receipt confirmation
65 GNUNET_SCHEDULER_Task cont;
68 * Continuation closure
73 * Unique ID for this request
78 * Free the saved message once sent, set
79 * to GNUNET_YES for messages that don't
86 struct PendingMessageList
89 * This is a singly linked list.
91 struct PendingMessageList *next;
94 * The pending message.
96 struct PendingMessage *message;
99 struct GNUNET_DHT_GetContext
102 * Iterator to call on data receipt
104 GNUNET_DHT_GetIterator iter;
107 * Closure for the iterator callback
113 struct GNUNET_DHT_FindPeerContext
116 * Iterator to call on data receipt
118 GNUNET_DHT_FindPeerProcessor proc;
121 * Closure for the iterator callback
128 * Handle to a route request
130 struct GNUNET_DHT_RouteHandle
134 * Unique identifier for this request (for key collisions)
139 * Key that this get request is for
144 * Iterator to call on data receipt
146 GNUNET_DHT_ReplyProcessor iter;
149 * Closure for the iterator callback
154 * Main handle to this DHT api
156 struct GNUNET_DHT_Handle *dht_handle;
159 * The actual message sent for this request,
160 * used for retransmitting requests on service
161 * failure/reconnect. Freed on route_stop.
163 struct GNUNET_DHT_RouteMessage *message;
168 * Handle to control a get operation.
170 struct GNUNET_DHT_GetHandle
173 * Handle to the actual route operation for the get
175 struct GNUNET_DHT_RouteHandle *route_handle;
178 * The context of the get request
180 struct GNUNET_DHT_GetContext get_context;
185 * Handle to control a find peer operation.
187 struct GNUNET_DHT_FindPeerHandle
190 * Handle to the actual route operation for the request
192 struct GNUNET_DHT_RouteHandle *route_handle;
195 * The context of the find peer request
197 struct GNUNET_DHT_FindPeerContext find_peer_context;
201 enum DHT_Retransmit_Stage
204 * The API is not retransmitting anything at this time.
206 DHT_NOT_RETRANSMITTING,
209 * The API is retransmitting, and nothing has been single
210 * queued for sending.
215 * The API is retransmitting, and a single message has been
216 * queued for transmission once finished.
218 DHT_RETRANSMITTING_MESSAGE_QUEUED
223 * Connection to the DHT service.
225 struct GNUNET_DHT_Handle
230 struct GNUNET_SCHEDULER_Handle *sched;
233 * Configuration to use.
235 const struct GNUNET_CONFIGURATION_Handle *cfg;
238 * Socket (if available).
240 struct GNUNET_CLIENT_Connection *client;
243 * Currently pending transmission request.
245 struct GNUNET_CLIENT_TransmitHandle *th;
248 * Message we are currently sending, only allow
249 * a single message to be queued. If not unique
250 * (typically a put request), await a confirmation
251 * from the service that the message was received.
252 * If unique, just fire and forget.
254 struct PendingMessage *current;
257 * Hash map containing the current outstanding unique requests
259 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
262 * Generator for unique ids.
267 * Are we currently retransmitting requests? If so queue a _single_
268 * new request when received.
270 enum DHT_Retransmit_Stage retransmit_stage;
273 * Linked list of retranmissions, to be used in the event
274 * of a dht service disconnect/reconnect.
276 struct PendingMessageList *retransmissions;
279 * A single pending message allowed to be scheduled
280 * during retransmission phase.
282 struct PendingMessage *retransmission_buffer;
287 * Convert unique ID to hash code.
289 * @param uid unique ID to convert
290 * @param hash set to uid (extended with zeros)
293 hash_from_uid (uint64_t uid,
294 GNUNET_HashCode *hash)
296 memset (hash, 0, sizeof(GNUNET_HashCode));
297 *((uint64_t*)hash) = uid;
302 * Iterator callback to retransmit each outstanding request
303 * because the connection to the DHT service went down (and
308 static int retransmit_iterator (void *cls,
309 const GNUNET_HashCode * key,
312 struct GNUNET_DHT_RouteHandle *route_handle = value;
313 struct PendingMessageList *pending_message_list;
315 pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
316 pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
317 pending_message_list->message->msg = &route_handle->message->header;
318 pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
319 pending_message_list->message->cont = NULL;
320 pending_message_list->message->cont_cls = NULL;
321 pending_message_list->message->unique_id = route_handle->uid;
322 /* Add the new pending message to the front of the retransmission list */
323 pending_message_list->next = route_handle->dht_handle->retransmissions;
324 route_handle->dht_handle->retransmissions = pending_message_list;
331 * Try to (re)connect to the dht service.
333 * @return GNUNET_YES on success, GNUNET_NO on failure.
336 try_connect (struct GNUNET_DHT_Handle *handle)
338 if (handle->client != NULL)
340 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
341 if (handle->client != NULL)
344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
345 _("Failed to connect to the dht service!\n"));
351 * Send complete (or failed), call continuation if we have one.
354 finish (struct GNUNET_DHT_Handle *handle, int code)
356 struct PendingMessage *pos = handle->current;
357 GNUNET_HashCode uid_hash;
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
361 GNUNET_assert (pos != NULL);
362 hash_from_uid (pos->unique_id, &uid_hash);
363 if (pos->cont != NULL)
365 if (code == GNUNET_SYSERR)
366 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
368 GNUNET_SCHEDULER_REASON_TIMEOUT);
370 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
372 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
375 GNUNET_assert(handle->th == NULL);
376 if (pos->free_on_send == GNUNET_YES)
377 GNUNET_free(pos->msg);
379 handle->current = NULL;
383 * Transmit the next pending message, called by notify_transmit_ready
386 transmit_pending (void *cls, size_t size, void *buf)
388 struct GNUNET_DHT_Handle *handle = cls;
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "`%s': In transmit_pending\n", "DHT API");
398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399 "`%s': In transmit_pending buf is NULL\n", "DHT API");
401 finish (handle, GNUNET_SYSERR);
407 if (handle->current != NULL)
409 tsize = ntohs (handle->current->msg->size);
413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414 "`%s': Sending message size %d\n", "DHT API", tsize);
416 memcpy (buf, handle->current->msg, tsize);
417 finish (handle, GNUNET_OK);
425 /* Have no pending request */
430 * Try to send messages from list of messages to send
433 process_pending_message (struct GNUNET_DHT_Handle *handle)
436 if (handle->current == NULL)
437 return; /* action already pending */
438 if (GNUNET_YES != try_connect (handle))
440 finish (handle, GNUNET_SYSERR);
445 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
455 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456 "Failed to transmit request to dht service.\n");
458 finish (handle, GNUNET_SYSERR);
462 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463 "`%s': Scheduled sending message of size %d to service\n",
464 "DHT API", ntohs (handle->current->msg->size));
469 * Send complete (or failed), call continuation if we have one.
470 * Forward declaration.
473 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
475 /* Forward declaration */
477 transmit_pending_retransmission (void *cls, size_t size, void *buf);
480 * Try to send messages from list of messages to send
483 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
486 if (handle->current == NULL)
487 return; /* action already pending */
488 if (GNUNET_YES != try_connect (handle))
490 finish_retransmission (handle, GNUNET_SYSERR);
495 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
501 &transmit_pending_retransmission,
505 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506 "Failed to transmit request to dht service.\n");
508 finish_retransmission (handle, GNUNET_SYSERR);
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513 "`%s': Scheduled sending message of size %d to service\n",
514 "DHT API", ntohs (handle->current->msg->size));
519 * Send complete (or failed), call continuation if we have one.
522 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
524 struct PendingMessage *pos = handle->current;
525 struct PendingMessageList *pending_list;
527 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
529 GNUNET_assert (pos == handle->retransmissions->message);
530 pending_list = handle->retransmissions;
531 handle->retransmissions = handle->retransmissions->next;
532 GNUNET_free (pending_list);
534 if (handle->retransmissions == NULL)
536 handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
539 if (handle->retransmissions != NULL)
541 handle->current = handle->retransmissions->message;
542 process_pending_retransmissions(handle);
544 else if (handle->retransmission_buffer != NULL)
546 handle->current = handle->retransmission_buffer;
547 process_pending_message(handle);
552 * Handler for messages received from the DHT service
553 * a demultiplexer which handles numerous message types
557 service_message_handler (void *cls,
558 const struct GNUNET_MessageHeader *msg)
560 struct GNUNET_DHT_Handle *handle = cls;
561 struct GNUNET_DHT_RouteResultMessage *dht_msg;
562 struct GNUNET_MessageHeader *enc_msg;
563 struct GNUNET_DHT_RouteHandle *route_handle;
565 GNUNET_HashCode uid_hash;
571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
572 "`%s': Received NULL from server, connection down!\n",
575 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
576 handle->client = GNUNET_CLIENT_connect (handle->sched,
579 if (handle->current != NULL)
582 finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
585 if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
587 handle->retransmit_stage = DHT_RETRANSMITTING;
588 handle->current = handle->retransmissions->message;
589 process_pending_retransmissions(handle);
595 switch (ntohs (msg->type))
597 case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
599 dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
600 uid = GNUNET_ntohll (dht_msg->unique_id);
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "`%s': Received response to message (uid %llu)\n",
607 hash_from_uid (uid, &uid_hash);
609 GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
611 if (route_handle == NULL) /* We have no recollection of this request */
614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615 "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
622 ntohs (dht_msg->header.size) -
623 sizeof (struct GNUNET_DHT_RouteResultMessage);
624 GNUNET_assert (enc_size > 0);
625 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
626 route_handle->iter (route_handle->iter_cls, enc_msg);
633 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
634 "`%s': Received unknown message type %d\n", "DHT API",
638 GNUNET_CLIENT_receive (handle->client,
639 &service_message_handler,
640 handle, GNUNET_TIME_UNIT_FOREVER_REL);
646 * Initialize the connection with the DHT service.
648 * @param sched scheduler to use
649 * @param cfg configuration to use
650 * @param ht_len size of the internal hash table to use for
651 * processing multiple GET/FIND requests in parallel
653 * @return handle to the DHT service, or NULL on error
655 struct GNUNET_DHT_Handle *
656 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
657 const struct GNUNET_CONFIGURATION_Handle *cfg,
660 struct GNUNET_DHT_Handle *handle;
662 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
664 handle->sched = sched;
665 handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
666 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
667 if (handle->client == NULL)
669 GNUNET_free (handle);
672 handle->outstanding_requests =
673 GNUNET_CONTAINER_multihashmap_create (ht_len);
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "`%s': Connection to service in progress\n", "DHT API");
678 GNUNET_CLIENT_receive (handle->client,
679 &service_message_handler,
680 handle, GNUNET_TIME_UNIT_FOREVER_REL);
686 * Shutdown connection with the DHT service.
688 * @param handle handle of the DHT connection to stop
691 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
697 GNUNET_assert (handle != NULL);
698 if (handle->th != NULL) /* We have a live transmit request */
700 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
703 if (handle->current != NULL) /* We are trying to send something now, clean it up */
704 GNUNET_free (handle->current);
706 if (handle->client != NULL) /* Finally, disconnect from the service */
708 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
709 handle->client = NULL;
712 GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
713 GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
714 GNUNET_free (handle);
719 * Transmit the next pending message, called by notify_transmit_ready
722 transmit_pending_retransmission (void *cls, size_t size, void *buf)
724 struct GNUNET_DHT_Handle *handle = cls;
728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729 "`%s': In transmit_pending\n", "DHT API");
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "`%s': In transmit_pending buf is NULL\n", "DHT API");
737 finish_retransmission (handle, GNUNET_SYSERR);
743 if (handle->current != NULL)
745 tsize = ntohs (handle->current->msg->size);
749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750 "`%s': Sending message size %d\n", "DHT API", tsize);
752 memcpy (buf, handle->current->msg, tsize);
753 finish_retransmission (handle, GNUNET_OK);
761 /* Have no pending request */
767 * Iterator called on each result obtained from a generic route
771 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
773 struct GNUNET_DHT_GetHandle *get_handle = cls;
774 struct GNUNET_DHT_GetResultMessage *result;
778 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
781 GNUNET_assert (ntohs (reply->size) >=
782 sizeof (struct GNUNET_DHT_GetResultMessage));
783 result = (struct GNUNET_DHT_GetResultMessage *) reply;
784 data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
786 result_data = (char *) &result[1]; /* Set data pointer to end of message */
788 get_handle->get_context.iter (get_handle->get_context.iter_cls,
789 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
790 ntohs (result->type), data_size, result_data);
795 * Iterator called on each result obtained from a generic route
799 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
801 struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
802 struct GNUNET_MessageHeader *hello;
804 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "Received wrong type of response to a find peer request...\n");
812 GNUNET_assert (ntohs (reply->size) >=
813 sizeof (struct GNUNET_MessageHeader));
814 hello = (struct GNUNET_MessageHeader *)&reply[1];
816 if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819 "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
822 find_peer_handle->find_peer_context.proc (find_peer_handle->
823 find_peer_context.proc_cls,
824 (struct GNUNET_HELLO_Message *)hello);
828 * Send a message to the DHT telling it to start issuing random GET
829 * requests every 'frequency' milliseconds.
831 * @param handle handle to the DHT service
832 * @param frequency delay (in milliseconds) between sending malicious messages
833 * @param cont continuation to call once the message is sent
834 * @param cont_cls closure for continuation
836 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
838 int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
840 struct GNUNET_DHT_ControlMessage *msg;
841 struct PendingMessage *pending;
843 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
846 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
847 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
848 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
849 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
850 msg->variable = htons(frequency);
852 pending = GNUNET_malloc (sizeof (struct PendingMessage));
853 pending->msg = &msg->header;
854 pending->timeout = GNUNET_TIME_relative_get_forever();
855 pending->free_on_send = GNUNET_YES;
856 pending->cont = cont;
857 pending->cont_cls = cont_cls;
858 pending->unique_id = 0;
860 if (handle->current == NULL)
862 handle->current = pending;
863 process_pending_message (handle);
867 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
868 handle->retransmission_buffer = pending;
875 * Send a message to the DHT telling it to start issuing random PUT
876 * requests every 'frequency' milliseconds.
878 * @param handle handle to the DHT service
879 * @param frequency delay (in milliseconds) between sending malicious messages
880 * @param cont continuation to call once the message is sent
881 * @param cont_cls closure for continuation
883 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
885 int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
887 struct GNUNET_DHT_ControlMessage *msg;
888 struct PendingMessage *pending;
890 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
893 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
894 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
895 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
896 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
897 msg->variable = htons(frequency);
899 pending = GNUNET_malloc (sizeof (struct PendingMessage));
900 pending->msg = &msg->header;
901 pending->timeout = GNUNET_TIME_relative_get_forever();
902 pending->free_on_send = GNUNET_YES;
903 pending->cont = cont;
904 pending->cont_cls = cont_cls;
905 pending->unique_id = 0;
907 if (handle->current == NULL)
909 handle->current = pending;
910 process_pending_message (handle);
914 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
915 handle->retransmission_buffer = pending;
922 * Send a message to the DHT telling it to start dropping
923 * all requests received.
925 * @param handle handle to the DHT service
926 * @param cont continuation to call once the message is sent
927 * @param cont_cls closure for continuation
929 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
931 int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
933 struct GNUNET_DHT_ControlMessage *msg;
934 struct PendingMessage *pending;
936 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
939 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
940 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
941 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
942 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
943 msg->variable = htons(0);
945 pending = GNUNET_malloc (sizeof (struct PendingMessage));
946 pending->msg = &msg->header;
947 pending->timeout = GNUNET_TIME_relative_get_forever();
948 pending->free_on_send = GNUNET_YES;
949 pending->cont = cont;
950 pending->cont_cls = cont_cls;
951 pending->unique_id = 0;
953 if (handle->current == NULL)
955 handle->current = pending;
956 process_pending_message (handle);
960 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
961 handle->retransmission_buffer = pending;
969 * Initiate a generic DHT route operation.
971 * @param handle handle to the DHT service
972 * @param key the key to look up
973 * @param desired_replication_level how many peers should ultimately receive
974 * this message (advisory only, target may be too high for the
975 * given DHT or not hit exactly).
976 * @param options options for routing
977 * @param enc send the encapsulated message to a peer close to the key
978 * @param iter function to call on each result, NULL if no replies are expected
979 * @param iter_cls closure for iter
980 * @param timeout when to abort with an error if we fail to get
981 * a confirmation for the request (when necessary) or how long
982 * to wait for tramission to the service
983 * @param cont continuation to call when done;
984 * reason will be TIMEOUT on error,
985 * reason will be PREREQ_DONE on success
986 * @param cont_cls closure for cont
988 * @return handle to stop the request, NULL if the request is "fire and forget"
990 struct GNUNET_DHT_RouteHandle *
991 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
992 const GNUNET_HashCode * key,
993 unsigned int desired_replication_level,
994 enum GNUNET_DHT_RouteOption options,
995 const struct GNUNET_MessageHeader *enc,
996 struct GNUNET_TIME_Relative timeout,
997 GNUNET_DHT_ReplyProcessor iter,
999 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1001 struct GNUNET_DHT_RouteHandle *route_handle;
1002 struct PendingMessage *pending;
1003 struct GNUNET_DHT_RouteMessage *message;
1005 GNUNET_HashCode uid_key;
1007 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1010 if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1016 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
1017 memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
1018 route_handle->iter = iter;
1019 route_handle->iter_cls = iter_cls;
1020 route_handle->dht_handle = handle;
1021 route_handle->uid = handle->uid_gen++;
1024 hash_from_uid (route_handle->uid, &uid_key);
1025 GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
1026 &uid_key, route_handle,
1027 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1031 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032 "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
1035 msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
1036 message = GNUNET_malloc (msize);
1037 message->header.size = htons (msize);
1038 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
1039 memcpy (&message->key, key, sizeof (GNUNET_HashCode));
1040 message->options = htonl (options);
1041 message->desired_replication_level = htonl (options);
1042 message->unique_id = GNUNET_htonll (route_handle->uid);
1043 memcpy (&message[1], enc, ntohs (enc->size));
1044 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1045 pending->msg = &message->header;
1046 pending->timeout = timeout;
1048 pending->free_on_send = GNUNET_YES;
1049 pending->cont = cont;
1050 pending->cont_cls = cont_cls;
1051 pending->unique_id = route_handle->uid;
1052 if (handle->current == NULL)
1054 handle->current = pending;
1055 process_pending_message (handle);
1059 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1060 handle->retransmission_buffer = pending;
1063 route_handle->message = message;
1064 return route_handle;
1069 * Perform an asynchronous GET operation on the DHT identified.
1071 * @param handle handle to the DHT service
1072 * @param timeout how long to wait for transmission of this request to the service
1073 * @param type expected type of the response object
1074 * @param key the key to look up
1075 * @param iter function to call on each result
1076 * @param iter_cls closure for iter
1077 * @param cont continuation to call once message sent
1078 * @param cont_cls closure for continuation
1080 * @return handle to stop the async get
1082 struct GNUNET_DHT_GetHandle *
1083 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1084 struct GNUNET_TIME_Relative timeout,
1086 const GNUNET_HashCode * key,
1087 GNUNET_DHT_GetIterator iter,
1089 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1091 struct GNUNET_DHT_GetHandle *get_handle;
1092 struct GNUNET_DHT_GetMessage get_msg;
1094 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1097 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1098 get_handle->get_context.iter = iter;
1099 get_handle->get_context.iter_cls = iter_cls;
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "`%s': Inserting pending get request with key %s\n", "DHT API",
1107 get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
1108 get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
1109 get_msg.type = htons (type);
1111 get_handle->route_handle =
1112 GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
1113 &get_reply_iterator, get_handle, cont, cont_cls);
1120 * Stop a previously issued routing request
1122 * @param route_handle handle to the request to stop
1123 * @param cont continuation to call once this message is sent to the service or times out
1124 * @param cont_cls closure for the continuation
1127 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
1128 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1130 struct PendingMessage *pending;
1131 struct GNUNET_DHT_StopMessage *message;
1133 GNUNET_HashCode uid_key;
1135 msize = sizeof (struct GNUNET_DHT_StopMessage);
1136 message = GNUNET_malloc (msize);
1137 message->header.size = htons (msize);
1138 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
1140 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1141 "`%s': Remove outstanding request for uid %llu\n", "DHT API",
1144 message->unique_id = GNUNET_htonll (route_handle->uid);
1145 memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
1146 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1147 pending->msg = (struct GNUNET_MessageHeader *) message;
1148 pending->timeout = GNUNET_TIME_relative_get_forever();
1149 pending->cont = cont;
1150 pending->cont_cls = cont_cls;
1151 pending->free_on_send = GNUNET_YES;
1152 pending->unique_id = 0; /* When finished is called, free pending->msg */
1154 if (route_handle->dht_handle->current == NULL)
1156 route_handle->dht_handle->current = pending;
1157 process_pending_message (route_handle->dht_handle);
1159 else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1161 route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1162 route_handle->dht_handle->retransmission_buffer = pending;
1166 GNUNET_free(pending);
1170 hash_from_uid (route_handle->uid, &uid_key);
1171 GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1172 (route_handle->dht_handle->outstanding_requests, &uid_key,
1173 route_handle) == GNUNET_YES);
1175 GNUNET_free(route_handle->message);
1176 GNUNET_free(route_handle);
1181 * Stop async DHT-get.
1183 * @param get_handle handle to the GET operation to stop
1184 * @param cont continuation to call once this message is sent to the service or times out
1185 * @param cont_cls closure for the continuation
1188 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1189 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1191 if ((get_handle->route_handle->dht_handle->current != NULL) &&
1192 (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1196 GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1197 GNUNET_SCHEDULER_REASON_TIMEOUT);
1203 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204 "`%s': Removing pending get request with key %s, uid %llu\n",
1205 "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1206 get_handle->route_handle->uid);
1208 GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1209 GNUNET_free (get_handle);
1214 * Perform an asynchronous FIND PEER operation on the DHT.
1216 * @param handle handle to the DHT service
1217 * @param timeout timeout for this request to be sent to the
1219 * @param options routing options for this message
1220 * @param key the key to look up
1221 * @param proc function to call on each result
1222 * @param proc_cls closure for proc
1223 * @param cont continuation to call once message sent
1224 * @param cont_cls closure for continuation
1226 * @return handle to stop the async get, NULL on error
1228 struct GNUNET_DHT_FindPeerHandle *
1229 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1230 struct GNUNET_TIME_Relative timeout,
1231 enum GNUNET_DHT_RouteOption options,
1232 const GNUNET_HashCode * key,
1233 GNUNET_DHT_FindPeerProcessor proc,
1235 GNUNET_SCHEDULER_Task cont,
1238 struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1239 struct GNUNET_DHT_FindPeerMessage find_peer_msg;
1241 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1245 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1246 find_peer_handle->find_peer_context.proc = proc;
1247 find_peer_handle->find_peer_context.proc_cls = proc_cls;
1250 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1251 "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1252 "FIND PEER", GNUNET_h2s (key));
1255 find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
1256 find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1257 find_peer_handle->route_handle =
1258 GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
1259 timeout, &find_peer_reply_iterator,
1260 find_peer_handle, cont, cont_cls);
1261 return find_peer_handle;
1265 * Stop async find peer. Frees associated resources.
1267 * @param find_peer_handle GET operation to stop.
1268 * @param cont continuation to call once this message is sent to the service or times out
1269 * @param cont_cls closure for the continuation
1272 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1273 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1275 if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1276 (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1280 GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1281 GNUNET_SCHEDULER_REASON_TIMEOUT);
1287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288 "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1289 "DHT API", "FIND PEER",
1290 GNUNET_h2s (&find_peer_handle->route_handle->key),
1291 find_peer_handle->route_handle->uid);
1293 GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1294 GNUNET_free (find_peer_handle);
1300 * Perform a PUT operation storing data in the DHT.
1302 * @param handle handle to DHT service
1303 * @param key the key to store under
1304 * @param type type of the value
1305 * @param size number of bytes in data; must be less than 64k
1306 * @param data the data to store
1307 * @param exp desired expiration time for the value
1308 * @param timeout how long to wait for transmission of this request
1309 * @param cont continuation to call when done;
1310 * reason will be TIMEOUT on error,
1311 * reason will be PREREQ_DONE on success
1312 * @param cont_cls closure for cont
1314 * @return GNUNET_YES if put message is queued for transmission
1317 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1318 const GNUNET_HashCode * key,
1322 struct GNUNET_TIME_Absolute exp,
1323 struct GNUNET_TIME_Relative timeout,
1324 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1326 struct GNUNET_DHT_PutMessage *put_msg;
1327 struct GNUNET_DHT_RouteHandle *put_route;
1330 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1332 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
1335 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1336 GNUNET_SCHEDULER_REASON_TIMEOUT);
1342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1343 "`%s': Inserting pending put request with key %s\n", "DHT API",
1347 msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1348 put_msg = GNUNET_malloc (msize);
1349 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1350 put_msg->header.size = htons (msize);
1351 put_msg->type = htons (type);
1352 put_msg->data_size = htons (size);
1353 put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1354 memcpy (&put_msg[1], data, size);
1356 put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1357 NULL, cont, cont_cls);
1359 if (put_route == NULL) /* Route start failed! */
1361 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1364 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1365 GNUNET_SCHEDULER_REASON_TIMEOUT);
1370 GNUNET_free(put_route);
1373 GNUNET_free (put_msg);