+ GNUNET_assert (NULL == ev->parent_queue);
+
+ ev->parent_queue = mq;
+ /* is the implementation busy? queue it! */
+ if (NULL != mq->current_envelope)
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
+ mq->envelope_tail,
+ ev);
+ return;
+ }
+ mq->current_envelope = ev;
+ mq->send_impl (mq, ev->mh, mq->impl_state);
+}
+
+
+/**
+ * Task run to call the send implementation for the next queued
+ * message, if any. Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
+ */
+static void
+impl_send_continue (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct GNUNET_MQ_Envelope *current_envelope;
+
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ mq->continue_task = NULL;
+ /* call is only valid if we're actually currently sending
+ * a message */
+ current_envelope = mq->current_envelope;
+ GNUNET_assert (NULL != current_envelope);
+ current_envelope->parent_queue = NULL;
+ if (NULL == mq->envelope_head)
+ {
+ mq->current_envelope = NULL;
+ }
+ else
+ {
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+ }
+ if (NULL != current_envelope->sent_cb)
+ current_envelope->sent_cb (current_envelope->sent_cls);
+ GNUNET_free (current_envelope);
+}
+
+
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+ GNUNET_assert (NULL == mq->continue_task);
+ mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+ mq);
+}
+
+
+/**
+ * Create a message queue for the specified handlers.
+ *
+ * @param send function the implements sending messages
+ * @param destroy function that implements destroying the queue
+ * @param cancel function that implements canceling a message
+ * @param impl_state for the queue, passed to 'send' and 'destroy'
+ * @param handlers array of message handlers
+ * @param error_handler handler for read and write errors
+ * @param cls closure for message handlers and error handler
+ * @return a new message queue
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
+ GNUNET_MQ_DestroyImpl destroy,
+ GNUNET_MQ_CancelImpl cancel,
+ void *impl_state,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
+ void *cls)
+{
+ struct GNUNET_MQ_Handle *mq;
+
+ mq = GNUNET_new (struct GNUNET_MQ_Handle);
+ mq->send_impl = send;
+ mq->destroy_impl = destroy;
+ mq->cancel_impl = cancel;
+ mq->handlers = handlers;
+ mq->handlers_cls = cls;
+ mq->impl_state = impl_state;
+
+ return mq;
+}
+
+
+/**
+ * Get the message that should currently be sent.
+ * Fails if there is no current message.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
+{
+ if (NULL == mq->current_envelope)
+ GNUNET_assert (0);
+ if (NULL == mq->current_envelope->mh)
+ GNUNET_assert (0);
+ return mq->current_envelope->mh;
+}
+
+
+/**
+ * Get the implementation state associated with the
+ * message queue.
+ *
+ * While the GNUNET_MQ_Impl* callbacks receive the
+ * implementation state, continuations that are scheduled
+ * by the implementation function often only have one closure
+ * argument, with this function it is possible to get at the
+ * implementation state when only passing the GNUNET_MQ_Handle
+ * as closure.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+void *
+GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
+{
+ return mq->impl_state;