From: Nathan S. Evans Date: Mon, 19 Apr 2010 15:16:38 +0000 (+0000) Subject: dht api fixes, it works again (for me) X-Git-Tag: initial-import-from-subversion-38251~22090 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=c5fee7d95eaa1695999c12d059b0aa4bc566d836;p=oweals%2Fgnunet.git dht api fixes, it works again (for me) --- diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index 1a50800b6..1677cf465 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am @@ -68,6 +68,7 @@ test_dht_api_SOURCES = \ test_dht_api.c test_dht_api_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/hello/libgnunethello.la \ $(top_builddir)/src/dht/libgnunetdht.la EXTRA_DIST = \ diff --git a/src/dht/dht.h b/src/dht/dht.h index 2bafc3694..93ac9fa69 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -34,12 +34,13 @@ typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls, * msg); /** - * FIXME. + * Message which indicates the DHT should cancel outstanding + * requests and discard any state. */ struct GNUNET_DHT_StopMessage { /** - * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE + * Type: GNUNET_MESSAGE_TYPE_DHT_STOP */ struct GNUNET_MessageHeader header; @@ -57,7 +58,8 @@ struct GNUNET_DHT_StopMessage /** - * Generic DHT message, wrapper for other message types + * Generic DHT message, indicates that a route request + * should be issued. */ struct GNUNET_DHT_RouteMessage { @@ -77,7 +79,8 @@ struct GNUNET_DHT_RouteMessage GNUNET_HashCode key; /** - * Unique ID identifying this request + * Unique ID identifying this request, if 0 then + * the client will not expect a response */ uint64_t unique_id GNUNET_PACKED; @@ -86,12 +89,6 @@ struct GNUNET_DHT_RouteMessage */ uint32_t desired_replication_level GNUNET_PACKED; - /** - * Is this message uniquely identified? If so it will - * be fire and forget, if not we will wait for a receipt - * from the service. - */ - uint32_t unique GNUNET_PACKED; /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */ diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 5675cef50..fda836d69 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -24,6 +24,11 @@ * @author Christian Grothoff * @author Nathan Evans * + * TODO: retransmission of pending requests maybe happens now, at least + * the code is in place to do so. Need to add checks when api calls + * happen to check if retransmission is in progress, and if so set + * the single pending message for transmission once the list of + * retries are done. */ #include "platform.h" @@ -67,16 +72,23 @@ struct PendingMessage void *cont_cls; /** - * Whether or not to await verification the message - * was received by the service + * Unique ID for this request */ - size_t is_unique; + uint64_t unique_id; + +}; +struct PendingMessageList +{ /** - * Unique ID for this request + * This is a singly linked list. */ - uint64_t unique_id; + struct PendingMessageList *next; + /** + * The pending message. + */ + struct PendingMessage *message; }; struct GNUNET_DHT_GetContext @@ -108,8 +120,7 @@ struct GNUNET_DHT_FindPeerContext }; /** - * Handle to control a unique operation (one that is - * expected to return results) + * Handle to a route request */ struct GNUNET_DHT_RouteHandle { @@ -138,37 +149,16 @@ struct GNUNET_DHT_RouteHandle * Main handle to this DHT api */ struct GNUNET_DHT_Handle *dht_handle; -}; -/** - * Handle for a non unique request, holds callback - * which needs to be called before we allow other - * messages to be processed and sent to the DHT service - */ -struct GNUNET_DHT_NonUniqueHandle -{ /** - * Key that this get request is for + * The actual message sent for this request, + * used for retransmitting requests on service + * failure/reconnect. Freed on route_stop. */ - GNUNET_HashCode key; - - /** - * Type of data get request was for - */ - uint32_t type; - - /** - * Continuation to call on service - * confirmation of message receipt. - */ - GNUNET_SCHEDULER_Task cont; - - /** - * Send continuation cls - */ - void *cont_cls; + struct GNUNET_DHT_RouteMessage *message; }; + /** * Handle to control a get operation. */ @@ -185,6 +175,7 @@ struct GNUNET_DHT_GetHandle struct GNUNET_DHT_GetContext get_context; }; + /** * Handle to control a find peer operation. */ @@ -202,6 +193,27 @@ struct GNUNET_DHT_FindPeerHandle }; +enum DHT_Retransmit_Stage +{ + /** + * The API is not retransmitting anything at this time. + */ + DHT_NOT_RETRANSMITTING, + + /** + * The API is retransmitting, and nothing has been single + * queued for sending. + */ + DHT_RETRANSMITTING, + + /** + * The API is retransmitting, and a single message has been + * queued for transmission once finished. + */ + DHT_RETRANSMITTING_MESSAGE_QUEUED +}; + + /** * Connection to the DHT service. */ @@ -242,16 +254,27 @@ struct GNUNET_DHT_Handle struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; /** - * Non unique handle. If set don't schedule another non - * unique request. + * Generator for unique ids. */ - struct GNUNET_DHT_NonUniqueHandle *non_unique_request; + uint64_t uid_gen; /** - * Generator for unique ids. + * Are we currently retransmitting requests? If so queue a _single_ + * new request when received. */ - uint64_t uid_gen; + enum DHT_Retransmit_Stage retransmit_stage; + /** + * Linked list of retranmissions, to be used in the event + * of a dht service disconnect/reconnect. + */ + struct PendingMessageList *retransmissions; + + /** + * A single pending message allowed to be scheduled + * during retransmission phase. + */ + struct PendingMessage *retransmission_buffer; }; @@ -269,6 +292,253 @@ hash_from_uid (uint64_t uid, *((uint64_t*)hash) = uid; } +/** + * Iterator callback to retransmit each outstanding request + * because the connection to the DHT service went down (and + * came back). + * + * + */ +static int retransmit_iterator (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct GNUNET_DHT_RouteHandle *route_handle = value; + struct PendingMessageList *pending_message_list; + + pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage)); + pending_message_list->message = (struct PendingMessage *)&pending_message_list[1]; + pending_message_list->message->msg = &route_handle->message->header; + pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever(); + pending_message_list->message->cont = NULL; + pending_message_list->message->cont_cls = NULL; + pending_message_list->message->unique_id = route_handle->uid; + /* Add the new pending message to the front of the retransmission list */ + pending_message_list->next = route_handle->dht_handle->retransmissions; + + return GNUNET_OK; +} + +/** + * Try to (re)connect to the dht service. + * + * @return GNUNET_YES on success, GNUNET_NO on failure. + */ +static int +try_connect (struct GNUNET_DHT_Handle *handle) +{ + if (handle->client != NULL) + return GNUNET_OK; + handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); + if (handle->client != NULL) + return GNUNET_YES; +#if DEBUG_STATISTICS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + _("Failed to connect to the dht service!\n")); +#endif + return GNUNET_NO; +} + +/** + * Send complete (or failed), call continuation if we have one. + */ +static void +finish (struct GNUNET_DHT_Handle *handle, int code) +{ + struct PendingMessage *pos = handle->current; + GNUNET_HashCode uid_hash; + hash_from_uid (pos->unique_id, &uid_hash); +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); +#endif + GNUNET_assert (pos != NULL); + + if (pos->cont != NULL) + { + if (code == GNUNET_SYSERR) + GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, + pos->cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + else + GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, + pos->cont_cls, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + } + + if (pos->unique_id != 0) + GNUNET_free(pos->msg); + GNUNET_free (pos); + handle->current = NULL; +} + +/** + * Transmit the next pending message, called by notify_transmit_ready + */ +static size_t +transmit_pending (void *cls, size_t size, void *buf) +{ + struct GNUNET_DHT_Handle *handle = cls; + size_t tsize; + +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending\n", "DHT API"); +#endif + if (buf == NULL) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending buf is NULL\n", "DHT API"); +#endif + finish (handle, GNUNET_SYSERR); + return 0; + } + + handle->th = NULL; + + if (handle->current != NULL) + { + tsize = ntohs (handle->current->msg->size); + if (size >= tsize) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending message size %d\n", "DHT API", tsize); +#endif + memcpy (buf, handle->current->msg, tsize); + finish (handle, GNUNET_OK); + return tsize; + } + else + { + return 0; + } + } + /* Have no pending request */ + return 0; +} + +/** + * Try to send messages from list of messages to send + */ +static void +process_pending_message (struct GNUNET_DHT_Handle *handle) +{ + + if (handle->current == NULL) + return; /* action already pending */ + if (GNUNET_YES != try_connect (handle)) + { + finish (handle, GNUNET_SYSERR); + return; + } + + if (NULL == + (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, + ntohs (handle-> + current->msg-> + size), + handle->current-> + timeout, GNUNET_YES, + &transmit_pending, + handle))) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit request to dht service.\n"); +#endif + finish (handle, GNUNET_SYSERR); + return; + } +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Scheduled sending message of size %d to service\n", + "DHT API", ntohs (handle->current->msg->size)); +#endif +} + +/** + * Send complete (or failed), call continuation if we have one. + * Forward declaration. + */ +static void +finish_retransmission (struct GNUNET_DHT_Handle *handle, int code); + +/* Forward declaration */ +static size_t +transmit_pending_retransmission (void *cls, size_t size, void *buf); + +/** + * Try to send messages from list of messages to send + */ +static void +process_pending_retransmissions (struct GNUNET_DHT_Handle *handle) +{ + + if (handle->current == NULL) + return; /* action already pending */ + if (GNUNET_YES != try_connect (handle)) + { + finish_retransmission (handle, GNUNET_SYSERR); + return; + } + + if (NULL == + (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, + ntohs (handle-> + current->msg-> + size), + handle->current-> + timeout, GNUNET_YES, + &transmit_pending_retransmission, + handle))) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit request to dht service.\n"); +#endif + finish_retransmission (handle, GNUNET_SYSERR); + return; + } +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Scheduled sending message of size %d to service\n", + "DHT API", ntohs (handle->current->msg->size)); +#endif +} + +/** + * Send complete (or failed), call continuation if we have one. + */ +static void +finish_retransmission (struct GNUNET_DHT_Handle *handle, int code) +{ + struct PendingMessage *pos = handle->current; + struct PendingMessageList *pending_list; +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API"); +#endif + GNUNET_assert (pos == handle->retransmissions->message); + pending_list = handle->retransmissions; + handle->retransmissions = handle->retransmissions->next; + GNUNET_free (pending_list); + + if (handle->retransmissions == NULL) + { + handle->retransmit_stage = DHT_NOT_RETRANSMITTING; + } + + if (handle->retransmissions != NULL) + { + handle->current = handle->retransmissions->message; + process_pending_retransmissions(handle); + } + else if (handle->retransmission_buffer != NULL) + { + handle->current = handle->retransmission_buffer; + process_pending_message(handle); + } +} /** * Handler for messages received from the DHT service @@ -286,8 +556,6 @@ service_message_handler (void *cls, uint64_t uid; GNUNET_HashCode uid_hash; size_t enc_size; - /* TODO: find out message type, handle callbacks for different types of messages. - * Should be a non unique acknowledgment, or unique result. */ if (msg == NULL) { @@ -300,8 +568,11 @@ service_message_handler (void *cls, handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); - /* FIXME: re-transmit *all* of our GET requests AND re-start - receiving responses! */ + + handle->retransmit_stage = DHT_RETRANSMITTING; + GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle); + handle->current = handle->retransmissions->message; + process_pending_retransmissions(handle); return; } @@ -341,37 +612,6 @@ service_message_handler (void *cls, break; } - /* FIXME: we don't want these anymore, call continuation once message is sent. */ - /* - case GNUNET_MESSAGE_TYPE_DHT_STOP: - { - stop_msg = (struct GNUNET_DHT_StopMessage *) msg; - uid = GNUNET_ntohll (stop_msg->unique_id); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), current uid %llu\n", - "DHT API", uid, handle->current->unique_id); -#endif - if (handle->current->unique_id == uid) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Have pending confirmation for this message!\n", - "DHT API", uid); -#endif - if (handle->current->cont != NULL) - GNUNET_SCHEDULER_add_continuation (handle->sched, - handle->current->cont, - handle->current->cont_cls, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - - GNUNET_free (handle->current->msg); - GNUNET_free (handle->current); - handle->current = NULL; - } - break; - } - */ default: { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -407,6 +647,7 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, handle->cfg = cfg; handle->sched = sched; handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); + handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); if (handle->client == NULL) { GNUNET_free (handle); @@ -451,40 +692,10 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); handle->client = NULL; } - /* Either assert that outstanding_requests is empty */ - /* FIXME: handle->outstanding_requests not freed! */ - GNUNET_free (handle); -} - - -/** - * Send complete (or failed), call continuation if we have one. - */ -static void -finish (struct GNUNET_DHT_Handle *handle, int code) -{ - struct PendingMessage *pos = handle->current; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); -#endif - GNUNET_assert (pos != NULL); - - - if (pos->cont != NULL) - { - if (code == GNUNET_SYSERR) - GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, - pos->cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); - else - GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, - pos->cont_cls, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - } - GNUNET_free (pos->msg); - GNUNET_free (pos); - handle->current = NULL; + GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0); + GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests); + GNUNET_free (handle); } @@ -492,7 +703,7 @@ finish (struct GNUNET_DHT_Handle *handle, int code) * Transmit the next pending message, called by notify_transmit_ready */ static size_t -transmit_pending (void *cls, size_t size, void *buf) +transmit_pending_retransmission (void *cls, size_t size, void *buf) { struct GNUNET_DHT_Handle *handle = cls; size_t tsize; @@ -507,8 +718,7 @@ transmit_pending (void *cls, size_t size, void *buf) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': In transmit_pending buf is NULL\n", "DHT API"); #endif - /* FIXME: free associated resources or summat */ - finish (handle, GNUNET_SYSERR); + finish_retransmission (handle, GNUNET_SYSERR); return 0; } @@ -524,7 +734,7 @@ transmit_pending (void *cls, size_t size, void *buf) "`%s': Sending message size %d\n", "DHT API", tsize); #endif memcpy (buf, handle->current->msg, tsize); - finish (handle, GNUNET_OK); + finish_retransmission (handle, GNUNET_OK); return tsize; } else @@ -537,66 +747,6 @@ transmit_pending (void *cls, size_t size, void *buf) } -/** - * Try to (re)connect to the dht service. - * - * @return GNUNET_YES on success, GNUNET_NO on failure. - */ -static int -try_connect (struct GNUNET_DHT_Handle *handle) -{ - if (handle->client != NULL) - return GNUNET_OK; - handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); - if (handle->client != NULL) - return GNUNET_YES; -#if DEBUG_STATISTICS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to connect to the dht service!\n")); -#endif - return GNUNET_NO; -} - - -/** - * Try to send messages from list of messages to send - */ -static void -process_pending_message (struct GNUNET_DHT_Handle *handle) -{ - - if (handle->current == NULL) - return; /* action already pending */ - if (GNUNET_YES != try_connect (handle)) - { - finish (handle, GNUNET_SYSERR); - return; - } - - if (NULL == - (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs (handle-> - current->msg-> - size), - handle->current-> - timeout, GNUNET_YES, - &transmit_pending, - handle))) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dht service.\n"); -#endif - finish (handle, GNUNET_SYSERR); - return; - } -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", - "DHT API", ntohs (handle->current->msg->size)); -#endif -} - /** * Iterator called on each result obtained from a generic route * operation @@ -633,20 +783,31 @@ void find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) { struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; + struct GNUNET_MessageHeader *hello; + size_t hello_size; -#if DEBUG_DHT_API + if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) + { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Find peer iterator called.\n"); -#endif - if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_HELLO) - return; + "Received wrong type of response to a find peer request...\n"); + return; + } + GNUNET_assert (ntohs (reply->size) >= sizeof (struct GNUNET_MessageHeader)); + hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader); + hello = (struct GNUNET_MessageHeader *)&reply[1]; + if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO"); + return; + } find_peer_handle->find_peer_context.proc (find_peer_handle-> find_peer_context.proc_cls, - (struct GNUNET_HELLO_Message *)reply); + (struct GNUNET_HELLO_Message *)hello); } /** @@ -685,36 +846,38 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_DHT_RouteHandle *route_handle; struct PendingMessage *pending; struct GNUNET_DHT_RouteMessage *message; - size_t expects_response; uint16_t msize; GNUNET_HashCode uid_key; - uint64_t uid; if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); return NULL; } - expects_response = GNUNET_YES; - if (iter == NULL) - expects_response = GNUNET_NO; - uid = handle->uid_gen++; - if (expects_response) + + route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); + memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); + route_handle->iter = iter; + route_handle->iter_cls = iter_cls; + route_handle->dht_handle = handle; + if (iter != NULL) { - route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); - memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); - route_handle->iter = iter; - route_handle->iter_cls = iter_cls; - route_handle->dht_handle = handle; - route_handle->uid = uid; -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Unique ID is %llu\n", "DHT API", uid); -#endif + route_handle->uid = handle->uid_gen++; + hash_from_uid (route_handle->uid, &uid_key); GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, &uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } + else + { + route_handle->uid = 0; + } + +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid); +#endif + msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size); message = GNUNET_malloc (msize); message->header.size = htons (msize); @@ -722,18 +885,25 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, memcpy (&message->key, key, sizeof (GNUNET_HashCode)); message->options = htonl (options); message->desired_replication_level = htonl (options); - message->unique = htonl (expects_response); - message->unique_id = GNUNET_htonll (uid); + message->unique_id = GNUNET_htonll (route_handle->uid); memcpy (&message[1], enc, ntohs (enc->size)); pending = GNUNET_malloc (sizeof (struct PendingMessage)); pending->msg = &message->header; pending->timeout = timeout; pending->cont = cont; pending->cont_cls = cont_cls; - pending->unique_id = uid; - GNUNET_assert (handle->current == NULL); - handle->current = pending; - process_pending_message (handle); + pending->unique_id = route_handle->uid; + if (handle->current == NULL) + { + handle->current = pending; + process_pending_message (handle); + } + else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING)) + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + route_handle->message = message; return route_handle; } @@ -762,9 +932,9 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_GetHandle *get_handle; - struct GNUNET_DHT_GetMessage *get_msg; + struct GNUNET_DHT_GetMessage get_msg; - if (handle->current != NULL) /* Can't send right now, we have a pending message... */ + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ return NULL; get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); @@ -777,14 +947,14 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, GNUNET_h2s (key)); #endif - get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage)); - get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); - get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); - get_msg->type = htons (type); + get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); + get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); + get_msg.type = htons (type); get_handle->route_handle = - GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout, + GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout, &get_reply_iterator, get_handle, cont, cont_cls); + return get_handle; } @@ -821,10 +991,23 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, pending->timeout = DEFAULT_DHT_TIMEOUT; pending->cont = cont; pending->cont_cls = cont_cls; - pending->unique_id = route_handle->uid; - GNUNET_assert (route_handle->dht_handle->current == NULL); - route_handle->dht_handle->current = pending; - process_pending_message (route_handle->dht_handle); + pending->unique_id = 0; /* When finished is called, free pending->msg */ + + if (route_handle->dht_handle->current == NULL) + { + route_handle->dht_handle->current = pending; + process_pending_message (route_handle->dht_handle); + } + else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)) + { + route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + route_handle->dht_handle->retransmission_buffer = pending; + } + else + { + GNUNET_break(0); + } + hash_from_uid (route_handle->uid, &uid_key); GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove (route_handle->dht_handle->outstanding_requests, &uid_key, @@ -843,6 +1026,17 @@ void GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) { + if ((get_handle->route_handle->dht_handle->current != NULL) && + (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) + { + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } + return; + } + #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Removing pending get request with key %s, uid %llu\n", @@ -880,9 +1074,9 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, void *cont_cls) { struct GNUNET_DHT_FindPeerHandle *find_peer_handle; - struct GNUNET_MessageHeader *find_peer_msg; + struct GNUNET_MessageHeader find_peer_msg; - if (handle->current != NULL) /* Can't send right now, we have a pending message... */ + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ return NULL; find_peer_handle = @@ -896,11 +1090,10 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, "FIND PEER", GNUNET_h2s (key)); #endif - find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader)); - find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader)); - find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); + find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader)); + find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); find_peer_handle->route_handle = - GNUNET_DHT_route_start (handle, key, 0, options, find_peer_msg, + GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg, timeout, &find_peer_reply_iterator, find_peer_handle, cont, cont_cls); return find_peer_handle; @@ -917,6 +1110,17 @@ void GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) { + if ((find_peer_handle->route_handle->dht_handle->current != NULL) && + (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) + { + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } + return; + } + #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Removing pending `%s' request with key %s, uid %llu\n", @@ -958,12 +1162,16 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_PutMessage *put_msg; + struct GNUNET_DHT_RouteHandle *put_route; size_t msize; - if (handle->current != NULL) + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) { - GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } return; } @@ -982,8 +1190,19 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, put_msg->expiration = GNUNET_TIME_absolute_hton(exp); memcpy (&put_msg[1], data, size); - GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL, - NULL, cont, cont_cls); + put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL, + NULL, cont, cont_cls); + + if (put_route == NULL) /* Route start failed! */ + { + if (cont != NULL) + { + GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + } + } + else + GNUNET_free(put_route); GNUNET_free (put_msg); } diff --git a/src/dht/gnunet-dht-get-peer.c b/src/dht/gnunet-dht-get-peer.c index 3b7f7a4e8..e3f6116cd 100644 --- a/src/dht/gnunet-dht-get-peer.c +++ b/src/dht/gnunet-dht-get-peer.c @@ -101,17 +101,18 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * operation * * @param cls closure (NULL) - * @param peer the peer we learned about - * @param reply the response message, should be a HELLO + * @param hello the response message, a HELLO */ void find_peer_processor (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *reply) + const struct GNUNET_HELLO_Message *hello) { - result_count++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(peer), result_count); - + struct GNUNET_PeerIdentity peer; + if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer)) + { + result_count++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(&peer), result_count); + } } @@ -191,8 +192,14 @@ run (void *cls, if (verbose) fprintf (stderr, "Issuing FIND PEER request for %s!\n", query_key); - find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, timeout, 0, NULL, &key, - &find_peer_processor, NULL, &message_sent_cont, NULL); + find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, + timeout, + 0, + &key, + &find_peer_processor, + NULL, + &message_sent_cont, + NULL); } diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 8907c3642..27c431bd5 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -166,7 +166,7 @@ struct DHT_MessageContext /** * The key this request was about */ - GNUNET_HashCode *key; + const GNUNET_HashCode *key; /** * The unique identifier of this request @@ -240,9 +240,6 @@ send_generic_reply (void *cls, size_t size, void *buf) if (buf == NULL) { /* client disconnected */ -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT"); -#endif return 0; } off = 0; @@ -256,10 +253,6 @@ send_generic_reply (void *cls, size_t size, void *buf) GNUNET_free (reply); off += msize; } -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Copying reply to buffer, REALLY SENT\n", "DHT"); -#endif process_pending_messages (client); return off; } @@ -284,7 +277,7 @@ add_pending_message (struct ClientList *client, /** - * Called when a reply needs to be sent to a client, either as + * Called when a reply needs to be sent to a client, as * a result it found to a GET or FIND PEER request. * * @param client the client to send the reply to @@ -296,7 +289,7 @@ send_reply_to_client (struct ClientList *client, const struct GNUNET_MessageHeader *message, unsigned long long uid) { - struct GNUNET_DHT_Message *reply; + struct GNUNET_DHT_RouteResultMessage *reply; struct PendingMessage *pending_message; uint16_t msize; size_t tsize; @@ -305,21 +298,21 @@ send_reply_to_client (struct ClientList *client, "`%s': Sending reply to client.\n", "DHT"); #endif msize = ntohs (message->size); - tsize = sizeof (struct GNUNET_DHT_Message) + msize; + tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize; if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { - GNUNET_BREAK_op (0); + GNUNET_break_op (0); return; } - reply = GNUNET_malloc (tsize); + + pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize); + pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1]; + reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1]; reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT); reply->header.size = htons (tsize); - if (uid != 0) - reply->unique = htonl (GNUNET_YES); // ???? reply->unique_id = GNUNET_htonll (uid); memcpy (&reply[1], message, msize); - pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); // inline - pending_message->msg = &reply->header; + add_pending_message (client, pending_message); } @@ -354,7 +347,6 @@ datacache_get_iterator (void *cls, get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); get_result->header.size = htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); - get_result->data_size = htons (size); get_result->expiration = exp; memcpy (&get_result->key, key, sizeof (GNUNET_HashCode)); get_result->type = htons (type); @@ -383,12 +375,13 @@ handle_dht_get (void *cls, unsigned int results; struct DatacacheGetContext datacache_get_context; - if (ntohs (msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage)) + get_msg = (const struct GNUNET_DHT_GetMessage *) msg; + if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage)) { GNUNET_break (0); return; } - get_msg = (const struct GNUNET_DHT_GetMessage *) msg; + get_type = ntohs (get_msg->type); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -422,7 +415,7 @@ handle_dht_find_peer (void *cls, const struct GNUNET_MessageHeader *find_msg, struct DHT_MessageContext *message_context) { - struct GNUNET_DHT_FindPeerResultMessage *find_peer_result; + struct GNUNET_MessageHeader *find_peer_result; size_t hello_size; size_t tsize; @@ -430,8 +423,8 @@ handle_dht_find_peer (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", "DHT", "FIND PEER", GNUNET_h2s (message_context->key), - ntohs (find_msg->header.size), - sizeof (struct GNUNET_DHT_FindPeerMessage)); + ntohs (find_msg->size), + sizeof (struct GNUNET_MessageHeader)); #endif if (my_hello == NULL) { @@ -444,13 +437,18 @@ handle_dht_find_peer (void *cls, } /* Simplistic find_peer functionality, always return our hello */ hello_size = ntohs(my_hello->size); - tsize = hello_size + sizeof (struct GNUNET_DHT_FindPeerResultMessage); + tsize = hello_size + sizeof (struct GNUNET_MessageHeader); // check tsize < MAX find_peer_result = GNUNET_malloc (tsize); - find_peer_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT); - find_peer_result->header.size = htons (tsize); - memcpy (&find_peer_result[1], &my_hello, hello_size); - send_reply_to_client(message_context->client, &find_peer_result->header, message_context->unique_id); + find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT); + find_peer_result->size = htons (tsize); + memcpy (&find_peer_result[1], my_hello, hello_size); +#if DEBUG_DHT_HELLO + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending hello size %d to client.\n", + "DHT", hello_size); +#endif + send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id); GNUNET_free(find_peer_result); } @@ -471,16 +469,12 @@ handle_dht_put (void *cls, size_t put_type; size_t data_size; - GNUNET_assert (ntohs (msg->header.size) >= + GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage)); put_msg = (struct GNUNET_DHT_PutMessage *)msg; - put_type = ntohl (put_msg->type); + put_type = ntohs (put_msg->type); data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); #if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': %s msg total size is %d, data size %d, struct size %d\n", - "DHT", "PUT", ntohs (put_msg->header.size), data_size, - sizeof (struct GNUNET_DHT_PutMessage)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s (message_context->key)); @@ -488,7 +482,11 @@ handle_dht_put (void *cls, if (datacache != NULL) GNUNET_DATACACHE_put (datacache, message_context->key, data_size, (char *) &put_msg[1], put_type, - put_msg->expiration); + GNUNET_TIME_absolute_ntoh(put_msg->expiration)); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': %s request received locally, but have no datacache!\n", + "DHT", "PUT"); } @@ -519,41 +517,6 @@ find_active_client (struct GNUNET_SERVER_Client *client) return ret; } -/** - * Construct a message receipt confirmation for a particular uid. - * Receipt confirmations are used for any requests that don't expect - * a reply otherwise (i.e. put requests, stop requests). - * - * @param client the handle for the client - * @param uid the unique identifier of this message - */ -static void -send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client, - uint64_t uid) -{ - struct GNUNET_DHT_StopMessage *confirm_message; - struct ClientList *active_client; - struct PendingMessage *pending_message; - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending receipt confirmation for uid %llu\n", "DHT", - uid); -#endif - confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage)); - confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP); - confirm_message->header.size = - htons (sizeof (struct GNUNET_DHT_StopMessage)); - confirm_message->unique_id = GNUNET_htonll (uid); - - active_client = find_active_client (client); - pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); - pending_message->msg = &confirm_message->header; - - add_pending_message (active_client, pending_message); - -} - /** * Handler for any generic DHT messages, calls the appropriate handler * depending on message type, sends confirmation if responses aren't otherwise @@ -567,9 +530,10 @@ static void handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { - const struct GNUNET_DHT_Message *dht_msg = (const struct GNUNET_DHT_Message *) message; + const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message; const struct GNUNET_MessageHeader *enc_msg; struct DHT_MessageContext *message_context; + int handle_locally; size_t enc_type; enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; @@ -590,28 +554,37 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, message_context->replication = ntohl (dht_msg->desired_replication_level); message_context->msg_options = ntohl (dht_msg->options); + /* TODO: Steps to be added by students */ /* FIXME: Implement *remote* DHT operations here (forward request) */ + /* Implement generic route function and call here. */ /* FIXME: *IF* handling should be local, then do this: */ - switch (enc_type) + /* 1. find if this peer is closest based on whatever metric the DHT uses + * 2. if this peer is closest _OR_ the message options indicate it should + * be processed everywhere _AND_ we want it processed everywhere, then + * handle it locally. + */ + handle_locally = GNUNET_YES; + if (handle_locally == GNUNET_YES) { - case GNUNET_MESSAGE_TYPE_DHT_GET: - handle_dht_get (cls, enc_msg, - message_context); - break; - case GNUNET_MESSAGE_TYPE_DHT_PUT: - handle_dht_put (cls, enc_msg, - message_context); - send_client_receipt_confirmation (client, - GNUNET_ntohll (dht_msg->unique_id)); - break; - case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: - handle_dht_find_peer (cls, - enc_msg, - message_context); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "`%s': Message type (%d) not handled\n", "DHT", enc_type); + switch (enc_type) + { + case GNUNET_MESSAGE_TYPE_DHT_GET: + handle_dht_get (cls, enc_msg, + message_context); + break; + case GNUNET_MESSAGE_TYPE_DHT_PUT: + handle_dht_put (cls, enc_msg, + message_context); + break; + case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: + handle_dht_find_peer (cls, + enc_msg, + message_context); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "`%s': Message type (%d) not handled\n", "DHT", enc_type); + } } GNUNET_free (message_context); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -620,14 +593,14 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, /** * Handler for any generic DHT stop messages, calls the appropriate handler - * depending on message type, sends confirmation by default (stop messages - * do not otherwise expect replies) + * depending on message type (if processed locally) * * @param cls closure for the service * @param client the client we received this message from * @param message the actual message received * - * TODO: add demultiplexing for stop message types. + * TODO: once message are remembered by unique id, add code to + * forget them here */ static void handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, @@ -635,13 +608,17 @@ handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, { const struct GNUNET_DHT_StopMessage *dht_stop_msg = (const struct GNUNET_DHT_StopMessage *) message; - + uint64_t uid; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); #endif - /* TODO: actually stop... */ + + uid = GNUNET_ntohll(dht_stop_msg->unique_id); + /* TODO: actually stop... free associated resources for the request + * lookup request by uid and remove state. */ + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -767,14 +744,14 @@ core_init (void *cls, static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { - {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, + {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0}, {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0}, {NULL, NULL, 0, 0} }; static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE_REQUEST, 0}, + {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0}, {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0}, {NULL, 0, 0} }; @@ -803,7 +780,6 @@ run (void *cls, GNUNET_TIME_UNIT_FOREVER_REL, NULL, /* FIXME: anything we want to pass around? */ &core_init, /* Call core_init once connected */ - NULL, /* Don't care about pre-connects */ NULL, /* Don't care about connects */ NULL, /* Don't care about disconnects */ NULL, /* Don't want notified about all incoming messages */ diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index f99e84269..10db5f45a 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -32,6 +32,7 @@ #include "gnunet_program_lib.h" #include "gnunet_scheduler_lib.h" #include "gnunet_dht_service.h" +#include "gnunet_hello_lib.h" #define VERBOSE GNUNET_NO @@ -90,6 +91,8 @@ struct PeerContext static struct PeerContext p1; +struct RetryContext retry_context; + static struct GNUNET_SCHEDULER_Handle *sched; static int ok; @@ -146,7 +149,10 @@ end_badly () #if VERBOSE fprintf (stderr, "Ending on an unhappy note.\n"); #endif - + if (retry_context.peer_ctx->find_peer_handle != NULL) + GNUNET_DHT_find_peer_stop(retry_context.peer_ctx->find_peer_handle, NULL, NULL); + if (retry_context.retry_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel(sched, retry_context.retry_task); GNUNET_DHT_disconnect (p1.dht_handle); ok = 1; @@ -186,21 +192,41 @@ test_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param reply response */ void test_find_peer_processor (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *reply) + const struct GNUNET_HELLO_Message *hello) { struct RetryContext *retry_ctx = cls; + struct GNUNET_PeerIdentity peer; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "test_find_peer_processor called (peer `%s'), stopping find peer request!\n", GNUNET_i2s(peer)); + if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "test_find_peer_processor called (peer `%s'), stopping find peer request!\n", GNUNET_i2s(&peer)); - if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task); + if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task); + retry_ctx->retry_task = GNUNET_SCHEDULER_NO_TASK; + } + + GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "received find peer request, but hello_get_id failed!\n"); + } - GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); } +/** + * Retry the find_peer task on timeout. (Forward declaration) + * + * @param cls closure + * @param tc context information (why was this task triggered now?) + */ +void +retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); /** * Retry the find_peer task on timeout. @@ -219,9 +245,9 @@ retry_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_find_peer timed out, retrying!\n"); - + retry_ctx->next_timeout = GNUNET_TIME_relative_multiply(retry_ctx->next_timeout, 2); retry_ctx->peer_ctx->find_peer_handle = - GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, retry_ctx->next_timeout, 0, NULL, &hash, + GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, retry_ctx->next_timeout, 0, &hash, &test_find_peer_processor, retry_ctx, NULL, NULL); } else @@ -235,14 +261,14 @@ retry_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (retry_ctx->peer_ctx->find_peer_handle == NULL) GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); else - retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer, retry_ctx); + retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx); } /** * Retry the find_peer task on timeout. * * @param cls closure - * @param tc context information (why was this task triggered now) + * @param tc context information (why was this task triggered now?) */ void retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) @@ -270,24 +296,22 @@ test_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct PeerContext *peer = cls; GNUNET_HashCode hash; memset (&hash, 42, sizeof (GNUNET_HashCode)); - struct RetryContext *retry_ctx; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n"); GNUNET_assert (peer->dht_handle != NULL); - retry_ctx = GNUNET_malloc(sizeof(struct RetryContext)); - retry_ctx->real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT); - retry_ctx->next_timeout = BASE_TIMEOUT; - retry_ctx->peer_ctx = peer; + retry_context.real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT); + retry_context.next_timeout = BASE_TIMEOUT; + retry_context.peer_ctx = peer; peer->find_peer_handle = - GNUNET_DHT_find_peer_start (peer->dht_handle, retry_ctx->next_timeout, 0, NULL, &hash, - &test_find_peer_processor, retry_ctx, NULL, NULL); + GNUNET_DHT_find_peer_start (peer->dht_handle, retry_context.next_timeout, 0, &hash, + &test_find_peer_processor, &retry_context, NULL, NULL); if (peer->find_peer_handle == NULL) GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); else - retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx); + retry_context.retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_context.next_timeout, &retry_find_peer_stop, &retry_context); } /**