/*
This file is part of GNUnet.
- Copyright (C) 2012-2014 GNUnet e.V.
+ Copyright (C) 2012-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "platform.h"
#include "gnunet_util_lib.h"
-#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
+#define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__)
struct GNUNET_MQ_Envelope
/**
* Called after the message was sent irrevocably.
*/
- GNUNET_MQ_NotifyCallback sent_cb;
+ GNUNET_SCHEDULER_TaskCallback sent_cb;
/**
* Closure for @e send_cb
void *error_handler_cls;
/**
- * Task to asynchronously run #impl_send_continue().
+ * Task to asynchronously run #impl_send_continue().
*/
struct GNUNET_SCHEDULER_Task *send_task;
-
+
/**
* Linked list of messages pending to be sent
*/
{
const struct GNUNET_MQ_MessageHandler *handler;
int handled = GNUNET_NO;
- uint16_t ms = ntohs (mh->size);
+ uint16_t msize = ntohs (mh->size);
+ uint16_t mtype = ntohs (mh->type);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u and size %u\n",
+ mtype, msize);
if (NULL == mq->handlers)
goto done;
for (handler = mq->handlers; NULL != handler->cb; handler++)
{
- if (handler->type == ntohs (mh->type))
+ if (handler->type == mtype)
{
handled = GNUNET_YES;
- if ( (handler->expected_size > ms) ||
- ( (handler->expected_size != ms) &&
+ if ( (handler->expected_size > msize) ||
+ ( (handler->expected_size != msize) &&
(NULL == handler->mv) ) )
{
/* Too small, or not an exact size and
no 'mv' handler to check rest */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Received malformed message of type %u\n",
- (unsigned int) handler->type);
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received malformed message of type %u\n",
+ (unsigned int) handler->type);
GNUNET_MQ_inject_error (mq,
GNUNET_MQ_ERROR_MALFORMED);
break;
else
{
/* Message rejected by check routine */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Received malformed message of type %u\n",
- (unsigned int) handler->type);
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received malformed message of type %u\n",
+ (unsigned int) handler->type);
GNUNET_MQ_inject_error (mq,
GNUNET_MQ_ERROR_MALFORMED);
}
done:
if (GNUNET_NO == handled)
LOG (GNUNET_ERROR_TYPE_INFO,
- "No handler for message of type %d and size %d\n",
- ntohs (mh->type),
- ntohs (mh->size));
+ "No handler for message of type %u and size %u\n",
+ mtype, msize);
}
GNUNET_assert (NULL == ev->parent_queue);
mq->queue_length++;
+ GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */
ev->parent_queue = mq;
/* is the implementation busy? queue it! */
if ( (NULL != mq->current_envelope) ||
ev);
return;
}
+ GNUNET_assert (NULL == mq->envelope_head);
mq->current_envelope = ev;
mq->send_impl (mq,
ev->mh,
}
+/**
+ * Remove the first envelope that has not yet been sent from the message
+ * queue and return it.
+ *
+ * @param mq queue to remove envelope from
+ * @return NULL if queue is empty (or has no envelope that is not under transmission)
+ */
+struct GNUNET_MQ_Envelope *
+GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
+{
+ struct GNUNET_MQ_Envelope *env;
+
+ env = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ env);
+ mq->queue_length--;
+ env->parent_queue = NULL;
+ return env;
+}
+
+
+/**
+ * Function to copy an envelope. The envelope must not yet
+ * be in any queue or have any options or callbacks set.
+ *
+ * @param env envelope to copy
+ * @return copy of @a env
+ */
+struct GNUNET_MQ_Envelope *
+GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
+{
+ GNUNET_assert (NULL == env->next);
+ GNUNET_assert (NULL == env->parent_queue);
+ GNUNET_assert (NULL == env->sent_cb);
+ GNUNET_assert (GNUNET_NO == env->have_custom_options);
+ return GNUNET_MQ_msg_copy (env->mh);
+}
+
+
/**
* Send a copy of a message with the given message queue.
* Can be called repeatedly on the same envelope.
impl_send_continue (void *cls)
{
struct GNUNET_MQ_Handle *mq = cls;
-
+
mq->send_task = NULL;
/* call is only valid if we're actually currently sending
* a message */
GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_MQ_Envelope *current_envelope;
- GNUNET_MQ_NotifyCallback cb;
-
+ GNUNET_SCHEDULER_TaskCallback cb;
+
GNUNET_assert (0 < mq->queue_length);
mq->queue_length--;
mq->in_flight = GNUNET_NO;
{
current_envelope->sent_cb = NULL;
cb (current_envelope->sent_cls);
- }
+ }
GNUNET_free (current_envelope);
}
GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_MQ_Envelope *current_envelope;
- GNUNET_MQ_NotifyCallback cb;
-
+ GNUNET_SCHEDULER_TaskCallback cb;
+
mq->in_flight = GNUNET_YES;
/* call is only valid if we're actually currently sending
* a message */
void *error_handler_cls)
{
struct GNUNET_MQ_Handle *mq;
- unsigned int i;
mq = GNUNET_new (struct GNUNET_MQ_Handle);
mq->send_impl = send;
mq->destroy_impl = destroy;
mq->cancel_impl = cancel;
- if (NULL != handlers)
- {
- for (i=0;NULL != handlers[i].cb; i++) ;
- mq->handlers = GNUNET_new_array (i + 1,
- struct GNUNET_MQ_MessageHandler);
- GNUNET_memcpy (mq->handlers,
- handlers,
- i * sizeof (struct GNUNET_MQ_MessageHandler));
- }
+ mq->handlers = GNUNET_MQ_copy_handlers (handlers);
mq->error_handler = error_handler;
mq->error_handler_cls = error_handler_cls;
mq->impl_state = impl_state;
const struct GNUNET_MessageHeader *msg,
void *impl_state)
{
- struct ServerClientSocketState *state = impl_state;
-
GNUNET_assert (NULL != mq);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of type %u and size %u\n",
+ ntohs (msg->type), ntohs (msg->size));
+
+ struct ServerClientSocketState *state = impl_state;
state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_queued, mq);
+ &transmit_queued,
+ mq);
}
* @param cb_cls closure for the callback
*/
void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
- GNUNET_MQ_NotifyCallback cb,
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
+ GNUNET_SCHEDULER_TaskCallback cb,
void *cb_cls)
{
- mqm->sent_cb = cb;
- mqm->sent_cls = cb_cls;
+ GNUNET_assert (NULL == ev->sent_cb);
+ ev->sent_cb = cb;
+ ev->sent_cls = cb_cls;
}
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL != mq->cancel_impl);
-
+
mq->evacuate_called = GNUNET_NO;
if (mq->current_envelope == ev)
{
- // complex case, we already started with transmitting
- // the message
+ /* complex case, we already started with transmitting
+ the message using the callbacks. */
GNUNET_assert (0 < mq->queue_length);
mq->queue_length--;
mq->cancel_impl (mq,
mq->impl_state);
- // continue sending the next message, if any
- if (NULL == mq->envelope_head)
+ /* continue sending the next message, if any */
+ mq->current_envelope = mq->envelope_head;
+ if (NULL != mq->current_envelope)
{
- mq->current_envelope = NULL;
- }
- else
- {
- mq->current_envelope = mq->envelope_head;
GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
mq->envelope_tail,
mq->current_envelope);
}
else
{
- // simple case, message is still waiting in the queue
+ /* simple case, message is still waiting in the queue */
GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
mq->envelope_tail,
ev);
}
+/**
+ * Insert @a env into the envelope DLL starting at @a env_head
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module. This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env. Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to insert at the tail
+ */
+void
+GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
+ struct GNUNET_MQ_Envelope **env_tail,
+ struct GNUNET_MQ_Envelope *env)
+{
+ GNUNET_CONTAINER_DLL_insert_tail (*env_head,
+ *env_tail,
+ env);
+}
+
+
+/**
+ * Remove @a env from the envelope DLL starting at @a env_head.
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module. This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env. Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to remove from the DLL
+ */
+void
+GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
+ struct GNUNET_MQ_Envelope **env_tail,
+ struct GNUNET_MQ_Envelope *env)
+{
+ GNUNET_CONTAINER_DLL_remove (*env_head,
+ *env_tail,
+ env);
+}
+
+
+/**
+ * Copy an array of handlers.
+ *
+ * Useful if the array has been delared in local memory and needs to be
+ * persisted for future use.
+ *
+ * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
+ * @return A newly allocated array of handlers.
+ * Needs to be freed with #GNUNET_free.
+ */
+struct GNUNET_MQ_MessageHandler *
+GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
+{
+ struct GNUNET_MQ_MessageHandler *copy;
+ unsigned int count;
+
+ if (NULL == handlers)
+ return NULL;
+
+ count = GNUNET_MQ_count_handlers (handlers);
+ copy = GNUNET_new_array (count + 1,
+ struct GNUNET_MQ_MessageHandler);
+ GNUNET_memcpy (copy,
+ handlers,
+ count * sizeof (struct GNUNET_MQ_MessageHandler));
+ return copy;
+}
+
+
+/**
+ * Count the handlers in a handler array.
+ *
+ * @param handlers Array of handlers to be counted.
+ * @return The number of handlers in the array.
+ */
+unsigned int
+GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
+{
+ unsigned int i;
+
+ if (NULL == handlers)
+ return 0;
+
+ for (i=0; NULL != handlers[i].cb; i++) ;
+
+ return i;
+}
+
+
+
/* end of mq.c */