-move tcp session check into extra checks condition
[oweals/gnunet.git] / src / util / mq.c
index 733329a2ce7a1499702af89ea1b2736db4a7a02e..a4691ff2c2eccef9d447a971c448090f8509b18c 100644 (file)
@@ -121,15 +121,14 @@ struct GNUNET_MQ_Handle
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
-   * Has the current envelope been commited?
-   * Either GNUNET_YES or GNUNET_NO.
+   * Map of associations, lazily allocated
    */
-  int commited;
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
   /**
-   * Map of associations, lazily allocated
+   * Task scheduled during #GNUNET_MQ_impl_send_continue.
    */
-  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+  GNUNET_SCHEDULER_TaskIdentifier continue_task;
 
   /**
    * Next id that should be used for the assoc_map,
@@ -192,7 +191,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_Messa
       handled = GNUNET_YES;
     }
   }
-  
+
   if (GNUNET_NO == handled)
     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
 }
@@ -233,7 +232,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
- * 
+ *
  * @param mq message queue
  * @param ev the envelope with the message to send.
  */
@@ -242,7 +241,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
-  
+
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
@@ -255,36 +254,60 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 
 
 /**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any.  Only useful for implementing message queues,
  * results in undefined behavior if not used carefully.
  *
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
  */
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct GNUNET_MQ_Envelope *current_envelope;
+
+  if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+    return;
+
+  mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
   /* call is only valid if we're actually currently sending
    * a message */
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_YES == mq->commited);
-  mq->commited = GNUNET_NO;
-  GNUNET_free (mq->current_envelope);
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
-    return;
   }
+  else
+  {
+    mq->current_envelope = mq->envelope_head;
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                 mq->envelope_tail,
+                                 mq->current_envelope);
+    mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+  }
+  if (NULL != current_envelope->sent_cb)
+    current_envelope->sent_cb (current_envelope->sent_cls);
+  GNUNET_free (current_envelope);
+}
 
 
-  GNUNET_assert (NULL != mq->envelope_tail);
-  GNUNET_assert (NULL != mq->envelope_head);
-  mq->current_envelope = mq->envelope_head;
-  GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
-                               mq->current_envelope);
-  mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task);
+  mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+                                                mq);
 }
 
 
@@ -363,25 +386,6 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
 }
 
 
-
-/**
- * Mark the current message as irrevocably sent, but do not
- * proceed with sending the next message.
- * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
- *
- * @param mq message queue
- */
-void
-GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
-{
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_NO == mq->commited);
-  mq->commited = GNUNET_YES;
-  if (NULL != mq->current_envelope->sent_cb)
-    mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
-}
-
-
 struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
 {
@@ -459,19 +463,25 @@ transmit_queued (void *cls, size_t size,
 }
 
 
-
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
-  
+
+  if (NULL != state->th)
+  {
+    GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
+    state->th = NULL;
+  }
+
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
+
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
                          const struct GNUNET_MessageHeader *msg, void *impl_state)
@@ -480,10 +490,7 @@ server_client_send_impl (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
+  state->th =
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
@@ -522,7 +529,7 @@ handle_client_message (void *cls,
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
-  
+
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
@@ -540,13 +547,14 @@ handle_client_message (void *cls,
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
-connection_client_transmit_queued (void *cls, size_t size,
-                 void *buf)
+connection_client_transmit_queued (void *cls,
+                                   size_t size,
+                                   void *buf)
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
@@ -582,13 +590,13 @@ connection_client_transmit_queued (void *cls, size_t size,
 }
 
 
-
 static void
 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
   GNUNET_free (impl_state);
 }
 
+
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
                              const struct GNUNET_MessageHeader *msg, void *impl_state)
@@ -597,13 +605,11 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
-      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), 
+  state->th =
+      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
+  GNUNET_assert (NULL != state->th);
 }
 
 
@@ -670,7 +676,6 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
 }
 
 
-
 void *
 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
@@ -710,7 +715,11 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
-
+  if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task)
+  {
+    GNUNET_SCHEDULER_cancel (mq->continue_task);
+    mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
+  }
   while (NULL != mq->envelope_head)
   {
     struct GNUNET_MQ_Envelope *ev;