2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
27 * TODO: retransmission of pending requests maybe happens now, at least
28 * the code is in place to do so. Need to add checks when api calls
29 * happen to check if retransmission is in progress, and if so set
30 * the single pending message for transmission once the list of
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
47 #define DEBUG_DHT_API GNUNET_NO
52 * Message that is pending
54 struct GNUNET_MessageHeader *msg;
57 * Timeout for this message
59 struct GNUNET_TIME_Relative timeout;
62 * Continuation to call on message send
63 * or message receipt confirmation
65 GNUNET_SCHEDULER_Task cont;
68 * Continuation closure
73 * Unique ID for this request
78 * Free the saved message once sent, set
79 * to GNUNET_YES for messages that don't
86 struct PendingMessageList
89 * This is a singly linked list.
91 struct PendingMessageList *next;
94 * The pending message.
96 struct PendingMessage *message;
99 struct GNUNET_DHT_GetContext
102 * Iterator to call on data receipt
104 GNUNET_DHT_GetIterator iter;
107 * Closure for the iterator callback
113 struct GNUNET_DHT_FindPeerContext
116 * Iterator to call on data receipt
118 GNUNET_DHT_FindPeerProcessor proc;
121 * Closure for the iterator callback
128 * Handle to a route request
130 struct GNUNET_DHT_RouteHandle
134 * Unique identifier for this request (for key collisions)
139 * Key that this get request is for
144 * Iterator to call on data receipt
146 GNUNET_DHT_ReplyProcessor iter;
149 * Closure for the iterator callback
154 * Main handle to this DHT api
156 struct GNUNET_DHT_Handle *dht_handle;
159 * The actual message sent for this request,
160 * used for retransmitting requests on service
161 * failure/reconnect. Freed on route_stop.
163 struct GNUNET_DHT_RouteMessage *message;
168 * Handle to control a get operation.
170 struct GNUNET_DHT_GetHandle
173 * Handle to the actual route operation for the get
175 struct GNUNET_DHT_RouteHandle *route_handle;
178 * The context of the get request
180 struct GNUNET_DHT_GetContext get_context;
185 * Handle to control a find peer operation.
187 struct GNUNET_DHT_FindPeerHandle
190 * Handle to the actual route operation for the request
192 struct GNUNET_DHT_RouteHandle *route_handle;
195 * The context of the find peer request
197 struct GNUNET_DHT_FindPeerContext find_peer_context;
201 enum DHT_Retransmit_Stage
204 * The API is not retransmitting anything at this time.
206 DHT_NOT_RETRANSMITTING,
209 * The API is retransmitting, and nothing has been single
210 * queued for sending.
215 * The API is retransmitting, and a single message has been
216 * queued for transmission once finished.
218 DHT_RETRANSMITTING_MESSAGE_QUEUED
223 * Connection to the DHT service.
225 struct GNUNET_DHT_Handle
230 struct GNUNET_SCHEDULER_Handle *sched;
233 * Configuration to use.
235 const struct GNUNET_CONFIGURATION_Handle *cfg;
238 * Socket (if available).
240 struct GNUNET_CLIENT_Connection *client;
243 * Currently pending transmission request.
245 struct GNUNET_CLIENT_TransmitHandle *th;
248 * Message we are currently sending, only allow
249 * a single message to be queued. If not unique
250 * (typically a put request), await a confirmation
251 * from the service that the message was received.
252 * If unique, just fire and forget.
254 struct PendingMessage *current;
257 * Hash map containing the current outstanding unique requests
259 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
262 * Generator for unique ids.
267 * Are we currently retransmitting requests? If so queue a _single_
268 * new request when received.
270 enum DHT_Retransmit_Stage retransmit_stage;
273 * Linked list of retranmissions, to be used in the event
274 * of a dht service disconnect/reconnect.
276 struct PendingMessageList *retransmissions;
279 * A single pending message allowed to be scheduled
280 * during retransmission phase.
282 struct PendingMessage *retransmission_buffer;
287 * Convert unique ID to hash code.
289 * @param uid unique ID to convert
290 * @param hash set to uid (extended with zeros)
293 hash_from_uid (uint64_t uid,
294 GNUNET_HashCode *hash)
296 memset (hash, 0, sizeof(GNUNET_HashCode));
297 *((uint64_t*)hash) = uid;
302 * Iterator callback to retransmit each outstanding request
303 * because the connection to the DHT service went down (and
308 static int retransmit_iterator (void *cls,
309 const GNUNET_HashCode * key,
312 struct GNUNET_DHT_RouteHandle *route_handle = value;
313 struct PendingMessageList *pending_message_list;
315 pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
316 pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
317 pending_message_list->message->msg = &route_handle->message->header;
318 pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
319 pending_message_list->message->cont = NULL;
320 pending_message_list->message->cont_cls = NULL;
321 pending_message_list->message->unique_id = route_handle->uid;
322 /* Add the new pending message to the front of the retransmission list */
323 pending_message_list->next = route_handle->dht_handle->retransmissions;
324 route_handle->dht_handle->retransmissions = pending_message_list;
331 * Try to (re)connect to the dht service.
333 * @return GNUNET_YES on success, GNUNET_NO on failure.
336 try_connect (struct GNUNET_DHT_Handle *handle)
338 if (handle->client != NULL)
340 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
341 if (handle->client != NULL)
344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
345 _("Failed to connect to the dht service!\n"));
351 * Send complete (or failed), call continuation if we have one.
354 finish (struct GNUNET_DHT_Handle *handle, int code)
356 struct PendingMessage *pos = handle->current;
357 GNUNET_HashCode uid_hash;
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
361 GNUNET_assert (pos != NULL);
362 hash_from_uid (pos->unique_id, &uid_hash);
363 if (pos->cont != NULL)
365 if (code == GNUNET_SYSERR)
366 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
368 GNUNET_SCHEDULER_REASON_TIMEOUT);
370 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
372 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
375 GNUNET_assert(handle->th == NULL);
376 if (pos->free_on_send == GNUNET_YES)
377 GNUNET_free(pos->msg);
379 handle->current = NULL;
383 * Transmit the next pending message, called by notify_transmit_ready
386 transmit_pending (void *cls, size_t size, void *buf)
388 struct GNUNET_DHT_Handle *handle = cls;
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "`%s': In transmit_pending\n", "DHT API");
398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399 "`%s': In transmit_pending buf is NULL\n", "DHT API");
401 finish (handle, GNUNET_SYSERR);
407 if (handle->current != NULL)
409 tsize = ntohs (handle->current->msg->size);
413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414 "`%s': Sending message size %d\n", "DHT API", tsize);
416 memcpy (buf, handle->current->msg, tsize);
417 finish (handle, GNUNET_OK);
425 /* Have no pending request */
430 * Try to send messages from list of messages to send
433 process_pending_message (struct GNUNET_DHT_Handle *handle)
436 if (handle->current == NULL)
437 return; /* action already pending */
438 if (GNUNET_YES != try_connect (handle))
440 finish (handle, GNUNET_SYSERR);
445 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
455 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456 "Failed to transmit request to dht service.\n");
458 finish (handle, GNUNET_SYSERR);
462 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463 "`%s': Scheduled sending message of size %d to service\n",
464 "DHT API", ntohs (handle->current->msg->size));
469 * Send complete (or failed), call continuation if we have one.
470 * Forward declaration.
473 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
475 /* Forward declaration */
477 transmit_pending_retransmission (void *cls, size_t size, void *buf);
480 * Try to send messages from list of messages to send
483 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
486 if (handle->current == NULL)
487 return; /* action already pending */
488 if (GNUNET_YES != try_connect (handle))
490 finish_retransmission (handle, GNUNET_SYSERR);
495 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
501 &transmit_pending_retransmission,
505 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506 "Failed to transmit request to dht service.\n");
508 finish_retransmission (handle, GNUNET_SYSERR);
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513 "`%s': Scheduled sending message of size %d to service\n",
514 "DHT API", ntohs (handle->current->msg->size));
519 * Send complete (or failed), call continuation if we have one.
522 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
524 struct PendingMessage *pos = handle->current;
525 struct PendingMessageList *pending_list;
527 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
529 GNUNET_assert (pos == handle->retransmissions->message);
530 pending_list = handle->retransmissions;
531 handle->retransmissions = handle->retransmissions->next;
532 GNUNET_free (pending_list);
534 if (handle->retransmissions == NULL)
536 handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
539 if (handle->retransmissions != NULL)
541 handle->current = handle->retransmissions->message;
542 process_pending_retransmissions(handle);
544 else if (handle->retransmission_buffer != NULL)
546 handle->current = handle->retransmission_buffer;
547 process_pending_message(handle);
552 * Handler for messages received from the DHT service
553 * a demultiplexer which handles numerous message types
557 service_message_handler (void *cls,
558 const struct GNUNET_MessageHeader *msg)
560 struct GNUNET_DHT_Handle *handle = cls;
561 struct GNUNET_DHT_RouteResultMessage *dht_msg;
562 struct GNUNET_MessageHeader *enc_msg;
563 struct GNUNET_DHT_RouteHandle *route_handle;
565 GNUNET_HashCode uid_hash;
571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
572 "`%s': Received NULL from server, connection down!\n",
575 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
576 handle->client = GNUNET_CLIENT_connect (handle->sched,
579 if (handle->current != NULL)
582 finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
585 if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
587 handle->retransmit_stage = DHT_RETRANSMITTING;
588 handle->current = handle->retransmissions->message;
589 process_pending_retransmissions(handle);
595 switch (ntohs (msg->type))
597 case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
599 dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
600 uid = GNUNET_ntohll (dht_msg->unique_id);
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "`%s': Received response to message (uid %llu)\n",
607 hash_from_uid (uid, &uid_hash);
609 GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
611 if (route_handle == NULL) /* We have no recollection of this request */
614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615 "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
622 ntohs (dht_msg->header.size) -
623 sizeof (struct GNUNET_DHT_RouteResultMessage);
624 GNUNET_assert (enc_size > 0);
625 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
626 route_handle->iter (route_handle->iter_cls, enc_msg);
633 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
634 "`%s': Received unknown message type %d\n", "DHT API",
638 GNUNET_CLIENT_receive (handle->client,
639 &service_message_handler,
640 handle, GNUNET_TIME_UNIT_FOREVER_REL);
646 * Initialize the connection with the DHT service.
648 * @param sched scheduler to use
649 * @param cfg configuration to use
650 * @param ht_len size of the internal hash table to use for
651 * processing multiple GET/FIND requests in parallel
653 * @return handle to the DHT service, or NULL on error
655 struct GNUNET_DHT_Handle *
656 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
657 const struct GNUNET_CONFIGURATION_Handle *cfg,
660 struct GNUNET_DHT_Handle *handle;
662 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
664 handle->sched = sched;
665 handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
666 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
667 if (handle->client == NULL)
669 GNUNET_free (handle);
672 handle->outstanding_requests =
673 GNUNET_CONTAINER_multihashmap_create (ht_len);
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "`%s': Connection to service in progress\n", "DHT API");
678 GNUNET_CLIENT_receive (handle->client,
679 &service_message_handler,
680 handle, GNUNET_TIME_UNIT_FOREVER_REL);
686 * Shutdown connection with the DHT service.
688 * @param handle handle of the DHT connection to stop
691 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
697 GNUNET_assert (handle != NULL);
698 if (handle->th != NULL) /* We have a live transmit request */
700 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
703 if (handle->current != NULL) /* We are trying to send something now, clean it up */
704 GNUNET_free (handle->current);
706 if (handle->client != NULL) /* Finally, disconnect from the service */
708 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
709 handle->client = NULL;
712 GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
713 GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
714 GNUNET_free (handle);
719 * Transmit the next pending message, called by notify_transmit_ready
722 transmit_pending_retransmission (void *cls, size_t size, void *buf)
724 struct GNUNET_DHT_Handle *handle = cls;
728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729 "`%s': In transmit_pending\n", "DHT API");
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "`%s': In transmit_pending buf is NULL\n", "DHT API");
737 finish_retransmission (handle, GNUNET_SYSERR);
743 if (handle->current != NULL)
745 tsize = ntohs (handle->current->msg->size);
749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750 "`%s': Sending message size %d\n", "DHT API", tsize);
752 memcpy (buf, handle->current->msg, tsize);
753 finish_retransmission (handle, GNUNET_OK);
761 /* Have no pending request */
767 * Iterator called on each result obtained from a generic route
771 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
773 struct GNUNET_DHT_GetHandle *get_handle = cls;
774 struct GNUNET_DHT_GetResultMessage *result;
778 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
781 GNUNET_assert (ntohs (reply->size) >=
782 sizeof (struct GNUNET_DHT_GetResultMessage));
783 result = (struct GNUNET_DHT_GetResultMessage *) reply;
784 data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
786 result_data = (char *) &result[1]; /* Set data pointer to end of message */
788 get_handle->get_context.iter (get_handle->get_context.iter_cls,
789 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
790 ntohs (result->type), data_size, result_data);
795 * Iterator called on each result obtained from a generic route
799 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
801 struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
802 struct GNUNET_MessageHeader *hello;
804 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "Received wrong type of response to a find peer request...\n");
812 GNUNET_assert (ntohs (reply->size) >=
813 sizeof (struct GNUNET_MessageHeader));
814 hello = (struct GNUNET_MessageHeader *)&reply[1];
816 if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819 "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
822 find_peer_handle->find_peer_context.proc (find_peer_handle->
823 find_peer_context.proc_cls,
824 (struct GNUNET_HELLO_Message *)hello);
828 * Send a message to the DHT telling it to start issuing random GET
829 * requests every 'frequency' milliseconds.
831 * @param handle handle to the DHT service
832 * @param frequency delay (in milliseconds) between sending malicious messages
833 * @param cont continuation to call once the message is sent
834 * @param cont_cls closure for continuation
836 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
838 int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
840 struct GNUNET_DHT_ControlMessage *msg;
841 struct PendingMessage *pending;
843 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
846 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
847 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
848 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
849 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
850 msg->variable = htons(frequency);
852 pending = GNUNET_malloc (sizeof (struct PendingMessage));
853 pending->msg = &msg->header;
854 pending->timeout = GNUNET_TIME_relative_get_forever();
855 pending->free_on_send = GNUNET_YES;
856 pending->cont = cont;
857 pending->cont_cls = cont_cls;
858 pending->unique_id = 0;
860 if (handle->current == NULL)
862 handle->current = pending;
863 process_pending_message (handle);
867 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
868 handle->retransmission_buffer = pending;
875 * Send a message to the DHT telling it to issue a single find
876 * peer request using the peers unique identifier as key. This
877 * is used to fill the routing table, and is normally controlled
878 * by the DHT itself. However, for testing and perhaps more
879 * close control over the DHT, this can be explicitly managed.
881 * @param handle handle to the DHT service
882 * @param cont continuation to call once the message is sent
883 * @param cont_cls closure for continuation
885 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
887 int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
888 GNUNET_SCHEDULER_Task cont, void *cont_cls)
890 struct GNUNET_DHT_ControlMessage *msg;
891 struct PendingMessage *pending;
893 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
896 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
897 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
898 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
899 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
901 pending = GNUNET_malloc (sizeof (struct PendingMessage));
902 pending->msg = &msg->header;
903 pending->timeout = GNUNET_TIME_relative_get_forever();
904 pending->free_on_send = GNUNET_YES;
905 pending->cont = cont;
906 pending->cont_cls = cont_cls;
907 pending->unique_id = 0;
909 if (handle->current == NULL)
911 handle->current = pending;
912 process_pending_message (handle);
916 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
917 handle->retransmission_buffer = pending;
924 * Send a message to the DHT telling it to start issuing random PUT
925 * requests every 'frequency' milliseconds.
927 * @param handle handle to the DHT service
928 * @param frequency delay (in milliseconds) between sending malicious messages
929 * @param cont continuation to call once the message is sent
930 * @param cont_cls closure for continuation
932 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
934 int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
936 struct GNUNET_DHT_ControlMessage *msg;
937 struct PendingMessage *pending;
939 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
942 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
943 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
944 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
945 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
946 msg->variable = htons(frequency);
948 pending = GNUNET_malloc (sizeof (struct PendingMessage));
949 pending->msg = &msg->header;
950 pending->timeout = GNUNET_TIME_relative_get_forever();
951 pending->free_on_send = GNUNET_YES;
952 pending->cont = cont;
953 pending->cont_cls = cont_cls;
954 pending->unique_id = 0;
956 if (handle->current == NULL)
958 handle->current = pending;
959 process_pending_message (handle);
963 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
964 handle->retransmission_buffer = pending;
971 * Send a message to the DHT telling it to start dropping
972 * all requests received.
974 * @param handle handle to the DHT service
975 * @param cont continuation to call once the message is sent
976 * @param cont_cls closure for continuation
978 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
980 int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
982 struct GNUNET_DHT_ControlMessage *msg;
983 struct PendingMessage *pending;
985 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
988 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
989 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
990 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
991 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
992 msg->variable = htons(0);
994 pending = GNUNET_malloc (sizeof (struct PendingMessage));
995 pending->msg = &msg->header;
996 pending->timeout = GNUNET_TIME_relative_get_forever();
997 pending->free_on_send = GNUNET_YES;
998 pending->cont = cont;
999 pending->cont_cls = cont_cls;
1000 pending->unique_id = 0;
1002 if (handle->current == NULL)
1004 handle->current = pending;
1005 process_pending_message (handle);
1009 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1010 handle->retransmission_buffer = pending;
1018 * Initiate a generic DHT route operation.
1020 * @param handle handle to the DHT service
1021 * @param key the key to look up
1022 * @param desired_replication_level how many peers should ultimately receive
1023 * this message (advisory only, target may be too high for the
1024 * given DHT or not hit exactly).
1025 * @param options options for routing
1026 * @param enc send the encapsulated message to a peer close to the key
1027 * @param iter function to call on each result, NULL if no replies are expected
1028 * @param iter_cls closure for iter
1029 * @param timeout when to abort with an error if we fail to get
1030 * a confirmation for the request (when necessary) or how long
1031 * to wait for tramission to the service
1032 * @param cont continuation to call when done;
1033 * reason will be TIMEOUT on error,
1034 * reason will be PREREQ_DONE on success
1035 * @param cont_cls closure for cont
1037 * @return handle to stop the request, NULL if the request is "fire and forget"
1039 struct GNUNET_DHT_RouteHandle *
1040 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
1041 const GNUNET_HashCode * key,
1042 unsigned int desired_replication_level,
1043 enum GNUNET_DHT_RouteOption options,
1044 const struct GNUNET_MessageHeader *enc,
1045 struct GNUNET_TIME_Relative timeout,
1046 GNUNET_DHT_ReplyProcessor iter,
1048 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1050 struct GNUNET_DHT_RouteHandle *route_handle;
1051 struct PendingMessage *pending;
1052 struct GNUNET_DHT_RouteMessage *message;
1054 GNUNET_HashCode uid_key;
1056 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1059 if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1065 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
1066 memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
1067 route_handle->iter = iter;
1068 route_handle->iter_cls = iter_cls;
1069 route_handle->dht_handle = handle;
1070 route_handle->uid = handle->uid_gen++;
1073 hash_from_uid (route_handle->uid, &uid_key);
1074 GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
1075 &uid_key, route_handle,
1076 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081 "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
1084 msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
1085 message = GNUNET_malloc (msize);
1086 message->header.size = htons (msize);
1087 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
1088 memcpy (&message->key, key, sizeof (GNUNET_HashCode));
1089 message->options = htonl (options);
1090 message->desired_replication_level = htonl (options);
1091 message->unique_id = GNUNET_htonll (route_handle->uid);
1092 memcpy (&message[1], enc, ntohs (enc->size));
1093 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1094 pending->msg = &message->header;
1095 pending->timeout = timeout;
1097 pending->free_on_send = GNUNET_YES;
1098 pending->cont = cont;
1099 pending->cont_cls = cont_cls;
1100 pending->unique_id = route_handle->uid;
1101 if (handle->current == NULL)
1103 handle->current = pending;
1104 process_pending_message (handle);
1108 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1109 handle->retransmission_buffer = pending;
1112 route_handle->message = message;
1113 return route_handle;
1118 * Perform an asynchronous GET operation on the DHT identified.
1120 * @param handle handle to the DHT service
1121 * @param timeout how long to wait for transmission of this request to the service
1122 * @param type expected type of the response object
1123 * @param key the key to look up
1124 * @param iter function to call on each result
1125 * @param iter_cls closure for iter
1126 * @param cont continuation to call once message sent
1127 * @param cont_cls closure for continuation
1129 * @return handle to stop the async get
1131 struct GNUNET_DHT_GetHandle *
1132 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1133 struct GNUNET_TIME_Relative timeout,
1135 const GNUNET_HashCode * key,
1136 GNUNET_DHT_GetIterator iter,
1138 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1140 struct GNUNET_DHT_GetHandle *get_handle;
1141 struct GNUNET_DHT_GetMessage get_msg;
1143 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1146 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1147 get_handle->get_context.iter = iter;
1148 get_handle->get_context.iter_cls = iter_cls;
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152 "`%s': Inserting pending get request with key %s\n", "DHT API",
1156 get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
1157 get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
1158 get_msg.type = htons (type);
1160 get_handle->route_handle =
1161 GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
1162 &get_reply_iterator, get_handle, cont, cont_cls);
1169 * Stop a previously issued routing request
1171 * @param route_handle handle to the request to stop
1172 * @param cont continuation to call once this message is sent to the service or times out
1173 * @param cont_cls closure for the continuation
1176 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
1177 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1179 struct PendingMessage *pending;
1180 struct GNUNET_DHT_StopMessage *message;
1182 GNUNET_HashCode uid_key;
1184 msize = sizeof (struct GNUNET_DHT_StopMessage);
1185 message = GNUNET_malloc (msize);
1186 message->header.size = htons (msize);
1187 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
1189 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190 "`%s': Remove outstanding request for uid %llu\n", "DHT API",
1193 message->unique_id = GNUNET_htonll (route_handle->uid);
1194 memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
1195 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1196 pending->msg = (struct GNUNET_MessageHeader *) message;
1197 pending->timeout = GNUNET_TIME_relative_get_forever();
1198 pending->cont = cont;
1199 pending->cont_cls = cont_cls;
1200 pending->free_on_send = GNUNET_YES;
1201 pending->unique_id = 0; /* When finished is called, free pending->msg */
1203 if (route_handle->dht_handle->current == NULL)
1205 route_handle->dht_handle->current = pending;
1206 process_pending_message (route_handle->dht_handle);
1208 else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1210 route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1211 route_handle->dht_handle->retransmission_buffer = pending;
1215 GNUNET_free(pending);
1219 hash_from_uid (route_handle->uid, &uid_key);
1220 GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1221 (route_handle->dht_handle->outstanding_requests, &uid_key,
1222 route_handle) == GNUNET_YES);
1224 GNUNET_free(route_handle->message);
1225 GNUNET_free(route_handle);
1230 * Stop async DHT-get.
1232 * @param get_handle handle to the GET operation to stop
1233 * @param cont continuation to call once this message is sent to the service or times out
1234 * @param cont_cls closure for the continuation
1237 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1238 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1240 if ((get_handle->route_handle->dht_handle->current != NULL) &&
1241 (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1245 GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1246 GNUNET_SCHEDULER_REASON_TIMEOUT);
1252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1253 "`%s': Removing pending get request with key %s, uid %llu\n",
1254 "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1255 get_handle->route_handle->uid);
1257 GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1258 GNUNET_free (get_handle);
1263 * Perform an asynchronous FIND PEER operation on the DHT.
1265 * @param handle handle to the DHT service
1266 * @param timeout timeout for this request to be sent to the
1268 * @param options routing options for this message
1269 * @param key the key to look up
1270 * @param proc function to call on each result
1271 * @param proc_cls closure for proc
1272 * @param cont continuation to call once message sent
1273 * @param cont_cls closure for continuation
1275 * @return handle to stop the async get, NULL on error
1277 struct GNUNET_DHT_FindPeerHandle *
1278 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1279 struct GNUNET_TIME_Relative timeout,
1280 enum GNUNET_DHT_RouteOption options,
1281 const GNUNET_HashCode * key,
1282 GNUNET_DHT_FindPeerProcessor proc,
1284 GNUNET_SCHEDULER_Task cont,
1287 struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1288 struct GNUNET_DHT_FindPeerMessage find_peer_msg;
1290 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1294 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1295 find_peer_handle->find_peer_context.proc = proc;
1296 find_peer_handle->find_peer_context.proc_cls = proc_cls;
1299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300 "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1301 "FIND PEER", GNUNET_h2s (key));
1304 find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
1305 find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1306 find_peer_handle->route_handle =
1307 GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
1308 timeout, &find_peer_reply_iterator,
1309 find_peer_handle, cont, cont_cls);
1310 return find_peer_handle;
1314 * Stop async find peer. Frees associated resources.
1316 * @param find_peer_handle GET operation to stop.
1317 * @param cont continuation to call once this message is sent to the service or times out
1318 * @param cont_cls closure for the continuation
1321 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1322 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1324 if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1325 (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1329 GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1330 GNUNET_SCHEDULER_REASON_TIMEOUT);
1336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337 "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1338 "DHT API", "FIND PEER",
1339 GNUNET_h2s (&find_peer_handle->route_handle->key),
1340 find_peer_handle->route_handle->uid);
1342 GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1343 GNUNET_free (find_peer_handle);
1349 * Perform a PUT operation storing data in the DHT.
1351 * @param handle handle to DHT service
1352 * @param key the key to store under
1353 * @param type type of the value
1354 * @param size number of bytes in data; must be less than 64k
1355 * @param data the data to store
1356 * @param exp desired expiration time for the value
1357 * @param timeout how long to wait for transmission of this request
1358 * @param cont continuation to call when done;
1359 * reason will be TIMEOUT on error,
1360 * reason will be PREREQ_DONE on success
1361 * @param cont_cls closure for cont
1363 * @return GNUNET_YES if put message is queued for transmission
1366 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1367 const GNUNET_HashCode * key,
1371 struct GNUNET_TIME_Absolute exp,
1372 struct GNUNET_TIME_Relative timeout,
1373 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1375 struct GNUNET_DHT_PutMessage *put_msg;
1376 struct GNUNET_DHT_RouteHandle *put_route;
1379 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1381 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
1384 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1385 GNUNET_SCHEDULER_REASON_TIMEOUT);
1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392 "`%s': Inserting pending put request with key %s\n", "DHT API",
1396 msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1397 put_msg = GNUNET_malloc (msize);
1398 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1399 put_msg->header.size = htons (msize);
1400 put_msg->type = htons (type);
1401 put_msg->data_size = htons (size);
1402 put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1403 memcpy (&put_msg[1], data, size);
1405 put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1406 NULL, cont, cont_cls);
1408 if (put_route == NULL) /* Route start failed! */
1410 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1413 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1414 GNUNET_SCHEDULER_REASON_TIMEOUT);
1419 GNUNET_free(put_route);
1422 GNUNET_free (put_msg);