From 7831efe09b1e8d30c7361e4b6c17b6966ad0fafa Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 9 Jun 2012 15:12:03 +0000 Subject: [PATCH] -fixing #2400 --- src/core/core.h | 4 +- src/core/core_api.c | 289 +++++++++++++++----------------------------- 2 files changed, 98 insertions(+), 195 deletions(-) diff --git a/src/core/core.h b/src/core/core.h index 03e328ca8..9b1802fbc 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -257,9 +257,9 @@ struct SendMessageRequest struct GNUNET_PeerIdentity peer; /** - * How large is the client's message queue for this peer? + * Always zero. */ - uint32_t queue_size GNUNET_PACKED; + uint32_t reserved GNUNET_PACKED; /** * How large is the message? diff --git a/src/core/core_api.c b/src/core/core_api.c index 2b6407b6e..5c16adeae 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -31,6 +31,65 @@ #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__) + +/** + * Handle for a transmission request. + */ +struct GNUNET_CORE_TransmitHandle +{ + + /** + * Corresponding peer record. + */ + struct PeerRecord *peer; + + /** + * Corresponding SEND_REQUEST message. Only non-NULL + * while SEND_REQUEST message is pending. + */ + struct ControlMessage *cm; + + /** + * Function that will be called to get the actual request + * (once we are ready to transmit this request to the core). + * The function will be called with a NULL buffer to signal + * timeout. + */ + GNUNET_CONNECTION_TransmitReadyNotify get_message; + + /** + * Closure for get_message. + */ + void *get_message_cls; + + /** + * Timeout for this handle. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * How important is this message? + */ + uint32_t priority; + + /** + * Size of this request. + */ + uint16_t msize; + + /** + * Send message request ID for this request. + */ + uint16_t smr_id; + + /** + * Is corking allowed? + */ + int cork; + +}; + + /** * Information we track for each peer. */ @@ -62,16 +121,10 @@ struct PeerRecord struct GNUNET_CORE_Handle *ch; /** - * Head of doubly-linked list of pending requests. - * Requests are sorted by deadline *except* for HEAD, - * which is only modified upon transmission to core. + * Pending request, if any. 'th->peer' is set to NULL if the + * request is not active. */ - struct GNUNET_CORE_TransmitHandle *pending_head; - - /** - * Tail of doubly-linked list of pending requests. - */ - struct GNUNET_CORE_TransmitHandle *pending_tail; + struct GNUNET_CORE_TransmitHandle th; /** * ID of timeout task for the 'pending_head' handle @@ -84,11 +137,6 @@ struct PeerRecord */ GNUNET_SCHEDULER_TaskIdentifier ntr_task; - /** - * Current size of the queue of pending requests. - */ - unsigned int queue_size; - /** * SendMessageRequest ID generator for this peer. */ @@ -246,11 +294,6 @@ struct GNUNET_CORE_Handle */ struct GNUNET_TIME_Relative retry_backoff; - /** - * Number of messages we are allowed to queue per target. - */ - unsigned int queue_size; - /** * Number of entries in the handlers array. */ @@ -277,74 +320,6 @@ struct GNUNET_CORE_Handle }; -/** - * Handle for a transmission request. - */ -struct GNUNET_CORE_TransmitHandle -{ - - /** - * We keep active transmit handles in a doubly-linked list. - */ - struct GNUNET_CORE_TransmitHandle *next; - - /** - * We keep active transmit handles in a doubly-linked list. - */ - struct GNUNET_CORE_TransmitHandle *prev; - - /** - * Corresponding peer record. - */ - struct PeerRecord *peer; - - /** - * Corresponding SEND_REQUEST message. Only non-NULL - * while SEND_REQUEST message is pending. - */ - struct ControlMessage *cm; - - /** - * Function that will be called to get the actual request - * (once we are ready to transmit this request to the core). - * The function will be called with a NULL buffer to signal - * timeout. - */ - GNUNET_CONNECTION_TransmitReadyNotify get_message; - - /** - * Closure for get_message. - */ - void *get_message_cls; - - /** - * Timeout for this handle. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * How important is this message? - */ - uint32_t priority; - - /** - * Size of this request. - */ - uint16_t msize; - - /** - * Send message request ID for this request. - */ - uint16_t smr_id; - - /** - * Is corking allowed? - */ - int cork; - -}; - - /** * Our current client connection went down. Clean it up * and try to reconnect! @@ -404,23 +379,18 @@ disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key, if (h->disconnects != NULL) h->disconnects (h->cls, &pr->peer); /* all requests should have been cancelled, clean up anyway, just in case */ - GNUNET_break (pr->queue_size == 0); - while (NULL != (th = pr->pending_head)) + th = &pr->th; + if (NULL != th->peer) { GNUNET_break (0); - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; + th->peer = NULL; if (th->cm != NULL) th->cm->th = NULL; - GNUNET_free (th); } /* done with 'voluntary' cleanups, now on to normal freeing */ GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr)); - GNUNET_assert (pr->pending_head == NULL); - GNUNET_assert (pr->pending_tail == NULL); GNUNET_assert (pr->ch == h); - GNUNET_assert (pr->queue_size == 0); GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK); GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK); GNUNET_free (pr); @@ -517,7 +487,8 @@ request_next_transmission (struct PeerRecord *pr) GNUNET_SCHEDULER_cancel (pr->timeout_task); pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; } - if (NULL == (th = pr->pending_head)) + th = &pr->th; + if (NULL == th->peer) { trigger_next_request (h, GNUNET_NO); return; @@ -539,7 +510,7 @@ request_next_transmission (struct PeerRecord *pr) smr->priority = htonl (th->priority); smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); smr->peer = pr->peer; - smr->queue_size = htonl (pr->queue_size); + smr->reserved = htonl (0); smr->size = htons (th->msize); smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, @@ -566,9 +537,8 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_CORE_TransmitHandle *th; pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; - th = pr->pending_head; - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; + th = &pr->th; + th->peer = NULL; if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head)) { /* the request that was 'approved' by core was @@ -587,7 +557,6 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) "Signalling timeout of request for transmission to CORE service\n"); request_next_transmission (pr); GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); - GNUNET_free (th); } @@ -647,16 +616,15 @@ transmit_message (void *cls, size_t size, void *buf) /* now check for 'ready' P2P messages */ if (NULL != (pr = h->ready_peer_head)) { - GNUNET_assert (pr->pending_head != NULL); - th = pr->pending_head; + GNUNET_assert (NULL != pr->th.peer); + th = &pr->th; if (size < th->msize + sizeof (struct SendMessage)) { trigger_next_request (h, GNUNET_NO); return 0; } GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; + th->peer = NULL; if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (pr->timeout_task); @@ -679,7 +647,6 @@ transmit_message (void *cls, size_t size, void *buf) LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting SEND request to `%s' yielded %u bytes.\n", GNUNET_i2s (&pr->peer), ret); - GNUNET_free (th); if (0 == ret) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -738,7 +705,7 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) control_pending_head[1])->size); else if (h->ready_peer_head != NULL) msize = - h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage); + h->ready_peer_head->th.msize + sizeof (struct SendMessage); else { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -782,8 +749,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) if (NULL == msg) { LOG (GNUNET_ERROR_TYPE_INFO, - _ - ("Client was disconnected from core service, trying to reconnect.\n")); + _("Client was disconnected from core service, trying to reconnect.\n")); reconnect_later (h); return; } @@ -1032,14 +998,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about transmission readiness to `%s'.\n", GNUNET_i2s (&smr->peer)); - if (NULL == pr->pending_head) + if (NULL == pr->th.peer) { /* request must have been cancelled between the original request * and the response from core, ignore core's readiness */ break; } - th = pr->pending_head; + th = &pr->th; if (ntohs (smr->smr_id) != th->smr_id) { /* READY message is for expired or cancelled message, @@ -1191,7 +1157,6 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle)); h->cfg = cfg; - h->queue_size = 1; // FIXME: remove entirely... h->cls = cls; h->init = init; h->connects = connects; @@ -1318,89 +1283,34 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, { struct PeerRecord *pr; struct GNUNET_CORE_TransmitHandle *th; - struct GNUNET_CORE_TransmitHandle *pos; - struct GNUNET_CORE_TransmitHandle *prev; - struct GNUNET_CORE_TransmitHandle *minp; + GNUNET_assert (NULL != notify); pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey); if (NULL == pr) { /* attempt to send to peer that is not connected */ - LOG (GNUNET_ERROR_TYPE_WARNING, - "Attempting to send to peer `%s' from peer `%s', but not connected!\n", - GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey)); + GNUNET_break (0); + return NULL; + } + if (NULL != pr->th.peer) + { + /* attempting to queue a second request for the same destination */ GNUNET_break (0); return NULL; } GNUNET_assert (notify_size + sizeof (struct SendMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE); - th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle)); + th = &pr->th; th->peer = pr; - GNUNET_assert (NULL != notify); th->get_message = notify; th->get_message_cls = notify_cls; th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); th->priority = priority; th->msize = notify_size; th->cork = cork; - /* bound queue size */ - if (pr->queue_size == handle->queue_size) - { - /* find lowest-priority entry, but skip the head of the list */ - minp = pr->pending_head->next; - prev = minp; - while (prev != NULL) - { - if (prev->priority < minp->priority) - minp = prev; - prev = prev->next; - } - if (minp == NULL) - { - GNUNET_break (handle->queue_size != 0); - GNUNET_break (pr->queue_size == 1); - GNUNET_free (th); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Dropping transmission request: cannot drop queue head and limit is one\n"); - return NULL; - } - if (priority <= minp->priority) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Dropping transmission request: priority too low\n"); - GNUNET_free (th); - return NULL; /* priority too low */ - } - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp); - pr->queue_size--; - GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL)); - GNUNET_free (minp); - } - - /* Order entries by deadline, but SKIP 'HEAD' (as we may have transmitted - * that request already or might even already be approved to transmit that - * message to core) */ - pos = pr->pending_head; - if (pos != NULL) - pos = pos->next; /* skip head */ - - /* insertion sort */ - prev = pos; - while ((NULL != pos) && (pos->timeout.abs_value < th->timeout.abs_value)) - { - prev = pos; - pos = pos->next; - } - GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev, - th); - pr->queue_size++; - /* was the request queue previously empty? */ + pr->ntr_task = + GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n"); - if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) && - (pr->next == NULL) && (pr->prev == NULL) && - (handle->ready_peer_head != pr)) - pr->ntr_task = - GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); return th; } @@ -1414,12 +1324,11 @@ void GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) { struct PeerRecord *pr = th->peer; - struct GNUNET_CORE_Handle *h = pr->ch; - int was_head; + struct GNUNET_CORE_Handle *h; - was_head = (pr->pending_head == th); - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; + GNUNET_assert (NULL != pr); + th->peer = NULL; + h = pr->ch; if (NULL != th->cm) { /* we're currently in the control queue, remove */ @@ -1427,18 +1336,12 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) h->control_pending_tail, th->cm); GNUNET_free (th->cm); } - GNUNET_free (th); - if (was_head) + if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) { - if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) - { - /* the request that was 'approved' by core was - * canceled before it could be transmitted; remove - * us from the 'ready' list */ - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); - } - if (NULL != h->client) - request_next_transmission (pr); + /* the request that was 'approved' by core was + * canceled before it could be transmitted; remove + * us from the 'ready' list */ + GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); } } -- 2.25.1