X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdht%2Fdht_api.c;h=15faba6c97dec4631e3f80a53d8ca89538b94fbe;hb=09f4e2ed03d6861b78033328670256b430bafbbd;hp=9fb77d5d4fab0c014e0314a24d34288949df1f9c;hpb=e96b5f1bb1269cdde73e8a178344069f61080804;p=oweals%2Fgnunet.git diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 9fb77d5d4..15faba6c9 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -4,7 +4,7 @@ GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -24,13 +24,13 @@ * @author Christian Grothoff * @author Nathan Evans * - * TODO: Only allow a single message until confirmed as received by - * the service. For put messages call continuation as soon as - * receipt acknowledged (then remove), for GET or other messages - * only call continuation when data received. - * Add unique identifier to message types requesting data to be - * returned. + * TODO: retransmission of pending requests maybe happens now, at least + * the code is in place to do so. Need to add checks when api calls + * happen to check if retransmission is in progress, and if so set + * the single pending message for transmission once the list of + * retries are done. */ + #include "platform.h" #include "gnunet_bandwidth_lib.h" #include "gnunet_client_lib.h" @@ -44,9 +44,7 @@ #include "gnunet_dht_service.h" #include "dht.h" -#define DEBUG_DHT_API GNUNET_YES - -#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +#define DEBUG_DHT_API GNUNET_NO struct PendingMessage { @@ -72,22 +70,34 @@ struct PendingMessage void *cont_cls; /** - * Whether or not to await verification the message - * was received by the service + * Unique ID for this request */ - size_t is_unique; + uint64_t unique_id; /** - * Unique ID for this request + * Free the saved message once sent, set + * to GNUNET_YES for messages that don't + * receive responses! */ - uint64_t unique_id; + int free_on_send; }; -struct GNUNET_DHT_GetContext +struct PendingMessageList { + /** + * This is a singly linked list. + */ + struct PendingMessageList *next; + /** + * The pending message. + */ + struct PendingMessage *message; +}; +struct GNUNET_DHT_GetContext +{ /** * Iterator to call on data receipt */ @@ -100,9 +110,22 @@ struct GNUNET_DHT_GetContext }; +struct GNUNET_DHT_FindPeerContext +{ + /** + * Iterator to call on data receipt + */ + GNUNET_DHT_FindPeerProcessor proc; + + /** + * Closure for the iterator callback + */ + void *proc_cls; + +}; + /** - * Handle to control a unique operation (one that is - * expected to return results) + * Handle to a route request */ struct GNUNET_DHT_RouteHandle { @@ -131,35 +154,68 @@ struct GNUNET_DHT_RouteHandle * Main handle to this DHT api */ struct GNUNET_DHT_Handle *dht_handle; + + /** + * The actual message sent for this request, + * used for retransmitting requests on service + * failure/reconnect. Freed on route_stop. + */ + struct GNUNET_DHT_RouteMessage *message; }; + /** - * Handle for a non unique request, holds callback - * which needs to be called before we allow other - * messages to be processed and sent to the DHT service + * Handle to control a get operation. */ -struct GNUNET_DHT_NonUniqueHandle +struct GNUNET_DHT_GetHandle { /** - * Key that this get request is for + * Handle to the actual route operation for the get */ - GNUNET_HashCode key; + struct GNUNET_DHT_RouteHandle *route_handle; /** - * Type of data get request was for + * The context of the get request */ - uint32_t type; + struct GNUNET_DHT_GetContext get_context; +}; + + +/** + * Handle to control a find peer operation. + */ +struct GNUNET_DHT_FindPeerHandle +{ + /** + * Handle to the actual route operation for the request + */ + struct GNUNET_DHT_RouteHandle *route_handle; + /** + * The context of the find peer request + */ + struct GNUNET_DHT_FindPeerContext find_peer_context; +}; + + +enum DHT_Retransmit_Stage +{ /** - * Continuation to call on service - * confirmation of message receipt. + * The API is not retransmitting anything at this time. */ - GNUNET_SCHEDULER_Task cont; + DHT_NOT_RETRANSMITTING, /** - * Send continuation cls + * The API is retransmitting, and nothing has been single + * queued for sending. */ - void *cont_cls; + DHT_RETRANSMITTING, + + /** + * The API is retransmitting, and a single message has been + * queued for transmission once finished. + */ + DHT_RETRANSMITTING_MESSAGE_QUEUED }; @@ -203,42 +259,294 @@ struct GNUNET_DHT_Handle struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; /** - * Non unique handle. If set don't schedule another non - * unique request. + * Generator for unique ids. + */ + uint64_t uid_gen; + + /** + * Are we currently retransmitting requests? If so queue a _single_ + * new request when received. */ - struct GNUNET_DHT_NonUniqueHandle *non_unique_request; + enum DHT_Retransmit_Stage retransmit_stage; /** - * Kill off the connection and any pending messages. + * Linked list of retranmissions, to be used in the event + * of a dht service disconnect/reconnect. */ - int do_destroy; + struct PendingMessageList *retransmissions; + /** + * A single pending message allowed to be scheduled + * during retransmission phase. + */ + struct PendingMessage *retransmission_buffer; }; -static struct GNUNET_TIME_Relative default_request_timeout; + +/** + * Convert unique ID to hash code. + * + * @param uid unique ID to convert + * @param hash set to uid (extended with zeros) + */ +static void +hash_from_uid (uint64_t uid, + GNUNET_HashCode *hash) +{ + memset (hash, 0, sizeof(GNUNET_HashCode)); + *((uint64_t*)hash) = uid; +} + +#if RETRANSMIT +/** + * Iterator callback to retransmit each outstanding request + * because the connection to the DHT service went down (and + * came back). + * + * + */ +static int retransmit_iterator (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct GNUNET_DHT_RouteHandle *route_handle = value; + struct PendingMessageList *pending_message_list; + + pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage)); + pending_message_list->message = (struct PendingMessage *)&pending_message_list[1]; + pending_message_list->message->msg = &route_handle->message->header; + pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever(); + pending_message_list->message->cont = NULL; + pending_message_list->message->cont_cls = NULL; + pending_message_list->message->unique_id = route_handle->uid; + /* Add the new pending message to the front of the retransmission list */ + pending_message_list->next = route_handle->dht_handle->retransmissions; + route_handle->dht_handle->retransmissions = pending_message_list; + + return GNUNET_OK; +} +#endif + +/** + * Try to (re)connect to the dht service. + * + * @return GNUNET_YES on success, GNUNET_NO on failure. + */ +static int +try_connect (struct GNUNET_DHT_Handle *handle) +{ + if (handle->client != NULL) + return GNUNET_OK; + handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); + if (handle->client != NULL) + return GNUNET_YES; +#if DEBUG_STATISTICS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + _("Failed to connect to the dht service!\n")); +#endif + return GNUNET_NO; +} + +/** + * Send complete (or failed), call continuation if we have one. + */ +static void +finish (struct GNUNET_DHT_Handle *handle, int code) +{ + struct PendingMessage *pos = handle->current; + GNUNET_HashCode uid_hash; +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); +#endif + GNUNET_assert (pos != NULL); + hash_from_uid (pos->unique_id, &uid_hash); + if (pos->cont != NULL) + { + if (code == GNUNET_SYSERR) + GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, + pos->cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + else + GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, + pos->cont_cls, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + } + + GNUNET_assert(handle->th == NULL); + if (pos->free_on_send == GNUNET_YES) + GNUNET_free(pos->msg); + GNUNET_free (pos); + handle->current = NULL; +} + +/** + * Transmit the next pending message, called by notify_transmit_ready + */ +static size_t +transmit_pending (void *cls, size_t size, void *buf) +{ + struct GNUNET_DHT_Handle *handle = cls; + size_t tsize; + +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending\n", "DHT API"); +#endif + handle->th = NULL; + + if (buf == NULL) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending buf is NULL\n", "DHT API"); +#endif + finish (handle, GNUNET_SYSERR); + return 0; + } + + if (handle->current != NULL) + { + tsize = ntohs (handle->current->msg->size); + if (size >= tsize) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending message size %d\n", "DHT API", tsize); +#endif + memcpy (buf, handle->current->msg, tsize); + finish (handle, GNUNET_OK); + return tsize; + } + else + { + return 0; + } + } + /* Have no pending request */ + return 0; +} + +/** + * Try to send messages from list of messages to send + */ +static void +process_pending_message (struct GNUNET_DHT_Handle *handle) +{ + + if (handle->current == NULL) + return; /* action already pending */ + if (GNUNET_YES != try_connect (handle)) + { + handle->th = NULL; + finish (handle, GNUNET_SYSERR); + return; + } + + if (NULL == + (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, + ntohs (handle-> + current->msg-> + size), + handle->current-> + timeout, GNUNET_YES, + &transmit_pending, + handle))) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit request to dht service.\n"); +#endif + finish (handle, GNUNET_SYSERR); + return; + } +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Scheduled sending message of size %d to service\n", + "DHT API", ntohs (handle->current->msg->size)); +#endif +} + +/** + * Send complete (or failed), call continuation if we have one. + * Forward declaration. + */ +static void +finish_retransmission (struct GNUNET_DHT_Handle *handle, int code); /* Forward declaration */ -static void process_pending_message(struct GNUNET_DHT_Handle *handle); +static size_t +transmit_pending_retransmission (void *cls, size_t size, void *buf); -static GNUNET_HashCode * hash_from_uid(uint64_t uid) +/** + * Try to send messages from list of messages to send + */ +static void +process_pending_retransmissions (struct GNUNET_DHT_Handle *handle) { - int count; - int remaining; - GNUNET_HashCode *hash; - hash = GNUNET_malloc(sizeof(GNUNET_HashCode)); - count = 0; - while (count < sizeof(GNUNET_HashCode)) + if (handle->current == NULL) + return; /* action already pending */ + if (GNUNET_YES != try_connect (handle)) { - remaining = sizeof(GNUNET_HashCode) - count; - if (remaining > sizeof(uid)) - remaining = sizeof(uid); + finish_retransmission (handle, GNUNET_SYSERR); + return; + } + + if (NULL == + (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, + ntohs (handle-> + current->msg-> + size), + handle->current-> + timeout, GNUNET_YES, + &transmit_pending_retransmission, + handle))) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit request to dht service.\n"); +#endif + finish_retransmission (handle, GNUNET_SYSERR); + return; + } +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Scheduled sending message of size %d to service\n", + "DHT API", ntohs (handle->current->msg->size)); +#endif +} + +/** + * Send complete (or failed), call continuation if we have one. + */ +static void +finish_retransmission (struct GNUNET_DHT_Handle *handle, int code) +{ + struct PendingMessage *pos = handle->current; + struct PendingMessageList *pending_list; +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API"); +#endif + GNUNET_assert (pos == handle->retransmissions->message); + pending_list = handle->retransmissions; + handle->retransmissions = handle->retransmissions->next; + GNUNET_free (pending_list); - memcpy(hash, &uid, remaining); - count += remaining; + if (handle->retransmissions == NULL) + { + handle->retransmit_stage = DHT_NOT_RETRANSMITTING; } - return hash; + if (handle->retransmissions != NULL) + { + handle->current = handle->retransmissions->message; + process_pending_retransmissions(handle); + } + else if (handle->retransmission_buffer != NULL) + { + handle->current = handle->retransmission_buffer; + process_pending_message(handle); + } } /** @@ -246,88 +554,88 @@ static GNUNET_HashCode * hash_from_uid(uint64_t uid) * a demultiplexer which handles numerous message types * */ -void service_message_handler (void *cls, - const struct GNUNET_MessageHeader *msg) +void +service_message_handler (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_Message *dht_msg; - struct GNUNET_DHT_StopMessage *stop_msg; + struct GNUNET_DHT_RouteResultMessage *dht_msg; struct GNUNET_MessageHeader *enc_msg; struct GNUNET_DHT_RouteHandle *route_handle; uint64_t uid; - GNUNET_HashCode *uid_hash; + GNUNET_HashCode uid_hash; size_t enc_size; - /* TODO: find out message type, handle callbacks for different types of messages. - * Should be a non unique acknowledgment, or unique result. */ if (msg == NULL) - { + { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received NULL from server, connection down?\n", "DHT API"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received NULL from server, connection down!\n", + "DHT API"); #endif - return; - } + GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); + handle->client = GNUNET_CLIENT_connect (handle->sched, + "dht", + handle->cfg); + if (handle->current != NULL) + { + handle->th = NULL; + finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */ + } +#if RETRANSMIT + if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0)) + { + handle->retransmit_stage = DHT_RETRANSMITTING; + handle->current = handle->retransmissions->message; + process_pending_retransmissions(handle); + } +#endif + return; + } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT) - { - dht_msg = (struct GNUNET_DHT_Message *)msg; - uid = GNUNET_ntohll(dht_msg->unique_id); + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT: + { + dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg; + uid = GNUNET_ntohll (dht_msg->unique_id); #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu)\n", "DHT API", uid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu)\n", + "DHT API", uid); #endif - if (ntohs(dht_msg->unique)) - { - uid_hash = hash_from_uid(ntohl(dht_msg->unique_id)); - route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash); - GNUNET_free(uid_hash); - if (route_handle == NULL) /* We have no recollection of this request */ + + hash_from_uid (uid, &uid_hash); + route_handle = + GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests, + &uid_hash); + if (route_handle == NULL) /* We have no recollection of this request */ { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu), but have no recollection of it!\n", + "DHT API", uid); #endif } else { - enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message); - GNUNET_assert(enc_size > 0); - enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; - route_handle->iter(route_handle->iter_cls, enc_msg); + enc_size = + ntohs (dht_msg->header.size) - + sizeof (struct GNUNET_DHT_RouteResultMessage); + GNUNET_assert (enc_size > 0); + enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; + route_handle->iter (route_handle->iter_cls, enc_msg); } + + break; } - } - else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP) - { - stop_msg = (struct GNUNET_DHT_StopMessage *)msg; - uid = GNUNET_ntohll(stop_msg->unique_id); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id); -#endif - if (handle->current->unique_id == uid) + default: { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Have pending confirmation for this message!\n", "DHT API", uid); -#endif - if (handle->current->cont != NULL) - GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE); - - GNUNET_free(handle->current->msg); - GNUNET_free(handle->current); - handle->current = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "`%s': Received unknown message type %d\n", "DHT API", + ntohs (msg->type)); } - } - else - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type)); -#endif - } - + } GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); @@ -352,24 +660,18 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, { struct GNUNET_DHT_Handle *handle; - handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle)); - - default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); + handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle)); handle->cfg = cfg; handle->sched = sched; - - handle->current = NULL; - handle->do_destroy = GNUNET_NO; - handle->th = NULL; - - handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg); - handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len); - + handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); + handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); if (handle->client == NULL) { - GNUNET_free(handle); + GNUNET_free (handle); return NULL; } + handle->outstanding_requests = + GNUNET_CONTAINER_multihashmap_create (ht_len); #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Connection to service in progress\n", "DHT API"); @@ -377,7 +679,6 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); - return handle; } @@ -394,69 +695,39 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); #endif - GNUNET_assert(handle != NULL); - - if (handle->th != NULL) /* We have a live transmit request in the Aether */ + GNUNET_assert (handle != NULL); + if (handle->th != NULL) /* We have a live transmit request */ { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; } - if (handle->current != NULL) /* We are trying to send something now, clean it up */ - GNUNET_free(handle->current); + if (handle->current != NULL) /* We are trying to send something now, clean it up */ + GNUNET_free (handle->current); - if (handle->client != NULL) /* Finally, disconnect from the service */ + if (handle->client != NULL) /* Finally, disconnect from the service */ { GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); handle->client = NULL; } + GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0); + GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests); GNUNET_free (handle); } -/** - * Send complete (or failed), schedule next (or don't) - */ -static void -finish (struct GNUNET_DHT_Handle *handle, int code) -{ - /* TODO: if code is not GNUNET_OK, do something! */ - struct PendingMessage *pos = handle->current; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Finish called!\n", "DHT API"); -#endif - GNUNET_assert(pos != NULL); - - if (pos->is_unique) - { - if (pos->cont != NULL) - { - if (code == GNUNET_SYSERR) - GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT); - else - GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE); - } - - GNUNET_free(pos->msg); - handle->current = NULL; - GNUNET_free(pos); - } - /* Otherwise we need to wait for a response to this message! */ -} - /** * Transmit the next pending message, called by notify_transmit_ready */ static size_t -transmit_pending (void *cls, size_t size, void *buf) +transmit_pending_retransmission (void *cls, size_t size, void *buf) { struct GNUNET_DHT_Handle *handle = cls; size_t tsize; #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending\n", "DHT API"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending\n", "DHT API"); #endif if (buf == NULL) { @@ -464,109 +735,288 @@ transmit_pending (void *cls, size_t size, void *buf) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': In transmit_pending buf is NULL\n", "DHT API"); #endif - /* FIXME: free associated resources or summat */ - finish(handle, GNUNET_SYSERR); + finish_retransmission (handle, GNUNET_SYSERR); return 0; } handle->th = NULL; if (handle->current != NULL) - { - tsize = ntohs(handle->current->msg->size); - if (size >= tsize) { + tsize = ntohs (handle->current->msg->size); + if (size >= tsize) + { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending message size %d\n", "DHT API", tsize); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending message size %d\n", "DHT API", tsize); #endif - memcpy(buf, handle->current->msg, tsize); - finish(handle, GNUNET_OK); - return tsize; - } - else - { - return 0; + memcpy (buf, handle->current->msg, tsize); + finish_retransmission (handle, GNUNET_OK); + return tsize; + } + else + { + return 0; + } } - } /* Have no pending request */ return 0; } /** - * Try to (re)connect to the dht service. - * - * @return GNUNET_YES on success, GNUNET_NO on failure. + * Iterator called on each result obtained from a generic route + * operation */ -static int -try_connect (struct GNUNET_DHT_Handle *handle) +void +get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) { - if (handle->client != NULL) - return GNUNET_OK; - handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); - if (handle->client != NULL) - return GNUNET_YES; -#if DEBUG_STATISTICS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to connect to the dht service!\n")); -#endif - return GNUNET_NO; + struct GNUNET_DHT_GetHandle *get_handle = cls; + struct GNUNET_DHT_GetResultMessage *result; + size_t data_size; + char *result_data; + + if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT) + return; + + GNUNET_assert (ntohs (reply->size) >= + sizeof (struct GNUNET_DHT_GetResultMessage)); + result = (struct GNUNET_DHT_GetResultMessage *) reply; + data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage); + + result_data = (char *) &result[1]; /* Set data pointer to end of message */ + + get_handle->get_context.iter (get_handle->get_context.iter_cls, + GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key, + ntohs (result->type), data_size, result_data); } /** - * Try to send messages from list of messages to send + * Iterator called on each result obtained from a generic route + * operation */ -static void process_pending_message(struct GNUNET_DHT_Handle *handle) +void +find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) { + struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; + struct GNUNET_MessageHeader *hello; - if (handle->current == NULL) - return; /* action already pending */ - if (GNUNET_YES != try_connect (handle)) + if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) { - finish (handle, GNUNET_SYSERR); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received wrong type of response to a find peer request...\n"); return; } - /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */ - if (handle->do_destroy) + + GNUNET_assert (ntohs (reply->size) >= + sizeof (struct GNUNET_MessageHeader)); + hello = (struct GNUNET_MessageHeader *)&reply[1]; + + if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO) { - //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO"); + return; } + find_peer_handle->find_peer_context.proc (find_peer_handle-> + find_peer_context.proc_cls, + (struct GNUNET_HELLO_Message *)hello); +} +/** + * Send a message to the DHT telling it to start issuing random GET + * requests every 'frequency' milliseconds. + * + * @param handle handle to the DHT service + * @param frequency delay (in milliseconds) between sending malicious messages + * @param cont continuation to call once the message is sent + * @param cont_cls closure for continuation + * + * @return GNUNET_YES if the control message was sent, GNUNET_NO if not + */ +int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls) +{ + struct GNUNET_DHT_ControlMessage *msg; + struct PendingMessage *pending; - if (NULL == - (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs(handle->current->msg->size), - handle->current->timeout, - GNUNET_YES, - &transmit_pending, handle))) + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + return GNUNET_NO; + + msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); + msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET); + msg->variable = htons(frequency); + + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = &msg->header; + pending->timeout = GNUNET_TIME_relative_get_forever(); + pending->free_on_send = GNUNET_YES; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->unique_id = 0; + + if (handle->current == NULL) { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dht service.\n"); -#endif - finish (handle, GNUNET_SYSERR); + handle->current = pending; + process_pending_message (handle); } -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size)); -#endif + else + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + + return GNUNET_YES; } /** - * Iterator called on each result obtained from a generic route - * operation + * Send a message to the DHT telling it to issue a single find + * peer request using the peers unique identifier as key. This + * is used to fill the routing table, and is normally controlled + * by the DHT itself. However, for testing and perhaps more + * close control over the DHT, this can be explicitly managed. + * + * @param handle handle to the DHT service + * @param cont continuation to call once the message is sent + * @param cont_cls closure for continuation + * + * @return GNUNET_YES if the control message was sent, GNUNET_NO if not */ -void get_reply_iterator (void *cls, - const struct GNUNET_MessageHeader *reply) +int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { + struct GNUNET_DHT_ControlMessage *msg; + struct PendingMessage *pending; + + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + return GNUNET_NO; + + msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); + msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); + + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = &msg->header; + pending->timeout = GNUNET_TIME_relative_get_forever(); + pending->free_on_send = GNUNET_YES; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->unique_id = 0; + + if (handle->current == NULL) + { + handle->current = pending; + process_pending_message (handle); + } + else + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + return GNUNET_YES; } /** - * Perform an asynchronous FIND_PEER operation on the DHT. + * Send a message to the DHT telling it to start issuing random PUT + * requests every 'frequency' milliseconds. + * + * @param handle handle to the DHT service + * @param frequency delay (in milliseconds) between sending malicious messages + * @param cont continuation to call once the message is sent + * @param cont_cls closure for continuation + * + * @return GNUNET_YES if the control message was sent, GNUNET_NO if not + */ +int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls) +{ + struct GNUNET_DHT_ControlMessage *msg; + struct PendingMessage *pending; + + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + return GNUNET_NO; + + msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); + msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT); + msg->variable = htons(frequency); + + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = &msg->header; + pending->timeout = GNUNET_TIME_relative_get_forever(); + pending->free_on_send = GNUNET_YES; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->unique_id = 0; + + if (handle->current == NULL) + { + handle->current = pending; + process_pending_message (handle); + } + else + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + + return GNUNET_YES; +} + +/** + * Send a message to the DHT telling it to start dropping + * all requests received. + * + * @param handle handle to the DHT service + * @param cont continuation to call once the message is sent + * @param cont_cls closure for continuation + * + * @return GNUNET_YES if the control message was sent, GNUNET_NO if not + */ +int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) +{ + struct GNUNET_DHT_ControlMessage *msg; + struct PendingMessage *pending; + + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + return GNUNET_NO; + + msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); + msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP); + msg->variable = htons(0); + + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = &msg->header; + pending->timeout = GNUNET_TIME_relative_get_forever(); + pending->free_on_send = GNUNET_YES; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->unique_id = 0; + + if (handle->current == NULL) + { + handle->current = pending; + process_pending_message (handle); + } + else + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + + return GNUNET_YES; +} + + +/** + * Initiate a generic DHT route operation. * * @param handle handle to the DHT service * @param key the key to look up @@ -589,93 +1039,81 @@ void get_reply_iterator (void *cls, */ struct GNUNET_DHT_RouteHandle * GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, - const GNUNET_HashCode *key, + const GNUNET_HashCode * key, unsigned int desired_replication_level, enum GNUNET_DHT_RouteOption options, const struct GNUNET_MessageHeader *enc, struct GNUNET_TIME_Relative timeout, GNUNET_DHT_ReplyProcessor iter, void *iter_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_RouteHandle *route_handle; struct PendingMessage *pending; - struct GNUNET_DHT_Message *message; - size_t is_unique; - size_t msize; - GNUNET_HashCode *uid_key; - uint64_t uid; + struct GNUNET_DHT_RouteMessage *message; + uint16_t msize; + GNUNET_HashCode uid_key; - is_unique = GNUNET_YES; - if (iter == NULL) - is_unique = GNUNET_NO; - - route_handle = NULL; - uid_key = NULL; - - do - { - GNUNET_free_non_null(uid_key); - uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); - uid_key = hash_from_uid(uid); - } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES); - - if (is_unique) - { - route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle)); - memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode)); - route_handle->iter = iter; - route_handle->iter_cls = iter_cls; - route_handle->dht_handle = handle; - route_handle->uid = uid; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Unique ID is %llu\n", "DHT API", uid); -#endif - /** - * Store based on random identifier! - */ - GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + return NULL; + if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return NULL; } - else + + route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); + memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); + route_handle->iter = iter; + route_handle->iter_cls = iter_cls; + route_handle->dht_handle = handle; + route_handle->uid = handle->uid_gen++; + if (iter != NULL) { - msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); + hash_from_uid (route_handle->uid, &uid_key); + GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, + &uid_key, route_handle, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } - GNUNET_free(uid_key); - message = GNUNET_malloc(msize); - message->header.size = htons(msize); - message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT); - memcpy(&message->key, key, sizeof(GNUNET_HashCode)); - message->options = htons(options); - message->desired_replication_level = htons(options); - message->unique = htons(is_unique); - message->unique_id = GNUNET_htonll(uid); - memcpy(&message[1], enc, ntohs(enc->size)); +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid); +#endif - pending = GNUNET_malloc(sizeof(struct PendingMessage)); + msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size); + message = GNUNET_malloc (msize); + message->header.size = htons (msize); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE); + memcpy (&message->key, key, sizeof (GNUNET_HashCode)); + message->options = htonl (options); + message->desired_replication_level = htonl (desired_replication_level); + message->unique_id = GNUNET_htonll (route_handle->uid); + memcpy (&message[1], enc, ntohs (enc->size)); + pending = GNUNET_malloc (sizeof (struct PendingMessage)); pending->msg = &message->header; pending->timeout = timeout; + if (iter == NULL) + pending->free_on_send = GNUNET_YES; pending->cont = cont; pending->cont_cls = cont_cls; - pending->is_unique = is_unique; - pending->unique_id = uid; - - GNUNET_assert(handle->current == NULL); - - handle->current = pending; - - process_pending_message(handle); + pending->unique_id = route_handle->uid; + if (handle->current == NULL) + { + handle->current = pending; + process_pending_message (handle); + } + else + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + route_handle->message = message; return route_handle; } -void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph); - /** * Perform an asynchronous GET operation on the DHT identified. @@ -691,135 +1129,233 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph); * * @return handle to stop the async get */ -struct GNUNET_DHT_RouteHandle * +struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_TIME_Relative timeout, - uint32_t type, + enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, GNUNET_DHT_GetIterator iter, void *iter_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + GNUNET_SCHEDULER_Task cont, void *cont_cls) { - struct GNUNET_DHT_GetContext *get_context; - struct GNUNET_DHT_GetMessage *get_msg; + struct GNUNET_DHT_GetHandle *get_handle; + struct GNUNET_DHT_GetMessage get_msg; - if (handle->current != NULL) /* Can't send right now, we have a pending message... */ + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ return NULL; - get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext)); - get_context->iter = iter; - get_context->iter_cls = iter_cls; + get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); + get_handle->get_context.iter = iter; + get_handle->get_context.iter_cls = iter_cls; #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key)); + "`%s': Inserting pending get request with key %s\n", "DHT API", + GNUNET_h2s (key)); #endif - get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); - get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET); - get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); - get_msg->type = htonl(type); + get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); + get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); + get_msg.type = htons (type); - return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, cont, cont_cls); + get_handle->route_handle = + GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout, + &get_reply_iterator, get_handle, cont, cont_cls); + return get_handle; } +/** + * Stop a previously issued routing request + * + * @param route_handle handle to the request to stop + * @param cont continuation to call once this message is sent to the service or times out + * @param cont_cls closure for the continuation + */ void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle) +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct PendingMessage *pending; struct GNUNET_DHT_StopMessage *message; size_t msize; - GNUNET_HashCode *uid_key; + GNUNET_HashCode uid_key; - msize = sizeof(struct GNUNET_DHT_StopMessage); - - message = GNUNET_malloc(msize); - message->header.size = htons(msize); - message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); + msize = sizeof (struct GNUNET_DHT_StopMessage); + message = GNUNET_malloc (msize); + message->header.size = htons (msize); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP); #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Remove outstanding request for uid %llu\n", "DHT API", + route_handle->uid); #endif - message->unique_id = GNUNET_htonll(route_handle->uid); - - GNUNET_assert(route_handle->dht_handle->current == NULL); - - pending = GNUNET_malloc(sizeof(struct PendingMessage)); - pending->msg = (struct GNUNET_MessageHeader *)message; - pending->timeout = DEFAULT_DHT_TIMEOUT; - pending->cont = NULL; - pending->cont_cls = NULL; - pending->is_unique = GNUNET_NO; - pending->unique_id = route_handle->uid; + message->unique_id = GNUNET_htonll (route_handle->uid); + memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode)); + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = (struct GNUNET_MessageHeader *) message; + pending->timeout = GNUNET_TIME_relative_get_forever(); + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->free_on_send = GNUNET_YES; + pending->unique_id = 0; /* When finished is called, free pending->msg */ - GNUNET_assert(route_handle->dht_handle->current == NULL); + if (route_handle->dht_handle->current == NULL) + { + route_handle->dht_handle->current = pending; + process_pending_message (route_handle->dht_handle); + } + else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING) + { + route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + route_handle->dht_handle->retransmission_buffer = pending; + } + else + { + GNUNET_free(pending); + GNUNET_break(0); + } - route_handle->dht_handle->current = pending; + hash_from_uid (route_handle->uid, &uid_key); + GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove + (route_handle->dht_handle->outstanding_requests, &uid_key, + route_handle) == GNUNET_YES); - process_pending_message(route_handle->dht_handle); + GNUNET_free(route_handle->message); + GNUNET_free(route_handle); +} - uid_key = hash_from_uid(route_handle->uid); - if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES) +/** + * Stop async DHT-get. + * + * @param get_handle handle to the GET operation to stop + * @param cont continuation to call once this message is sent to the service or times out + * @param cont_cls closure for the continuation + */ +void +GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) +{ + if ((get_handle->route_handle->dht_handle->current != NULL) && + (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) { + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } + return; + } + #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Removing pending get request with key %s, uid %llu\n", + "DHT API", GNUNET_h2s (&get_handle->route_handle->key), + get_handle->route_handle->uid); #endif - } - GNUNET_free(uid_key); - return; + GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls); + GNUNET_free (get_handle); } /** - * Stop async DHT-get. + * Perform an asynchronous FIND PEER operation on the DHT. * - * @param get_handle handle to the GET operation to stop + * @param handle handle to the DHT service + * @param timeout timeout for this request to be sent to the + * service + * @param options routing options for this message + * @param key the key to look up + * @param proc function to call on each result + * @param proc_cls closure for proc + * @param cont continuation to call once message sent + * @param cont_cls closure for continuation + * + * @return handle to stop the async get, NULL on error */ -void -GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *get_handle) +struct GNUNET_DHT_FindPeerHandle * +GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_DHT_RouteOption options, + const GNUNET_HashCode * key, + GNUNET_DHT_FindPeerProcessor proc, + void *proc_cls, + GNUNET_SCHEDULER_Task cont, + void *cont_cls) { -#if OLDREMOVE - struct GNUNET_DHT_GetMessage *get_msg; - struct GNUNET_DHT_Handle *handle; - GNUNET_HashCode *uid_key; -#endif + struct GNUNET_DHT_FindPeerHandle *find_peer_handle; + struct GNUNET_DHT_FindPeerMessage find_peer_msg; - GNUNET_DHT_route_stop(get_handle); + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ + return NULL; -#if OLDREMOVE - uid_key = hash_from_uid(get_handle->uid); - GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES); + find_peer_handle = + GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle)); + find_peer_handle->find_peer_context.proc = proc; + find_peer_handle->find_peer_context.proc_cls = proc_cls; - if (handle->do_destroy == GNUNET_NO) - { - get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); - get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP); - get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Inserting pending `%s' request with key %s\n", "DHT API", + "FIND PEER", GNUNET_h2s (key)); +#endif + find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage)); + find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); + find_peer_handle->route_handle = + GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header, + timeout, &find_peer_reply_iterator, + find_peer_handle, cont, cont_cls); + return find_peer_handle; +} +/** + * Stop async find peer. Frees associated resources. + * + * @param find_peer_handle GET operation to stop. + * @param cont continuation to call once this message is sent to the service or times out + * @param cont_cls closure for the continuation + */ +void +GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) +{ + if ((find_peer_handle->route_handle->dht_handle->current != NULL) && + (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) + { + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } + return; } -#endif + #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&get_handle->key), get_handle->uid); + "`%s': Removing pending `%s' request with key %s, uid %llu\n", + "DHT API", "FIND PEER", + GNUNET_h2s (&find_peer_handle->route_handle->key), + find_peer_handle->route_handle->uid); #endif + GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls); + GNUNET_free (find_peer_handle); + } /** * Perform a PUT operation storing data in the DHT. * - * @param h handle to DHT service + * @param handle handle to DHT service * @param key the key to store under * @param type type of the value * @param size number of bytes in data; must be less than 64k * @param data the data to store * @param exp desired expiration time for the value + * @param timeout how long to wait for transmission of this request * @param cont continuation to call when done; * reason will be TIMEOUT on error, * reason will be PREREQ_DONE on success @@ -830,38 +1366,61 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *get_handle) void GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, - uint32_t type, + enum GNUNET_BLOCK_Type type, uint32_t size, const char *data, struct GNUNET_TIME_Absolute exp, struct GNUNET_TIME_Relative timeout, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_PutMessage *put_msg; + struct GNUNET_DHT_RouteHandle *put_route; size_t msize; - if (handle->current != NULL) + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) { - GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT); + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n"); + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } return; } #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key)); + "`%s': Inserting pending put request with key %s\n", "DHT API", + GNUNET_h2s (key)); #endif - msize = sizeof(struct GNUNET_DHT_PutMessage) + size; - put_msg = GNUNET_malloc(msize); - put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); - put_msg->header.size = htons(msize); - put_msg->type = htonl(type); - put_msg->data_size = htons(size); - put_msg->expiration = exp; - memcpy(&put_msg[1], data, size); + msize = sizeof (struct GNUNET_DHT_PutMessage) + size; + put_msg = GNUNET_malloc (msize); + put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT); + put_msg->header.size = htons (msize); + put_msg->type = htons (type); + put_msg->data_size = htons (size); + put_msg->expiration = GNUNET_TIME_absolute_hton(exp); + memcpy (&put_msg[1], data, size); - GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls); + put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL, + NULL, cont, cont_cls); - GNUNET_free(put_msg); + if (put_route == NULL) /* Route start failed! */ + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n"); + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } + } + else + { + GNUNET_free(put_route); + } + + GNUNET_free (put_msg); } + +/* end of dht_api.c */