*/
struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+ /**
+ * Task scheduled during #GNUNET_MQ_impl_send_continue.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier continue_task;
+
/**
* Next id that should be used for the assoc_map,
* initialized lazily to a random value together with
/**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any. Only useful for implementing message queues,
* results in undefined behavior if not used carefully.
*
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
*/
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GNUNET_MQ_Handle *mq = cls;
struct GNUNET_MQ_Envelope *current_envelope;
+ if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+ return;
+
+ mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
/* call is only valid if we're actually currently sending
* a message */
current_envelope = mq->current_envelope;
}
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task);
+ mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+ mq);
+}
+
+
/**
* Create a message queue for the specified handlers.
*
}
-
static void
server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
void *impl_state)
{
struct ServerClientSocketState *state = impl_state;
+ if (NULL != state->th)
+ {
+ GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
+ state->th = NULL;
+ }
+
GNUNET_assert (NULL != mq);
GNUNET_assert (NULL != state);
GNUNET_SERVER_client_drop (state->client);
GNUNET_free (state);
}
+
static void
server_client_send_impl (struct GNUNET_MQ_Handle *mq,
const struct GNUNET_MessageHeader *msg, void *impl_state)
* Transmit a queued message to the session's client.
*
* @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
* @param buf where the callee should write the message
* @return number of bytes written to buf
*/
static size_t
-connection_client_transmit_queued (void *cls, size_t size,
- void *buf)
+connection_client_transmit_queued (void *cls,
+ size_t size,
+ void *buf)
{
struct GNUNET_MQ_Handle *mq = cls;
const struct GNUNET_MessageHeader *msg;
}
-
static void
connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
GNUNET_free (impl_state);
}
+
static void
connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
const struct GNUNET_MessageHeader *msg, void *impl_state)
}
-
void *
GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
{
{
mq->destroy_impl (mq, mq->impl_state);
}
-
+ if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task)
+ {
+ GNUNET_SCHEDULER_cancel (mq->continue_task);
+ mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (NULL != mq->envelope_head)
{
struct GNUNET_MQ_Envelope *ev;