X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Futil%2Fmq.c;h=eaac85575c64d852e50ff78c88ef447ae1d40ef1;hb=5c7f4f919d2569f49e4223d77000452dd2ec4e97;hp=fe47f6ab421d0c7b4648622807d2065f5bf4d15e;hpb=ba5817a7dbaef67b871606431d9a9a7f82d5bdf8;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index fe47f6ab4..eaac85575 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -1,21 +1,19 @@ /* This file is part of GNUnet. - Copyright (C) 2012-2014 GNUnet e.V. + Copyright (C) 2012-2017 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ /** @@ -26,7 +24,7 @@ #include "platform.h" #include "gnunet_util_lib.h" -#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) +#define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__) struct GNUNET_MQ_Envelope @@ -201,24 +199,6 @@ struct GNUNET_MQ_Handle }; -/** - * Implementation-specific state for connection to - * client (MQ for server). - */ -struct ServerClientSocketState -{ - /** - * Handle of the client that connected to the server. - */ - struct GNUNET_SERVER_Client *client; - - /** - * Active transmission request to the client. - */ - struct GNUNET_SERVER_TransmitHandle *th; -}; - - /** * Call the message message handler that was registered * for the type of the given message in the given message queue. @@ -235,24 +215,29 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, { const struct GNUNET_MQ_MessageHandler *handler; int handled = GNUNET_NO; - uint16_t ms = ntohs (mh->size); + uint16_t msize = ntohs (mh->size); + uint16_t mtype = ntohs (mh->type); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u and size %u\n", + mtype, msize); if (NULL == mq->handlers) goto done; for (handler = mq->handlers; NULL != handler->cb; handler++) { - if (handler->type == ntohs (mh->type)) + if (handler->type == mtype) { handled = GNUNET_YES; - if ( (handler->expected_size > ms) || - ( (handler->expected_size != ms) && + if ( (handler->expected_size > msize) || + ( (handler->expected_size != msize) && (NULL == handler->mv) ) ) { /* Too small, or not an exact size and no 'mv' handler to check rest */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Received malformed message of type %u\n", - (unsigned int) handler->type); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received malformed message of type %u\n", + (unsigned int) handler->type); GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED); break; @@ -267,9 +252,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, else { /* Message rejected by check routine */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Received malformed message of type %u\n", - (unsigned int) handler->type); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received malformed message of type %u\n", + (unsigned int) handler->type); GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED); } @@ -279,9 +264,8 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, done: if (GNUNET_NO == handled) LOG (GNUNET_ERROR_TYPE_INFO, - "No handler for message of type %d and size %d\n", - ntohs (mh->type), - ntohs (mh->size)); + "No handler for message of type %u and size %u\n", + mtype, msize); } @@ -358,6 +342,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, GNUNET_assert (NULL == ev->parent_queue); mq->queue_length++; + GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */ ev->parent_queue = mq; /* is the implementation busy? queue it! */ if ( (NULL != mq->current_envelope) || @@ -370,12 +355,58 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, } GNUNET_assert (NULL == mq->envelope_head); mq->current_envelope = ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending message of type %u, queue empty (MQ: %p)\n", + ntohs(ev->mh->type), + mq); + mq->send_impl (mq, ev->mh, mq->impl_state); } +/** + * Remove the first envelope that has not yet been sent from the message + * queue and return it. + * + * @param mq queue to remove envelope from + * @return NULL if queue is empty (or has no envelope that is not under transmission) + */ +struct GNUNET_MQ_Envelope * +GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq) +{ + struct GNUNET_MQ_Envelope *env; + + env = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + env); + mq->queue_length--; + env->parent_queue = NULL; + return env; +} + + +/** + * Function to copy an envelope. The envelope must not yet + * be in any queue or have any options or callbacks set. + * + * @param env envelope to copy + * @return copy of @a env + */ +struct GNUNET_MQ_Envelope * +GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env) +{ + GNUNET_assert (NULL == env->next); + GNUNET_assert (NULL == env->parent_queue); + GNUNET_assert (NULL == env->sent_cb); + GNUNET_assert (GNUNET_NO == env->have_custom_options); + return GNUNET_MQ_msg_copy (env->mh); +} + + /** * Send a copy of a message with the given message queue. * Can be called repeatedly on the same envelope. @@ -425,6 +456,11 @@ impl_send_continue (void *cls) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending message of type %u from queue\n", + ntohs(mq->current_envelope->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -540,11 +576,9 @@ void GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls) { - unsigned int i; - if (NULL == mq->handlers) return; - for (i=0;NULL != mq->handlers[i].cb; i++) + for (unsigned int i=0;NULL != mq->handlers[i].cb; i++) mq->handlers[i].cls = handlers_cls; } @@ -662,88 +696,6 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, } -/** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf - */ -static size_t -transmit_queued (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_MQ_Handle *mq = cls; - struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq); - const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); - size_t msg_size; - - GNUNET_assert (NULL != buf); - msg_size = ntohs (msg->size); - GNUNET_assert (size >= msg_size); - GNUNET_memcpy (buf, msg, msg_size); - state->th = NULL; - - GNUNET_MQ_impl_send_continue (mq); - - return msg_size; -} - - -static void -server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct ServerClientSocketState *state = impl_state; - - if (NULL != state->th) - { - GNUNET_SERVER_notify_transmit_ready_cancel (state->th); - state->th = NULL; - } - - GNUNET_assert (NULL != mq); - GNUNET_assert (NULL != state); - GNUNET_SERVER_client_drop (state->client); - GNUNET_free (state); -} - - -static void -server_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) -{ - struct ServerClientSocketState *state = impl_state; - - GNUNET_assert (NULL != mq); - state->th = GNUNET_SERVER_notify_transmit_ready (state->client, - ntohs (msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, - mq); -} - - -struct GNUNET_MQ_Handle * -GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) -{ - struct GNUNET_MQ_Handle *mq; - struct ServerClientSocketState *scss; - - mq = GNUNET_new (struct GNUNET_MQ_Handle); - scss = GNUNET_new (struct ServerClientSocketState); - mq->impl_state = scss; - scss->client = client; - GNUNET_SERVER_client_keep (client); - mq->send_impl = &server_client_send_impl; - mq->destroy_impl = &server_client_destroy_impl; - return mq; -} - - /** * Associate the assoc_data in mq with a unique request id. * @@ -762,22 +714,40 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, mq->assoc_id = 1; } id = mq->assoc_id++; - GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, + id, + assoc_data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); return id; } +/** + * Get the data associated with a @a request_id in a queue + * + * @param mq the message queue with the association + * @param request_id the request id we are interested in + * @return the associated data + */ void * GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; - return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); + return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, + request_id); } +/** + * Remove the association for a @a request_id + * + * @param mq the message queue with the association + * @param request_id the request id we want to remove + * @return the associated data + */ void * GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) @@ -808,7 +778,9 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls) { - GNUNET_assert (NULL == ev->sent_cb); + /* allow setting *OR* clearing callback */ + GNUNET_assert ( (NULL == ev->sent_cb) || + (NULL == cb) ); ev->sent_cb = cb; ev->sent_cls = cb_cls; } @@ -877,6 +849,9 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) ev); GNUNET_assert (0 < mq->queue_length); mq->queue_length--; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroy drops message of type %u\n", + ntohs (ev->mh->type)); GNUNET_MQ_discard (ev); } if (NULL != mq->current_envelope) @@ -884,6 +859,9 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) /* we can only discard envelopes that * are not queued! */ mq->current_envelope->parent_queue = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroy drops current message of type %u\n", + ntohs (mq->current_envelope->mh->type)); GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; GNUNET_assert (0 < mq->queue_length); @@ -965,6 +943,11 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending canceled message of type %u queue\n", + ntohs(ev->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -1085,6 +1068,32 @@ GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, } +/** + * Obtain message contained in envelope. + * + * @param env the envelope + * @return message contained in the envelope + */ +const struct GNUNET_MessageHeader * +GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env) +{ + return env->mh; +} + + +/** + * Return next envelope in queue. + * + * @param env a queued envelope + * @return next one, or NULL + */ +const struct GNUNET_MQ_Envelope * +GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) +{ + return env->next; +} + + /** * Register function to be called whenever @a mq is being * destroyed. @@ -1204,6 +1213,43 @@ GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) } +/** + * Copy an array of handlers, appending AGPL handler. + * + * Useful if the array has been delared in local memory and needs to be + * persisted for future use. + * + * @param handlers Array of handlers to be copied. Can be NULL (nothing done). + * @param agpl_handler function to call for AGPL handling + * @param agpl_cls closure for @a agpl_handler + * @return A newly allocated array of handlers. + * Needs to be freed with #GNUNET_free. + */ +struct GNUNET_MQ_MessageHandler * +GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_MQ_MessageCallback agpl_handler, + void *agpl_cls) +{ + struct GNUNET_MQ_MessageHandler *copy; + unsigned int count; + + if (NULL == handlers) + return NULL; + count = GNUNET_MQ_count_handlers (handlers); + copy = GNUNET_new_array (count + 2, + struct GNUNET_MQ_MessageHandler); + GNUNET_memcpy (copy, + handlers, + count * sizeof (struct GNUNET_MQ_MessageHandler)); + copy[count].mv = NULL; + copy[count].cb = agpl_handler; + copy[count].cls = agpl_cls; + copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL; + copy[count].expected_size = sizeof (struct GNUNET_MessageHeader); + return copy; +} + + /** * Count the handlers in a handler array. *