X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fmq.c;h=14e0816e28b3f2aa6e989320ed1ff463321bbf79;hb=34f34474b6137233d6700d4599f42257e8208af2;hp=c4f9e0d0b3c2ecd4f7f4ab83b6a970e2d3b4c4be;hpb=61c39c60565b386e0e12ea669556b030e8cd7180;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index c4f9e0d0b..14e0816e2 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -60,7 +60,7 @@ struct GNUNET_MQ_Envelope GNUNET_MQ_NotifyCallback sent_cb; /** - * Closure for send_cb + * Closure for @e send_cb */ void *sent_cls; }; @@ -93,6 +93,11 @@ struct GNUNET_MQ_Handle */ GNUNET_MQ_DestroyImpl destroy_impl; + /** + * Implementation-dependent send cancel function + */ + GNUNET_MQ_CancelImpl cancel_impl; + /** * Implementation-specific state */ @@ -121,34 +126,46 @@ struct GNUNET_MQ_Handle struct GNUNET_MQ_Envelope *current_envelope; /** - * Has the current envelope been commited? - * Either GNUNET_YES or GNUNET_NO. + * Map of associations, lazily allocated */ - int commited; + struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; /** - * Map of associations, lazily allocated + * Task scheduled during #GNUNET_MQ_impl_send_continue. */ - struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; + struct GNUNET_SCHEDULER_Task * continue_task; /** - * Next id that should be used for the assoc_map, + * Next id that should be used for the @e assoc_map, * initialized lazily to a random value together with - * assoc_map + * @e assoc_map */ uint32_t assoc_id; }; - - +/** + * Implementation-specific state for connection to + * client (MQ for server). + */ struct ServerClientSocketState { + /** + * Handle of the client that connected to the server. + */ struct GNUNET_SERVER_Client *client; + + /** + * Active transmission request to the client. + */ struct GNUNET_SERVER_TransmitHandle* th; }; +/** + * Implementation-specific state for connection to + * service (MQ for clients). + */ struct ClientConnectionState { /** @@ -160,7 +177,15 @@ struct ClientConnectionState * Do we also want to receive? */ int receive_requested; + + /** + * Connection to the service. + */ struct GNUNET_CLIENT_Connection *connection; + + /** + * Active transmission request (or NULL). + */ struct GNUNET_CLIENT_TransmitHandle *th; }; @@ -176,25 +201,32 @@ struct ClientConnectionState * @param mh message to dispatch */ void -GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh) +GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *mh) { const struct GNUNET_MQ_MessageHandler *handler; int handled = GNUNET_NO; - handler = mq->handlers; - if (NULL == handler) + if (NULL == mq->handlers) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "No handler for message of type %d\n", + ntohs (mh->type)); return; - for (; NULL != handler->cb; handler++) + } + for (handler = mq->handlers; NULL != handler->cb; handler++) { if (handler->type == ntohs (mh->type)) { handler->cb (mq->handlers_cls, mh); handled = GNUNET_YES; + break; } } - if (GNUNET_NO == handled) - LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); + LOG (GNUNET_ERROR_TYPE_WARNING, + "No handler for message of type %d\n", + ntohs (mh->type)); } @@ -214,8 +246,9 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, { if (NULL == mq->error_handler) { - /* FIXME: log what kind of error occured */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "mq: got error %d, but no handler installed\n", + (int) error); return; } mq->error_handler (mq->handlers_cls, error); @@ -238,15 +271,19 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) * @param ev the envelope with the message to send. */ void -GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) +GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, + struct GNUNET_MQ_Envelope *ev) { GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); + ev->parent_queue = mq; /* is the implementation busy? queue it! */ if (NULL != mq->current_envelope) { - GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); + GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, + mq->envelope_tail, + ev); return; } mq->current_envelope = ev; @@ -255,36 +292,61 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) /** - * Call the send implementation for the next queued message, - * if any. - * Only useful for implementing message queues, + * Task run to call the send implementation for the next queued + * message, if any. Only useful for implementing message queues, * results in undefined behavior if not used carefully. * - * @param mq message queue to send the next message with + * @param cls message queue to send the next message with + * @param tc scheduler context */ -void -GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) +static void +impl_send_continue (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct GNUNET_MQ_Handle *mq = cls; + struct GNUNET_MQ_Envelope *current_envelope; + + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + mq->continue_task = NULL; /* call is only valid if we're actually currently sending * a message */ - GNUNET_assert (NULL != mq); - GNUNET_assert (NULL != mq->current_envelope); - GNUNET_assert (GNUNET_YES == mq->commited); - mq->commited = GNUNET_NO; - GNUNET_free (mq->current_envelope); + current_envelope = mq->current_envelope; + GNUNET_assert (NULL != current_envelope); + current_envelope->parent_queue = NULL; if (NULL == mq->envelope_head) { mq->current_envelope = NULL; - return; } + else + { + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); + } + if (NULL != current_envelope->sent_cb) + current_envelope->sent_cb (current_envelope->sent_cls); + GNUNET_free (current_envelope); +} - GNUNET_assert (NULL != mq->envelope_tail); - GNUNET_assert (NULL != mq->envelope_head); - mq->current_envelope = mq->envelope_head; - GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, - mq->current_envelope); - mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); +/** + * Call the send implementation for the next queued message, + * if any. + * Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param mq message queue to send the next message with + */ +void +GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) +{ + GNUNET_assert (NULL == mq->continue_task); + mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, + mq); } @@ -314,6 +376,7 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, mq = GNUNET_new (struct GNUNET_MQ_Handle); mq->send_impl = send; mq->destroy_impl = destroy; + mq->cancel_impl = cancel; mq->handlers = handlers; mq->handlers_cls = cls; mq->impl_state = impl_state; @@ -335,9 +398,9 @@ const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) { if (NULL == mq->current_envelope) - GNUNET_abort (); + GNUNET_assert (0); if (NULL == mq->current_envelope->mh) - GNUNET_abort (); + GNUNET_assert (0); return mq->current_envelope->mh; } @@ -363,27 +426,10 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) } - -/** - * Mark the current message as irrevocably sent, but do not - * proceed with sending the next message. - * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. - * - * @param mq message queue - */ -void -GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq) -{ - GNUNET_assert (NULL != mq->current_envelope); - GNUNET_assert (GNUNET_NO == mq->commited); - mq->commited = GNUNET_YES; - if (NULL != mq->current_envelope->sent_cb) - mq->current_envelope->sent_cb (mq->current_envelope->sent_cls); -} - - struct GNUNET_MQ_Envelope * -GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) +GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, + uint16_t size, + uint16_t type) { struct GNUNET_MQ_Envelope *mqm; @@ -407,7 +453,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) * @param nested_mh the message to append to the message after base_size */ struct GNUNET_MQ_Envelope * -GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, +GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, + uint16_t base_size, + uint16_t type, const struct GNUNET_MessageHeader *nested_mh) { struct GNUNET_MQ_Envelope *mqm; @@ -459,30 +507,34 @@ transmit_queued (void *cls, size_t size, } - static void server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct ServerClientSocketState *state = impl_state; + if (NULL != state->th) + { + GNUNET_SERVER_notify_transmit_ready_cancel (state->th); + state->th = NULL; + } + GNUNET_assert (NULL != mq); GNUNET_assert (NULL != state); GNUNET_SERVER_client_drop (state->client); GNUNET_free (state); } + static void server_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, void *impl_state) + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ServerClientSocketState *state = impl_state; GNUNET_assert (NULL != mq); GNUNET_assert (NULL != state); - - GNUNET_MQ_impl_send_commit (mq); - state->th = GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, @@ -540,13 +592,14 @@ handle_client_message (void *cls, * Transmit a queued message to the session's client. * * @param cls consensus session - * @param size number of bytes available in buf + * @param size number of bytes available in @a buf * @param buf where the callee should write the message * @return number of bytes written to buf */ static size_t -connection_client_transmit_queued (void *cls, size_t size, - void *buf) +connection_client_transmit_queued (void *cls, + size_t size, + void *buf) { struct GNUNET_MQ_Handle *mq = cls; const struct GNUNET_MessageHeader *msg; @@ -570,7 +623,6 @@ connection_client_transmit_queued (void *cls, size_t size, GNUNET_TIME_UNIT_FOREVER_REL); } - msg_size = ntohs (msg->size); GNUNET_assert (size >= msg_size); memcpy (buf, msg, msg_size); @@ -582,28 +634,39 @@ connection_client_transmit_queued (void *cls, size_t size, } - static void -connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) +connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) { GNUNET_free (impl_state); } + static void connection_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, void *impl_state) + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ClientConnectionState *state = impl_state; GNUNET_assert (NULL != state); GNUNET_assert (NULL == state->th); - - GNUNET_MQ_impl_send_commit (mq); - state->th = GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, &connection_client_transmit_queued, mq); + GNUNET_assert (NULL != state->th); +} + + +static void +connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct ClientConnectionState *state = impl_state; + GNUNET_assert (NULL != state->th); + GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); + state->th = NULL; } @@ -627,6 +690,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti mq->impl_state = state; mq->send_impl = connection_client_send_impl; mq->destroy_impl = connection_client_destroy_impl; + mq->cancel_impl = connection_client_cancel_impl; if (NULL != handlers) state->receive_requested = GNUNET_YES; @@ -670,9 +734,9 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, } - void * -GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) +GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, + uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; @@ -681,7 +745,8 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) void * -GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) +GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, + uint32_t request_id) { void *val; @@ -710,17 +775,25 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { mq->destroy_impl (mq, mq->impl_state); } - + if (NULL != mq->continue_task) + { + GNUNET_SCHEDULER_cancel (mq->continue_task); + mq->continue_task = NULL; + } while (NULL != mq->envelope_head) { struct GNUNET_MQ_Envelope *ev; ev = mq->envelope_head; + ev->parent_queue = NULL; GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); GNUNET_MQ_discard (ev); } if (NULL != mq->current_envelope) { + /* we can only discard envelopes that + * are not queued! */ + mq->current_envelope->parent_queue = NULL; GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; } @@ -735,36 +808,74 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) } -struct GNUNET_MessageHeader * -GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) +const struct GNUNET_MessageHeader * +GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, + uint16_t base_size) { uint16_t whole_size; uint16_t nested_size; - struct GNUNET_MessageHeader *nested_msg; + const struct GNUNET_MessageHeader *nested_msg; whole_size = ntohs (mh->size); GNUNET_assert (whole_size >= base_size); - nested_size = whole_size - base_size; - if (0 == nested_size) return NULL; - if (nested_size < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); return NULL; } - - nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size); - + nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size); if (ntohs (nested_msg->size) != nested_size) { GNUNET_break_op (0); - nested_msg->size = htons (nested_size); + return NULL; } - return nested_msg; } +/** + * Cancel sending the message. Message must have been sent with + * #GNUNET_MQ_send before. May not be called after the notify sent + * callback has been called + * + * @param ev queued envelope to cancel + */ +void +GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) +{ + struct GNUNET_MQ_Handle *mq = ev->parent_queue; + + GNUNET_assert (NULL != mq); + GNUNET_assert (NULL != mq->cancel_impl); + + if (mq->current_envelope == ev) { + // complex case, we already started with transmitting + // the message + mq->cancel_impl (mq, mq->impl_state); + // continue sending the next message, if any + if (NULL == mq->envelope_head) + { + mq->current_envelope = NULL; + } + else + { + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); + } + } else { + // simple case, message is still waiting in the queue + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); + } + + ev->parent_queue = NULL; + ev->mh = NULL; + GNUNET_free (ev); +} + +/* end of mq.c */