X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fmq.c;h=f5ebe33a5bee46ed5d34b7a29af998a87bb8552e;hb=60de5f48cbfc3868570284e91415ca7e06c390e1;hp=bc13bbb36660835e6bc5307c01b0d6e26f236c45;hpb=c1d130fae9a49d5b68e7f29d59f0543f4c5a88d6;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index bc13bbb36..f5ebe33a5 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2012-2014 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -60,7 +60,7 @@ struct GNUNET_MQ_Envelope GNUNET_MQ_NotifyCallback sent_cb; /** - * Closure for send_cb + * Closure for @e send_cb */ void *sent_cls; }; @@ -93,6 +93,11 @@ struct GNUNET_MQ_Handle */ GNUNET_MQ_DestroyImpl destroy_impl; + /** + * Implementation-dependent send cancel function + */ + GNUNET_MQ_CancelImpl cancel_impl; + /** * Implementation-specific state */ @@ -128,26 +133,39 @@ struct GNUNET_MQ_Handle /** * Task scheduled during #GNUNET_MQ_impl_send_continue. */ - GNUNET_SCHEDULER_TaskIdentifier continue_task; + struct GNUNET_SCHEDULER_Task * continue_task; /** - * Next id that should be used for the assoc_map, + * Next id that should be used for the @e assoc_map, * initialized lazily to a random value together with - * assoc_map + * @e assoc_map */ uint32_t assoc_id; }; - - +/** + * Implementation-specific state for connection to + * client (MQ for server). + */ struct ServerClientSocketState { + /** + * Handle of the client that connected to the server. + */ struct GNUNET_SERVER_Client *client; + + /** + * Active transmission request to the client. + */ struct GNUNET_SERVER_TransmitHandle* th; }; +/** + * Implementation-specific state for connection to + * service (MQ for clients). + */ struct ClientConnectionState { /** @@ -159,7 +177,15 @@ struct ClientConnectionState * Do we also want to receive? */ int receive_requested; + + /** + * Connection to the service. + */ struct GNUNET_CLIENT_Connection *connection; + + /** + * Active transmission request (or NULL). + */ struct GNUNET_CLIENT_TransmitHandle *th; }; @@ -175,25 +201,32 @@ struct ClientConnectionState * @param mh message to dispatch */ void -GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh) +GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *mh) { const struct GNUNET_MQ_MessageHandler *handler; int handled = GNUNET_NO; - handler = mq->handlers; - if (NULL == handler) + if (NULL == mq->handlers) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "No handler for message of type %d\n", + ntohs (mh->type)); return; - for (; NULL != handler->cb; handler++) + } + for (handler = mq->handlers; NULL != handler->cb; handler++) { if (handler->type == ntohs (mh->type)) { handler->cb (mq->handlers_cls, mh); handled = GNUNET_YES; + break; } } - if (GNUNET_NO == handled) - LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); + LOG (GNUNET_ERROR_TYPE_WARNING, + "No handler for message of type %d\n", + ntohs (mh->type)); } @@ -213,8 +246,9 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, { if (NULL == mq->error_handler) { - /* FIXME: log what kind of error occured */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "mq: got error %d, but no handler installed\n", + (int) error); return; } mq->error_handler (mq->handlers_cls, error); @@ -237,15 +271,19 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) * @param ev the envelope with the message to send. */ void -GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) +GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, + struct GNUNET_MQ_Envelope *ev) { GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); + ev->parent_queue = mq; /* is the implementation busy? queue it! */ if (NULL != mq->current_envelope) { - GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); + GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, + mq->envelope_tail, + ev); return; } mq->current_envelope = ev; @@ -268,11 +306,15 @@ impl_send_continue (void *cls, struct GNUNET_MQ_Handle *mq = cls; struct GNUNET_MQ_Envelope *current_envelope; - mq->continue_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + mq->continue_task = NULL; /* call is only valid if we're actually currently sending * a message */ current_envelope = mq->current_envelope; GNUNET_assert (NULL != current_envelope); + current_envelope->parent_queue = NULL; if (NULL == mq->envelope_head) { mq->current_envelope = NULL; @@ -302,7 +344,7 @@ impl_send_continue (void *cls, void GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) { - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task); + GNUNET_assert (NULL == mq->continue_task); mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq); } @@ -334,6 +376,7 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, mq = GNUNET_new (struct GNUNET_MQ_Handle); mq->send_impl = send; mq->destroy_impl = destroy; + mq->cancel_impl = cancel; mq->handlers = handlers; mq->handlers_cls = cls; mq->impl_state = impl_state; @@ -355,9 +398,9 @@ const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) { if (NULL == mq->current_envelope) - GNUNET_abort (); + GNUNET_assert (0); if (NULL == mq->current_envelope->mh) - GNUNET_abort (); + GNUNET_assert (0); return mq->current_envelope->mh; } @@ -384,7 +427,9 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) struct GNUNET_MQ_Envelope * -GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) +GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, + uint16_t size, + uint16_t type) { struct GNUNET_MQ_Envelope *mqm; @@ -408,7 +453,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) * @param nested_mh the message to append to the message after base_size */ struct GNUNET_MQ_Envelope * -GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, +GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, + uint16_t base_size, + uint16_t type, const struct GNUNET_MessageHeader *nested_mh) { struct GNUNET_MQ_Envelope *mqm; @@ -466,6 +513,12 @@ server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, { struct ServerClientSocketState *state = impl_state; + if (NULL != state->th) + { + GNUNET_SERVER_notify_transmit_ready_cancel (state->th); + state->th = NULL; + } + GNUNET_assert (NULL != mq); GNUNET_assert (NULL != state); GNUNET_SERVER_client_drop (state->client); @@ -475,7 +528,8 @@ server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, static void server_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, void *impl_state) + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ServerClientSocketState *state = impl_state; @@ -569,7 +623,6 @@ connection_client_transmit_queued (void *cls, GNUNET_TIME_UNIT_FOREVER_REL); } - msg_size = ntohs (msg->size); GNUNET_assert (size >= msg_size); memcpy (buf, msg, msg_size); @@ -582,7 +635,8 @@ connection_client_transmit_queued (void *cls, static void -connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) +connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) { GNUNET_free (impl_state); } @@ -590,7 +644,8 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) static void connection_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, void *impl_state) + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ClientConnectionState *state = impl_state; @@ -604,6 +659,17 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, } +static void +connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct ClientConnectionState *state = impl_state; + GNUNET_assert (NULL != state->th); + GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); + state->th = NULL; +} + + struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, const struct GNUNET_MQ_MessageHandler *handlers, @@ -624,6 +690,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti mq->impl_state = state; mq->send_impl = connection_client_send_impl; mq->destroy_impl = connection_client_destroy_impl; + mq->cancel_impl = connection_client_cancel_impl; if (NULL != handlers) state->receive_requested = GNUNET_YES; @@ -668,7 +735,8 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void * -GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) +GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, + uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; @@ -677,7 +745,8 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) void * -GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) +GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, + uint32_t request_id) { void *val; @@ -706,21 +775,25 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { mq->destroy_impl (mq, mq->impl_state); } - if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task) + if (NULL != mq->continue_task) { GNUNET_SCHEDULER_cancel (mq->continue_task); - mq->continue_task = GNUNET_SCHEDULER_NO_TASK; + mq->continue_task = NULL; } while (NULL != mq->envelope_head) { struct GNUNET_MQ_Envelope *ev; ev = mq->envelope_head; + ev->parent_queue = NULL; GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); GNUNET_MQ_discard (ev); } if (NULL != mq->current_envelope) { + /* we can only discard envelopes that + * are not queued! */ + mq->current_envelope->parent_queue = NULL; GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; } @@ -735,36 +808,74 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) } -struct GNUNET_MessageHeader * -GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) +const struct GNUNET_MessageHeader * +GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, + uint16_t base_size) { uint16_t whole_size; uint16_t nested_size; - struct GNUNET_MessageHeader *nested_msg; + const struct GNUNET_MessageHeader *nested_msg; whole_size = ntohs (mh->size); GNUNET_assert (whole_size >= base_size); - nested_size = whole_size - base_size; - if (0 == nested_size) return NULL; - if (nested_size < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); return NULL; } - - nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size); - + nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size); if (ntohs (nested_msg->size) != nested_size) { GNUNET_break_op (0); - nested_msg->size = htons (nested_size); + return NULL; } - return nested_msg; } +/** + * Cancel sending the message. Message must have been sent with + * #GNUNET_MQ_send before. May not be called after the notify sent + * callback has been called + * + * @param ev queued envelope to cancel + */ +void +GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) +{ + struct GNUNET_MQ_Handle *mq = ev->parent_queue; + + GNUNET_assert (NULL != mq); + GNUNET_assert (NULL != mq->cancel_impl); + + if (mq->current_envelope == ev) { + // complex case, we already started with transmitting + // the message + mq->cancel_impl (mq, mq->impl_state); + // continue sending the next message, if any + if (NULL == mq->envelope_head) + { + mq->current_envelope = NULL; + } + else + { + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); + } + } else { + // simple case, message is still waiting in the queue + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); + } + + ev->parent_queue = NULL; + ev->mh = NULL; + GNUNET_free (ev); +} + +/* end of mq.c */