X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fmq.c;h=a4691ff2c2eccef9d447a971c448090f8509b18c;hb=007c852f2e43e658cb2581a62c03cd0ccbfd08f4;hp=289ac1ade4e39f69ee83b3d840c542f6c56519be;hpb=6db7b23f0be85c8fc263cb8380cb9a3d495be75a;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index 289ac1ade..a4691ff2c 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -23,9 +23,7 @@ * @file util/mq.c * @brief general purpose request queue */ - #include "platform.h" -#include "gnunet_common.h" #include "gnunet_util_lib.h" #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) @@ -123,15 +121,14 @@ struct GNUNET_MQ_Handle struct GNUNET_MQ_Envelope *current_envelope; /** - * Has the current envelope been commited? - * Either GNUNET_YES or GNUNET_NO. + * Map of associations, lazily allocated */ - int commited; + struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; /** - * Map of associations, lazily allocated + * Task scheduled during #GNUNET_MQ_impl_send_continue. */ - struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; + GNUNET_SCHEDULER_TaskIdentifier continue_task; /** * Next id that should be used for the assoc_map, @@ -194,7 +191,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_Messa handled = GNUNET_YES; } } - + if (GNUNET_NO == handled) LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); } @@ -235,7 +232,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) /** * Send a message with the give message queue. * May only be called once per message. - * + * * @param mq message queue * @param ev the envelope with the message to send. */ @@ -244,7 +241,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) { GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); - + /* is the implementation busy? queue it! */ if (NULL != mq->current_envelope) { @@ -257,36 +254,60 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) /** - * Call the send implementation for the next queued message, - * if any. - * Only useful for implementing message queues, + * Task run to call the send implementation for the next queued + * message, if any. Only useful for implementing message queues, * results in undefined behavior if not used carefully. * - * @param mq message queue to send the next message with + * @param cls message queue to send the next message with + * @param tc scheduler context */ -void -GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) +static void +impl_send_continue (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct GNUNET_MQ_Handle *mq = cls; + struct GNUNET_MQ_Envelope *current_envelope; + + if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + return; + + mq->continue_task = GNUNET_SCHEDULER_NO_TASK; /* call is only valid if we're actually currently sending * a message */ - GNUNET_assert (NULL != mq); - GNUNET_assert (NULL != mq->current_envelope); - GNUNET_assert (GNUNET_YES == mq->commited); - mq->commited = GNUNET_NO; - GNUNET_free (mq->current_envelope); + current_envelope = mq->current_envelope; + GNUNET_assert (NULL != current_envelope); if (NULL == mq->envelope_head) { mq->current_envelope = NULL; - return; } + else + { + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); + } + if (NULL != current_envelope->sent_cb) + current_envelope->sent_cb (current_envelope->sent_cls); + GNUNET_free (current_envelope); +} - GNUNET_assert (NULL != mq->envelope_tail); - GNUNET_assert (NULL != mq->envelope_head); - mq->current_envelope = mq->envelope_head; - GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, - mq->current_envelope); - mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); +/** + * Call the send implementation for the next queued message, + * if any. + * Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param mq message queue to send the next message with + */ +void +GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) +{ + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task); + mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, + mq); } @@ -365,25 +386,6 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) } - -/** - * Mark the current message as irrevocably sent, but do not - * proceed with sending the next message. - * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. - * - * @param mq message queue - */ -void -GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq) -{ - GNUNET_assert (NULL != mq->current_envelope); - GNUNET_assert (GNUNET_NO == mq->commited); - mq->commited = GNUNET_YES; - if (NULL != mq->current_envelope->sent_cb) - mq->current_envelope->sent_cb (mq->current_envelope->sent_cls); -} - - struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) { @@ -461,19 +463,25 @@ transmit_queued (void *cls, size_t size, } - static void server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct ServerClientSocketState *state = impl_state; - + + if (NULL != state->th) + { + GNUNET_SERVER_notify_transmit_ready_cancel (state->th); + state->th = NULL; + } + GNUNET_assert (NULL != mq); GNUNET_assert (NULL != state); GNUNET_SERVER_client_drop (state->client); GNUNET_free (state); } + static void server_client_send_impl (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state) @@ -482,10 +490,7 @@ server_client_send_impl (struct GNUNET_MQ_Handle *mq, GNUNET_assert (NULL != mq); GNUNET_assert (NULL != state); - - GNUNET_MQ_impl_send_commit (mq); - - state->th = + state->th = GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, &transmit_queued, mq); @@ -524,7 +529,7 @@ handle_client_message (void *cls, struct ClientConnectionState *state; state = mq->impl_state; - + if (NULL == msg) { GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); @@ -542,13 +547,14 @@ handle_client_message (void *cls, * Transmit a queued message to the session's client. * * @param cls consensus session - * @param size number of bytes available in buf + * @param size number of bytes available in @a buf * @param buf where the callee should write the message * @return number of bytes written to buf */ static size_t -connection_client_transmit_queued (void *cls, size_t size, - void *buf) +connection_client_transmit_queued (void *cls, + size_t size, + void *buf) { struct GNUNET_MQ_Handle *mq = cls; const struct GNUNET_MessageHeader *msg; @@ -584,13 +590,13 @@ connection_client_transmit_queued (void *cls, size_t size, } - static void connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { GNUNET_free (impl_state); } + static void connection_client_send_impl (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state) @@ -599,13 +605,11 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, GNUNET_assert (NULL != state); GNUNET_assert (NULL == state->th); - - GNUNET_MQ_impl_send_commit (mq); - - state->th = - GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), + state->th = + GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, &connection_client_transmit_queued, mq); + GNUNET_assert (NULL != state->th); } @@ -672,7 +676,6 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, } - void * GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) { @@ -712,7 +715,11 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { mq->destroy_impl (mq, mq->impl_state); } - + if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task) + { + GNUNET_SCHEDULER_cancel (mq->continue_task); + mq->continue_task = GNUNET_SCHEDULER_NO_TASK; + } while (NULL != mq->envelope_head) { struct GNUNET_MQ_Envelope *ev;