From 7b80cc15aaf3f61fb05635577fdd5258d5b8bb17 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 2 Oct 2010 13:58:05 +0000 Subject: [PATCH] new DHT API, tests do not yet compile --- src/dht/Makefile.am | 8 +- src/dht/dht.h | 5 - src/dht/dht_api.c | 1468 +++++++++--------------------- src/dht/gnunet-dht-get-peer.c | 85 +- src/dht/gnunet-dht-get.c | 69 +- src/dht/gnunet-dht-put.c | 20 +- src/include/gnunet_dht_service.h | 207 +++-- 7 files changed, 644 insertions(+), 1218 deletions(-) diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index ccfc5a041..c5ae96f02 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am @@ -52,7 +52,7 @@ libgnunet_plugin_dhtlog_mysql_dump_la_LIBADD = \ $(XLIB) libgnunet_plugin_dhtlog_mysql_dump_la_LDFLAGS = \ $(GN_PLUGIN_LDFLAGS) - + libgnunet_plugin_dhtlog_mysql_dump_load_la_SOURCES = \ plugin_dhtlog_mysql_dump_load.c libgnunet_plugin_dhtlog_mysql_dump_load_la_LIBADD = \ @@ -70,7 +70,9 @@ libgnunetdhtlog_la_LDFLAGS = \ -version-info 0:0:0 libgnunetdht_la_SOURCES = \ - dht_api.c dht.h + dht_api.c dht.h \ + dht_api_get_put.c \ + dht_api_find_peer.c libgnunetdht_la_LIBADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(XLIB) @@ -92,8 +94,10 @@ bin_PROGRAMS = $(STUD_PROGS) \ gnunet-dht-get-peer \ gnunet-dht-put +if HAVE_MALICIOUS noinst_PROGRAMS = \ gnunet-dht-driver +endif gnunet_service_dht_SOURCES = \ gnunet-service-dht.c diff --git a/src/dht/dht.h b/src/dht/dht.h index 56bb29934..237b91f64 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -320,11 +320,6 @@ struct GNUNET_DHT_PutMessage */ struct GNUNET_TIME_AbsoluteNBO expiration; - /** - * The size of the data, appended to the end of this message. - */ - size_t data_size GNUNET_PACKED; - }; diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 15faba6c9..c89a92dd5 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -23,12 +23,6 @@ * @brief library to access the DHT service * @author Christian Grothoff * @author Nathan Evans - * - * TODO: retransmission of pending requests maybe happens now, at least - * the code is in place to do so. Need to add checks when api calls - * happen to check if retransmission is in progress, and if so set - * the single pending message for transmission once the list of - * retries are done. */ #include "platform.h" @@ -46,83 +40,68 @@ #define DEBUG_DHT_API GNUNET_NO +/** + * Entry in our list of messages to be (re-)transmitted. + */ struct PendingMessage { /** - * Message that is pending + * This is a doubly-linked list. */ - struct GNUNET_MessageHeader *msg; + struct PendingMessage *prev; /** - * Timeout for this message + * This is a doubly-linked list. */ - struct GNUNET_TIME_Relative timeout; + struct PendingMessage *next; /** - * Continuation to call on message send - * or message receipt confirmation + * Message that is pending, allocated at the end + * of this struct. */ - GNUNET_SCHEDULER_Task cont; - + const struct GNUNET_MessageHeader *msg; + /** - * Continuation closure + * Handle to the DHT API context. */ - void *cont_cls; - + struct GNUNET_DHT_Handle *handle; + /** - * Unique ID for this request + * Continuation to call when the request has been + * transmitted (for the first time) to the service; can be NULL. */ - uint64_t unique_id; + GNUNET_SCHEDULER_Task cont; /** - * Free the saved message once sent, set - * to GNUNET_YES for messages that don't - * receive responses! + * Closure for 'cont'. */ - int free_on_send; - -}; + void *cont_cls; -struct PendingMessageList -{ /** - * This is a singly linked list. + * Timeout task for this message */ - struct PendingMessageList *next; + GNUNET_SCHEDULER_TaskIdentifier timeout_task; /** - * The pending message. + * Unique ID for this request */ - struct PendingMessage *message; -}; + uint64_t unique_id; -struct GNUNET_DHT_GetContext -{ /** - * Iterator to call on data receipt + * Free the saved message once sent, set to GNUNET_YES for messages + * that do not receive responses; GNUNET_NO if this pending message + * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed + * from there. */ - GNUNET_DHT_GetIterator iter; + int free_on_send; /** - * Closure for the iterator callback + * GNUNET_YES if this message is in our pending queue right now. */ - void *iter_cls; + int in_pending_queue; }; -struct GNUNET_DHT_FindPeerContext -{ - /** - * Iterator to call on data receipt - */ - GNUNET_DHT_FindPeerProcessor proc; - - /** - * Closure for the iterator callback - */ - void *proc_cls; - -}; /** * Handle to a route request @@ -130,16 +109,6 @@ struct GNUNET_DHT_FindPeerContext struct GNUNET_DHT_RouteHandle { - /** - * Unique identifier for this request (for key collisions) - */ - uint64_t uid; - - /** - * Key that this get request is for - */ - GNUNET_HashCode key; - /** * Iterator to call on data receipt */ @@ -160,62 +129,18 @@ struct GNUNET_DHT_RouteHandle * used for retransmitting requests on service * failure/reconnect. Freed on route_stop. */ - struct GNUNET_DHT_RouteMessage *message; -}; - - -/** - * Handle to control a get operation. - */ -struct GNUNET_DHT_GetHandle -{ - /** - * Handle to the actual route operation for the get - */ - struct GNUNET_DHT_RouteHandle *route_handle; - - /** - * The context of the get request - */ - struct GNUNET_DHT_GetContext get_context; -}; - - -/** - * Handle to control a find peer operation. - */ -struct GNUNET_DHT_FindPeerHandle -{ - /** - * Handle to the actual route operation for the request - */ - struct GNUNET_DHT_RouteHandle *route_handle; - - /** - * The context of the find peer request - */ - struct GNUNET_DHT_FindPeerContext find_peer_context; -}; - + struct PendingMessage *message; -enum DHT_Retransmit_Stage -{ /** - * The API is not retransmitting anything at this time. + * Key that this get request is for */ - DHT_NOT_RETRANSMITTING, + GNUNET_HashCode key; /** - * The API is retransmitting, and nothing has been single - * queued for sending. + * Unique identifier for this request (for key collisions) */ - DHT_RETRANSMITTING, + uint64_t uid; - /** - * The API is retransmitting, and a single message has been - * queued for transmission once finished. - */ - DHT_RETRANSMITTING_MESSAGE_QUEUED }; @@ -240,95 +165,57 @@ struct GNUNET_DHT_Handle struct GNUNET_CLIENT_Connection *client; /** - * Currently pending transmission request. + * Currently pending transmission request (or NULL). */ struct GNUNET_CLIENT_TransmitHandle *th; /** - * Message we are currently sending, only allow - * a single message to be queued. If not unique - * (typically a put request), await a confirmation - * from the service that the message was received. - * If unique, just fire and forget. + * Head of linked list of messages we would like to transmit. */ - struct PendingMessage *current; + struct PendingMessage *pending_head; /** - * Hash map containing the current outstanding unique requests - */ - struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; - - /** - * Generator for unique ids. + * Tail of linked list of messages we would like to transmit. */ - uint64_t uid_gen; + struct PendingMessage *pending_tail; /** - * Are we currently retransmitting requests? If so queue a _single_ - * new request when received. + * Hash map containing the current outstanding unique requests + * (values are of type 'struct GNUNET_DHT_RouteHandle'). */ - enum DHT_Retransmit_Stage retransmit_stage; + struct GNUNET_CONTAINER_MultiHashMap *active_requests; /** - * Linked list of retranmissions, to be used in the event - * of a dht service disconnect/reconnect. + * Generator for unique ids. */ - struct PendingMessageList *retransmissions; + uint64_t uid_gen; - /** - * A single pending message allowed to be scheduled - * during retransmission phase. - */ - struct PendingMessage *retransmission_buffer; }; /** - * Convert unique ID to hash code. - * - * @param uid unique ID to convert - * @param hash set to uid (extended with zeros) + * Transmit the next pending message, called by notify_transmit_ready */ -static void -hash_from_uid (uint64_t uid, - GNUNET_HashCode *hash) -{ - memset (hash, 0, sizeof(GNUNET_HashCode)); - *((uint64_t*)hash) = uid; -} +static size_t +transmit_pending (void *cls, + size_t size, + void *buf); + -#if RETRANSMIT /** - * Iterator callback to retransmit each outstanding request - * because the connection to the DHT service went down (and - * came back). - * + * Handler for messages received from the DHT service + * a demultiplexer which handles numerous message types * */ -static int retransmit_iterator (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct GNUNET_DHT_RouteHandle *route_handle = value; - struct PendingMessageList *pending_message_list; - - pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage)); - pending_message_list->message = (struct PendingMessage *)&pending_message_list[1]; - pending_message_list->message->msg = &route_handle->message->header; - pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever(); - pending_message_list->message->cont = NULL; - pending_message_list->message->cont_cls = NULL; - pending_message_list->message->unique_id = route_handle->uid; - /* Add the new pending message to the front of the retransmission list */ - pending_message_list->next = route_handle->dht_handle->retransmissions; - route_handle->dht_handle->retransmissions = pending_message_list; - - return GNUNET_OK; -} -#endif +static void +service_message_handler (void *cls, + const struct GNUNET_MessageHeader *msg); + + + /** - * Try to (re)connect to the dht service. + * Try to (re)connect to the DHT service. * * @return GNUNET_YES on success, GNUNET_NO on failure. */ @@ -338,304 +225,233 @@ try_connect (struct GNUNET_DHT_Handle *handle) if (handle->client != NULL) return GNUNET_OK; handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); - if (handle->client != NULL) - return GNUNET_YES; -#if DEBUG_STATISTICS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to connect to the dht service!\n")); -#endif - return GNUNET_NO; + if (handle->client == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to connect to the DHT service!\n")); + return GNUNET_NO; + } + GNUNET_CLIENT_receive (handle->client, + &service_message_handler, + handle, + GNUNET_TIME_UNIT_FOREVER_REL); + return GNUNET_YES; } + /** - * Send complete (or failed), call continuation if we have one. + * Add the request corresponding to the given route handle + * to the pending queue (if it is not already in there). + * + * @param cls the 'struct GNUNET_DHT_Handle*' + * @param key key for the request (not used) + * @param value the 'struct GNUNET_DHT_RouteHandle*' + * @return GNUNET_YES (always) */ -static void -finish (struct GNUNET_DHT_Handle *handle, int code) +static int +add_request_to_pending (void *cls, + const GNUNET_HashCode *key, + void *value) { - struct PendingMessage *pos = handle->current; - GNUNET_HashCode uid_hash; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); -#endif - GNUNET_assert (pos != NULL); - hash_from_uid (pos->unique_id, &uid_hash); - if (pos->cont != NULL) + struct GNUNET_DHT_Handle *handle = cls; + struct GNUNET_DHT_RouteHandle *rh = value; + + if (GNUNET_NO == rh->message->in_pending_queue) { - if (code == GNUNET_SYSERR) - GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, - pos->cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - else - GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, - pos->cont_cls, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); + GNUNET_CONTAINER_DLL_insert (handle->pending_head, + handle->pending_tail, + rh->message); + rh->message->in_pending_queue = GNUNET_YES; } - - GNUNET_assert(handle->th == NULL); - if (pos->free_on_send == GNUNET_YES) - GNUNET_free(pos->msg); - GNUNET_free (pos); - handle->current = NULL; + return GNUNET_YES; } + /** - * Transmit the next pending message, called by notify_transmit_ready + * Re-connect to the DHT, re-issue all pending requests if needed. */ -static size_t -transmit_pending (void *cls, size_t size, void *buf) +static void +reconnect (struct GNUNET_DHT_Handle *handle) { - struct GNUNET_DHT_Handle *handle = cls; - size_t tsize; - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending\n", "DHT API"); -#endif - handle->th = NULL; - - if (buf == NULL) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending buf is NULL\n", "DHT API"); -#endif - finish (handle, GNUNET_SYSERR); - return 0; - } - - if (handle->current != NULL) + if (handle->client != NULL) { - tsize = ntohs (handle->current->msg->size); - if (size >= tsize) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending message size %d\n", "DHT API", tsize); -#endif - memcpy (buf, handle->current->msg, tsize); - finish (handle, GNUNET_OK); - return tsize; - } - else - { - return 0; - } + GNUNET_CLIENT_disconnect (handle->client, + GNUNET_NO); + handle->client = NULL; } - /* Have no pending request */ - return 0; + if (GNUNET_YES != try_connect (handle)) + return; + GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests, + &add_request_to_pending, + handle); + if (handle->pending_head == NULL) + return; + GNUNET_CLIENT_notify_transmit_ready (handle->client, + ntohs(handle->pending_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, + &transmit_pending, + handle); + } + /** * Try to send messages from list of messages to send */ static void -process_pending_message (struct GNUNET_DHT_Handle *handle) +process_pending_messages (struct GNUNET_DHT_Handle *handle) { + struct PendingMessage *head; - if (handle->current == NULL) - return; /* action already pending */ if (GNUNET_YES != try_connect (handle)) - { - handle->th = NULL; - finish (handle, GNUNET_SYSERR); - return; - } - - if (NULL == - (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs (handle-> - current->msg-> - size), - handle->current-> - timeout, GNUNET_YES, - &transmit_pending, - handle))) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dht service.\n"); -#endif - finish (handle, GNUNET_SYSERR); + return; + if (handle->th != NULL) + return; + if (NULL == (head = handle->pending_head)) + return; + handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, + ntohs (head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, + &transmit_pending, + handle); + if (NULL == handle->th) + { + reconnect (handle); return; } -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", - "DHT API", ntohs (handle->current->msg->size)); -#endif } -/** - * Send complete (or failed), call continuation if we have one. - * Forward declaration. - */ -static void -finish_retransmission (struct GNUNET_DHT_Handle *handle, int code); - -/* Forward declaration */ -static size_t -transmit_pending_retransmission (void *cls, size_t size, void *buf); /** - * Try to send messages from list of messages to send + * Transmit the next pending message, called by notify_transmit_ready */ -static void -process_pending_retransmissions (struct GNUNET_DHT_Handle *handle) +static size_t +transmit_pending (void *cls, + size_t size, + void *buf) { + struct GNUNET_DHT_Handle *handle = cls; + struct PendingMessage *head; + size_t tsize; - if (handle->current == NULL) - return; /* action already pending */ - if (GNUNET_YES != try_connect (handle)) + handle->th = NULL; + if (buf == NULL) { - finish_retransmission (handle, GNUNET_SYSERR); - return; + reconnect (handle); + return 0; } - - if (NULL == - (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs (handle-> - current->msg-> - size), - handle->current-> - timeout, GNUNET_YES, - &transmit_pending_retransmission, - handle))) + if (NULL == (head = handle->pending_head)) + return 0; + + tsize = ntohs (head->msg->size); + if (size < tsize) { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dht service.\n"); -#endif - finish_retransmission (handle, GNUNET_SYSERR); - return; + process_pending_messages (handle); + return 0; } -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", - "DHT API", ntohs (handle->current->msg->size)); -#endif + memcpy (buf, head->msg, tsize); + GNUNET_CONTAINER_DLL_remove (handle->pending_head, + handle->pending_tail, + head); + if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (handle->sched, + head->timeout_task); + head->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != head->cont) + { + GNUNET_SCHEDULER_add_now (handle->sched, + head->cont, + head->cont_cls); + head->cont = NULL; + head->cont_cls = NULL; + } + head->in_pending_queue = GNUNET_NO; + if (GNUNET_YES == head->free_on_send) + GNUNET_free (head); + process_pending_messages (handle); + return tsize; } + + + /** - * Send complete (or failed), call continuation if we have one. + * Process a given reply that might match the given + * request. */ -static void -finish_retransmission (struct GNUNET_DHT_Handle *handle, int code) +static int +process_reply (void *cls, + const GNUNET_HashCode *key, + void *value) { - struct PendingMessage *pos = handle->current; - struct PendingMessageList *pending_list; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API"); -#endif - GNUNET_assert (pos == handle->retransmissions->message); - pending_list = handle->retransmissions; - handle->retransmissions = handle->retransmissions->next; - GNUNET_free (pending_list); - - if (handle->retransmissions == NULL) - { - handle->retransmit_stage = DHT_NOT_RETRANSMITTING; - } + const struct GNUNET_DHT_RouteResultMessage *dht_msg = cls; + struct GNUNET_DHT_RouteHandle *rh = value; + const struct GNUNET_MessageHeader *enc_msg; + size_t enc_size; + uint64_t uid; - if (handle->retransmissions != NULL) + uid = GNUNET_ntohll (dht_msg->unique_id); + if (uid != rh->uid) + return GNUNET_YES; + enc_size = ntohs (dht_msg->header.size) - sizeof (struct GNUNET_DHT_RouteResultMessage); + if (enc_size < sizeof (struct GNUNET_MessageHeader)) { - handle->current = handle->retransmissions->message; - process_pending_retransmissions(handle); + GNUNET_break (0); + return GNUNET_NO; } - else if (handle->retransmission_buffer != NULL) + enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; + if (enc_size != ntohs (enc_msg->size)) { - handle->current = handle->retransmission_buffer; - process_pending_message(handle); + GNUNET_break (0); + return GNUNET_NO; } + rh->iter (rh->iter_cls, + &rh->key, + enc_msg); + return GNUNET_YES; } + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types * + * @param cls the 'struct GNUNET_DHT_Handle' + * @param msg the incoming message */ -void +static void service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_RouteResultMessage *dht_msg; - struct GNUNET_MessageHeader *enc_msg; - struct GNUNET_DHT_RouteHandle *route_handle; - uint64_t uid; - GNUNET_HashCode uid_hash; - size_t enc_size; - + const struct GNUNET_DHT_RouteResultMessage *dht_msg; + if (msg == NULL) { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received NULL from server, connection down!\n", - "DHT API"); -#endif - GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); - handle->client = GNUNET_CLIENT_connect (handle->sched, - "dht", - handle->cfg); - if (handle->current != NULL) - { - handle->th = NULL; - finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */ - } -#if RETRANSMIT - if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0)) - { - handle->retransmit_stage = DHT_RETRANSMITTING; - handle->current = handle->retransmissions->message; - process_pending_retransmissions(handle); - } -#endif + reconnect (handle); return; } - - switch (ntohs (msg->type)) + if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT) { - case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT: - { - dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg; - uid = GNUNET_ntohll (dht_msg->unique_id); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu)\n", - "DHT API", uid); -#endif - - hash_from_uid (uid, &uid_hash); - route_handle = - GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests, - &uid_hash); - if (route_handle == NULL) /* We have no recollection of this request */ - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), but have no recollection of it!\n", - "DHT API", uid); -#endif - } - else - { - enc_size = - ntohs (dht_msg->header.size) - - sizeof (struct GNUNET_DHT_RouteResultMessage); - GNUNET_assert (enc_size > 0); - enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; - route_handle->iter (route_handle->iter_cls, enc_msg); - } - - break; - } - default: - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "`%s': Received unknown message type %d\n", "DHT API", - ntohs (msg->type)); - } + GNUNET_break (0); + reconnect (handle); + return; } + if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_RouteResultMessage)) + { + GNUNET_break (0); + reconnect (handle); + return; + } + dht_msg = (const struct GNUNET_DHT_RouteResultMessage *) msg; + GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, + &dht_msg->key, + &process_reply, + (void*) dht_msg); GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); @@ -663,22 +479,13 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle)); handle->cfg = cfg; handle->sched = sched; - handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); - handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); - if (handle->client == NULL) + handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); + handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len); + if (GNUNET_NO == try_connect (handle)) { - GNUNET_free (handle); + GNUNET_DHT_disconnect (handle); return NULL; } - handle->outstanding_requests = - GNUNET_CONTAINER_multihashmap_create (ht_len); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Connection to service in progress\n", "DHT API"); -#endif - GNUNET_CLIENT_receive (handle->client, - &service_message_handler, - handle, GNUNET_TIME_UNIT_FOREVER_REL); return handle; } @@ -691,327 +498,73 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, void GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); -#endif - GNUNET_assert (handle != NULL); - if (handle->th != NULL) /* We have a live transmit request */ + struct PendingMessage *pm; + + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size(handle->active_requests)); + if (handle->th != NULL) { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; } - if (handle->current != NULL) /* We are trying to send something now, clean it up */ - GNUNET_free (handle->current); - - if (handle->client != NULL) /* Finally, disconnect from the service */ + while (NULL != (pm = handle->pending_head)) + { + GNUNET_CONTAINER_DLL_remove (handle->pending_head, + handle->pending_tail, + pm); + GNUNET_assert (GNUNET_YES == pm->free_on_send); + if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task) + GNUNET_SCHEDULER_cancel (handle->sched, + pm->timeout_task); + if (NULL != pm->cont) + GNUNET_SCHEDULER_add_continuation (handle->sched, + pm->cont, + pm->cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + pm->in_pending_queue = GNUNET_NO; + GNUNET_free (pm); + } + if (handle->client != NULL) { - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); handle->client = NULL; - } - - GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0); - GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests); + } + GNUNET_CONTAINER_multihashmap_destroy(handle->active_requests); GNUNET_free (handle); } -/** - * Transmit the next pending message, called by notify_transmit_ready - */ -static size_t -transmit_pending_retransmission (void *cls, size_t size, void *buf) -{ - struct GNUNET_DHT_Handle *handle = cls; - size_t tsize; - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending\n", "DHT API"); -#endif - if (buf == NULL) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending buf is NULL\n", "DHT API"); -#endif - finish_retransmission (handle, GNUNET_SYSERR); - return 0; - } - - handle->th = NULL; - - if (handle->current != NULL) - { - tsize = ntohs (handle->current->msg->size); - if (size >= tsize) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending message size %d\n", "DHT API", tsize); -#endif - memcpy (buf, handle->current->msg, tsize); - finish_retransmission (handle, GNUNET_OK); - return tsize; - } - else - { - return 0; - } - } - /* Have no pending request */ - return 0; -} - - -/** - * Iterator called on each result obtained from a generic route - * operation - */ -void -get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) -{ - struct GNUNET_DHT_GetHandle *get_handle = cls; - struct GNUNET_DHT_GetResultMessage *result; - size_t data_size; - char *result_data; - - if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT) - return; - - GNUNET_assert (ntohs (reply->size) >= - sizeof (struct GNUNET_DHT_GetResultMessage)); - result = (struct GNUNET_DHT_GetResultMessage *) reply; - data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage); - - result_data = (char *) &result[1]; /* Set data pointer to end of message */ - - get_handle->get_context.iter (get_handle->get_context.iter_cls, - GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key, - ntohs (result->type), data_size, result_data); -} - - -/** - * Iterator called on each result obtained from a generic route - * operation - */ -void -find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) -{ - struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; - struct GNUNET_MessageHeader *hello; - - if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received wrong type of response to a find peer request...\n"); - return; - } - - - GNUNET_assert (ntohs (reply->size) >= - sizeof (struct GNUNET_MessageHeader)); - hello = (struct GNUNET_MessageHeader *)&reply[1]; - - if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO"); - return; - } - find_peer_handle->find_peer_context.proc (find_peer_handle-> - find_peer_context.proc_cls, - (struct GNUNET_HELLO_Message *)hello); -} - -/** - * Send a message to the DHT telling it to start issuing random GET - * requests every 'frequency' milliseconds. - * - * @param handle handle to the DHT service - * @param frequency delay (in milliseconds) between sending malicious messages - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation - * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not - */ -int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls) -{ - struct GNUNET_DHT_ControlMessage *msg; - struct PendingMessage *pending; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - return GNUNET_NO; - - msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); - msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET); - msg->variable = htons(frequency); - - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = &msg->header; - pending->timeout = GNUNET_TIME_relative_get_forever(); - pending->free_on_send = GNUNET_YES; - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->unique_id = 0; - - if (handle->current == NULL) - { - handle->current = pending; - process_pending_message (handle); - } - else - { - handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - handle->retransmission_buffer = pending; - } - - return GNUNET_YES; -} - -/** - * Send a message to the DHT telling it to issue a single find - * peer request using the peers unique identifier as key. This - * is used to fill the routing table, and is normally controlled - * by the DHT itself. However, for testing and perhaps more - * close control over the DHT, this can be explicitly managed. - * - * @param handle handle to the DHT service - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation - * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not - */ -int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) -{ - struct GNUNET_DHT_ControlMessage *msg; - struct PendingMessage *pending; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - return GNUNET_NO; - - msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); - msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); - - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = &msg->header; - pending->timeout = GNUNET_TIME_relative_get_forever(); - pending->free_on_send = GNUNET_YES; - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->unique_id = 0; - - if (handle->current == NULL) - { - handle->current = pending; - process_pending_message (handle); - } - else - { - handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - handle->retransmission_buffer = pending; - } - - return GNUNET_YES; -} - -/** - * Send a message to the DHT telling it to start issuing random PUT - * requests every 'frequency' milliseconds. - * - * @param handle handle to the DHT service - * @param frequency delay (in milliseconds) between sending malicious messages - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation - * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not - */ -int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls) -{ - struct GNUNET_DHT_ControlMessage *msg; - struct PendingMessage *pending; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - return GNUNET_NO; - msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); - msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT); - msg->variable = htons(frequency); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = &msg->header; - pending->timeout = GNUNET_TIME_relative_get_forever(); - pending->free_on_send = GNUNET_YES; - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->unique_id = 0; +/* ***** Special low-level API providing generic routeing abstraction ***** */ - if (handle->current == NULL) - { - handle->current = pending; - process_pending_message (handle); - } - else - { - handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - handle->retransmission_buffer = pending; - } - - return GNUNET_YES; -} /** - * Send a message to the DHT telling it to start dropping - * all requests received. - * - * @param handle handle to the DHT service - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation + * Timeout for the transmission of a fire&forget-request. Clean it up. * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not + * @param cls the 'struct PendingMessage' + * @param tc scheduler context */ -int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) +static void +timeout_route_request (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_DHT_ControlMessage *msg; - struct PendingMessage *pending; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - return GNUNET_NO; - - msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); - msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); - msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP); - msg->variable = htons(0); - - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = &msg->header; - pending->timeout = GNUNET_TIME_relative_get_forever(); - pending->free_on_send = GNUNET_YES; - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->unique_id = 0; + struct PendingMessage *pending = cls; + struct GNUNET_DHT_Handle *handle; - if (handle->current == NULL) + if (pending->free_on_send != GNUNET_YES) { - handle->current = pending; - process_pending_message (handle); + /* timeouts should only apply to fire & forget requests! */ + GNUNET_break (0); + return; } - else - { - handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - handle->retransmission_buffer = pending; - } - - return GNUNET_YES; + handle = pending->handle; + GNUNET_CONTAINER_DLL_remove (handle->pending_head, + handle->pending_tail, + pending); + if (pending->cont != NULL) + pending->cont (pending->cont_cls, + tc); + GNUNET_free (pending); } @@ -1029,398 +582,259 @@ int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_S * @param iter_cls closure for iter * @param timeout when to abort with an error if we fail to get * a confirmation for the request (when necessary) or how long - * to wait for tramission to the service - * @param cont continuation to call when done; - * reason will be TIMEOUT on error, - * reason will be PREREQ_DONE on success + * to wait for tramission to the service; only applies + * if 'iter' is NULL + * @param cont continuation to call when the request has been transmitted + * the first time to the service * @param cont_cls closure for cont - * * @return handle to stop the request, NULL if the request is "fire and forget" */ struct GNUNET_DHT_RouteHandle * GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, - unsigned int desired_replication_level, + uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const struct GNUNET_MessageHeader *enc, struct GNUNET_TIME_Relative timeout, GNUNET_DHT_ReplyProcessor iter, void *iter_cls, - GNUNET_SCHEDULER_Task cont, void *cont_cls) + GNUNET_SCHEDULER_Task cont, + void *cont_cls) { - struct GNUNET_DHT_RouteHandle *route_handle; struct PendingMessage *pending; struct GNUNET_DHT_RouteMessage *message; + struct GNUNET_DHT_RouteHandle *route_handle; uint16_t msize; - GNUNET_HashCode uid_key; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - return NULL; + uint16_t esize; - if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + esize = ntohs (enc->size); + if (sizeof (struct GNUNET_DHT_RouteMessage) + esize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); return NULL; } - - route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); - memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); - route_handle->iter = iter; - route_handle->iter_cls = iter_cls; - route_handle->dht_handle = handle; - route_handle->uid = handle->uid_gen++; - if (iter != NULL) - { - hash_from_uid (route_handle->uid, &uid_key); - GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, - &uid_key, route_handle, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - } - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid); -#endif - - msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size); - message = GNUNET_malloc (msize); - message->header.size = htons (msize); - message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE); - memcpy (&message->key, key, sizeof (GNUNET_HashCode)); - message->options = htonl (options); - message->desired_replication_level = htonl (desired_replication_level); - message->unique_id = GNUNET_htonll (route_handle->uid); - memcpy (&message[1], enc, ntohs (enc->size)); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); + msize = sizeof (struct GNUNET_DHT_RouteMessage) + esize; + pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + message = (struct GNUNET_DHT_RouteMessage*) &pending[1]; pending->msg = &message->header; - pending->timeout = timeout; - if (iter == NULL) - pending->free_on_send = GNUNET_YES; + pending->handle = handle; pending->cont = cont; pending->cont_cls = cont_cls; - pending->unique_id = route_handle->uid; - if (handle->current == NULL) + + message->header.size = htons (msize); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE); + message->key = *key; + message->options = htonl ((uint32_t) options); + message->desired_replication_level = htonl (desired_replication_level); + memcpy (&message[1], enc, esize); + if (iter != NULL) { - handle->current = pending; - process_pending_message (handle); + route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); + route_handle->key = *key; + route_handle->iter = iter; + route_handle->iter_cls = iter_cls; + route_handle->dht_handle = handle; + route_handle->uid = handle->uid_gen++; + route_handle->message = pending; + message->unique_id = GNUNET_htonll (route_handle->uid); + GNUNET_CONTAINER_multihashmap_put (handle->active_requests, + key, + route_handle, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } else - { - handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - handle->retransmission_buffer = pending; - } - - route_handle->message = message; + { + route_handle = NULL; + pending->free_on_send = GNUNET_YES; + pending->timeout_task = GNUNET_SCHEDULER_add_delayed (handle->sched, + timeout, + &timeout_route_request, + pending); + } + GNUNET_CONTAINER_DLL_insert (handle->pending_head, + handle->pending_tail, + pending); + pending->in_pending_queue = GNUNET_YES; + process_pending_messages (handle); return route_handle; } -/** - * Perform an asynchronous GET operation on the DHT identified. - * - * @param handle handle to the DHT service - * @param timeout how long to wait for transmission of this request to the service - * @param type expected type of the response object - * @param key the key to look up - * @param iter function to call on each result - * @param iter_cls closure for iter - * @param cont continuation to call once message sent - * @param cont_cls closure for continuation - * - * @return handle to stop the async get - */ -struct GNUNET_DHT_GetHandle * -GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_BLOCK_Type type, - const GNUNET_HashCode * key, - GNUNET_DHT_GetIterator iter, - void *iter_cls, - GNUNET_SCHEDULER_Task cont, void *cont_cls) -{ - struct GNUNET_DHT_GetHandle *get_handle; - struct GNUNET_DHT_GetMessage get_msg; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ - return NULL; - - get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); - get_handle->get_context.iter = iter; - get_handle->get_context.iter_cls = iter_cls; - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending get request with key %s\n", "DHT API", - GNUNET_h2s (key)); -#endif - - get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); - get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); - get_msg.type = htons (type); - - get_handle->route_handle = - GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout, - &get_reply_iterator, get_handle, cont, cont_cls); - - return get_handle; -} - - /** * Stop a previously issued routing request * * @param route_handle handle to the request to stop - * @param cont continuation to call once this message is sent to the service or times out - * @param cont_cls closure for the continuation */ void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle) { + struct GNUNET_DHT_Handle *handle; struct PendingMessage *pending; struct GNUNET_DHT_StopMessage *message; size_t msize; - GNUNET_HashCode uid_key; - - msize = sizeof (struct GNUNET_DHT_StopMessage); - message = GNUNET_malloc (msize); - message->header.size = htons (msize); - message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request for uid %llu\n", "DHT API", - route_handle->uid); -#endif - message->unique_id = GNUNET_htonll (route_handle->uid); - memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode)); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = (struct GNUNET_MessageHeader *) message; - pending->timeout = GNUNET_TIME_relative_get_forever(); - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->free_on_send = GNUNET_YES; - pending->unique_id = 0; /* When finished is called, free pending->msg */ - if (route_handle->dht_handle->current == NULL) - { - route_handle->dht_handle->current = pending; - process_pending_message (route_handle->dht_handle); - } - else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING) - { - route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - route_handle->dht_handle->retransmission_buffer = pending; + handle = route_handle->dht_handle; + if (GNUNET_NO == route_handle->message->in_pending_queue) + { + /* need to send stop message */ + msize = sizeof (struct GNUNET_DHT_StopMessage); + pending = GNUNET_malloc (sizeof (struct PendingMessage) + + msize); + message = (struct GNUNET_DHT_StopMessage*) &pending[1]; + pending->msg = &message->header; + message->header.size = htons (msize); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP); + message->unique_id = GNUNET_htonll (route_handle->uid); + message->key = route_handle->key; + pending->handle = handle; + pending->free_on_send = GNUNET_YES; + pending->in_pending_queue = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert (handle->pending_head, + handle->pending_tail, + pending); + process_pending_messages (handle); } else { - GNUNET_free(pending); - GNUNET_break(0); + /* simply remove pending request from message queue before + transmission, no need to transmit STOP request! */ + GNUNET_CONTAINER_DLL_remove (handle->pending_head, + handle->pending_tail, + route_handle->message); } - - hash_from_uid (route_handle->uid, &uid_key); - GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove - (route_handle->dht_handle->outstanding_requests, &uid_key, - route_handle) == GNUNET_YES); - + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (route_handle->dht_handle->active_requests, + &route_handle->key, + route_handle)); GNUNET_free(route_handle->message); GNUNET_free(route_handle); } + +/* ***** Special API for controlling DHT routing maintenance ******* */ + + /** - * Stop async DHT-get. + * Send a control message to the DHT. * - * @param get_handle handle to the GET operation to stop - * @param cont continuation to call once this message is sent to the service or times out - * @param cont_cls closure for the continuation + * @param handle handle to the DHT service + * @param command command + * @param variable variable to the command + * @param cont continuation to call when done (transmitting request to service) + * @param cont_cls closure for cont */ -void -GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +static void +send_control_message (struct GNUNET_DHT_Handle *handle, + uint16_t command, + uint16_t variable, + GNUNET_SCHEDULER_Task cont, + void *cont_cls) { - if ((get_handle->route_handle->dht_handle->current != NULL) && - (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) - { - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } - return; - } + struct GNUNET_DHT_ControlMessage *msg; + struct PendingMessage *pending; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending get request with key %s, uid %llu\n", - "DHT API", GNUNET_h2s (&get_handle->route_handle->key), - get_handle->route_handle->uid); -#endif - GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls); - GNUNET_free (get_handle); + pending = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof(struct GNUNET_DHT_ControlMessage)); + msg = (struct GNUNET_DHT_ControlMessage*) &pending[1]; + pending->msg = &msg->header; + msg->header.size = htons (sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CONTROL); + msg->command = htons (command); + msg->variable = htons (variable); + pending->free_on_send = GNUNET_YES; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->in_pending_queue = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert (handle->pending_head, + handle->pending_tail, + pending); + process_pending_messages (handle); } /** - * Perform an asynchronous FIND PEER operation on the DHT. + * Send a message to the DHT telling it to issue a single find + * peer request using the peers unique identifier as key. This + * is used to fill the routing table, and is normally controlled + * by the DHT itself. However, for testing and perhaps more + * close control over the DHT, this can be explicitly managed. * * @param handle handle to the DHT service - * @param timeout timeout for this request to be sent to the - * service - * @param options routing options for this message - * @param key the key to look up - * @param proc function to call on each result - * @param proc_cls closure for proc - * @param cont continuation to call once message sent - * @param cont_cls closure for continuation - * - * @return handle to stop the async get, NULL on error + * @param cont continuation to call when done (transmitting request to service) + * @param cont_cls closure for cont */ -struct GNUNET_DHT_FindPeerHandle * -GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_DHT_RouteOption options, - const GNUNET_HashCode * key, - GNUNET_DHT_FindPeerProcessor proc, - void *proc_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) +void +GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle, + GNUNET_SCHEDULER_Task cont, + void *cont_cls) { - struct GNUNET_DHT_FindPeerHandle *find_peer_handle; - struct GNUNET_DHT_FindPeerMessage find_peer_msg; + send_control_message (handle, + GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0, + cont, cont_cls); +} - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ - return NULL; - find_peer_handle = - GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle)); - find_peer_handle->find_peer_context.proc = proc; - find_peer_handle->find_peer_context.proc_cls = proc_cls; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending `%s' request with key %s\n", "DHT API", - "FIND PEER", GNUNET_h2s (key)); -#endif - - find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage)); - find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); - find_peer_handle->route_handle = - GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header, - timeout, &find_peer_reply_iterator, - find_peer_handle, cont, cont_cls); - return find_peer_handle; -} +#if HAVE_MALICIOUS /** - * Stop async find peer. Frees associated resources. + * Send a message to the DHT telling it to start issuing random GET + * requests every 'frequency' milliseconds. * - * @param find_peer_handle GET operation to stop. - * @param cont continuation to call once this message is sent to the service or times out - * @param cont_cls closure for the continuation + * @param handle handle to the DHT service + * @param frequency delay between sending malicious messages */ void -GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, + struct GNUNET_TIME_Relative frequency) { - if ((find_peer_handle->route_handle->dht_handle->current != NULL) && - (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) + if (frequency.value > UINT16_MAX) { - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } + GNUNET_break (0); return; } - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending `%s' request with key %s, uid %llu\n", - "DHT API", "FIND PEER", - GNUNET_h2s (&find_peer_handle->route_handle->key), - find_peer_handle->route_handle->uid); -#endif - GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls); - GNUNET_free (find_peer_handle); - + send_control_message (handle, + GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET, frequency.value, + NULL, NULL); } - /** - * Perform a PUT operation storing data in the DHT. - * - * @param handle handle to DHT service - * @param key the key to store under - * @param type type of the value - * @param size number of bytes in data; must be less than 64k - * @param data the data to store - * @param exp desired expiration time for the value - * @param timeout how long to wait for transmission of this request - * @param cont continuation to call when done; - * reason will be TIMEOUT on error, - * reason will be PREREQ_DONE on success - * @param cont_cls closure for cont + * Send a message to the DHT telling it to start issuing random PUT + * requests every 'frequency' milliseconds. * - * @return GNUNET_YES if put message is queued for transmission + * @param handle handle to the DHT service + * @param frequency delay between sending malicious messages */ -void -GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - uint32_t size, - const char *data, - struct GNUNET_TIME_Absolute exp, - struct GNUNET_TIME_Relative timeout, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +void +GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, + struct GNUNET_TIME_Relative frequency) { - struct GNUNET_DHT_PutMessage *put_msg; - struct GNUNET_DHT_RouteHandle *put_route; - size_t msize; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + if (frequency.value > UINT16_MAX) { - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n"); - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } + GNUNET_break (0); return; } + send_control_message (handle, + GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT, frequency.value, + NULL, NULL); +} -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending put request with key %s\n", "DHT API", - GNUNET_h2s (key)); -#endif - - msize = sizeof (struct GNUNET_DHT_PutMessage) + size; - put_msg = GNUNET_malloc (msize); - put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT); - put_msg->header.size = htons (msize); - put_msg->type = htons (type); - put_msg->data_size = htons (size); - put_msg->expiration = GNUNET_TIME_absolute_hton(exp); - memcpy (&put_msg[1], data, size); - - put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL, - NULL, cont, cont_cls); - - if (put_route == NULL) /* Route start failed! */ - { - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n"); - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } - } - else - { - GNUNET_free(put_route); - } - GNUNET_free (put_msg); +/** + * Send a message to the DHT telling it to start dropping + * all requests received. + * + * @param handle handle to the DHT service + */ +void +GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle) +{ + send_control_message (handle, + GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP, 0, + NULL, NULL); } +#endif + /* end of dht_api.c */ diff --git a/src/dht/gnunet-dht-get-peer.c b/src/dht/gnunet-dht-get-peer.c index 04959792b..f6974cddb 100644 --- a/src/dht/gnunet-dht-get-peer.c +++ b/src/dht/gnunet-dht-get-peer.c @@ -78,22 +78,30 @@ static unsigned int result_count; static int ret; static void -shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - if (dht_handle != NULL) - GNUNET_DHT_disconnect (dht_handle); - - dht_handle = NULL; + { + GNUNET_DHT_disconnect (dht_handle); + dht_handle = NULL; + } + fprintf (stderr, + _("Found %u peers\n"), + result_count); } + static void -cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +cleanup_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { if (find_peer_handle != NULL) - GNUNET_DHT_find_peer_stop (find_peer_handle, &shutdown_task, NULL); - else - GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); + { + GNUNET_DHT_find_peer_stop (find_peer_handle); + find_peer_handle = NULL; + } + GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); } /** @@ -103,46 +111,23 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param cls closure (NULL) * @param hello the response message, a HELLO */ -void find_peer_processor (void *cls, - const struct GNUNET_HELLO_Message *hello) +static void +find_peer_processor (void *cls, + const struct GNUNET_HELLO_Message *hello) { struct GNUNET_PeerIdentity peer; + if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer)) { result_count++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(&peer), result_count); - } -} - - -/** - * Signature of the main function of a task. - * - * @param cls closure - * @param tc context information (why was this task triggered now) - */ -void -message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) - { - if (verbose) - fprintf (stderr, - "Failed to send FIND PEER request to service, quitting.\n"); - ret = 1; - GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); - } - else - { if (verbose) - fprintf (stderr, "FIND PEER request sent, awaiting results!\n"); - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining - (absolute_timeout), &cleanup_task, NULL); + fprintf (stderr, + _("Found peer `%s'\n"), + GNUNET_i2s (&peer)); } } + /** * Main function that will be run by the scheduler. * @@ -194,13 +179,19 @@ run (void *cls, find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, timeout, - 0, &key, + GNUNET_DHT_RO_NONE, &find_peer_processor, - NULL, - &message_sent_cont, NULL); - + if (NULL == find_peer_handle) + { + GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); + return; + } + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_TIME_absolute_get_remaining + (absolute_timeout), + &cleanup_task, NULL); } @@ -234,8 +225,10 @@ main (int argc, char *const *argv) return (GNUNET_OK == GNUNET_PROGRAM_run (argc, argv, - "gnunet-dht-get", + "gnunet-dht-get-peer", gettext_noop - ("Issue a GET request to the GNUnet DHT, prints results."), + ("Issue a GET PEER request to the GNUnet DHT, print results."), options, &run, NULL)) ? ret : 1; } + +/* end of gnunet-dht-get-peer */ diff --git a/src/dht/gnunet-dht-get.c b/src/dht/gnunet-dht-get.c index 023188f35..fabe4aeb9 100644 --- a/src/dht/gnunet-dht-get.c +++ b/src/dht/gnunet-dht-get.c @@ -81,23 +81,27 @@ static unsigned int result_count; */ static int ret; + static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - if (dht_handle != NULL) - GNUNET_DHT_disconnect (dht_handle); - - dht_handle = NULL; + { + GNUNET_DHT_disconnect (dht_handle); + dht_handle = NULL; + } } + static void cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { if (get_handle != NULL) - GNUNET_DHT_get_stop (get_handle, &shutdown_task, NULL); - else - GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); + { + GNUNET_DHT_get_stop (get_handle); + get_handle = NULL; + } + GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); } @@ -108,6 +112,10 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param cls closure * @param exp when will this value expire * @param key key of the result + * @param get_path NULL-terminated array of pointers + * to the peers on reverse GET path (or NULL if not recorded) + * @param put_path NULL-terminated array of pointers + * to the peers on the PUT path (or NULL if not recorded) * @param type type of the result * @param size number of bytes in data * @param data pointer to the result data @@ -116,39 +124,17 @@ void get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp, const GNUNET_HashCode * key, - uint32_t type, uint32_t size, const void *data) + const struct GNUNET_PeerIdentity * const *get_path, + const struct GNUNET_PeerIdentity * const *put_path, + enum GNUNET_BLOCK_Type type, + size_t size, + const void *data) { fprintf (stdout, "Result %d, type %d:\n%.*s\n", result_count, type, size, (char *) data); result_count++; } -/** - * Signature of the main function of a task. - * - * @param cls closure - * @param tc context information (why was this task triggered now) - */ -void -message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) - { - if (verbose) - fprintf (stderr, - "Failed to send GET request to service, quitting.\n"); - ret = 1; - GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); - } - else - { - if (verbose) - fprintf (stderr, "GET request sent, awaiting results!\n"); - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining - (absolute_timeout), &cleanup_task, NULL); - } -} /** * Main function that will be run by the scheduler. @@ -198,8 +184,17 @@ run (void *cls, if (verbose) fprintf (stderr, "Issuing GET request for %s!\n", query_key); - get_handle = GNUNET_DHT_get_start (dht_handle, timeout, query_type, &key, - &get_result_iterator, NULL, &message_sent_cont, NULL); + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_TIME_absolute_get_remaining + (absolute_timeout), &cleanup_task, NULL); + get_handle = GNUNET_DHT_get_start (dht_handle, + timeout, + query_type, + &key, + GNUNET_DHT_RO_NONE, + NULL, 0, + NULL, 0, + &get_result_iterator, NULL); } @@ -242,3 +237,5 @@ main (int argc, char *const *argv) ("Issue a GET request to the GNUnet DHT, prints results."), options, &run, NULL)) ? ret : 1; } + +/* end of gnunet-dht-get.c */ diff --git a/src/dht/gnunet-dht-put.c b/src/dht/gnunet-dht-put.c index 9d3b2707b..9f7091b84 100644 --- a/src/dht/gnunet-dht-put.c +++ b/src/dht/gnunet-dht-put.c @@ -79,11 +79,11 @@ static char *data; static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - if (dht_handle != NULL) - GNUNET_DHT_disconnect (dht_handle); - - dht_handle = NULL; + { + GNUNET_DHT_disconnect (dht_handle); + dht_handle = NULL; + } } /** @@ -165,8 +165,14 @@ run (void *cls, fprintf (stderr, "Issuing put request for `%s' with data `%s'!\n", query_key, data); - GNUNET_DHT_put (dht_handle, &key, query_type, strlen (data), data, - expiration, timeout, &message_sent_cont, NULL); + GNUNET_DHT_put (dht_handle, + &key, + GNUNET_DHT_RO_NONE, + query_type, + strlen (data), data, + expiration, + timeout, + &message_sent_cont, NULL); } @@ -215,3 +221,5 @@ main (int argc, char *const *argv) ("Issue a PUT request to the GNUnet DHT insert DATA under KEY."), options, &run, NULL)) ? ret : 1; } + +/* end of gnunet-dht-put.c */ diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h index e4eed20d8..00d9851b9 100644 --- a/src/include/gnunet_dht_service.h +++ b/src/include/gnunet_dht_service.h @@ -62,11 +62,28 @@ struct GNUNET_DHT_FindPeerHandle; /** - * Iterator called on each result obtained from a generic route - * operation + * Options for routing. */ -typedef void (*GNUNET_DHT_MessageCallback)(void *cls, - int code); +enum GNUNET_DHT_RouteOption + { + /** + * Default. Do nothing special. + */ + GNUNET_DHT_RO_NONE = 0, + + /** + * Each peer along the way should look at 'enc' (otherwise + * only the k-peers closest to the key should look at it). + */ + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE = 1, + + /** + * We should keep track of the route that the message + * took in the P2P network. + */ + GNUNET_DHT_RO_RECORD_ROUTE = 2 + }; + /** * Initialize the connection with the DHT service. @@ -92,34 +109,36 @@ void GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle); +/* *************** Standard API: get and put ******************* */ + /** * Perform a PUT operation on the DHT identified by 'table' storing * a binding of 'key' to 'value'. The peer does not have to be part * of the table (if so, we will attempt to locate a peer that is!) * * @param handle handle to DHT service - * @param key the key to store under + * @param key the key to store data under + * @param options routing options for this message * @param type type of the value * @param size number of bytes in data; must be less than 64k * @param data the data to store * @param exp desired expiration time for the data - * @param timeout when to abort with an error if we fail to get - * a confirmation for the PUT from the local DHT service - * @param cont continuation to call when done; - * reason will be TIMEOUT on error, - * reason will be PREREQ_DONE on success + * @param timeout when to abort if we fail to transmit the request + * for the PUT to the local DHT service + * @param cont continuation to call when done (transmitting request to service) * @param cont_cls closure for cont */ void GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, + enum GNUNET_DHT_RouteOption options, enum GNUNET_BLOCK_Type type, - uint32_t size, + size_t size, const char *data, struct GNUNET_TIME_Absolute exp, struct GNUNET_TIME_Relative timeout, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); + GNUNET_SCHEDULER_Task cont, + void *cont_cls); /** @@ -129,6 +148,10 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, * @param cls closure * @param exp when will this value expire * @param key key of the result + * @param get_path NULL-terminated array of pointers + * to the peers on reverse GET path (or NULL if not recorded) + * @param put_path NULL-terminated array of pointers + * to the peers on the PUT path (or NULL if not recorded) * @param type type of the result * @param size number of bytes in data * @param data pointer to the result data @@ -136,70 +159,56 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, typedef void (*GNUNET_DHT_GetIterator)(void *cls, struct GNUNET_TIME_Absolute exp, const GNUNET_HashCode * key, + const struct GNUNET_PeerIdentity * const *get_path, + const struct GNUNET_PeerIdentity * const *put_path, enum GNUNET_BLOCK_Type type, - uint32_t size, + size_t size, const void *data); /** - * Perform an asynchronous GET operation on the DHT. + * Perform an asynchronous GET operation on the DHT. See + * also "GNUNET_BLOCK_evaluate". * * @param handle handle to the DHT service * @param timeout timeout for this request to be sent to the * service (this is NOT a timeout for receiving responses) * @param type expected type of the response object (GNUNET_BLOCK_TYPE_FS_*) * @param key the key to look up + * @param options routing options for this message + * @param bf bloom filter associated with query (can be NULL) + * @param bf_mutator mutation value for bf + * @param xquery extrended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in xquery * @param iter function to call on each result * @param iter_cls closure for iter - * @param cont continuation to call once message sent (and it is now - * safe to do another operation on the DHT) - * @param cont_cls closure for continuation - * @return handle to stop the async get, NULL on error (two - * concurrent operations scheduled) + * @return handle to stop the async get, NULL on error */ struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, - // bf, bf_mutator, xquery, xquery_size + enum GNUNET_DHT_RouteOption options, + const struct GNUNET_CONTAINER_BloomFilter *bf, + int32_t bf_mutator, + const void *xquery, + size_t xquery_size, GNUNET_DHT_GetIterator iter, - void *iter_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); + void *iter_cls); /** * Stop async DHT-get. Frees associated resources. * * @param get_handle GET operation to stop. - * @param cont continuation to call once this message is sent to the service - * @param cont_cls closure for the continuation */ void -GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); - - -/** - * Options for routing. - */ -enum GNUNET_DHT_RouteOption - { - /** - * Default. Do nothing special. - */ - GNUNET_DHT_RO_NONE = 0, +GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle); - /** - * Each peer along the way should look at 'enc' (otherwise - * only the k-peers closest to the key should look at it). - */ - GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE = 1 - }; +/* ******** Special high-level API for finding peers *********** */ /** * Iterator called on each result obtained from a find peer @@ -218,46 +227,43 @@ typedef void (*GNUNET_DHT_FindPeerProcessor)(void *cls, * @param handle handle to the DHT service * @param timeout timeout for this request to be sent to the * service - * @param options routing options for this message * @param key the key to look up + * @param options routing options for this message * @param proc function to call on each result * @param proc_cls closure for proc - * @param cont continuation to call once message sent - * @param cont_cls closure for continuation * @return handle to stop the async get, NULL on error */ struct GNUNET_DHT_FindPeerHandle * GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_TIME_Relative timeout, + const GNUNET_HashCode *key, enum GNUNET_DHT_RouteOption options, - const GNUNET_HashCode * key, GNUNET_DHT_FindPeerProcessor proc, - void *proc_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); + void *proc_cls); /** * Stop async find peer. Frees associated resources. * * @param find_peer_handle GET operation to stop. - * @param cont continuation to call once this message is sent to the service - * @param cont_cls closure for the continuation */ void -GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); +GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle); + +/* ***** Special low-level API providing generic routeing abstraction ***** */ + /** * Iterator called on each result obtained from a generic route * operation * * @param cls closure + * @param key key that was used * @param reply response */ typedef void (*GNUNET_DHT_ReplyProcessor)(void *cls, + const GNUNET_HashCode *key, const struct GNUNET_MessageHeader *reply); @@ -273,18 +279,19 @@ typedef void (*GNUNET_DHT_ReplyProcessor)(void *cls, * @param enc send the encapsulated message to a peer close to the key * @param timeout when to abort with an error if we fail to get * a confirmation for the request (when necessary) or how long - * to wait for transmission to the service + * to wait for transmission to the service; only applies + * if 'iter' is NULL * @param iter function to call on each result, NULL if no replies are expected * @param iter_cls closure for iter - * @param cont continuation to call when done, GNUNET_SYSERR if failed - * GNUNET_OK otherwise + * @param cont continuation to call when the request has been transmitted + * the first time to the service * @param cont_cls closure for cont - * @return handle to stop the request + * @return handle to stop the request, NULL if the request is "fire and forget" */ struct GNUNET_DHT_RouteHandle * GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode *key, - unsigned int desired_replication_level, + uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const struct GNUNET_MessageHeader *enc, struct GNUNET_TIME_Relative timeout, @@ -294,30 +301,18 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, void *cont_cls); + /** - * Stop async route stop. Frees associated resources. + * Stop async route operation. Frees associated resources. * * @param route_handle operation to stop. - * @param cont continuation to call once this message is sent to the service - * @param cont_cls closure for the continuation */ void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle); + + +/* ***** Special API for controlling DHT routing maintenance ******* */ -/** - * Send a message to the DHT telling it to start issuing random GET - * requests every 'frequency' milliseconds. - * - * @param handle handle to the DHT service - * @param frequency delay (in milliseconds) between sending malicious messages - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation - * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not - */ -int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls); /** * Send a message to the DHT telling it to issue a single find @@ -326,26 +321,33 @@ int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequ * by the DHT itself. However, for testing and perhaps more * close control over the DHT, this can be explicitly managed. * + * @param cont continuation to call when done (transmitting request to service) + * @param cont_cls closure for cont * @param handle handle to the DHT service - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation - * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not */ -int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls); +void +GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle, + GNUNET_SCHEDULER_Task cont, + void *cont_cls); + +/* ***** Special API for testing robustness with malicious peers ******* */ + +#if HAVE_MALICIOUS +/* Note that these functions are NOT considered to be part of the + "official" API and hence are NOT subjected to library versioning; + only developers testing GNUnet's robustness should have any use for + them, applications should never use them. Applications must NOT + define "HAVE_MALICIOUS" before including this header. */ /** * Send a message to the DHT telling it to start dropping * all requests received. * * @param handle handle to the DHT service - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation - * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not */ -int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); +void +GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle); + /** * Send a message to the DHT telling it to start issuing random PUT @@ -353,12 +355,25 @@ int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_S * * @param handle handle to the DHT service * @param frequency delay (in milliseconds) between sending malicious messages - * @param cont continuation to call once the message is sent - * @param cont_cls closure for continuation + */ +void +GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, + struct GNUNET_TIME_Relative frequency); + + +/** + * Send a message to the DHT telling it to start issuing random GET + * requests every 'frequency' milliseconds. * - * @return GNUNET_YES if the control message was sent, GNUNET_NO if not + * @param handle handle to the DHT service + * @param frequency delay between sending malicious messages */ -int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls); +void +GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, + struct GNUNET_TIME_Relative frequency); + + +#endif #if 0 /* keep Emacsens' auto-indent happy */ { -- 2.25.1