-move tcp session check into extra checks condition
[oweals/gnunet.git] / src / util / mq.c
index 68134881eb8d8522a929430141ce7841424ddfdd..a4691ff2c2eccef9d447a971c448090f8509b18c 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -23,9 +23,7 @@
  * @file util/mq.c
  * @brief general purpose request queue
  */
  * @file util/mq.c
  * @brief general purpose request queue
  */
-
 #include "platform.h"
 #include "platform.h"
-#include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
 #include "gnunet_util_lib.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
@@ -123,15 +121,14 @@ struct GNUNET_MQ_Handle
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
-   * Has the current envelope been commited?
-   * Either GNUNET_YES or GNUNET_NO.
+   * Map of associations, lazily allocated
    */
    */
-  int commited;
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
   /**
 
   /**
-   * Map of associations, lazily allocated
+   * Task scheduled during #GNUNET_MQ_impl_send_continue.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+  GNUNET_SCHEDULER_TaskIdentifier continue_task;
 
   /**
    * Next id that should be used for the assoc_map,
 
   /**
    * Next id that should be used for the assoc_map,
@@ -194,7 +191,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_Messa
       handled = GNUNET_YES;
     }
   }
       handled = GNUNET_YES;
     }
   }
-  
+
   if (GNUNET_NO == handled)
     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
 }
   if (GNUNET_NO == handled)
     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
 }
@@ -235,7 +232,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
- * 
+ *
  * @param mq message queue
  * @param ev the envelope with the message to send.
  */
  * @param mq message queue
  * @param ev the envelope with the message to send.
  */
@@ -244,7 +241,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
-  
+
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
@@ -257,36 +254,60 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 
 
 /**
 
 
 /**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any.  Only useful for implementing message queues,
  * results in undefined behavior if not used carefully.
  *
  * results in undefined behavior if not used carefully.
  *
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
  */
  */
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct GNUNET_MQ_Envelope *current_envelope;
+
+  if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+    return;
+
+  mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
   /* call is only valid if we're actually currently sending
    * a message */
   /* call is only valid if we're actually currently sending
    * a message */
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_YES == mq->commited);
-  mq->commited = GNUNET_NO;
-  GNUNET_free (mq->current_envelope);
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
-    return;
   }
   }
+  else
+  {
+    mq->current_envelope = mq->envelope_head;
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                 mq->envelope_tail,
+                                 mq->current_envelope);
+    mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+  }
+  if (NULL != current_envelope->sent_cb)
+    current_envelope->sent_cb (current_envelope->sent_cls);
+  GNUNET_free (current_envelope);
+}
 
 
 
 
-  GNUNET_assert (NULL != mq->envelope_tail);
-  GNUNET_assert (NULL != mq->envelope_head);
-  mq->current_envelope = mq->envelope_head;
-  GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
-                               mq->current_envelope);
-  mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task);
+  mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+                                                mq);
 }
 
 
 }
 
 
@@ -365,25 +386,6 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
 }
 
 
 }
 
 
-
-/**
- * Mark the current message as irrevocably sent, but do not
- * proceed with sending the next message.
- * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
- *
- * @param mq message queue
- */
-void
-GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
-{
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_NO == mq->commited);
-  mq->commited = GNUNET_YES;
-  if (NULL != mq->current_envelope->sent_cb)
-    mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
-}
-
-
 struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
 {
 struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
 {
@@ -461,19 +463,25 @@ transmit_queued (void *cls, size_t size,
 }
 
 
 }
 
 
-
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
-  
+
+  if (NULL != state->th)
+  {
+    GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
+    state->th = NULL;
+  }
+
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
+
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
                          const struct GNUNET_MessageHeader *msg, void *impl_state)
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
                          const struct GNUNET_MessageHeader *msg, void *impl_state)
@@ -482,10 +490,7 @@ server_client_send_impl (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
+  state->th =
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
@@ -524,7 +529,7 @@ handle_client_message (void *cls,
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
-  
+
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
@@ -542,13 +547,14 @@ handle_client_message (void *cls,
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
-connection_client_transmit_queued (void *cls, size_t size,
-                 void *buf)
+connection_client_transmit_queued (void *cls,
+                                   size_t size,
+                                   void *buf)
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
@@ -584,13 +590,13 @@ connection_client_transmit_queued (void *cls, size_t size,
 }
 
 
 }
 
 
-
 static void
 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
   GNUNET_free (impl_state);
 }
 
 static void
 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
   GNUNET_free (impl_state);
 }
 
+
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
                              const struct GNUNET_MessageHeader *msg, void *impl_state)
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
                              const struct GNUNET_MessageHeader *msg, void *impl_state)
@@ -599,13 +605,11 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
-      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), 
+  state->th =
+      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
+  GNUNET_assert (NULL != state->th);
 }
 
 
 }
 
 
@@ -672,7 +676,6 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
 }
 
 
 }
 
 
-
 void *
 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
 void *
 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
@@ -690,9 +693,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
   if (NULL == mq->assoc_map)
     return NULL;
   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
   if (NULL == mq->assoc_map)
     return NULL;
   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
-  GNUNET_assert (NULL != val);
-  GNUNET_assert (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val));
+  GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
   return val;
 }
 
   return val;
 }
 
@@ -714,7 +715,11 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
-
+  if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task)
+  {
+    GNUNET_SCHEDULER_cancel (mq->continue_task);
+    mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
+  }
   while (NULL != mq->envelope_head)
   {
     struct GNUNET_MQ_Envelope *ev;
   while (NULL != mq->envelope_head)
   {
     struct GNUNET_MQ_Envelope *ev;
@@ -729,6 +734,12 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
     mq->current_envelope = NULL;
   }
 
     mq->current_envelope = NULL;
   }
 
+  if (NULL != mq->assoc_map)
+  {
+    GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
+    mq->assoc_map = NULL;
+  }
+
   GNUNET_free (mq);
 }
 
   GNUNET_free (mq);
 }