GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
+ by the Free Software Foundation; either version 3, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
* @file util/mq.c
* @brief general purpose request queue
*/
-
#include "platform.h"
-#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
struct GNUNET_MQ_Envelope *current_envelope;
/**
- * Has the current envelope been commited?
- * Either GNUNET_YES or GNUNET_NO.
+ * Map of associations, lazily allocated
*/
- int commited;
+ struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
/**
- * Map of associations, lazily allocated
+ * Task scheduled during #GNUNET_MQ_impl_send_continue.
*/
- struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+ GNUNET_SCHEDULER_TaskIdentifier continue_task;
/**
* Next id that should be used for the assoc_map,
handled = GNUNET_YES;
}
}
-
+
if (GNUNET_NO == handled)
LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
}
/**
* Send a message with the give message queue.
* May only be called once per message.
- *
+ *
* @param mq message queue
* @param ev the envelope with the message to send.
*/
{
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL == ev->parent_queue);
-
+
/* is the implementation busy? queue it! */
if (NULL != mq->current_envelope)
{
/**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any. Only useful for implementing message queues,
* results in undefined behavior if not used carefully.
*
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
*/
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct GNUNET_MQ_Envelope *current_envelope;
+
+ if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+ return;
+
+ mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
/* call is only valid if we're actually currently sending
* a message */
- GNUNET_assert (NULL != mq);
- GNUNET_assert (NULL != mq->current_envelope);
- GNUNET_assert (GNUNET_YES == mq->commited);
- mq->commited = GNUNET_NO;
- GNUNET_free (mq->current_envelope);
+ current_envelope = mq->current_envelope;
+ GNUNET_assert (NULL != current_envelope);
if (NULL == mq->envelope_head)
{
mq->current_envelope = NULL;
- return;
}
+ else
+ {
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+ }
+ if (NULL != current_envelope->sent_cb)
+ current_envelope->sent_cb (current_envelope->sent_cls);
+ GNUNET_free (current_envelope);
+}
- GNUNET_assert (NULL != mq->envelope_tail);
- GNUNET_assert (NULL != mq->envelope_head);
- mq->current_envelope = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
- mq->current_envelope);
- mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task);
+ mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+ mq);
}
}
-
-/**
- * Mark the current message as irrevocably sent, but do not
- * proceed with sending the next message.
- * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
- *
- * @param mq message queue
- */
-void
-GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
-{
- GNUNET_assert (NULL != mq->current_envelope);
- GNUNET_assert (GNUNET_NO == mq->commited);
- mq->commited = GNUNET_YES;
- if (NULL != mq->current_envelope->sent_cb)
- mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
-}
-
-
struct GNUNET_MQ_Envelope *
GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
{
}
-
static void
server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
void *impl_state)
{
struct ServerClientSocketState *state = impl_state;
-
+
+ if (NULL != state->th)
+ {
+ GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
+ state->th = NULL;
+ }
+
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL != state);
GNUNET_SERVER_client_drop (state->client);
GNUNET_free (state);
}
+
static void
server_client_send_impl (struct GNUNET_MQ_Handle *mq,
const struct GNUNET_MessageHeader *msg, void *impl_state)
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL != state);
-
- GNUNET_MQ_impl_send_commit (mq);
-
- state->th =
+ state->th =
GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
&transmit_queued, mq);
struct ClientConnectionState *state;
state = mq->impl_state;
-
+
if (NULL == msg)
{
GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
* Transmit a queued message to the session's client.
*
* @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
* @param buf where the callee should write the message
* @return number of bytes written to buf
*/
static size_t
-connection_client_transmit_queued (void *cls, size_t size,
- void *buf)
+connection_client_transmit_queued (void *cls,
+ size_t size,
+ void *buf)
{
struct GNUNET_MQ_Handle *mq = cls;
const struct GNUNET_MessageHeader *msg;
}
-
static void
connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
GNUNET_free (impl_state);
}
+
static void
connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
const struct GNUNET_MessageHeader *msg, void *impl_state)
GNUNET_assert (NULL != state);
GNUNET_assert (NULL == state->th);
-
- GNUNET_MQ_impl_send_commit (mq);
-
- state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
+ state->th =
+ GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
&connection_client_transmit_queued, mq);
+ GNUNET_assert (NULL != state->th);
}
}
-
void *
GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
{
{
mq->destroy_impl (mq, mq->impl_state);
}
-
+ if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task)
+ {
+ GNUNET_SCHEDULER_cancel (mq->continue_task);
+ mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (NULL != mq->envelope_head)
{
struct GNUNET_MQ_Envelope *ev;
mq->current_envelope = NULL;
}
+ if (NULL != mq->assoc_map)
+ {
+ GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
+ mq->assoc_map = NULL;
+ }
+
GNUNET_free (mq);
}