X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fmq.c;h=eaac85575c64d852e50ff78c88ef447ae1d40ef1;hb=5c7f4f919d2569f49e4223d77000452dd2ec4e97;hp=71619bda4f92a2d34fbb0cdeaafa4fa3c011c2c3;hpb=b7002d3f8016478d716236238bd43a7c06c924d2;p=oweals%2Fgnunet.git diff --git a/src/util/mq.c b/src/util/mq.c index 71619bda4..eaac85575 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -2,20 +2,18 @@ This file is part of GNUnet. Copyright (C) 2012-2017 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ /** @@ -201,24 +199,6 @@ struct GNUNET_MQ_Handle }; -/** - * Implementation-specific state for connection to - * client (MQ for server). - */ -struct ServerClientSocketState -{ - /** - * Handle of the client that connected to the server. - */ - struct GNUNET_SERVER_Client *client; - - /** - * Active transmission request to the client. - */ - struct GNUNET_SERVER_TransmitHandle *th; -}; - - /** * Call the message message handler that was registered * for the type of the given message in the given message queue. @@ -375,6 +355,12 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, } GNUNET_assert (NULL == mq->envelope_head); mq->current_envelope = ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending message of type %u, queue empty (MQ: %p)\n", + ntohs(ev->mh->type), + mq); + mq->send_impl (mq, ev->mh, mq->impl_state); @@ -470,6 +456,11 @@ impl_send_continue (void *cls) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending message of type %u from queue\n", + ntohs(mq->current_envelope->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -585,11 +576,9 @@ void GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls) { - unsigned int i; - if (NULL == mq->handlers) return; - for (i=0;NULL != mq->handlers[i].cb; i++) + for (unsigned int i=0;NULL != mq->handlers[i].cb; i++) mq->handlers[i].cls = handlers_cls; } @@ -707,92 +696,6 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, } -/** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf - */ -static size_t -transmit_queued (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_MQ_Handle *mq = cls; - struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq); - const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); - size_t msg_size; - - GNUNET_assert (NULL != buf); - msg_size = ntohs (msg->size); - GNUNET_assert (size >= msg_size); - GNUNET_memcpy (buf, msg, msg_size); - state->th = NULL; - - GNUNET_MQ_impl_send_continue (mq); - - return msg_size; -} - - -static void -server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct ServerClientSocketState *state = impl_state; - - if (NULL != state->th) - { - GNUNET_SERVER_notify_transmit_ready_cancel (state->th); - state->th = NULL; - } - - GNUNET_assert (NULL != mq); - GNUNET_assert (NULL != state); - GNUNET_SERVER_client_drop (state->client); - GNUNET_free (state); -} - - -static void -server_client_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) -{ - GNUNET_assert (NULL != mq); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting message of type %u and size %u\n", - ntohs (msg->type), ntohs (msg->size)); - - struct ServerClientSocketState *state = impl_state; - state->th = GNUNET_SERVER_notify_transmit_ready (state->client, - ntohs (msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, - mq); -} - - -struct GNUNET_MQ_Handle * -GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) -{ - struct GNUNET_MQ_Handle *mq; - struct ServerClientSocketState *scss; - - mq = GNUNET_new (struct GNUNET_MQ_Handle); - scss = GNUNET_new (struct ServerClientSocketState); - mq->impl_state = scss; - scss->client = client; - GNUNET_SERVER_client_keep (client); - mq->send_impl = &server_client_send_impl; - mq->destroy_impl = &server_client_destroy_impl; - return mq; -} - - /** * Associate the assoc_data in mq with a unique request id. * @@ -811,22 +714,40 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, mq->assoc_id = 1; } id = mq->assoc_id++; - GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, + id, + assoc_data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); return id; } +/** + * Get the data associated with a @a request_id in a queue + * + * @param mq the message queue with the association + * @param request_id the request id we are interested in + * @return the associated data + */ void * GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; - return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); + return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, + request_id); } +/** + * Remove the association for a @a request_id + * + * @param mq the message queue with the association + * @param request_id the request id we want to remove + * @return the associated data + */ void * GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) @@ -857,7 +778,9 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls) { - GNUNET_assert (NULL == ev->sent_cb); + /* allow setting *OR* clearing callback */ + GNUNET_assert ( (NULL == ev->sent_cb) || + (NULL == cb) ); ev->sent_cb = cb; ev->sent_cls = cb_cls; } @@ -926,6 +849,9 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) ev); GNUNET_assert (0 < mq->queue_length); mq->queue_length--; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroy drops message of type %u\n", + ntohs (ev->mh->type)); GNUNET_MQ_discard (ev); } if (NULL != mq->current_envelope) @@ -933,6 +859,9 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) /* we can only discard envelopes that * are not queued! */ mq->current_envelope->parent_queue = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroy drops current message of type %u\n", + ntohs (mq->current_envelope->mh->type)); GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; GNUNET_assert (0 < mq->queue_length); @@ -1014,6 +943,11 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending canceled message of type %u queue\n", + ntohs(ev->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -1134,6 +1068,32 @@ GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, } +/** + * Obtain message contained in envelope. + * + * @param env the envelope + * @return message contained in the envelope + */ +const struct GNUNET_MessageHeader * +GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env) +{ + return env->mh; +} + + +/** + * Return next envelope in queue. + * + * @param env a queued envelope + * @return next one, or NULL + */ +const struct GNUNET_MQ_Envelope * +GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) +{ + return env->next; +} + + /** * Register function to be called whenever @a mq is being * destroyed. @@ -1253,6 +1213,43 @@ GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) } +/** + * Copy an array of handlers, appending AGPL handler. + * + * Useful if the array has been delared in local memory and needs to be + * persisted for future use. + * + * @param handlers Array of handlers to be copied. Can be NULL (nothing done). + * @param agpl_handler function to call for AGPL handling + * @param agpl_cls closure for @a agpl_handler + * @return A newly allocated array of handlers. + * Needs to be freed with #GNUNET_free. + */ +struct GNUNET_MQ_MessageHandler * +GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_MQ_MessageCallback agpl_handler, + void *agpl_cls) +{ + struct GNUNET_MQ_MessageHandler *copy; + unsigned int count; + + if (NULL == handlers) + return NULL; + count = GNUNET_MQ_count_handlers (handlers); + copy = GNUNET_new_array (count + 2, + struct GNUNET_MQ_MessageHandler); + GNUNET_memcpy (copy, + handlers, + count * sizeof (struct GNUNET_MQ_MessageHandler)); + copy[count].mv = NULL; + copy[count].cb = agpl_handler; + copy[count].cls = agpl_cls; + copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL; + copy[count].expected_size = sizeof (struct GNUNET_MessageHeader); + return copy; +} + + /** * Count the handlers in a handler array. *