X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdht%2Fdht_api.c;h=66eaf10647dad7e6a424e3440593f030603d4c46;hb=5b32752cd7b02adcb8e6fec7798637638c6f63a0;hp=4a19422a88b04f79a5ae61b78f34a884d0dd0e75;hpb=71f9f79c47dfacead7cc09961b0671a0eaed3e8c;p=oweals%2Fgnunet.git diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 4a19422a8..66eaf1064 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + Copyright (C) 2009, 2010, 2011, 2012, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -23,192 +23,179 @@ * @brief library to access the DHT service * @author Christian Grothoff * @author Nathan Evans - * - * TODO: retransmission of pending requests maybe happens now, at least - * the code is in place to do so. Need to add checks when api calls - * happen to check if retransmission is in progress, and if so set - * the single pending message for transmission once the list of - * retries are done. */ #include "platform.h" -#include "gnunet_bandwidth_lib.h" -#include "gnunet_client_lib.h" +#include "gnunet_util_lib.h" #include "gnunet_constants.h" -#include "gnunet_container_lib.h" #include "gnunet_arm_service.h" #include "gnunet_hello_lib.h" #include "gnunet_protocols.h" -#include "gnunet_server_lib.h" -#include "gnunet_time_lib.h" #include "gnunet_dht_service.h" #include "dht.h" -#define DEBUG_DHT_API GNUNET_NO +#define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__) + -struct PendingMessage +/** + * Handle to a PUT request. + */ +struct GNUNET_DHT_PutHandle { /** - * Message that is pending + * Kept in a DLL. */ - struct GNUNET_MessageHeader *msg; + struct GNUNET_DHT_PutHandle *next; /** - * Timeout for this message + * Kept in a DLL. */ - struct GNUNET_TIME_Relative timeout; + struct GNUNET_DHT_PutHandle *prev; /** - * Continuation to call on message send - * or message receipt confirmation + * Continuation to call when done. */ - GNUNET_SCHEDULER_Task cont; + GNUNET_DHT_PutContinuation cont; /** - * Continuation closure + * Main handle to this DHT api */ - void *cont_cls; + struct GNUNET_DHT_Handle *dht_handle; /** - * Unique ID for this request + * Closure for @e cont. */ - uint64_t unique_id; - -}; + void *cont_cls; -struct PendingMessageList -{ /** - * This is a singly linked list. + * Unique ID for the PUT operation. */ - struct PendingMessageList *next; + uint64_t unique_id; - /** - * The pending message. - */ - struct PendingMessage *message; }; -struct GNUNET_DHT_GetContext +/** + * Handle to a GET request + */ +struct GNUNET_DHT_GetHandle { + /** * Iterator to call on data receipt */ GNUNET_DHT_GetIterator iter; /** - * Closure for the iterator callback + * Closure for @a iter. */ void *iter_cls; -}; - -struct GNUNET_DHT_FindPeerContext -{ /** - * Iterator to call on data receipt + * Main handle to this DHT api */ - GNUNET_DHT_FindPeerProcessor proc; + struct GNUNET_DHT_Handle *dht_handle; /** - * Closure for the iterator callback + * Array of hash codes over the results that we have already + * seen. */ - void *proc_cls; + struct GNUNET_HashCode *seen_results; -}; + /** + * Key that this get request is for + */ + struct GNUNET_HashCode key; -/** - * Handle to a route request - */ -struct GNUNET_DHT_RouteHandle -{ + /** + * Unique identifier for this request (for key collisions). + */ + uint64_t unique_id; /** - * Unique identifier for this request (for key collisions) + * Size of the extended query, allocated at the end of this struct. */ - uint64_t uid; + size_t xquery_size; /** - * Key that this get request is for + * Desired replication level. */ - GNUNET_HashCode key; + uint32_t desired_replication_level; /** - * Iterator to call on data receipt + * Type of the block we are looking for. */ - GNUNET_DHT_ReplyProcessor iter; + enum GNUNET_BLOCK_Type type; /** - * Closure for the iterator callback + * Routing options. */ - void *iter_cls; + enum GNUNET_DHT_RouteOption options; /** - * Main handle to this DHT api + * Size of the @e seen_results array. Note that not + * all positions might be used (as we over-allocate). */ - struct GNUNET_DHT_Handle *dht_handle; + unsigned int seen_results_size; /** - * The actual message sent for this request, - * used for retransmitting requests on service - * failure/reconnect. Freed on route_stop. + * Offset into the @e seen_results array marking the + * end of the positions that are actually used. */ - struct GNUNET_DHT_RouteMessage *message; + unsigned int seen_results_end; + }; /** - * Handle to control a get operation. + * Handle to a monitoring request. */ -struct GNUNET_DHT_GetHandle +struct GNUNET_DHT_MonitorHandle { /** - * Handle to the actual route operation for the get + * DLL. */ - struct GNUNET_DHT_RouteHandle *route_handle; + struct GNUNET_DHT_MonitorHandle *next; /** - * The context of the get request + * DLL. */ - struct GNUNET_DHT_GetContext get_context; -}; + struct GNUNET_DHT_MonitorHandle *prev; + /** + * Main handle to this DHT api. + */ + struct GNUNET_DHT_Handle *dht_handle; -/** - * Handle to control a find peer operation. - */ -struct GNUNET_DHT_FindPeerHandle -{ /** - * Handle to the actual route operation for the request - */ - struct GNUNET_DHT_RouteHandle *route_handle; - - /** - * The context of the find peer request - */ - struct GNUNET_DHT_FindPeerContext find_peer_context; -}; + * Type of block looked for. + */ + enum GNUNET_BLOCK_Type type; + /** + * Key being looked for, NULL == all. + */ + struct GNUNET_HashCode *key; + + /** + * Callback for each received message of type get. + */ + GNUNET_DHT_MonitorGetCB get_cb; -enum DHT_Retransmit_Stage -{ /** - * The API is not retransmitting anything at this time. + * Callback for each received message of type get response. */ - DHT_NOT_RETRANSMITTING, + GNUNET_DHT_MonitorGetRespCB get_resp_cb; /** - * The API is retransmitting, and nothing has been single - * queued for sending. + * Callback for each received message of type put. */ - DHT_RETRANSMITTING, + GNUNET_DHT_MonitorPutCB put_cb; /** - * The API is retransmitting, and a single message has been - * queued for transmission once finished. + * Closure for @e get_cb, @e put_cb and @e get_resp_cb. */ - DHT_RETRANSMITTING_MESSAGE_QUEUED + void *cb_cls; + }; @@ -217,10 +204,6 @@ enum DHT_Retransmit_Stage */ struct GNUNET_DHT_Handle { - /** - * Our scheduler. - */ - struct GNUNET_SCHEDULER_Handle *sched; /** * Configuration to use. @@ -228,995 +211,1055 @@ struct GNUNET_DHT_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Connection to DHT service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** - * Currently pending transmission request. + * Head of linked list of messages we would like to monitor. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_DHT_MonitorHandle *monitor_head; /** - * Message we are currently sending, only allow - * a single message to be queued. If not unique - * (typically a put request), await a confirmation - * from the service that the message was received. - * If unique, just fire and forget. + * Tail of linked list of messages we would like to monitor. */ - struct PendingMessage *current; + struct GNUNET_DHT_MonitorHandle *monitor_tail; /** - * Hash map containing the current outstanding unique requests + * Head of active PUT requests. */ - struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; + struct GNUNET_DHT_PutHandle *put_head; /** - * Generator for unique ids. + * Tail of active PUT requests. */ - uint64_t uid_gen; + struct GNUNET_DHT_PutHandle *put_tail; + + /** + * Hash map containing the current outstanding unique GET requests + * (values are of type `struct GNUNET_DHT_GetHandle`). + */ + struct GNUNET_CONTAINER_MultiHashMap *active_requests; /** - * Are we currently retransmitting requests? If so queue a _single_ - * new request when received. + * Task for trying to reconnect. */ - enum DHT_Retransmit_Stage retransmit_stage; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** - * Linked list of retranmissions, to be used in the event - * of a dht service disconnect/reconnect. + * How quickly should we retry? Used for exponential back-off on + * connect-errors. */ - struct PendingMessageList *retransmissions; + struct GNUNET_TIME_Relative retry_time; /** - * A single pending message allowed to be scheduled - * during retransmission phase. + * Generator for unique ids. */ - struct PendingMessage *retransmission_buffer; + uint64_t uid_gen; + + }; /** - * Convert unique ID to hash code. + * Try to (re)connect to the DHT service. * - * @param uid unique ID to convert - * @param hash set to uid (extended with zeros) + * @param h DHT handle to reconnect + * @return #GNUNET_YES on success, #GNUNET_NO on failure. + */ +static int +try_connect (struct GNUNET_DHT_Handle *h); + + +/** + * Send GET message for a @a get_handle to DHT. + * + * @param gh GET to generate messages for. */ static void -hash_from_uid (uint64_t uid, - GNUNET_HashCode *hash) +send_get (struct GNUNET_DHT_GetHandle *gh) { - memset (hash, 0, sizeof(GNUNET_HashCode)); - *((uint64_t*)hash) = uid; + struct GNUNET_DHT_Handle *h = gh->dht_handle; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_ClientGetMessage *get_msg; + + env = GNUNET_MQ_msg_extra (get_msg, + gh->xquery_size, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET); + get_msg->options = htonl ((uint32_t) gh->options); + get_msg->desired_replication_level = htonl (gh->desired_replication_level); + get_msg->type = htonl (gh->type); + get_msg->key = gh->key; + get_msg->unique_id = gh->unique_id; + GNUNET_memcpy (&get_msg[1], + &gh[1], + gh->xquery_size); + GNUNET_MQ_send (h->mq, + env); } -#if RETRANSMIT + /** - * Iterator callback to retransmit each outstanding request - * because the connection to the DHT service went down (and - * came back). - * + * Send GET message(s) for indicating which results are already known + * for a @a get_handle to DHT. Complex as we need to send the list of + * known results, which means we may need mulitple messages to block + * known results from the result set. * + * @param gh GET to generate messages for + * @param transmission_offset_start at which offset should we start? */ -static int retransmit_iterator (void *cls, - const GNUNET_HashCode * key, - void *value) +static void +send_get_known_results (struct GNUNET_DHT_GetHandle *gh, + unsigned int transmission_offset_start) { - struct GNUNET_DHT_RouteHandle *route_handle = value; - struct PendingMessageList *pending_message_list; - - pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage)); - pending_message_list->message = (struct PendingMessage *)&pending_message_list[1]; - pending_message_list->message->msg = &route_handle->message->header; - pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever(); - pending_message_list->message->cont = NULL; - pending_message_list->message->cont_cls = NULL; - pending_message_list->message->unique_id = route_handle->uid; - /* Add the new pending message to the front of the retransmission list */ - pending_message_list->next = route_handle->dht_handle->retransmissions; - route_handle->dht_handle->retransmissions = pending_message_list; - - return GNUNET_OK; + struct GNUNET_DHT_Handle *h = gh->dht_handle; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_ClientGetResultSeenMessage *msg; + unsigned int delta; + unsigned int max; + unsigned int transmission_offset; + + max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*msg)) + / sizeof (struct GNUNET_HashCode); + transmission_offset = transmission_offset_start; + while (transmission_offset < gh->seen_results_end) + { + delta = gh->seen_results_end - transmission_offset; + if (delta > max) + delta = max; + env = GNUNET_MQ_msg_extra (msg, + delta * sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN); + msg->key = gh->key; + msg->unique_id = gh->unique_id; + GNUNET_memcpy (&msg[1], + &gh->seen_results[transmission_offset], + sizeof (struct GNUNET_HashCode) * delta); + GNUNET_MQ_send (h->mq, + env); + transmission_offset += delta; + } } -#endif + /** - * Try to (re)connect to the dht service. + * Add the GET request corresponding to the given route handle + * to the pending queue (if it is not already in there). * - * @return GNUNET_YES on success, GNUNET_NO on failure. + * @param cls the `struct GNUNET_DHT_Handle *` + * @param key key for the request (not used) + * @param value the `struct GNUNET_DHT_GetHandle *` + * @return #GNUNET_YES (always) */ static int -try_connect (struct GNUNET_DHT_Handle *handle) +add_get_request_to_pending (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - if (handle->client != NULL) - return GNUNET_OK; - handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); - if (handle->client != NULL) - return GNUNET_YES; -#if DEBUG_STATISTICS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to connect to the dht service!\n")); -#endif - return GNUNET_NO; + struct GNUNET_DHT_Handle *handle = cls; + struct GNUNET_DHT_GetHandle *gh = value; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Retransmitting request related to %s to DHT %p\n", + GNUNET_h2s (key), + handle); + send_get (gh); + send_get_known_results (gh, 0); + return GNUNET_YES; } + /** - * Send complete (or failed), call continuation if we have one. + * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message. + * + * @param mh monitor handle to generate start message for */ static void -finish (struct GNUNET_DHT_Handle *handle, int code) +send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh) { - struct PendingMessage *pos = handle->current; - GNUNET_HashCode uid_hash; - hash_from_uid (pos->unique_id, &uid_hash); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); -#endif - GNUNET_assert (pos != NULL); - - if (pos->cont != NULL) - { - if (code == GNUNET_SYSERR) - GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, - pos->cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - else - GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, - pos->cont_cls, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - } - - if (pos->unique_id == 0) - GNUNET_free(pos->msg); - GNUNET_free (pos); - handle->current = NULL; + struct GNUNET_DHT_Handle *h = mh->dht_handle; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_MonitorStartStopMessage *m; + + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_START); + m->type = htonl (mh->type); + m->get = htons (NULL != mh->get_cb); + m->get_resp = htons (NULL != mh->get_resp_cb); + m->put = htons (NULL != mh->put_cb); + if (NULL != mh->key) + { + m->filter_key = htons(1); + m->key = *mh->key; + } + GNUNET_MQ_send (h->mq, + env); } + /** - * Transmit the next pending message, called by notify_transmit_ready + * Try reconnecting to the dht service. + * + * @param cls a `struct GNUNET_DHT_Handle` */ -static size_t -transmit_pending (void *cls, size_t size, void *buf) +static void +try_reconnect (void *cls) { - struct GNUNET_DHT_Handle *handle = cls; - size_t tsize; - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending\n", "DHT API"); -#endif - if (buf == NULL) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending buf is NULL\n", "DHT API"); -#endif - finish (handle, GNUNET_SYSERR); - return 0; - } - - handle->th = NULL; - - if (handle->current != NULL) - { - tsize = ntohs (handle->current->msg->size); - if (size >= tsize) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending message size %d\n", "DHT API", tsize); -#endif - memcpy (buf, handle->current->msg, tsize); - finish (handle, GNUNET_OK); - return tsize; - } - else - { - return 0; - } - } - /* Have no pending request */ - return 0; + struct GNUNET_DHT_Handle *h = cls; + struct GNUNET_DHT_MonitorHandle *mh; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Reconnecting with DHT %p\n", + h); + h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); + h->reconnect_task = NULL; + if (GNUNET_YES != try_connect (h)) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "DHT reconnect failed!\n"); + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); + return; + } + GNUNET_CONTAINER_multihashmap_iterate (h->active_requests, + &add_get_request_to_pending, + h); + for (mh = h->monitor_head; NULL != mh; mh = mh->next) + send_monitor_start (mh); } + /** - * Try to send messages from list of messages to send + * Try reconnecting to the DHT service. + * + * @param h handle to dht to (possibly) disconnect and reconnect */ static void -process_pending_message (struct GNUNET_DHT_Handle *handle) +do_disconnect (struct GNUNET_DHT_Handle *h) { + struct GNUNET_DHT_PutHandle *ph; + GNUNET_DHT_PutContinuation cont; + void *cont_cls; - if (handle->current == NULL) - return; /* action already pending */ - if (GNUNET_YES != try_connect (handle)) - { - finish (handle, GNUNET_SYSERR); - return; - } - - if (NULL == - (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs (handle-> - current->msg-> - size), - handle->current-> - timeout, GNUNET_YES, - &transmit_pending, - handle))) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dht service.\n"); -#endif - finish (handle, GNUNET_SYSERR); - return; - } -#if DEBUG_DHT_API + if (NULL == h->mq) + return; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", - "DHT API", ntohs (handle->current->msg->size)); -#endif + "Disconnecting from DHT service, will try to reconnect in %s\n", + GNUNET_STRINGS_relative_time_to_string (h->retry_time, + GNUNET_YES)); + /* notify client about all PUTs that (may) have failed due to disconnect */ + while (NULL != (ph = h->put_head)) + { + cont = ph->cont; + cont_cls = ph->cont_cls; + GNUNET_DHT_put_cancel (ph); + if (NULL != cont) + cont (cont_cls, + GNUNET_SYSERR); + } + GNUNET_assert (NULL == h->reconnect_task); + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); } + /** - * Send complete (or failed), call continuation if we have one. - * Forward declaration. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_DHT_Handle *` + * @param error error code */ static void -finish_retransmission (struct GNUNET_DHT_Handle *handle, int code); +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DHT_Handle *h = cls; + + do_disconnect (h); +} -/* Forward declaration */ -static size_t -transmit_pending_retransmission (void *cls, size_t size, void *buf); /** - * Try to send messages from list of messages to send + * Verify integrity of a get monitor message from the service. + * + * @param cls The DHT handle. + * @param msg Monitor get message from the service. + * @return #GNUNET_OK if everything went fine, + * #GNUNET_SYSERR if the message is malformed. */ -static void -process_pending_retransmissions (struct GNUNET_DHT_Handle *handle) +static int +check_monitor_get (void *cls, + const struct GNUNET_DHT_MonitorGetMessage *msg) { + uint32_t plen = ntohl (msg->get_path_length); + uint16_t msize = ntohs (msg->header.size) - sizeof (*msg); - if (handle->current == NULL) - return; /* action already pending */ - if (GNUNET_YES != try_connect (handle)) - { - finish_retransmission (handle, GNUNET_SYSERR); - return; - } - - if (NULL == - (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs (handle-> - current->msg-> - size), - handle->current-> - timeout, GNUNET_YES, - &transmit_pending_retransmission, - handle))) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dht service.\n"); -#endif - finish_retransmission (handle, GNUNET_SYSERR); - return; - } -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", - "DHT API", ntohs (handle->current->msg->size)); -#endif + if ( (plen > UINT16_MAX) || + (plen * sizeof (struct GNUNET_HashCode) != msize) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } + /** - * Send complete (or failed), call continuation if we have one. + * Process a get monitor message from the service. + * + * @param cls The DHT handle. + * @param msg Monitor get message from the service. */ static void -finish_retransmission (struct GNUNET_DHT_Handle *handle, int code) +handle_monitor_get (void *cls, + const struct GNUNET_DHT_MonitorGetMessage *msg) { - struct PendingMessage *pos = handle->current; - struct PendingMessageList *pending_list; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API"); -#endif - GNUNET_assert (pos == handle->retransmissions->message); - pending_list = handle->retransmissions; - handle->retransmissions = handle->retransmissions->next; - GNUNET_free (pending_list); - - if (handle->retransmissions == NULL) - { - handle->retransmit_stage = DHT_NOT_RETRANSMITTING; - } - - if (handle->retransmissions != NULL) - { - handle->current = handle->retransmissions->message; - process_pending_retransmissions(handle); - } - else if (handle->retransmission_buffer != NULL) - { - handle->current = handle->retransmission_buffer; - process_pending_message(handle); - } + struct GNUNET_DHT_Handle *handle = cls; + struct GNUNET_DHT_MonitorHandle *mh; + + for (mh = handle->monitor_head; NULL != mh; mh = mh->next) + { + if (NULL == mh->get_cb) + continue; + if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) || + (mh->type == ntohl (msg->type)) ) && + ( (NULL == mh->key) || + (0 == memcmp (mh->key, + &msg->key, + sizeof (struct GNUNET_HashCode))) ) ) + mh->get_cb (mh->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + ntohl (msg->get_path_length), + (struct GNUNET_PeerIdentity *) &msg[1], + &msg->key); + } } + /** - * Handler for messages received from the DHT service - * a demultiplexer which handles numerous message types + * Validate a get response monitor message from the service. * + * @param cls The DHT handle. + * @param msg monitor get response message from the service + * @return #GNUNET_OK if everything went fine, + * #GNUNET_SYSERR if the message is malformed. */ -void -service_message_handler (void *cls, - const struct GNUNET_MessageHeader *msg) +static int +check_monitor_get_resp (void *cls, + const struct GNUNET_DHT_MonitorGetRespMessage *msg) { - struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_RouteResultMessage *dht_msg; - struct GNUNET_MessageHeader *enc_msg; - struct GNUNET_DHT_RouteHandle *route_handle; - uint64_t uid; - GNUNET_HashCode uid_hash; - size_t enc_size; - - if (msg == NULL) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received NULL from server, connection down!\n", - "DHT API"); -#endif - GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); - handle->client = GNUNET_CLIENT_connect (handle->sched, - "dht", - handle->cfg); - if (handle->current != NULL) - { - finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */ - } -#if RETRANSMIT - if (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0) - { - handle->retransmit_stage = DHT_RETRANSMITTING; - handle->current = handle->retransmissions->message; - process_pending_retransmissions(handle); - } -#endif - return; - } - - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT: - { - dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg; - uid = GNUNET_ntohll (dht_msg->unique_id); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu)\n", - "DHT API", uid); -#endif - - hash_from_uid (uid, &uid_hash); - route_handle = - GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests, - &uid_hash); - if (route_handle == NULL) /* We have no recollection of this request */ - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), but have no recollection of it!\n", - "DHT API", uid); -#endif - } - else - { - enc_size = - ntohs (dht_msg->header.size) - - sizeof (struct GNUNET_DHT_RouteResultMessage); - GNUNET_assert (enc_size > 0); - enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; - route_handle->iter (route_handle->iter_cls, enc_msg); - } - - break; - } - default: - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "`%s': Received unknown message type %d\n", "DHT API", - ntohs (msg->type)); - } - } - GNUNET_CLIENT_receive (handle->client, - &service_message_handler, - handle, GNUNET_TIME_UNIT_FOREVER_REL); + size_t msize = ntohs (msg->header.size) - sizeof (*msg); + uint32_t getl = ntohl (msg->get_path_length); + uint32_t putl = ntohl (msg->put_path_length); + if ( (getl + putl < getl) || + ( (msize / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Initialize the connection with the DHT service. - * - * @param sched scheduler to use - * @param cfg configuration to use - * @param ht_len size of the internal hash table to use for - * processing multiple GET/FIND requests in parallel + * Process a get response monitor message from the service. * - * @return handle to the DHT service, or NULL on error + * @param cls The DHT handle. + * @param msg monitor get response message from the service */ -struct GNUNET_DHT_Handle * -GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, - const struct GNUNET_CONFIGURATION_Handle *cfg, - unsigned int ht_len) +static void +handle_monitor_get_resp (void *cls, + const struct GNUNET_DHT_MonitorGetRespMessage *msg) { - struct GNUNET_DHT_Handle *handle; - - handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle)); - handle->cfg = cfg; - handle->sched = sched; - handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); - handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); - if (handle->client == NULL) - { - GNUNET_free (handle); - return NULL; - } - handle->outstanding_requests = - GNUNET_CONTAINER_multihashmap_create (ht_len); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Connection to service in progress\n", "DHT API"); -#endif - GNUNET_CLIENT_receive (handle->client, - &service_message_handler, - handle, GNUNET_TIME_UNIT_FOREVER_REL); - return handle; + struct GNUNET_DHT_Handle *handle = cls; + size_t msize = ntohs (msg->header.size) - sizeof (*msg); + const struct GNUNET_PeerIdentity *path; + uint32_t getl = ntohl (msg->get_path_length); + uint32_t putl = ntohl (msg->put_path_length); + struct GNUNET_DHT_MonitorHandle *mh; + + path = (const struct GNUNET_PeerIdentity *) &msg[1]; + for (mh = handle->monitor_head; NULL != mh; mh = mh->next) + { + if (NULL == mh->get_resp_cb) + continue; + if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) || + (mh->type == ntohl(msg->type)) ) && + ( (NULL == mh->key) || + (0 == memcmp (mh->key, + &msg->key, + sizeof (struct GNUNET_HashCode))) ) ) + mh->get_resp_cb (mh->cb_cls, + (enum GNUNET_BLOCK_Type) ntohl (msg->type), + path, + getl, + &path[getl], + putl, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (const void *) &path[getl + putl], + msize - sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); + } } /** - * Shutdown connection with the DHT service. + * Check validity of a put monitor message from the service. * - * @param handle handle of the DHT connection to stop + * @param cls The DHT handle. + * @param msg Monitor put message from the service. + * @return #GNUNET_OK if everything went fine, + * #GNUNET_SYSERR if the message is malformed. */ -void -GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) +static int +check_monitor_put (void *cls, + const struct GNUNET_DHT_MonitorPutMessage *msg) { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); -#endif - GNUNET_assert (handle != NULL); - if (handle->th != NULL) /* We have a live transmit request in the Aether */ - { - GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); - handle->th = NULL; - } - if (handle->current != NULL) /* We are trying to send something now, clean it up */ - GNUNET_free (handle->current); - - if (handle->client != NULL) /* Finally, disconnect from the service */ - { - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); - handle->client = NULL; - } - - GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0); - GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests); - GNUNET_free (handle); + size_t msize; + uint32_t putl; + + msize = ntohs (msg->header.size) - sizeof (*msg); + putl = ntohl (msg->put_path_length); + if ((msize / sizeof (struct GNUNET_PeerIdentity)) < putl) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Transmit the next pending message, called by notify_transmit_ready + * Process a put monitor message from the service. + * + * @param cls The DHT handle. + * @param msg Monitor put message from the service. */ -static size_t -transmit_pending_retransmission (void *cls, size_t size, void *buf) +static void +handle_monitor_put (void *cls, + const struct GNUNET_DHT_MonitorPutMessage *msg) { struct GNUNET_DHT_Handle *handle = cls; - size_t tsize; + size_t msize = ntohs (msg->header.size) - sizeof (*msg); + uint32_t putl = ntohl (msg->put_path_length); + const struct GNUNET_PeerIdentity *path; + struct GNUNET_DHT_MonitorHandle *mh; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending\n", "DHT API"); -#endif - if (buf == NULL) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending buf is NULL\n", "DHT API"); -#endif - finish_retransmission (handle, GNUNET_SYSERR); - return 0; - } - - handle->th = NULL; - - if (handle->current != NULL) - { - tsize = ntohs (handle->current->msg->size); - if (size >= tsize) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending message size %d\n", "DHT API", tsize); -#endif - memcpy (buf, handle->current->msg, tsize); - finish_retransmission (handle, GNUNET_OK); - return tsize; - } - else - { - return 0; - } - } - /* Have no pending request */ - return 0; + path = (const struct GNUNET_PeerIdentity *) &msg[1]; + for (mh = handle->monitor_head; NULL != mh; mh = mh->next) + { + if (NULL == mh->put_cb) + continue; + if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) || + (mh->type == ntohl(msg->type)) ) && + ( (NULL == mh->key) || + (0 == memcmp (mh->key, + &msg->key, + sizeof (struct GNUNET_HashCode))) ) ) + mh->put_cb (mh->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + putl, + path, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (const void *) &path[putl], + msize - sizeof (struct GNUNET_PeerIdentity) * putl); + } } /** - * Iterator called on each result obtained from a generic route - * operation + * Verify that client result message received from the service is well-formed. + * + * @param cls The DHT handle. + * @param msg Monitor put message from the service. + * @return #GNUNET_OK if everything went fine, + * #GNUNET_SYSERR if the message is malformed. */ -void -get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) +static int +check_client_result (void *cls, + const struct GNUNET_DHT_ClientResultMessage *msg) { - struct GNUNET_DHT_GetHandle *get_handle = cls; - struct GNUNET_DHT_GetResultMessage *result; - size_t data_size; - char *result_data; - - if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT) - return; - - GNUNET_assert (ntohs (reply->size) >= - sizeof (struct GNUNET_DHT_GetResultMessage)); - result = (struct GNUNET_DHT_GetResultMessage *) reply; - data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage); + size_t msize = ntohs (msg->header.size) - sizeof (*msg); + uint32_t put_path_length = ntohl (msg->put_path_length); + uint32_t get_path_length = ntohl (msg->get_path_length); + size_t meta_length; + + meta_length = + sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length); + if ( (msize < meta_length) || + (get_path_length > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || + (put_path_length > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} - result_data = (char *) &result[1]; /* Set data pointer to end of message */ - get_handle->get_context.iter (get_handle->get_context.iter_cls, - result->expiration, &result->key, - ntohs (result->type), data_size, result_data); +/** + * Process a given reply that might match the given request. + * + * @param cls the `struct GNUNET_DHT_ClientResultMessage` + * @param key query of the request + * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key + * @return #GNUNET_YES to continue to iterate over all results + */ +static int +process_client_result (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + const struct GNUNET_DHT_ClientResultMessage *crm = cls; + struct GNUNET_DHT_GetHandle *get_handle = value; + size_t msize = ntohs (crm->header.size) - sizeof (*crm); + uint32_t put_path_length = ntohl (crm->put_path_length); + uint32_t get_path_length = ntohl (crm->get_path_length); + const struct GNUNET_PeerIdentity *put_path; + const struct GNUNET_PeerIdentity *get_path; + struct GNUNET_HashCode hc; + size_t data_length; + size_t meta_length; + const void *data; + + if (crm->unique_id != get_handle->unique_id) + { + /* UID mismatch */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring reply for %s: UID mismatch: %llu/%llu\n", + GNUNET_h2s (key), + crm->unique_id, + get_handle->unique_id); + return GNUNET_YES; + } + /* FIXME: might want to check that type matches */ + meta_length = + sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length); + data_length = msize - meta_length; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Giving %u byte reply for %s to application\n", + (unsigned int) data_length, + GNUNET_h2s (key)); + put_path = (const struct GNUNET_PeerIdentity *) &crm[1]; + get_path = &put_path[put_path_length]; + data = &get_path[get_path_length]; + /* remember that we've seen this result */ + GNUNET_CRYPTO_hash (data, + data_length, + &hc); + if (get_handle->seen_results_size == get_handle->seen_results_end) + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_size, + get_handle->seen_results_size * 2 + 1); + get_handle->seen_results[get_handle->seen_results_end++] = hc; + /* no need to block it explicitly, service already knows about it! */ + get_handle->iter (get_handle->iter_cls, + GNUNET_TIME_absolute_ntoh (crm->expiration), + key, + get_path, + get_path_length, + put_path, + put_path_length, + ntohl (crm->type), + data_length, + data); + return GNUNET_YES; } /** - * Iterator called on each result obtained from a generic route - * operation + * Process a client result message received from the service. + * + * @param cls The DHT handle. + * @param msg Monitor put message from the service. */ -void -find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) +static void +handle_client_result (void *cls, + const struct GNUNET_DHT_ClientResultMessage *msg) { - struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; - struct GNUNET_MessageHeader *hello; - size_t hello_size; - - if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received wrong type of response to a find peer request...\n"); - return; - } - - - GNUNET_assert (ntohs (reply->size) >= - sizeof (struct GNUNET_MessageHeader)); - hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader); - hello = (struct GNUNET_MessageHeader *)&reply[1]; - - if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO"); - return; - } - find_peer_handle->find_peer_context.proc (find_peer_handle-> - find_peer_context.proc_cls, - (struct GNUNET_HELLO_Message *)hello); + struct GNUNET_DHT_Handle *handle = cls; + + GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, + &msg->key, + &process_client_result, + (void *) msg); } + /** - * Perform an asynchronous FIND_PEER operation on the DHT. + * Process a put confirmation message from the service. * - * @param handle handle to the DHT service - * @param key the key to look up - * @param desired_replication_level how many peers should ultimately receive - * this message (advisory only, target may be too high for the - * given DHT or not hit exactly). - * @param options options for routing - * @param enc send the encapsulated message to a peer close to the key - * @param iter function to call on each result, NULL if no replies are expected - * @param iter_cls closure for iter - * @param timeout when to abort with an error if we fail to get - * a confirmation for the request (when necessary) or how long - * to wait for tramission to the service - * @param cont continuation to call when done; - * reason will be TIMEOUT on error, - * reason will be PREREQ_DONE on success - * @param cont_cls closure for cont - * - * @return handle to stop the request, NULL if the request is "fire and forget" + * @param cls The DHT handle. + * @param msg confirmation message from the service. */ -struct GNUNET_DHT_RouteHandle * -GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, - const GNUNET_HashCode * key, - unsigned int desired_replication_level, - enum GNUNET_DHT_RouteOption options, - const struct GNUNET_MessageHeader *enc, - struct GNUNET_TIME_Relative timeout, - GNUNET_DHT_ReplyProcessor iter, - void *iter_cls, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +static void +handle_put_confirmation (void *cls, + const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) { - struct GNUNET_DHT_RouteHandle *route_handle; - struct PendingMessage *pending; - struct GNUNET_DHT_RouteMessage *message; - uint16_t msize; - GNUNET_HashCode uid_key; + struct GNUNET_DHT_Handle *handle = cls; + struct GNUNET_DHT_PutHandle *ph; + GNUNET_DHT_PutContinuation cont; + void *cont_cls; - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - return NULL; + for (ph = handle->put_head; NULL != ph; ph = ph->next) + if (ph->unique_id == msg->unique_id) + break; + if (NULL == ph) + return; + cont = ph->cont; + cont_cls = ph->cont_cls; + GNUNET_DHT_put_cancel (ph); + if (NULL != cont) + cont (cont_cls, + GNUNET_OK); +} - if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - return NULL; - } - - route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); - memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); - route_handle->iter = iter; - route_handle->iter_cls = iter_cls; - route_handle->dht_handle = handle; - if (iter != NULL) - { - route_handle->uid = handle->uid_gen++; - hash_from_uid (route_handle->uid, &uid_key); - GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, - &uid_key, route_handle, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - } - else - { - route_handle->uid = 0; - } - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid); -#endif - - msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size); - message = GNUNET_malloc (msize); - message->header.size = htons (msize); - message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE); - memcpy (&message->key, key, sizeof (GNUNET_HashCode)); - message->options = htonl (options); - message->desired_replication_level = htonl (options); - message->unique_id = GNUNET_htonll (route_handle->uid); - memcpy (&message[1], enc, ntohs (enc->size)); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = &message->header; - pending->timeout = timeout; - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->unique_id = route_handle->uid; - if (handle->current == NULL) - { - handle->current = pending; - process_pending_message (handle); - } - else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING)) + +/** + * Try to (re)connect to the DHT service. + * + * @param h DHT handle to reconnect + * @return #GNUNET_YES on success, #GNUNET_NO on failure. + */ +static int +try_connect (struct GNUNET_DHT_Handle *h) +{ + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (monitor_get, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, + struct GNUNET_DHT_MonitorGetMessage, + h), + GNUNET_MQ_hd_var_size (monitor_get_resp, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP, + struct GNUNET_DHT_MonitorGetRespMessage, + h), + GNUNET_MQ_hd_var_size (monitor_put, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT, + struct GNUNET_DHT_MonitorPutMessage, + h), + GNUNET_MQ_hd_var_size (client_result, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT, + struct GNUNET_DHT_ClientResultMessage, + h), + GNUNET_MQ_hd_fixed_size (put_confirmation, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK, + struct GNUNET_DHT_ClientPutConfirmationMessage, + h), + GNUNET_MQ_handler_end () + }; + if (NULL != h->mq) + return GNUNET_OK; + h->mq = GNUNET_CLIENT_connect (h->cfg, + "dht", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) { - handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - handle->retransmission_buffer = pending; + LOG (GNUNET_ERROR_TYPE_WARNING, + "Failed to connect to the DHT service!\n"); + return GNUNET_NO; } - - route_handle->message = message; - return route_handle; + return GNUNET_YES; } /** - * Perform an asynchronous GET operation on the DHT identified. - * - * @param handle handle to the DHT service - * @param timeout how long to wait for transmission of this request to the service - * @param type expected type of the response object - * @param key the key to look up - * @param iter function to call on each result - * @param iter_cls closure for iter - * @param cont continuation to call once message sent - * @param cont_cls closure for continuation + * Initialize the connection with the DHT service. * - * @return handle to stop the async get + * @param cfg configuration to use + * @param ht_len size of the internal hash table to use for + * processing multiple GET/FIND requests in parallel + * @return handle to the DHT service, or NULL on error */ -struct GNUNET_DHT_GetHandle * -GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, - uint32_t type, - const GNUNET_HashCode * key, - GNUNET_DHT_GetIterator iter, - void *iter_cls, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +struct GNUNET_DHT_Handle * +GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, + unsigned int ht_len) { - struct GNUNET_DHT_GetHandle *get_handle; - struct GNUNET_DHT_GetMessage get_msg; + struct GNUNET_DHT_Handle *handle; - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ + handle = GNUNET_new (struct GNUNET_DHT_Handle); + handle->cfg = cfg; + handle->uid_gen + = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT64_MAX); + handle->active_requests + = GNUNET_CONTAINER_multihashmap_create (ht_len, + GNUNET_YES); + if (GNUNET_NO == try_connect (handle)) + { + GNUNET_DHT_disconnect (handle); return NULL; + } + return handle; +} - get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); - get_handle->get_context.iter = iter; - get_handle->get_context.iter_cls = iter_cls; - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending get request with key %s\n", "DHT API", - GNUNET_h2s (key)); -#endif - - get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); - get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); - get_msg.type = htons (type); - get_handle->route_handle = - GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout, - &get_reply_iterator, get_handle, cont, cont_cls); +/** + * Shutdown connection with the DHT service. + * + * @param handle handle of the DHT connection to stop + */ +void +GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) +{ + struct GNUNET_DHT_PutHandle *ph; - return get_handle; + GNUNET_assert (0 == + GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); + while (NULL != (ph = handle->put_head)) + { + if (NULL != ph->cont) + ph->cont (ph->cont_cls, + GNUNET_SYSERR); + GNUNET_DHT_put_cancel (ph); + } + if (NULL != handle->mq) + { + GNUNET_MQ_destroy (handle->mq); + handle->mq = NULL; + } + if (NULL != handle->reconnect_task) + { + GNUNET_SCHEDULER_cancel (handle->reconnect_task); + handle->reconnect_task = NULL; + } + GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); + GNUNET_free (handle); } /** - * Stop a previously issued routing request + * Perform a PUT operation storing data in the DHT. FIXME: we should + * change the protocol to get a confirmation for the PUT from the DHT + * and call 'cont' only after getting the confirmation; otherwise, the + * client has no good way of telling if the 'PUT' message actually got + * to the DHT service! * - * @param route_handle handle to the request to stop - * @param cont continuation to call once this message is sent to the service or times out - * @param cont_cls closure for the continuation + * @param handle handle to DHT service + * @param key the key to store under + * @param desired_replication_level estimate of how many + * nearest peers this request should reach + * @param options routing options for this message + * @param type type of the value + * @param size number of bytes in data; must be less than 64k + * @param data the data to store + * @param exp desired expiration time for the value + * @param cont continuation to call when done (transmitting request to service) + * You must not call #GNUNET_DHT_disconnect in this continuation + * @param cont_cls closure for @a cont */ -void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +struct GNUNET_DHT_PutHandle * +GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_HashCode *key, + uint32_t desired_replication_level, + enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + size_t size, + const void *data, + struct GNUNET_TIME_Absolute exp, + GNUNET_DHT_PutContinuation cont, + void *cont_cls) { - struct PendingMessage *pending; - struct GNUNET_DHT_StopMessage *message; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_ClientPutMessage *put_msg; size_t msize; - GNUNET_HashCode uid_key; + struct GNUNET_DHT_PutHandle *ph; - msize = sizeof (struct GNUNET_DHT_StopMessage); - message = GNUNET_malloc (msize); - message->header.size = htons (msize); - message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request for uid %llu\n", "DHT API", - route_handle->uid); -#endif - message->unique_id = GNUNET_htonll (route_handle->uid); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); - pending->msg = (struct GNUNET_MessageHeader *) message; - pending->timeout = GNUNET_TIME_relative_get_forever(); - pending->cont = cont; - pending->cont_cls = cont_cls; - pending->unique_id = 0; /* When finished is called, free pending->msg */ - - if (route_handle->dht_handle->current == NULL) - { - route_handle->dht_handle->current = pending; - process_pending_message (route_handle->dht_handle); - } - else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)) - { - route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; - route_handle->dht_handle->retransmission_buffer = pending; - } - else - { - GNUNET_break(0); - } - - hash_from_uid (route_handle->uid, &uid_key); - GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove - (route_handle->dht_handle->outstanding_requests, &uid_key, - route_handle) == GNUNET_YES); - - GNUNET_free(route_handle->message); - GNUNET_free(route_handle); + msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; + if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) + { + GNUNET_break (0); + return NULL; + } + if (NULL == handle->mq) + return NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending PUT for %s to DHT via %p\n", + GNUNET_h2s (key), + handle); + ph = GNUNET_new (struct GNUNET_DHT_PutHandle); + ph->dht_handle = handle; + ph->cont = cont; + ph->cont_cls = cont_cls; + ph->unique_id = ++handle->uid_gen; + GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, + handle->put_tail, + ph); + env = GNUNET_MQ_msg_extra (put_msg, + size, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); + put_msg->type = htonl ((uint32_t) type); + put_msg->options = htonl ((uint32_t) options); + put_msg->desired_replication_level = htonl (desired_replication_level); + put_msg->unique_id = ph->unique_id; + put_msg->expiration = GNUNET_TIME_absolute_hton (exp); + put_msg->key = *key; + GNUNET_memcpy (&put_msg[1], + data, + size); + GNUNET_MQ_send (handle->mq, + env); + return ph; } /** - * Stop async DHT-get. + * Cancels a DHT PUT operation. Note that the PUT request may still + * go out over the network (we can't stop that); However, if the PUT + * has not yet been sent to the service, cancelling the PUT will stop + * this from happening (but there is no way for the user of this API + * to tell if that is the case). The only use for this API is to + * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because + * the system is shutting down). * - * @param get_handle handle to the GET operation to stop - * @param cont continuation to call once this message is sent to the service or times out - * @param cont_cls closure for the continuation + * @param ph put operation to cancel ('cont' will no longer be called) */ void -GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) { - if ((get_handle->route_handle->dht_handle->current != NULL) && - (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) - { - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } - return; - } - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending get request with key %s, uid %llu\n", - "DHT API", GNUNET_h2s (&get_handle->route_handle->key), - get_handle->route_handle->uid); -#endif - GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls); - GNUNET_free (get_handle); + struct GNUNET_DHT_Handle *handle = ph->dht_handle; + + GNUNET_CONTAINER_DLL_remove (handle->put_head, + handle->put_tail, + ph); + GNUNET_free (ph); } /** - * Perform an asynchronous FIND PEER operation on the DHT. + * Perform an asynchronous GET operation on the DHT identified. See + * also #GNUNET_BLOCK_evaluate. * * @param handle handle to the DHT service - * @param timeout timeout for this request to be sent to the - * service - * @param options routing options for this message + * @param type expected type of the response object * @param key the key to look up - * @param proc function to call on each result - * @param proc_cls closure for proc - * @param cont continuation to call once message sent - * @param cont_cls closure for continuation - * - * @return handle to stop the async get, NULL on error + * @param desired_replication_level estimate of how many + nearest peers this request should reach + * @param options routing options for this message + * @param xquery extended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in @a xquery + * @param iter function to call on each result + * @param iter_cls closure for @a iter + * @return handle to stop the async get */ -struct GNUNET_DHT_FindPeerHandle * -GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_DHT_RouteOption options, - const GNUNET_HashCode * key, - GNUNET_DHT_FindPeerProcessor proc, - void *proc_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) +struct GNUNET_DHT_GetHandle * +GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_HashCode *key, + uint32_t desired_replication_level, + enum GNUNET_DHT_RouteOption options, + const void *xquery, + size_t xquery_size, + GNUNET_DHT_GetIterator iter, + void *iter_cls) { - struct GNUNET_DHT_FindPeerHandle *find_peer_handle; - struct GNUNET_MessageHeader find_peer_msg; + struct GNUNET_DHT_GetHandle *gh; + size_t msize; - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ + msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size; + if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) + { + GNUNET_break (0); return NULL; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending query for %s to DHT %p\n", + GNUNET_h2s (key), + handle); + gh = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle) + + xquery_size); + gh->iter = iter; + gh->iter_cls = iter_cls; + gh->dht_handle = handle; + gh->key = *key; + gh->unique_id = ++handle->uid_gen; + gh->xquery_size = xquery_size; + gh->desired_replication_level = desired_replication_level; + gh->type = type; + gh->options = options; + GNUNET_memcpy (&gh[1], + xquery, + xquery_size); + GNUNET_CONTAINER_multihashmap_put (handle->active_requests, + &gh->key, + gh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + if (NULL != handle->mq) + send_get (gh); + return gh; +} - find_peer_handle = - GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle)); - find_peer_handle->find_peer_context.proc = proc; - find_peer_handle->find_peer_context.proc_cls = proc_cls; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending `%s' request with key %s\n", "DHT API", - "FIND PEER", GNUNET_h2s (key)); -#endif - - find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader)); - find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); - find_peer_handle->route_handle = - GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg, - timeout, &find_peer_reply_iterator, - find_peer_handle, cont, cont_cls); - return find_peer_handle; +/** + * Tell the DHT not to return any of the following known results + * to this client. + * + * @param get_handle get operation for which results should be filtered + * @param num_results number of results to be blocked that are + * provided in this call (size of the @a results array) + * @param results array of hash codes over the 'data' of the results + * to be blocked + */ +void +GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, + unsigned int num_results, + const struct GNUNET_HashCode *results) +{ + unsigned int needed; + unsigned int had; + + had = get_handle->seen_results_end; + needed = had + num_results; + if (needed > get_handle->seen_results_size) + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_size, + needed); + GNUNET_memcpy (&get_handle->seen_results[get_handle->seen_results_end], + results, + num_results * sizeof (struct GNUNET_HashCode)); + get_handle->seen_results_end += num_results; + if (NULL != get_handle->dht_handle->mq) + send_get_known_results (get_handle, + had); } + /** - * Stop async find peer. Frees associated resources. + * Stop async DHT-get. * - * @param find_peer_handle GET operation to stop. - * @param cont continuation to call once this message is sent to the service or times out - * @param cont_cls closure for the continuation + * @param get_handle handle to the GET operation to stop */ void -GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) { - if ((find_peer_handle->route_handle->dht_handle->current != NULL) && - (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) - { - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } - return; - } - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending `%s' request with key %s, uid %llu\n", - "DHT API", "FIND PEER", - GNUNET_h2s (&find_peer_handle->route_handle->key), - find_peer_handle->route_handle->uid); -#endif - GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls); - GNUNET_free (find_peer_handle); + struct GNUNET_DHT_Handle *handle = get_handle->dht_handle; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending STOP for %s to DHT via %p\n", + GNUNET_h2s (&get_handle->key), + handle); + if (NULL != handle->mq) + { + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_ClientGetStopMessage *stop_msg; + + env = GNUNET_MQ_msg (stop_msg, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP); + stop_msg->reserved = htonl (0); + stop_msg->unique_id = get_handle->unique_id; + stop_msg->key = get_handle->key; + GNUNET_MQ_send (handle->mq, + env); + } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (handle->active_requests, + &get_handle->key, + get_handle)); + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_end, + 0); + GNUNET_free (get_handle); } /** - * Perform a PUT operation storing data in the DHT. + * Start monitoring the local DHT service. * - * @param handle handle to DHT service - * @param key the key to store under - * @param type type of the value - * @param size number of bytes in data; must be less than 64k - * @param data the data to store - * @param exp desired expiration time for the value - * @param timeout how long to wait for transmission of this request - * @param cont continuation to call when done; - * reason will be TIMEOUT on error, - * reason will be PREREQ_DONE on success - * @param cont_cls closure for cont + * @param handle Handle to the DHT service. + * @param type Type of blocks that are of interest. + * @param key Key of data of interest, NULL for all. + * @param get_cb Callback to process monitored get messages. + * @param get_resp_cb Callback to process monitored get response messages. + * @param put_cb Callback to process monitored put messages. + * @param cb_cls Closure for callbacks. + * @return Handle to stop monitoring. + */ +struct GNUNET_DHT_MonitorHandle * +GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_HashCode *key, + GNUNET_DHT_MonitorGetCB get_cb, + GNUNET_DHT_MonitorGetRespCB get_resp_cb, + GNUNET_DHT_MonitorPutCB put_cb, + void *cb_cls) +{ + struct GNUNET_DHT_MonitorHandle *mh; + + mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle); + mh->get_cb = get_cb; + mh->get_resp_cb = get_resp_cb; + mh->put_cb = put_cb; + mh->cb_cls = cb_cls; + mh->type = type; + mh->dht_handle = handle; + if (NULL != key) + { + mh->key = GNUNET_new (struct GNUNET_HashCode); + *mh->key = *key; + } + GNUNET_CONTAINER_DLL_insert (handle->monitor_head, + handle->monitor_tail, + mh); + if (NULL != handle->mq) + send_monitor_start (mh); + return mh; +} + + +/** + * Stop monitoring. + * + * @param mh The handle to the monitor request returned by monitor_start. * - * @return GNUNET_YES if put message is queued for transmission + * On return get_handle will no longer be valid, caller must not use again!!! */ void -GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, - const GNUNET_HashCode * key, - uint32_t type, - uint32_t size, - const char *data, - struct GNUNET_TIME_Absolute exp, - struct GNUNET_TIME_Relative timeout, - GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh) { - struct GNUNET_DHT_PutMessage *put_msg; - struct GNUNET_DHT_RouteHandle *put_route; - size_t msize; - - if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) - { - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } - return; - } - -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending put request with key %s\n", "DHT API", - GNUNET_h2s (key)); -#endif - - msize = sizeof (struct GNUNET_DHT_PutMessage) + size; - put_msg = GNUNET_malloc (msize); - put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT); - put_msg->header.size = htons (msize); - put_msg->type = htons (type); - put_msg->data_size = htons (size); - put_msg->expiration = GNUNET_TIME_absolute_hton(exp); - memcpy (&put_msg[1], data, size); - - put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL, - NULL, cont, cont_cls); - - if (put_route == NULL) /* Route start failed! */ - { - if (cont != NULL) - { - GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - } - } - else - GNUNET_free(put_route); - - GNUNET_free (put_msg); + struct GNUNET_DHT_Handle *handle = mh->dht_handle; + struct GNUNET_DHT_MonitorStartStopMessage *m; + struct GNUNET_MQ_Envelope *env; + + GNUNET_CONTAINER_DLL_remove (handle->monitor_head, + handle->monitor_tail, + mh); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP); + m->type = htonl (mh->type); + m->get = htons (NULL != mh->get_cb); + m->get_resp = htons(NULL != mh->get_resp_cb); + m->put = htons (NULL != mh->put_cb); + if (NULL != mh->key) + { + m->filter_key = htons (1); + m->key = *mh->key; + } + GNUNET_MQ_send (handle->mq, + env); + GNUNET_free_non_null (mh->key); + GNUNET_free (mh); } + + +/* end of dht_api.c */