X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fmq.c;h=eaac85575c64d852e50ff78c88ef447ae1d40ef1;hb=5c7f4f919d2569f49e4223d77000452dd2ec4e97;hp=b9bc4f2f1cafa7de86b438cbb3c4cedb0a41dc43;hpb=4ab9f527078552f8a26008f6856fec0b21607a54;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index b9bc4f2f1..eaac85575 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -1,21 +1,19 @@ /* This file is part of GNUnet. - Copyright (C) 2012-2014 GNUnet e.V. + Copyright (C) 2012-2017 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ /** @@ -26,7 +24,7 @@ #include "platform.h" #include "gnunet_util_lib.h" -#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) +#define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__) struct GNUNET_MQ_Envelope @@ -44,8 +42,9 @@ struct GNUNET_MQ_Envelope struct GNUNET_MQ_Envelope *prev; /** - * Actual allocated message header, - * usually points to the end of the containing GNUNET_MQ_Envelope + * Actual allocated message header. + * The GNUNET_MQ_Envelope header is allocated at + * the end of the message. */ struct GNUNET_MessageHeader *mh; @@ -57,12 +56,31 @@ struct GNUNET_MQ_Envelope /** * Called after the message was sent irrevocably. */ - GNUNET_MQ_NotifyCallback sent_cb; + GNUNET_SCHEDULER_TaskCallback sent_cb; /** * Closure for @e send_cb */ void *sent_cls; + + /** + * Flags that were set for this envelope by + * #GNUNET_MQ_env_set_options(). Only valid if + * @e have_custom_options is set. + */ + uint64_t flags; + + /** + * Additional options buffer set for this envelope by + * #GNUNET_MQ_env_set_options(). Only valid if + * @e have_custom_options is set. + */ + const void *extra; + + /** + * Did the application call #GNUNET_MQ_env_set_options()? + */ + int have_custom_options; }; @@ -107,6 +125,11 @@ struct GNUNET_MQ_Handle */ void *error_handler_cls; + /** + * Task to asynchronously run #impl_send_continue(). + */ + struct GNUNET_SCHEDULER_Task *send_task; + /** * Linked list of messages pending to be sent */ @@ -130,67 +153,49 @@ struct GNUNET_MQ_Handle struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; /** - * Task scheduled during #GNUNET_MQ_impl_send_continue. + * Functions to call on queue destruction; kept in a DLL. */ - struct GNUNET_SCHEDULER_Task *continue_task; + struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; /** - * Next id that should be used for the @e assoc_map, - * initialized lazily to a random value together with - * @e assoc_map - */ - uint32_t assoc_id; - - /** - * Number of entries we have in the envelope-DLL. + * Functions to call on queue destruction; kept in a DLL. */ - unsigned int queue_length; -}; - + struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail; -/** - * Implementation-specific state for connection to - * client (MQ for server). - */ -struct ServerClientSocketState -{ /** - * Handle of the client that connected to the server. + * Additional options buffer set for this queue by + * #GNUNET_MQ_set_options(). Default is 0. */ - struct GNUNET_SERVER_Client *client; + const void *default_extra; /** - * Active transmission request to the client. + * Flags that were set for this queue by + * #GNUNET_MQ_set_options(). Default is 0. */ - struct GNUNET_SERVER_TransmitHandle *th; -}; - + uint64_t default_flags; -/** - * Implementation-specific state for connection to - * service (MQ for clients). - */ -struct ClientConnectionState -{ /** - * Did we call receive alread alreadyy? + * Next id that should be used for the @e assoc_map, + * initialized lazily to a random value together with + * @e assoc_map */ - int receive_active; + uint32_t assoc_id; /** - * Do we also want to receive? + * Number of entries we have in the envelope-DLL. */ - int receive_requested; + unsigned int queue_length; /** - * Connection to the service. + * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called. + * FIXME: is this dead? */ - struct GNUNET_CLIENT_Connection *connection; + int evacuate_called; /** - * Active transmission request (or NULL). + * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called. */ - struct GNUNET_CLIENT_TransmitHandle *th; + int in_flight; }; @@ -210,24 +215,29 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, { const struct GNUNET_MQ_MessageHandler *handler; int handled = GNUNET_NO; - uint16_t ms = ntohs (mh->size); + uint16_t msize = ntohs (mh->size); + uint16_t mtype = ntohs (mh->type); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u and size %u\n", + mtype, msize); if (NULL == mq->handlers) goto done; for (handler = mq->handlers; NULL != handler->cb; handler++) { - if (handler->type == ntohs (mh->type)) + if (handler->type == mtype) { handled = GNUNET_YES; - if ( (handler->expected_size > ms) || - ( (handler->expected_size != ms) && + if ( (handler->expected_size > msize) || + ( (handler->expected_size != msize) && (NULL == handler->mv) ) ) { /* Too small, or not an exact size and no 'mv' handler to check rest */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Received malformed message of type %u\n", - (unsigned int) handler->type); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received malformed message of type %u\n", + (unsigned int) handler->type); GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED); break; @@ -242,9 +252,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, else { /* Message rejected by check routine */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Received malformed message of type %u\n", - (unsigned int) handler->type); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received malformed message of type %u\n", + (unsigned int) handler->type); GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED); } @@ -253,9 +263,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, } done: if (GNUNET_NO == handled) - LOG (GNUNET_ERROR_TYPE_WARNING, - "No handler for message of type %d\n", - ntohs (mh->type)); + LOG (GNUNET_ERROR_TYPE_INFO, + "No handler for message of type %u and size %u\n", + mtype, msize); } @@ -293,10 +303,10 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, * @param mqm the message to discard */ void -GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) +GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) { - GNUNET_assert (NULL == mqm->parent_queue); - GNUNET_free (mqm); + GNUNET_assert (NULL == ev->parent_queue); + GNUNET_free (ev); } @@ -309,7 +319,11 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) unsigned int GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) { - return mq->queue_length; + if (GNUNET_YES != mq->in_flight) + { + return mq->queue_length; + } + return mq->queue_length - 1; } @@ -327,18 +341,69 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); + mq->queue_length++; + GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */ ev->parent_queue = mq; /* is the implementation busy? queue it! */ - if (NULL != mq->current_envelope) + if ( (NULL != mq->current_envelope) || + (NULL != mq->send_task) ) { GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); - mq->queue_length++; return; } + GNUNET_assert (NULL == mq->envelope_head); mq->current_envelope = ev; - mq->send_impl (mq, ev->mh, mq->impl_state); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending message of type %u, queue empty (MQ: %p)\n", + ntohs(ev->mh->type), + mq); + + mq->send_impl (mq, + ev->mh, + mq->impl_state); +} + + +/** + * Remove the first envelope that has not yet been sent from the message + * queue and return it. + * + * @param mq queue to remove envelope from + * @return NULL if queue is empty (or has no envelope that is not under transmission) + */ +struct GNUNET_MQ_Envelope * +GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq) +{ + struct GNUNET_MQ_Envelope *env; + + env = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + env); + mq->queue_length--; + env->parent_queue = NULL; + return env; +} + + +/** + * Function to copy an envelope. The envelope must not yet + * be in any queue or have any options or callbacks set. + * + * @param env envelope to copy + * @return copy of @a env + */ +struct GNUNET_MQ_Envelope * +GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env) +{ + GNUNET_assert (NULL == env->next); + GNUNET_assert (NULL == env->parent_queue); + GNUNET_assert (NULL == env->sent_cb); + GNUNET_assert (GNUNET_NO == env->have_custom_options); + return GNUNET_MQ_msg_copy (env->mh); } @@ -362,7 +427,7 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, env->mh = (struct GNUNET_MessageHeader *) &env[1]; env->sent_cb = ev->sent_cb; env->sent_cls = ev->sent_cls; - memcpy (&env[1], + GNUNET_memcpy (&env[1], ev->mh, msize); GNUNET_MQ_send (mq, @@ -370,7 +435,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, } - /** * Task run to call the send implementation for the next queued * message, if any. Only useful for implementing message queues, @@ -382,48 +446,86 @@ static void impl_send_continue (void *cls) { struct GNUNET_MQ_Handle *mq = cls; - struct GNUNET_MQ_Envelope *current_envelope; - mq->continue_task = NULL; + mq->send_task = NULL; /* call is only valid if we're actually currently sending * a message */ + if (NULL == mq->envelope_head) + return; + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending message of type %u from queue\n", + ntohs(mq->current_envelope->mh->type)); + + mq->send_impl (mq, + mq->current_envelope->mh, + mq->impl_state); +} + + +/** + * Call the send implementation for the next queued message, if any. + * Only useful for implementing message queues, results in undefined + * behavior if not used carefully. + * + * @param mq message queue to send the next message with + */ +void +GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) +{ + struct GNUNET_MQ_Envelope *current_envelope; + GNUNET_SCHEDULER_TaskCallback cb; + + GNUNET_assert (0 < mq->queue_length); + mq->queue_length--; + mq->in_flight = GNUNET_NO; current_envelope = mq->current_envelope; - GNUNET_assert (NULL != current_envelope); current_envelope->parent_queue = NULL; - if (NULL == mq->envelope_head) - { - mq->current_envelope = NULL; - } - else + mq->current_envelope = NULL; + GNUNET_assert (NULL == mq->send_task); + mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, + mq); + if (NULL != (cb = current_envelope->sent_cb)) { - mq->current_envelope = mq->envelope_head; - GNUNET_CONTAINER_DLL_remove (mq->envelope_head, - mq->envelope_tail, - mq->current_envelope); - mq->queue_length--; - mq->send_impl (mq, - mq->current_envelope->mh, - mq->impl_state); + current_envelope->sent_cb = NULL; + cb (current_envelope->sent_cls); } - if (NULL != current_envelope->sent_cb) - current_envelope->sent_cb (current_envelope->sent_cls); GNUNET_free (current_envelope); } /** - * Call the send implementation for the next queued message, if any. + * Call the send notification for the current message, but do not + * try to send the next message until #GNUNET_MQ_impl_send_continue + * is called. + * * Only useful for implementing message queues, results in undefined * behavior if not used carefully. * * @param mq message queue to send the next message with */ void -GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) +GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) { - GNUNET_assert (NULL == mq->continue_task); - mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, - mq); + struct GNUNET_MQ_Envelope *current_envelope; + GNUNET_SCHEDULER_TaskCallback cb; + + mq->in_flight = GNUNET_YES; + /* call is only valid if we're actually currently sending + * a message */ + current_envelope = mq->current_envelope; + GNUNET_assert (NULL != current_envelope); + /* can't call cancel from now on anymore */ + current_envelope->parent_queue = NULL; + if (NULL != (cb = current_envelope->sent_cb)) + { + current_envelope->sent_cb = NULL; + cb (current_envelope->sent_cls); + } } @@ -449,21 +551,12 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, void *error_handler_cls) { struct GNUNET_MQ_Handle *mq; - unsigned int i; mq = GNUNET_new (struct GNUNET_MQ_Handle); mq->send_impl = send; mq->destroy_impl = destroy; mq->cancel_impl = cancel; - if (NULL != handlers) - { - for (i=0;NULL != handlers[i].cb; i++) ; - mq->handlers = GNUNET_new_array (i + 1, - struct GNUNET_MQ_MessageHandler); - memcpy (mq->handlers, - handlers, - i * sizeof (struct GNUNET_MQ_MessageHandler)); - } + mq->handlers = GNUNET_MQ_copy_handlers (handlers); mq->error_handler = error_handler; mq->error_handler_cls = error_handler_cls; mq->impl_state = impl_state; @@ -483,11 +576,9 @@ void GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls) { - unsigned int i; - if (NULL == mq->handlers) return; - for (i=0;NULL != mq->handlers[i].cb; i++) + for (unsigned int i=0;NULL != mq->handlers[i].cb; i++) mq->handlers[i].cls = handlers_cls; } @@ -504,10 +595,8 @@ GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) { - if (NULL == mq->current_envelope) - GNUNET_assert (0); - if (NULL == mq->current_envelope->mh) - GNUNET_assert (0); + GNUNET_assert (NULL != mq->current_envelope); + GNUNET_assert (NULL != mq->current_envelope->mh); return mq->current_envelope->mh; } @@ -538,15 +627,15 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; - mqm = GNUNET_malloc (sizeof *mqm + size); - mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; - mqm->mh->size = htons (size); - mqm->mh->type = htons (type); + ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope)); + ev->mh = (struct GNUNET_MessageHeader *) &ev[1]; + ev->mh->size = htons (size); + ev->mh->type = htons (type); if (NULL != mhp) - *mhp = mqm->mh; - return mqm; + *mhp = ev->mh; + return ev; } @@ -564,7 +653,7 @@ GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr) mqm = GNUNET_malloc (sizeof (*mqm) + size); mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; - memcpy (mqm->mh, + GNUNET_memcpy (mqm->mh, hdr, size); return mqm; @@ -599,255 +688,14 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, return NULL; mqm = GNUNET_MQ_msg_ (mhp, size, type); - memcpy ((char *) mqm->mh + base_size, - nested_mh, - ntohs (nested_mh->size)); + GNUNET_memcpy ((char *) mqm->mh + base_size, + nested_mh, + ntohs (nested_mh->size)); return mqm; } -/** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf - */ -static size_t -transmit_queued (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_MQ_Handle *mq = cls; - struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq); - const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); - size_t msg_size; - - GNUNET_assert (NULL != buf); - msg_size = ntohs (msg->size); - GNUNET_assert (size >= msg_size); - memcpy (buf, msg, msg_size); - state->th = NULL; - - GNUNET_MQ_impl_send_continue (mq); - - return msg_size; -} - - -static void -server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct ServerClientSocketState *state = impl_state; - - if (NULL != state->th) - { - GNUNET_SERVER_notify_transmit_ready_cancel (state->th); - state->th = NULL; - } - - GNUNET_assert (NULL != mq); - GNUNET_assert (NULL != state); - GNUNET_SERVER_client_drop (state->client); - GNUNET_free (state); -} - - -static void -server_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) -{ - struct ServerClientSocketState *state = impl_state; - - GNUNET_assert (NULL != mq); - state->th = GNUNET_SERVER_notify_transmit_ready (state->client, - ntohs (msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, mq); -} - - -struct GNUNET_MQ_Handle * -GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) -{ - struct GNUNET_MQ_Handle *mq; - struct ServerClientSocketState *scss; - - mq = GNUNET_new (struct GNUNET_MQ_Handle); - scss = GNUNET_new (struct ServerClientSocketState); - mq->impl_state = scss; - scss->client = client; - GNUNET_SERVER_client_keep (client); - mq->send_impl = &server_client_send_impl; - mq->destroy_impl = &server_client_destroy_impl; - return mq; -} - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -handle_client_message (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_MQ_Handle *mq = cls; - struct ClientConnectionState *state; - - state = mq->impl_state; - if (NULL == msg) - { - GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); - return; - } - GNUNET_CLIENT_receive (state->connection, - &handle_client_message, - mq, - GNUNET_TIME_UNIT_FOREVER_REL); - GNUNET_MQ_inject_message (mq, msg); -} - - -/** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -connection_client_transmit_queued (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_MQ_Handle *mq = cls; - const struct GNUNET_MessageHeader *msg; - struct ClientConnectionState *state = mq->impl_state; - size_t msg_size; - - GNUNET_assert (NULL != mq); - state->th = NULL; - msg = GNUNET_MQ_impl_current (mq); - - if (NULL == buf) - { - GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); - return 0; - } - - if ( (GNUNET_YES == state->receive_requested) && - (GNUNET_NO == state->receive_active) ) - { - state->receive_active = GNUNET_YES; - GNUNET_CLIENT_receive (state->connection, - &handle_client_message, - mq, - GNUNET_TIME_UNIT_FOREVER_REL); - } - - msg_size = ntohs (msg->size); - GNUNET_assert (size >= msg_size); - memcpy (buf, msg, msg_size); - state->th = NULL; - - GNUNET_MQ_impl_send_continue (mq); - - return msg_size; -} - - -static void -connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct ClientConnectionState *state = impl_state; - - if (NULL != state->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); - state->th = NULL; - } - GNUNET_CLIENT_disconnect (state->connection); - GNUNET_free (impl_state); -} - - -static void -connection_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) -{ - struct ClientConnectionState *state = impl_state; - - GNUNET_assert (NULL != state); - GNUNET_assert (NULL == state->th); - state->th = - GNUNET_CLIENT_notify_transmit_ready (state->connection, - ntohs (msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &connection_client_transmit_queued, - mq); - GNUNET_assert (NULL != state->th); -} - - -static void -connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct ClientConnectionState *state = impl_state; - - GNUNET_assert (NULL != state->th); - GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); - state->th = NULL; -} - - -struct GNUNET_MQ_Handle * -GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, - const struct GNUNET_MQ_MessageHandler *handlers, - GNUNET_MQ_ErrorHandler error_handler, - void *error_handler_cls) -{ - struct GNUNET_MQ_Handle *mq; - struct ClientConnectionState *state; - unsigned int i; - - mq = GNUNET_new (struct GNUNET_MQ_Handle); - if (NULL != handlers) - { - for (i=0;NULL != handlers[i].cb; i++) ; - mq->handlers = GNUNET_new_array (i, - struct GNUNET_MQ_MessageHandler); - memcpy (mq->handlers, - handlers, - i * sizeof (struct GNUNET_MQ_MessageHandler)); - } - mq->error_handler = error_handler; - mq->error_handler_cls = error_handler_cls; - state = GNUNET_new (struct ClientConnectionState); - state->connection = connection; - mq->impl_state = state; - mq->send_impl = &connection_client_send_impl; - mq->destroy_impl = &connection_client_destroy_impl; - mq->cancel_impl = &connection_client_cancel_impl; - if (NULL != handlers) - state->receive_requested = GNUNET_YES; - - return mq; -} - - /** * Associate the assoc_data in mq with a unique request id. * @@ -866,22 +714,40 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, mq->assoc_id = 1; } id = mq->assoc_id++; - GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, + id, + assoc_data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); return id; } +/** + * Get the data associated with a @a request_id in a queue + * + * @param mq the message queue with the association + * @param request_id the request id we are interested in + * @return the associated data + */ void * GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; - return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); + return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, + request_id); } +/** + * Remove the association for a @a request_id + * + * @param mq the message queue with the association + * @param request_id the request id we want to remove + * @return the associated data + */ void * GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) @@ -898,27 +764,79 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, } +/** + * Call a callback once the envelope has been sent, that is, + * sending it can not be canceled anymore. + * There can be only one notify sent callback per envelope. + * + * @param ev message to call the notify callback for + * @param cb the notify callback + * @param cb_cls closure for the callback + */ void -GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm, - GNUNET_MQ_NotifyCallback cb, - void *cls) +GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, + GNUNET_SCHEDULER_TaskCallback cb, + void *cb_cls) { - mqm->sent_cb = cb; - mqm->sent_cls = cls; + /* allow setting *OR* clearing callback */ + GNUNET_assert ( (NULL == ev->sent_cb) || + (NULL == cb) ); + ev->sent_cb = cb; + ev->sent_cls = cb_cls; } +/** + * Handle we return for callbacks registered to be + * notified when #GNUNET_MQ_destroy() is called on a queue. + */ +struct GNUNET_MQ_DestroyNotificationHandle +{ + /** + * Kept in a DLL. + */ + struct GNUNET_MQ_DestroyNotificationHandle *prev; + + /** + * Kept in a DLL. + */ + struct GNUNET_MQ_DestroyNotificationHandle *next; + + /** + * Queue to notify about. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Function to call. + */ + GNUNET_SCHEDULER_TaskCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; +}; + + +/** + * Destroy the message queue. + * + * @param mq message queue to destroy + */ void GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { + struct GNUNET_MQ_DestroyNotificationHandle *dnh; + if (NULL != mq->destroy_impl) { mq->destroy_impl (mq, mq->impl_state); } - if (NULL != mq->continue_task) + if (NULL != mq->send_task) { - GNUNET_SCHEDULER_cancel (mq->continue_task); - mq->continue_task = NULL; + GNUNET_SCHEDULER_cancel (mq->send_task); + mq->send_task = NULL; } while (NULL != mq->envelope_head) { @@ -929,17 +847,31 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); + GNUNET_assert (0 < mq->queue_length); mq->queue_length--; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroy drops message of type %u\n", + ntohs (ev->mh->type)); GNUNET_MQ_discard (ev); } - GNUNET_assert (0 == mq->queue_length); if (NULL != mq->current_envelope) { /* we can only discard envelopes that * are not queued! */ mq->current_envelope->parent_queue = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroy drops current message of type %u\n", + ntohs (mq->current_envelope->mh->type)); GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; + GNUNET_assert (0 < mq->queue_length); + mq->queue_length--; + } + GNUNET_assert (0 == mq->queue_length); + while (NULL != (dnh = mq->dnh_head)) + { + dnh->cb (dnh->cb_cls); + GNUNET_MQ_destroy_notify_cancel (dnh); } if (NULL != mq->assoc_map) { @@ -994,24 +926,28 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_assert (NULL != mq); GNUNET_assert (NULL != mq->cancel_impl); + mq->evacuate_called = GNUNET_NO; + if (mq->current_envelope == ev) { - // complex case, we already started with transmitting - // the message + /* complex case, we already started with transmitting + the message using the callbacks. */ + GNUNET_assert (0 < mq->queue_length); + mq->queue_length--; mq->cancel_impl (mq, mq->impl_state); - // continue sending the next message, if any - if (NULL == mq->envelope_head) - { - mq->current_envelope = NULL; - } - else + /* continue sending the next message, if any */ + mq->current_envelope = mq->envelope_head; + if (NULL != mq->current_envelope) { - mq->current_envelope = mq->envelope_head; GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); - mq->queue_length--; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending canceled message of type %u queue\n", + ntohs(ev->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -1019,16 +955,320 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) } else { - // simple case, message is still waiting in the queue + /* simple case, message is still waiting in the queue */ GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); + GNUNET_assert (0 < mq->queue_length); mq->queue_length--; } - ev->parent_queue = NULL; - ev->mh = NULL; - GNUNET_free (ev); + if (GNUNET_YES != mq->evacuate_called) + { + ev->parent_queue = NULL; + ev->mh = NULL; + /* also frees ev */ + GNUNET_free (ev); + } +} + + +/** + * Function to obtain the current envelope + * from within #GNUNET_MQ_SendImpl implementations. + * + * @param mq message queue to interrogate + * @return the current envelope + */ +struct GNUNET_MQ_Envelope * +GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq) +{ + return mq->current_envelope; +} + + +/** + * Function to obtain the last envelope in the queue. + * + * @param mq message queue to interrogate + * @return the last envelope in the queue + */ +struct GNUNET_MQ_Envelope * +GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq) +{ + if (NULL != mq->envelope_tail) + return mq->envelope_tail; + + return mq->current_envelope; +} + + +/** + * Set application-specific options for this envelope. + * Overrides the options set for the queue with + * #GNUNET_MQ_set_options() for this message only. + * + * @param env message to set options for + * @param flags flags to use (meaning is queue-specific) + * @param extra additional buffer for further data (also queue-specific) + */ +void +GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, + uint64_t flags, + const void *extra) +{ + env->flags = flags; + env->extra = extra; + env->have_custom_options = GNUNET_YES; +} + + +/** + * Get application-specific options for this envelope. + * + * @param env message to set options for + * @param[out] flags set to flags to use (meaning is queue-specific) + * @return extra additional buffer for further data (also queue-specific) + */ +const void * +GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env, + uint64_t *flags) +{ + struct GNUNET_MQ_Handle *mq = env->parent_queue; + + if (GNUNET_YES == env->have_custom_options) + { + *flags = env->flags; + return env->extra; + } + if (NULL == mq) + { + *flags = 0; + return NULL; + } + *flags = mq->default_flags; + return mq->default_extra; +} + + +/** + * Set application-specific options for this queue. + * + * @param mq message queue to set options for + * @param flags flags to use (meaning is queue-specific) + * @param extra additional buffer for further data (also queue-specific) + */ +void +GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, + uint64_t flags, + const void *extra) +{ + mq->default_flags = flags; + mq->default_extra = extra; +} + + +/** + * Obtain message contained in envelope. + * + * @param env the envelope + * @return message contained in the envelope + */ +const struct GNUNET_MessageHeader * +GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env) +{ + return env->mh; +} + + +/** + * Return next envelope in queue. + * + * @param env a queued envelope + * @return next one, or NULL + */ +const struct GNUNET_MQ_Envelope * +GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) +{ + return env->next; +} + + +/** + * Register function to be called whenever @a mq is being + * destroyed. + * + * @param mq message queue to watch + * @param cb function to call on @a mq destruction + * @param cb_cls closure for @a cb + * @return handle for #GNUNET_MQ_destroy_notify_cancel(). + */ +struct GNUNET_MQ_DestroyNotificationHandle * +GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, + GNUNET_SCHEDULER_TaskCallback cb, + void *cb_cls) +{ + struct GNUNET_MQ_DestroyNotificationHandle *dnh; + + dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle); + dnh->mq = mq; + dnh->cb = cb; + dnh->cb_cls = cb_cls; + GNUNET_CONTAINER_DLL_insert (mq->dnh_head, + mq->dnh_tail, + dnh); + return dnh; } + +/** + * Cancel registration from #GNUNET_MQ_destroy_notify(). + * + * @param dnh handle for registration to cancel + */ +void +GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh) +{ + struct GNUNET_MQ_Handle *mq = dnh->mq; + + GNUNET_CONTAINER_DLL_remove (mq->dnh_head, + mq->dnh_tail, + dnh); + GNUNET_free (dnh); +} + + +/** + * Insert @a env into the envelope DLL starting at @a env_head + * Note that @a env must not be in any MQ while this function + * is used with DLLs defined outside of the MQ module. This + * is just in case some application needs to also manage a + * FIFO of envelopes independent of MQ itself and wants to + * re-use the pointers internal to @a env. Use with caution. + * + * @param[in|out] env_head of envelope DLL + * @param[in|out] env_tail tail of envelope DLL + * @param[in|out] env element to insert at the tail + */ +void +GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head, + struct GNUNET_MQ_Envelope **env_tail, + struct GNUNET_MQ_Envelope *env) +{ + GNUNET_CONTAINER_DLL_insert_tail (*env_head, + *env_tail, + env); +} + + +/** + * Remove @a env from the envelope DLL starting at @a env_head. + * Note that @a env must not be in any MQ while this function + * is used with DLLs defined outside of the MQ module. This + * is just in case some application needs to also manage a + * FIFO of envelopes independent of MQ itself and wants to + * re-use the pointers internal to @a env. Use with caution. + * + * @param[in|out] env_head of envelope DLL + * @param[in|out] env_tail tail of envelope DLL + * @param[in|out] env element to remove from the DLL + */ +void +GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head, + struct GNUNET_MQ_Envelope **env_tail, + struct GNUNET_MQ_Envelope *env) +{ + GNUNET_CONTAINER_DLL_remove (*env_head, + *env_tail, + env); +} + + +/** + * Copy an array of handlers. + * + * Useful if the array has been delared in local memory and needs to be + * persisted for future use. + * + * @param handlers Array of handlers to be copied. Can be NULL (nothing done). + * @return A newly allocated array of handlers. + * Needs to be freed with #GNUNET_free. + */ +struct GNUNET_MQ_MessageHandler * +GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) +{ + struct GNUNET_MQ_MessageHandler *copy; + unsigned int count; + + if (NULL == handlers) + return NULL; + + count = GNUNET_MQ_count_handlers (handlers); + copy = GNUNET_new_array (count + 1, + struct GNUNET_MQ_MessageHandler); + GNUNET_memcpy (copy, + handlers, + count * sizeof (struct GNUNET_MQ_MessageHandler)); + return copy; +} + + +/** + * Copy an array of handlers, appending AGPL handler. + * + * Useful if the array has been delared in local memory and needs to be + * persisted for future use. + * + * @param handlers Array of handlers to be copied. Can be NULL (nothing done). + * @param agpl_handler function to call for AGPL handling + * @param agpl_cls closure for @a agpl_handler + * @return A newly allocated array of handlers. + * Needs to be freed with #GNUNET_free. + */ +struct GNUNET_MQ_MessageHandler * +GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_MQ_MessageCallback agpl_handler, + void *agpl_cls) +{ + struct GNUNET_MQ_MessageHandler *copy; + unsigned int count; + + if (NULL == handlers) + return NULL; + count = GNUNET_MQ_count_handlers (handlers); + copy = GNUNET_new_array (count + 2, + struct GNUNET_MQ_MessageHandler); + GNUNET_memcpy (copy, + handlers, + count * sizeof (struct GNUNET_MQ_MessageHandler)); + copy[count].mv = NULL; + copy[count].cb = agpl_handler; + copy[count].cls = agpl_cls; + copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL; + copy[count].expected_size = sizeof (struct GNUNET_MessageHeader); + return copy; +} + + +/** + * Count the handlers in a handler array. + * + * @param handlers Array of handlers to be counted. + * @return The number of handlers in the array. + */ +unsigned int +GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers) +{ + unsigned int i; + + if (NULL == handlers) + return 0; + + for (i=0; NULL != handlers[i].cb; i++) ; + + return i; +} + + + /* end of mq.c */