X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fmq.c;h=14e0816e28b3f2aa6e989320ed1ff463321bbf79;hb=34f34474b6137233d6700d4599f42257e8208af2;hp=941a5c43eeca094d3f74509f14a42d8594980b8a;hpb=4c1a6478b3ae43bb009addd982390a5db949913b;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index 941a5c43e..14e0816e2 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -60,7 +60,7 @@ struct GNUNET_MQ_Envelope GNUNET_MQ_NotifyCallback sent_cb; /** - * Closure for send_cb + * Closure for @e send_cb */ void *sent_cls; }; @@ -133,26 +133,39 @@ struct GNUNET_MQ_Handle /** * Task scheduled during #GNUNET_MQ_impl_send_continue. */ - GNUNET_SCHEDULER_TaskIdentifier continue_task; + struct GNUNET_SCHEDULER_Task * continue_task; /** - * Next id that should be used for the assoc_map, + * Next id that should be used for the @e assoc_map, * initialized lazily to a random value together with - * assoc_map + * @e assoc_map */ uint32_t assoc_id; }; - - +/** + * Implementation-specific state for connection to + * client (MQ for server). + */ struct ServerClientSocketState { + /** + * Handle of the client that connected to the server. + */ struct GNUNET_SERVER_Client *client; + + /** + * Active transmission request to the client. + */ struct GNUNET_SERVER_TransmitHandle* th; }; +/** + * Implementation-specific state for connection to + * service (MQ for clients). + */ struct ClientConnectionState { /** @@ -164,7 +177,15 @@ struct ClientConnectionState * Do we also want to receive? */ int receive_requested; + + /** + * Connection to the service. + */ struct GNUNET_CLIENT_Connection *connection; + + /** + * Active transmission request (or NULL). + */ struct GNUNET_CLIENT_TransmitHandle *th; }; @@ -180,25 +201,32 @@ struct ClientConnectionState * @param mh message to dispatch */ void -GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh) +GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *mh) { const struct GNUNET_MQ_MessageHandler *handler; int handled = GNUNET_NO; - handler = mq->handlers; - if (NULL == handler) + if (NULL == mq->handlers) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "No handler for message of type %d\n", + ntohs (mh->type)); return; - for (; NULL != handler->cb; handler++) + } + for (handler = mq->handlers; NULL != handler->cb; handler++) { if (handler->type == ntohs (mh->type)) { handler->cb (mq->handlers_cls, mh); handled = GNUNET_YES; + break; } } - if (GNUNET_NO == handled) - LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); + LOG (GNUNET_ERROR_TYPE_WARNING, + "No handler for message of type %d\n", + ntohs (mh->type)); } @@ -218,8 +246,9 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, { if (NULL == mq->error_handler) { - /* FIXME: log what kind of error occured */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "mq: got error %d, but no handler installed\n", + (int) error); return; } mq->error_handler (mq->handlers_cls, error); @@ -242,17 +271,19 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) * @param ev the envelope with the message to send. */ void -GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) +GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, + struct GNUNET_MQ_Envelope *ev) { GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); ev->parent_queue = mq; - /* is the implementation busy? queue it! */ if (NULL != mq->current_envelope) { - GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); + GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, + mq->envelope_tail, + ev); return; } mq->current_envelope = ev; @@ -275,10 +306,10 @@ impl_send_continue (void *cls, struct GNUNET_MQ_Handle *mq = cls; struct GNUNET_MQ_Envelope *current_envelope; - if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - mq->continue_task = GNUNET_SCHEDULER_NO_TASK; + mq->continue_task = NULL; /* call is only valid if we're actually currently sending * a message */ current_envelope = mq->current_envelope; @@ -313,7 +344,7 @@ impl_send_continue (void *cls, void GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) { - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task); + GNUNET_assert (NULL == mq->continue_task); mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq); } @@ -367,9 +398,9 @@ const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) { if (NULL == mq->current_envelope) - GNUNET_abort (); + GNUNET_assert (0); if (NULL == mq->current_envelope->mh) - GNUNET_abort (); + GNUNET_assert (0); return mq->current_envelope->mh; } @@ -396,7 +427,9 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) struct GNUNET_MQ_Envelope * -GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) +GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, + uint16_t size, + uint16_t type) { struct GNUNET_MQ_Envelope *mqm; @@ -420,7 +453,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) * @param nested_mh the message to append to the message after base_size */ struct GNUNET_MQ_Envelope * -GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, +GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, + uint16_t base_size, + uint16_t type, const struct GNUNET_MessageHeader *nested_mh) { struct GNUNET_MQ_Envelope *mqm; @@ -493,7 +528,8 @@ server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, static void server_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, void *impl_state) + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ServerClientSocketState *state = impl_state; @@ -587,7 +623,6 @@ connection_client_transmit_queued (void *cls, GNUNET_TIME_UNIT_FOREVER_REL); } - msg_size = ntohs (msg->size); GNUNET_assert (size >= msg_size); memcpy (buf, msg, msg_size); @@ -600,7 +635,8 @@ connection_client_transmit_queued (void *cls, static void -connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) +connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) { GNUNET_free (impl_state); } @@ -608,7 +644,8 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) static void connection_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, void *impl_state) + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ClientConnectionState *state = impl_state; @@ -698,7 +735,8 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void * -GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) +GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, + uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; @@ -707,7 +745,8 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) void * -GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) +GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, + uint32_t request_id) { void *val; @@ -736,21 +775,25 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { mq->destroy_impl (mq, mq->impl_state); } - if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task) + if (NULL != mq->continue_task) { GNUNET_SCHEDULER_cancel (mq->continue_task); - mq->continue_task = GNUNET_SCHEDULER_NO_TASK; + mq->continue_task = NULL; } while (NULL != mq->envelope_head) { struct GNUNET_MQ_Envelope *ev; ev = mq->envelope_head; + ev->parent_queue = NULL; GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); GNUNET_MQ_discard (ev); } if (NULL != mq->current_envelope) { + /* we can only discard envelopes that + * are not queued! */ + mq->current_envelope->parent_queue = NULL; GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; } @@ -765,35 +808,30 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) } -struct GNUNET_MessageHeader * -GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) +const struct GNUNET_MessageHeader * +GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, + uint16_t base_size) { uint16_t whole_size; uint16_t nested_size; - struct GNUNET_MessageHeader *nested_msg; + const struct GNUNET_MessageHeader *nested_msg; whole_size = ntohs (mh->size); GNUNET_assert (whole_size >= base_size); - nested_size = whole_size - base_size; - if (0 == nested_size) return NULL; - if (nested_size < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); return NULL; } - - nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size); - + nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size); if (ntohs (nested_msg->size) != nested_size) { GNUNET_break_op (0); - nested_msg->size = htons (nested_size); + return NULL; } - return nested_msg; } @@ -840,3 +878,4 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_free (ev); } +/* end of mq.c */