2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
27 * TODO: retransmission of pending requests maybe happens now, at least
28 * the code is in place to do so. Need to add checks when api calls
29 * happen to check if retransmission is in progress, and if so set
30 * the single pending message for transmission once the list of
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
47 #define DEBUG_DHT_API GNUNET_NO
52 * Message that is pending
54 struct GNUNET_MessageHeader *msg;
57 * Timeout for this message
59 struct GNUNET_TIME_Relative timeout;
62 * Continuation to call on message send
63 * or message receipt confirmation
65 GNUNET_SCHEDULER_Task cont;
68 * Continuation closure
73 * Unique ID for this request
78 * Free the saved message once sent, set
79 * to GNUNET_YES for messages that don't
86 struct PendingMessageList
89 * This is a singly linked list.
91 struct PendingMessageList *next;
94 * The pending message.
96 struct PendingMessage *message;
99 struct GNUNET_DHT_GetContext
102 * Iterator to call on data receipt
104 GNUNET_DHT_GetIterator iter;
107 * Closure for the iterator callback
113 struct GNUNET_DHT_FindPeerContext
116 * Iterator to call on data receipt
118 GNUNET_DHT_FindPeerProcessor proc;
121 * Closure for the iterator callback
128 * Handle to a route request
130 struct GNUNET_DHT_RouteHandle
134 * Unique identifier for this request (for key collisions)
139 * Key that this get request is for
144 * Iterator to call on data receipt
146 GNUNET_DHT_ReplyProcessor iter;
149 * Closure for the iterator callback
154 * Main handle to this DHT api
156 struct GNUNET_DHT_Handle *dht_handle;
159 * The actual message sent for this request,
160 * used for retransmitting requests on service
161 * failure/reconnect. Freed on route_stop.
163 struct GNUNET_DHT_RouteMessage *message;
168 * Handle to control a get operation.
170 struct GNUNET_DHT_GetHandle
173 * Handle to the actual route operation for the get
175 struct GNUNET_DHT_RouteHandle *route_handle;
178 * The context of the get request
180 struct GNUNET_DHT_GetContext get_context;
185 * Handle to control a find peer operation.
187 struct GNUNET_DHT_FindPeerHandle
190 * Handle to the actual route operation for the request
192 struct GNUNET_DHT_RouteHandle *route_handle;
195 * The context of the find peer request
197 struct GNUNET_DHT_FindPeerContext find_peer_context;
201 enum DHT_Retransmit_Stage
204 * The API is not retransmitting anything at this time.
206 DHT_NOT_RETRANSMITTING,
209 * The API is retransmitting, and nothing has been single
210 * queued for sending.
215 * The API is retransmitting, and a single message has been
216 * queued for transmission once finished.
218 DHT_RETRANSMITTING_MESSAGE_QUEUED
223 * Connection to the DHT service.
225 struct GNUNET_DHT_Handle
230 struct GNUNET_SCHEDULER_Handle *sched;
233 * Configuration to use.
235 const struct GNUNET_CONFIGURATION_Handle *cfg;
238 * Socket (if available).
240 struct GNUNET_CLIENT_Connection *client;
243 * Currently pending transmission request.
245 struct GNUNET_CLIENT_TransmitHandle *th;
248 * Message we are currently sending, only allow
249 * a single message to be queued. If not unique
250 * (typically a put request), await a confirmation
251 * from the service that the message was received.
252 * If unique, just fire and forget.
254 struct PendingMessage *current;
257 * Hash map containing the current outstanding unique requests
259 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
262 * Generator for unique ids.
267 * Are we currently retransmitting requests? If so queue a _single_
268 * new request when received.
270 enum DHT_Retransmit_Stage retransmit_stage;
273 * Linked list of retranmissions, to be used in the event
274 * of a dht service disconnect/reconnect.
276 struct PendingMessageList *retransmissions;
279 * A single pending message allowed to be scheduled
280 * during retransmission phase.
282 struct PendingMessage *retransmission_buffer;
287 * Convert unique ID to hash code.
289 * @param uid unique ID to convert
290 * @param hash set to uid (extended with zeros)
293 hash_from_uid (uint64_t uid,
294 GNUNET_HashCode *hash)
296 memset (hash, 0, sizeof(GNUNET_HashCode));
297 *((uint64_t*)hash) = uid;
302 * Iterator callback to retransmit each outstanding request
303 * because the connection to the DHT service went down (and
308 static int retransmit_iterator (void *cls,
309 const GNUNET_HashCode * key,
312 struct GNUNET_DHT_RouteHandle *route_handle = value;
313 struct PendingMessageList *pending_message_list;
315 pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
316 pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
317 pending_message_list->message->msg = &route_handle->message->header;
318 pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
319 pending_message_list->message->cont = NULL;
320 pending_message_list->message->cont_cls = NULL;
321 pending_message_list->message->unique_id = route_handle->uid;
322 /* Add the new pending message to the front of the retransmission list */
323 pending_message_list->next = route_handle->dht_handle->retransmissions;
324 route_handle->dht_handle->retransmissions = pending_message_list;
331 * Try to (re)connect to the dht service.
333 * @return GNUNET_YES on success, GNUNET_NO on failure.
336 try_connect (struct GNUNET_DHT_Handle *handle)
338 if (handle->client != NULL)
340 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
341 if (handle->client != NULL)
344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
345 _("Failed to connect to the dht service!\n"));
351 * Send complete (or failed), call continuation if we have one.
354 finish (struct GNUNET_DHT_Handle *handle, int code)
356 struct PendingMessage *pos = handle->current;
357 GNUNET_HashCode uid_hash;
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
361 GNUNET_assert (pos != NULL);
362 hash_from_uid (pos->unique_id, &uid_hash);
363 if (pos->cont != NULL)
365 if (code == GNUNET_SYSERR)
366 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
368 GNUNET_SCHEDULER_REASON_TIMEOUT);
370 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
372 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
375 GNUNET_assert(handle->th == NULL);
376 if (pos->free_on_send == GNUNET_YES)
377 GNUNET_free(pos->msg);
379 handle->current = NULL;
383 * Transmit the next pending message, called by notify_transmit_ready
386 transmit_pending (void *cls, size_t size, void *buf)
388 struct GNUNET_DHT_Handle *handle = cls;
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "`%s': In transmit_pending\n", "DHT API");
400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401 "`%s': In transmit_pending buf is NULL\n", "DHT API");
403 finish (handle, GNUNET_SYSERR);
407 if (handle->current != NULL)
409 tsize = ntohs (handle->current->msg->size);
413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414 "`%s': Sending message size %d\n", "DHT API", tsize);
416 memcpy (buf, handle->current->msg, tsize);
417 finish (handle, GNUNET_OK);
425 /* Have no pending request */
430 * Try to send messages from list of messages to send
433 process_pending_message (struct GNUNET_DHT_Handle *handle)
436 if (handle->current == NULL)
437 return; /* action already pending */
438 if (GNUNET_YES != try_connect (handle))
441 finish (handle, GNUNET_SYSERR);
446 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
456 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457 "Failed to transmit request to dht service.\n");
459 finish (handle, GNUNET_SYSERR);
463 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464 "`%s': Scheduled sending message of size %d to service\n",
465 "DHT API", ntohs (handle->current->msg->size));
470 * Send complete (or failed), call continuation if we have one.
471 * Forward declaration.
474 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
476 /* Forward declaration */
478 transmit_pending_retransmission (void *cls, size_t size, void *buf);
481 * Try to send messages from list of messages to send
484 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
487 if (handle->current == NULL)
488 return; /* action already pending */
489 if (GNUNET_YES != try_connect (handle))
491 finish_retransmission (handle, GNUNET_SYSERR);
496 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
502 &transmit_pending_retransmission,
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "Failed to transmit request to dht service.\n");
509 finish_retransmission (handle, GNUNET_SYSERR);
513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514 "`%s': Scheduled sending message of size %d to service\n",
515 "DHT API", ntohs (handle->current->msg->size));
520 * Send complete (or failed), call continuation if we have one.
523 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
525 struct PendingMessage *pos = handle->current;
526 struct PendingMessageList *pending_list;
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
530 GNUNET_assert (pos == handle->retransmissions->message);
531 pending_list = handle->retransmissions;
532 handle->retransmissions = handle->retransmissions->next;
533 GNUNET_free (pending_list);
535 if (handle->retransmissions == NULL)
537 handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
540 if (handle->retransmissions != NULL)
542 handle->current = handle->retransmissions->message;
543 process_pending_retransmissions(handle);
545 else if (handle->retransmission_buffer != NULL)
547 handle->current = handle->retransmission_buffer;
548 process_pending_message(handle);
553 * Handler for messages received from the DHT service
554 * a demultiplexer which handles numerous message types
558 service_message_handler (void *cls,
559 const struct GNUNET_MessageHeader *msg)
561 struct GNUNET_DHT_Handle *handle = cls;
562 struct GNUNET_DHT_RouteResultMessage *dht_msg;
563 struct GNUNET_MessageHeader *enc_msg;
564 struct GNUNET_DHT_RouteHandle *route_handle;
566 GNUNET_HashCode uid_hash;
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573 "`%s': Received NULL from server, connection down!\n",
576 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
577 handle->client = GNUNET_CLIENT_connect (handle->sched,
580 if (handle->current != NULL)
583 finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
586 if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
588 handle->retransmit_stage = DHT_RETRANSMITTING;
589 handle->current = handle->retransmissions->message;
590 process_pending_retransmissions(handle);
596 switch (ntohs (msg->type))
598 case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
600 dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
601 uid = GNUNET_ntohll (dht_msg->unique_id);
603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604 "`%s': Received response to message (uid %llu)\n",
608 hash_from_uid (uid, &uid_hash);
610 GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
612 if (route_handle == NULL) /* We have no recollection of this request */
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616 "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
623 ntohs (dht_msg->header.size) -
624 sizeof (struct GNUNET_DHT_RouteResultMessage);
625 GNUNET_assert (enc_size > 0);
626 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
627 route_handle->iter (route_handle->iter_cls, enc_msg);
634 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
635 "`%s': Received unknown message type %d\n", "DHT API",
639 GNUNET_CLIENT_receive (handle->client,
640 &service_message_handler,
641 handle, GNUNET_TIME_UNIT_FOREVER_REL);
647 * Initialize the connection with the DHT service.
649 * @param sched scheduler to use
650 * @param cfg configuration to use
651 * @param ht_len size of the internal hash table to use for
652 * processing multiple GET/FIND requests in parallel
654 * @return handle to the DHT service, or NULL on error
656 struct GNUNET_DHT_Handle *
657 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
658 const struct GNUNET_CONFIGURATION_Handle *cfg,
661 struct GNUNET_DHT_Handle *handle;
663 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
665 handle->sched = sched;
666 handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
667 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
668 if (handle->client == NULL)
670 GNUNET_free (handle);
673 handle->outstanding_requests =
674 GNUNET_CONTAINER_multihashmap_create (ht_len);
676 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
677 "`%s': Connection to service in progress\n", "DHT API");
679 GNUNET_CLIENT_receive (handle->client,
680 &service_message_handler,
681 handle, GNUNET_TIME_UNIT_FOREVER_REL);
687 * Shutdown connection with the DHT service.
689 * @param handle handle of the DHT connection to stop
692 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
696 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
698 GNUNET_assert (handle != NULL);
699 if (handle->th != NULL) /* We have a live transmit request */
701 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
704 if (handle->current != NULL) /* We are trying to send something now, clean it up */
705 GNUNET_free (handle->current);
707 if (handle->client != NULL) /* Finally, disconnect from the service */
709 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
710 handle->client = NULL;
713 GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
714 GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
715 GNUNET_free (handle);
720 * Transmit the next pending message, called by notify_transmit_ready
723 transmit_pending_retransmission (void *cls, size_t size, void *buf)
725 struct GNUNET_DHT_Handle *handle = cls;
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "`%s': In transmit_pending\n", "DHT API");
735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736 "`%s': In transmit_pending buf is NULL\n", "DHT API");
738 finish_retransmission (handle, GNUNET_SYSERR);
744 if (handle->current != NULL)
746 tsize = ntohs (handle->current->msg->size);
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "`%s': Sending message size %d\n", "DHT API", tsize);
753 memcpy (buf, handle->current->msg, tsize);
754 finish_retransmission (handle, GNUNET_OK);
762 /* Have no pending request */
768 * Iterator called on each result obtained from a generic route
772 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
774 struct GNUNET_DHT_GetHandle *get_handle = cls;
775 struct GNUNET_DHT_GetResultMessage *result;
779 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
782 GNUNET_assert (ntohs (reply->size) >=
783 sizeof (struct GNUNET_DHT_GetResultMessage));
784 result = (struct GNUNET_DHT_GetResultMessage *) reply;
785 data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
787 result_data = (char *) &result[1]; /* Set data pointer to end of message */
789 get_handle->get_context.iter (get_handle->get_context.iter_cls,
790 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
791 ntohs (result->type), data_size, result_data);
796 * Iterator called on each result obtained from a generic route
800 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
802 struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
803 struct GNUNET_MessageHeader *hello;
805 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
807 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808 "Received wrong type of response to a find peer request...\n");
813 GNUNET_assert (ntohs (reply->size) >=
814 sizeof (struct GNUNET_MessageHeader));
815 hello = (struct GNUNET_MessageHeader *)&reply[1];
817 if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820 "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
823 find_peer_handle->find_peer_context.proc (find_peer_handle->
824 find_peer_context.proc_cls,
825 (struct GNUNET_HELLO_Message *)hello);
829 * Send a message to the DHT telling it to start issuing random GET
830 * requests every 'frequency' milliseconds.
832 * @param handle handle to the DHT service
833 * @param frequency delay (in milliseconds) between sending malicious messages
834 * @param cont continuation to call once the message is sent
835 * @param cont_cls closure for continuation
837 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
839 int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
841 struct GNUNET_DHT_ControlMessage *msg;
842 struct PendingMessage *pending;
844 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
847 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
848 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
849 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
850 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
851 msg->variable = htons(frequency);
853 pending = GNUNET_malloc (sizeof (struct PendingMessage));
854 pending->msg = &msg->header;
855 pending->timeout = GNUNET_TIME_relative_get_forever();
856 pending->free_on_send = GNUNET_YES;
857 pending->cont = cont;
858 pending->cont_cls = cont_cls;
859 pending->unique_id = 0;
861 if (handle->current == NULL)
863 handle->current = pending;
864 process_pending_message (handle);
868 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
869 handle->retransmission_buffer = pending;
876 * Send a message to the DHT telling it to issue a single find
877 * peer request using the peers unique identifier as key. This
878 * is used to fill the routing table, and is normally controlled
879 * by the DHT itself. However, for testing and perhaps more
880 * close control over the DHT, this can be explicitly managed.
882 * @param handle handle to the DHT service
883 * @param cont continuation to call once the message is sent
884 * @param cont_cls closure for continuation
886 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
888 int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
889 GNUNET_SCHEDULER_Task cont, void *cont_cls)
891 struct GNUNET_DHT_ControlMessage *msg;
892 struct PendingMessage *pending;
894 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
897 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
898 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
899 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
900 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
902 pending = GNUNET_malloc (sizeof (struct PendingMessage));
903 pending->msg = &msg->header;
904 pending->timeout = GNUNET_TIME_relative_get_forever();
905 pending->free_on_send = GNUNET_YES;
906 pending->cont = cont;
907 pending->cont_cls = cont_cls;
908 pending->unique_id = 0;
910 if (handle->current == NULL)
912 handle->current = pending;
913 process_pending_message (handle);
917 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
918 handle->retransmission_buffer = pending;
925 * Send a message to the DHT telling it to start issuing random PUT
926 * requests every 'frequency' milliseconds.
928 * @param handle handle to the DHT service
929 * @param frequency delay (in milliseconds) between sending malicious messages
930 * @param cont continuation to call once the message is sent
931 * @param cont_cls closure for continuation
933 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
935 int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
937 struct GNUNET_DHT_ControlMessage *msg;
938 struct PendingMessage *pending;
940 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
943 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
944 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
945 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
946 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
947 msg->variable = htons(frequency);
949 pending = GNUNET_malloc (sizeof (struct PendingMessage));
950 pending->msg = &msg->header;
951 pending->timeout = GNUNET_TIME_relative_get_forever();
952 pending->free_on_send = GNUNET_YES;
953 pending->cont = cont;
954 pending->cont_cls = cont_cls;
955 pending->unique_id = 0;
957 if (handle->current == NULL)
959 handle->current = pending;
960 process_pending_message (handle);
964 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
965 handle->retransmission_buffer = pending;
972 * Send a message to the DHT telling it to start dropping
973 * all requests received.
975 * @param handle handle to the DHT service
976 * @param cont continuation to call once the message is sent
977 * @param cont_cls closure for continuation
979 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
981 int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
983 struct GNUNET_DHT_ControlMessage *msg;
984 struct PendingMessage *pending;
986 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
989 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
990 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
991 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
992 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
993 msg->variable = htons(0);
995 pending = GNUNET_malloc (sizeof (struct PendingMessage));
996 pending->msg = &msg->header;
997 pending->timeout = GNUNET_TIME_relative_get_forever();
998 pending->free_on_send = GNUNET_YES;
999 pending->cont = cont;
1000 pending->cont_cls = cont_cls;
1001 pending->unique_id = 0;
1003 if (handle->current == NULL)
1005 handle->current = pending;
1006 process_pending_message (handle);
1010 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1011 handle->retransmission_buffer = pending;
1019 * Initiate a generic DHT route operation.
1021 * @param handle handle to the DHT service
1022 * @param key the key to look up
1023 * @param desired_replication_level how many peers should ultimately receive
1024 * this message (advisory only, target may be too high for the
1025 * given DHT or not hit exactly).
1026 * @param options options for routing
1027 * @param enc send the encapsulated message to a peer close to the key
1028 * @param iter function to call on each result, NULL if no replies are expected
1029 * @param iter_cls closure for iter
1030 * @param timeout when to abort with an error if we fail to get
1031 * a confirmation for the request (when necessary) or how long
1032 * to wait for tramission to the service
1033 * @param cont continuation to call when done;
1034 * reason will be TIMEOUT on error,
1035 * reason will be PREREQ_DONE on success
1036 * @param cont_cls closure for cont
1038 * @return handle to stop the request, NULL if the request is "fire and forget"
1040 struct GNUNET_DHT_RouteHandle *
1041 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
1042 const GNUNET_HashCode * key,
1043 unsigned int desired_replication_level,
1044 enum GNUNET_DHT_RouteOption options,
1045 const struct GNUNET_MessageHeader *enc,
1046 struct GNUNET_TIME_Relative timeout,
1047 GNUNET_DHT_ReplyProcessor iter,
1049 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1051 struct GNUNET_DHT_RouteHandle *route_handle;
1052 struct PendingMessage *pending;
1053 struct GNUNET_DHT_RouteMessage *message;
1055 GNUNET_HashCode uid_key;
1057 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1060 if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1066 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
1067 memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
1068 route_handle->iter = iter;
1069 route_handle->iter_cls = iter_cls;
1070 route_handle->dht_handle = handle;
1071 route_handle->uid = handle->uid_gen++;
1074 hash_from_uid (route_handle->uid, &uid_key);
1075 GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
1076 &uid_key, route_handle,
1077 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082 "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
1085 msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
1086 message = GNUNET_malloc (msize);
1087 message->header.size = htons (msize);
1088 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
1089 memcpy (&message->key, key, sizeof (GNUNET_HashCode));
1090 message->options = htonl (options);
1091 message->desired_replication_level = htonl (desired_replication_level);
1092 message->unique_id = GNUNET_htonll (route_handle->uid);
1093 memcpy (&message[1], enc, ntohs (enc->size));
1094 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1095 pending->msg = &message->header;
1096 pending->timeout = timeout;
1098 pending->free_on_send = GNUNET_YES;
1099 pending->cont = cont;
1100 pending->cont_cls = cont_cls;
1101 pending->unique_id = route_handle->uid;
1102 if (handle->current == NULL)
1104 handle->current = pending;
1105 process_pending_message (handle);
1109 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1110 handle->retransmission_buffer = pending;
1113 route_handle->message = message;
1114 return route_handle;
1119 * Perform an asynchronous GET operation on the DHT identified.
1121 * @param handle handle to the DHT service
1122 * @param timeout how long to wait for transmission of this request to the service
1123 * @param type expected type of the response object
1124 * @param key the key to look up
1125 * @param iter function to call on each result
1126 * @param iter_cls closure for iter
1127 * @param cont continuation to call once message sent
1128 * @param cont_cls closure for continuation
1130 * @return handle to stop the async get
1132 struct GNUNET_DHT_GetHandle *
1133 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1134 struct GNUNET_TIME_Relative timeout,
1135 enum GNUNET_BLOCK_Type type,
1136 const GNUNET_HashCode * key,
1137 GNUNET_DHT_GetIterator iter,
1139 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1141 struct GNUNET_DHT_GetHandle *get_handle;
1142 struct GNUNET_DHT_GetMessage get_msg;
1144 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1147 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1148 get_handle->get_context.iter = iter;
1149 get_handle->get_context.iter_cls = iter_cls;
1152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153 "`%s': Inserting pending get request with key %s\n", "DHT API",
1157 get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
1158 get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
1159 get_msg.type = htons (type);
1161 get_handle->route_handle =
1162 GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout,
1163 &get_reply_iterator, get_handle, cont, cont_cls);
1170 * Stop a previously issued routing request
1172 * @param route_handle handle to the request to stop
1173 * @param cont continuation to call once this message is sent to the service or times out
1174 * @param cont_cls closure for the continuation
1177 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
1178 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1180 struct PendingMessage *pending;
1181 struct GNUNET_DHT_StopMessage *message;
1183 GNUNET_HashCode uid_key;
1185 msize = sizeof (struct GNUNET_DHT_StopMessage);
1186 message = GNUNET_malloc (msize);
1187 message->header.size = htons (msize);
1188 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
1190 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191 "`%s': Remove outstanding request for uid %llu\n", "DHT API",
1194 message->unique_id = GNUNET_htonll (route_handle->uid);
1195 memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
1196 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1197 pending->msg = (struct GNUNET_MessageHeader *) message;
1198 pending->timeout = GNUNET_TIME_relative_get_forever();
1199 pending->cont = cont;
1200 pending->cont_cls = cont_cls;
1201 pending->free_on_send = GNUNET_YES;
1202 pending->unique_id = 0; /* When finished is called, free pending->msg */
1204 if (route_handle->dht_handle->current == NULL)
1206 route_handle->dht_handle->current = pending;
1207 process_pending_message (route_handle->dht_handle);
1209 else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1211 route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1212 route_handle->dht_handle->retransmission_buffer = pending;
1216 GNUNET_free(pending);
1220 hash_from_uid (route_handle->uid, &uid_key);
1221 GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1222 (route_handle->dht_handle->outstanding_requests, &uid_key,
1223 route_handle) == GNUNET_YES);
1225 GNUNET_free(route_handle->message);
1226 GNUNET_free(route_handle);
1231 * Stop async DHT-get.
1233 * @param get_handle handle to the GET operation to stop
1234 * @param cont continuation to call once this message is sent to the service or times out
1235 * @param cont_cls closure for the continuation
1238 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1239 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1241 if ((get_handle->route_handle->dht_handle->current != NULL) &&
1242 (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1246 GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1247 GNUNET_SCHEDULER_REASON_TIMEOUT);
1253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1254 "`%s': Removing pending get request with key %s, uid %llu\n",
1255 "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1256 get_handle->route_handle->uid);
1258 GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1259 GNUNET_free (get_handle);
1264 * Perform an asynchronous FIND PEER operation on the DHT.
1266 * @param handle handle to the DHT service
1267 * @param timeout timeout for this request to be sent to the
1269 * @param options routing options for this message
1270 * @param key the key to look up
1271 * @param proc function to call on each result
1272 * @param proc_cls closure for proc
1273 * @param cont continuation to call once message sent
1274 * @param cont_cls closure for continuation
1276 * @return handle to stop the async get, NULL on error
1278 struct GNUNET_DHT_FindPeerHandle *
1279 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1280 struct GNUNET_TIME_Relative timeout,
1281 enum GNUNET_DHT_RouteOption options,
1282 const GNUNET_HashCode * key,
1283 GNUNET_DHT_FindPeerProcessor proc,
1285 GNUNET_SCHEDULER_Task cont,
1288 struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1289 struct GNUNET_DHT_FindPeerMessage find_peer_msg;
1291 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1295 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1296 find_peer_handle->find_peer_context.proc = proc;
1297 find_peer_handle->find_peer_context.proc_cls = proc_cls;
1300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301 "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1302 "FIND PEER", GNUNET_h2s (key));
1305 find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
1306 find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1307 find_peer_handle->route_handle =
1308 GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
1309 timeout, &find_peer_reply_iterator,
1310 find_peer_handle, cont, cont_cls);
1311 return find_peer_handle;
1315 * Stop async find peer. Frees associated resources.
1317 * @param find_peer_handle GET operation to stop.
1318 * @param cont continuation to call once this message is sent to the service or times out
1319 * @param cont_cls closure for the continuation
1322 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1323 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1325 if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1326 (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1330 GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1331 GNUNET_SCHEDULER_REASON_TIMEOUT);
1337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1338 "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1339 "DHT API", "FIND PEER",
1340 GNUNET_h2s (&find_peer_handle->route_handle->key),
1341 find_peer_handle->route_handle->uid);
1343 GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1344 GNUNET_free (find_peer_handle);
1350 * Perform a PUT operation storing data in the DHT.
1352 * @param handle handle to DHT service
1353 * @param key the key to store under
1354 * @param type type of the value
1355 * @param size number of bytes in data; must be less than 64k
1356 * @param data the data to store
1357 * @param exp desired expiration time for the value
1358 * @param timeout how long to wait for transmission of this request
1359 * @param cont continuation to call when done;
1360 * reason will be TIMEOUT on error,
1361 * reason will be PREREQ_DONE on success
1362 * @param cont_cls closure for cont
1364 * @return GNUNET_YES if put message is queued for transmission
1367 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1368 const GNUNET_HashCode * key,
1369 enum GNUNET_BLOCK_Type type,
1372 struct GNUNET_TIME_Absolute exp,
1373 struct GNUNET_TIME_Relative timeout,
1374 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1376 struct GNUNET_DHT_PutMessage *put_msg;
1377 struct GNUNET_DHT_RouteHandle *put_route;
1380 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1382 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
1385 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1386 GNUNET_SCHEDULER_REASON_TIMEOUT);
1392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393 "`%s': Inserting pending put request with key %s\n", "DHT API",
1397 msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1398 put_msg = GNUNET_malloc (msize);
1399 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1400 put_msg->header.size = htons (msize);
1401 put_msg->type = htons (type);
1402 put_msg->data_size = htons (size);
1403 put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1404 memcpy (&put_msg[1], data, size);
1406 put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL,
1407 NULL, cont, cont_cls);
1409 if (put_route == NULL) /* Route start failed! */
1411 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1414 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1415 GNUNET_SCHEDULER_REASON_TIMEOUT);
1420 GNUNET_free(put_route);
1423 GNUNET_free (put_msg);
1426 /* end of dht_api.c */