Add a third default.
[oweals/gnunet.git] / src / util / mq.c
index c4f9e0d0b3c2ecd4f7f4ab83b6a970e2d3b4c4be..bc13bbb36660835e6bc5307c01b0d6e26f236c45 100644 (file)
@@ -121,15 +121,14 @@ struct GNUNET_MQ_Handle
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
-   * Has the current envelope been commited?
-   * Either GNUNET_YES or GNUNET_NO.
+   * Map of associations, lazily allocated
    */
-  int commited;
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
   /**
-   * Map of associations, lazily allocated
+   * Task scheduled during #GNUNET_MQ_impl_send_continue.
    */
-  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+  GNUNET_SCHEDULER_TaskIdentifier continue_task;
 
   /**
    * Next id that should be used for the assoc_map,
@@ -255,36 +254,57 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 
 
 /**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any.  Only useful for implementing message queues,
  * results in undefined behavior if not used carefully.
  *
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
  */
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct GNUNET_MQ_Envelope *current_envelope;
+
+  mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
   /* call is only valid if we're actually currently sending
    * a message */
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_YES == mq->commited);
-  mq->commited = GNUNET_NO;
-  GNUNET_free (mq->current_envelope);
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
-    return;
   }
+  else
+  {
+    mq->current_envelope = mq->envelope_head;
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                 mq->envelope_tail,
+                                 mq->current_envelope);
+    mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+  }
+  if (NULL != current_envelope->sent_cb)
+    current_envelope->sent_cb (current_envelope->sent_cls);
+  GNUNET_free (current_envelope);
+}
 
 
-  GNUNET_assert (NULL != mq->envelope_tail);
-  GNUNET_assert (NULL != mq->envelope_head);
-  mq->current_envelope = mq->envelope_head;
-  GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
-                               mq->current_envelope);
-  mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task);
+  mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+                                                mq);
 }
 
 
@@ -363,25 +383,6 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
 }
 
 
-
-/**
- * Mark the current message as irrevocably sent, but do not
- * proceed with sending the next message.
- * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
- *
- * @param mq message queue
- */
-void
-GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
-{
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_NO == mq->commited);
-  mq->commited = GNUNET_YES;
-  if (NULL != mq->current_envelope->sent_cb)
-    mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
-}
-
-
 struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
 {
@@ -459,7 +460,6 @@ transmit_queued (void *cls, size_t size,
 }
 
 
-
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
@@ -472,6 +472,7 @@ server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
   GNUNET_free (state);
 }
 
+
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
                          const struct GNUNET_MessageHeader *msg, void *impl_state)
@@ -480,9 +481,6 @@ server_client_send_impl (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
   state->th =
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
@@ -540,13 +538,14 @@ handle_client_message (void *cls,
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
-connection_client_transmit_queued (void *cls, size_t size,
-                 void *buf)
+connection_client_transmit_queued (void *cls,
+                                   size_t size,
+                                   void *buf)
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
@@ -582,13 +581,13 @@ connection_client_transmit_queued (void *cls, size_t size,
 }
 
 
-
 static void
 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
   GNUNET_free (impl_state);
 }
 
+
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
                              const struct GNUNET_MessageHeader *msg, void *impl_state)
@@ -597,13 +596,11 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
   state->th =
       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
+  GNUNET_assert (NULL != state->th);
 }
 
 
@@ -670,7 +667,6 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
 }
 
 
-
 void *
 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
@@ -710,7 +706,11 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
-
+  if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task)
+  {
+    GNUNET_SCHEDULER_cancel (mq->continue_task);
+    mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
+  }
   while (NULL != mq->envelope_head)
   {
     struct GNUNET_MQ_Envelope *ev;