Don't shadow the system() function
[oweals/gnunet.git] / src / util / mq.c
index 68134881eb8d8522a929430141ce7841424ddfdd..14e0816e28b3f2aa6e989320ed1ff463321bbf79 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2012 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -23,9 +23,7 @@
  * @file util/mq.c
  * @brief general purpose request queue
  */
  * @file util/mq.c
  * @brief general purpose request queue
  */
-
 #include "platform.h"
 #include "platform.h"
-#include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
 #include "gnunet_util_lib.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
@@ -62,7 +60,7 @@ struct GNUNET_MQ_Envelope
   GNUNET_MQ_NotifyCallback sent_cb;
 
   /**
   GNUNET_MQ_NotifyCallback sent_cb;
 
   /**
-   * Closure for send_cb
+   * Closure for @e send_cb
    */
   void *sent_cls;
 };
    */
   void *sent_cls;
 };
@@ -95,6 +93,11 @@ struct GNUNET_MQ_Handle
    */
   GNUNET_MQ_DestroyImpl destroy_impl;
 
    */
   GNUNET_MQ_DestroyImpl destroy_impl;
 
+  /**
+   * Implementation-dependent send cancel function
+   */
+  GNUNET_MQ_CancelImpl cancel_impl;
+
   /**
    * Implementation-specific state
    */
   /**
    * Implementation-specific state
    */
@@ -123,34 +126,46 @@ struct GNUNET_MQ_Handle
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
-   * Has the current envelope been commited?
-   * Either GNUNET_YES or GNUNET_NO.
+   * Map of associations, lazily allocated
    */
    */
-  int commited;
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
   /**
 
   /**
-   * Map of associations, lazily allocated
+   * Task scheduled during #GNUNET_MQ_impl_send_continue.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+  struct GNUNET_SCHEDULER_Task * continue_task;
 
   /**
 
   /**
-   * Next id that should be used for the assoc_map,
+   * Next id that should be used for the @e assoc_map,
    * initialized lazily to a random value together with
    * initialized lazily to a random value together with
-   * assoc_map
+   * @e assoc_map
    */
   uint32_t assoc_id;
 };
 
 
    */
   uint32_t assoc_id;
 };
 
 
-
-
+/**
+ * Implementation-specific state for connection to
+ * client (MQ for server).
+ */
 struct ServerClientSocketState
 {
 struct ServerClientSocketState
 {
+  /**
+   * Handle of the client that connected to the server.
+   */
   struct GNUNET_SERVER_Client *client;
   struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Active transmission request to the client.
+   */
   struct GNUNET_SERVER_TransmitHandle* th;
 };
 
 
   struct GNUNET_SERVER_TransmitHandle* th;
 };
 
 
+/**
+ * Implementation-specific state for connection to
+ * service (MQ for clients).
+ */
 struct ClientConnectionState
 {
   /**
 struct ClientConnectionState
 {
   /**
@@ -162,7 +177,15 @@ struct ClientConnectionState
    * Do we also want to receive?
    */
   int receive_requested;
    * Do we also want to receive?
    */
   int receive_requested;
+
+  /**
+   * Connection to the service.
+   */
   struct GNUNET_CLIENT_Connection *connection;
   struct GNUNET_CLIENT_Connection *connection;
+
+  /**
+   * Active transmission request (or NULL).
+   */
   struct GNUNET_CLIENT_TransmitHandle *th;
 };
 
   struct GNUNET_CLIENT_TransmitHandle *th;
 };
 
@@ -178,25 +201,32 @@ struct ClientConnectionState
  * @param mh message to dispatch
  */
 void
  * @param mh message to dispatch
  */
 void
-GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
+GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
+                          const struct GNUNET_MessageHeader *mh)
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
 
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
 
-  handler = mq->handlers;
-  if (NULL == handler)
+  if (NULL == mq->handlers)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "No handler for message of type %d\n",
+         ntohs (mh->type));
     return;
     return;
-  for (; NULL != handler->cb; handler++)
+  }
+  for (handler = mq->handlers; NULL != handler->cb; handler++)
   {
     if (handler->type == ntohs (mh->type))
     {
       handler->cb (mq->handlers_cls, mh);
       handled = GNUNET_YES;
   {
     if (handler->type == ntohs (mh->type))
     {
       handler->cb (mq->handlers_cls, mh);
       handled = GNUNET_YES;
+      break;
     }
   }
     }
   }
-  
   if (GNUNET_NO == handled)
   if (GNUNET_NO == handled)
-    LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "No handler for message of type %d\n",
+         ntohs (mh->type));
 }
 
 
 }
 
 
@@ -216,8 +246,9 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
 {
   if (NULL == mq->error_handler)
   {
 {
   if (NULL == mq->error_handler)
   {
-    /* FIXME: log what kind of error occured */
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "mq: got error %d, but no handler installed\n",
+                (int) error);
     return;
   }
   mq->error_handler (mq->handlers_cls, error);
     return;
   }
   mq->error_handler (mq->handlers_cls, error);
@@ -235,20 +266,24 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
- * 
+ *
  * @param mq message queue
  * @param ev the envelope with the message to send.
  */
 void
  * @param mq message queue
  * @param ev the envelope with the message to send.
  */
 void
-GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
+GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
+                struct GNUNET_MQ_Envelope *ev)
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
-  
+
+  ev->parent_queue = mq;
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
-    GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
+    GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
+                                      mq->envelope_tail,
+                                      ev);
     return;
   }
   mq->current_envelope = ev;
     return;
   }
   mq->current_envelope = ev;
@@ -257,36 +292,61 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 
 
 /**
 
 
 /**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any.  Only useful for implementing message queues,
  * results in undefined behavior if not used carefully.
  *
  * results in undefined behavior if not used carefully.
  *
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
  */
  */
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct GNUNET_MQ_Envelope *current_envelope;
+
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+
+  mq->continue_task = NULL;
   /* call is only valid if we're actually currently sending
    * a message */
   /* call is only valid if we're actually currently sending
    * a message */
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_YES == mq->commited);
-  mq->commited = GNUNET_NO;
-  GNUNET_free (mq->current_envelope);
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
+  current_envelope->parent_queue = NULL;
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
-    return;
   }
   }
+  else
+  {
+    mq->current_envelope = mq->envelope_head;
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                 mq->envelope_tail,
+                                 mq->current_envelope);
+    mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+  }
+  if (NULL != current_envelope->sent_cb)
+    current_envelope->sent_cb (current_envelope->sent_cls);
+  GNUNET_free (current_envelope);
+}
 
 
 
 
-  GNUNET_assert (NULL != mq->envelope_tail);
-  GNUNET_assert (NULL != mq->envelope_head);
-  mq->current_envelope = mq->envelope_head;
-  GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
-                               mq->current_envelope);
-  mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_assert (NULL == mq->continue_task);
+  mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+                                                mq);
 }
 
 
 }
 
 
@@ -316,6 +376,7 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->send_impl = send;
   mq->destroy_impl = destroy;
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->send_impl = send;
   mq->destroy_impl = destroy;
+  mq->cancel_impl = cancel;
   mq->handlers = handlers;
   mq->handlers_cls = cls;
   mq->impl_state = impl_state;
   mq->handlers = handlers;
   mq->handlers_cls = cls;
   mq->impl_state = impl_state;
@@ -337,9 +398,9 @@ const struct GNUNET_MessageHeader *
 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
 {
   if (NULL == mq->current_envelope)
 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
 {
   if (NULL == mq->current_envelope)
-    GNUNET_abort ();
+    GNUNET_assert (0);
   if (NULL == mq->current_envelope->mh)
   if (NULL == mq->current_envelope->mh)
-    GNUNET_abort ();
+    GNUNET_assert (0);
   return mq->current_envelope->mh;
 }
 
   return mq->current_envelope->mh;
 }
 
@@ -365,27 +426,10 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
 }
 
 
 }
 
 
-
-/**
- * Mark the current message as irrevocably sent, but do not
- * proceed with sending the next message.
- * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
- *
- * @param mq message queue
- */
-void
-GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
-{
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_NO == mq->commited);
-  mq->commited = GNUNET_YES;
-  if (NULL != mq->current_envelope->sent_cb)
-    mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
-}
-
-
 struct GNUNET_MQ_Envelope *
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
+GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
+                uint16_t size,
+                uint16_t type)
 {
   struct GNUNET_MQ_Envelope *mqm;
 
 {
   struct GNUNET_MQ_Envelope *mqm;
 
@@ -409,7 +453,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
  * @param nested_mh the message to append to the message after base_size
  */
 struct GNUNET_MQ_Envelope *
  * @param nested_mh the message to append to the message after base_size
  */
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
+GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
+                          uint16_t base_size,
+                          uint16_t type,
                           const struct GNUNET_MessageHeader *nested_mh)
 {
   struct GNUNET_MQ_Envelope *mqm;
                           const struct GNUNET_MessageHeader *nested_mh)
 {
   struct GNUNET_MQ_Envelope *mqm;
@@ -461,31 +507,35 @@ transmit_queued (void *cls, size_t size,
 }
 
 
 }
 
 
-
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
-  
+
+  if (NULL != state->th)
+  {
+    GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
+    state->th = NULL;
+  }
+
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
+
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
-                         const struct GNUNET_MessageHeader *msg, void *impl_state)
+                         const struct GNUNET_MessageHeader *msg,
+                         void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
 {
   struct ServerClientSocketState *state = impl_state;
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
+  state->th =
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
@@ -524,7 +574,7 @@ handle_client_message (void *cls,
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
-  
+
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
@@ -542,13 +592,14 @@ handle_client_message (void *cls,
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
-connection_client_transmit_queued (void *cls, size_t size,
-                 void *buf)
+connection_client_transmit_queued (void *cls,
+                                   size_t size,
+                                   void *buf)
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
@@ -572,7 +623,6 @@ connection_client_transmit_queued (void *cls, size_t size,
                            GNUNET_TIME_UNIT_FOREVER_REL);
   }
 
                            GNUNET_TIME_UNIT_FOREVER_REL);
   }
 
-
   msg_size = ntohs (msg->size);
   GNUNET_assert (size >= msg_size);
   memcpy (buf, msg, msg_size);
   msg_size = ntohs (msg->size);
   GNUNET_assert (size >= msg_size);
   memcpy (buf, msg, msg_size);
@@ -584,28 +634,39 @@ connection_client_transmit_queued (void *cls, size_t size,
 }
 
 
 }
 
 
-
 static void
 static void
-connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
+connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
+                                void *impl_state)
 {
   GNUNET_free (impl_state);
 }
 
 {
   GNUNET_free (impl_state);
 }
 
+
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
-                             const struct GNUNET_MessageHeader *msg, void *impl_state)
+                             const struct GNUNET_MessageHeader *msg,
+                             void *impl_state)
 {
   struct ClientConnectionState *state = impl_state;
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
 {
   struct ClientConnectionState *state = impl_state;
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
-      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), 
+  state->th =
+      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
+  GNUNET_assert (NULL != state->th);
+}
+
+
+static void
+connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
+                               void *impl_state)
+{
+  struct ClientConnectionState *state = impl_state;
+  GNUNET_assert (NULL != state->th);
+  GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
+  state->th = NULL;
 }
 
 
 }
 
 
@@ -629,6 +690,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
   mq->impl_state = state;
   mq->send_impl = connection_client_send_impl;
   mq->destroy_impl = connection_client_destroy_impl;
   mq->impl_state = state;
   mq->send_impl = connection_client_send_impl;
   mq->destroy_impl = connection_client_destroy_impl;
+  mq->cancel_impl = connection_client_cancel_impl;
   if (NULL != handlers)
     state->receive_requested = GNUNET_YES;
 
   if (NULL != handlers)
     state->receive_requested = GNUNET_YES;
 
@@ -672,9 +734,9 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
 }
 
 
 }
 
 
-
 void *
 void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
+                     uint32_t request_id)
 {
   if (NULL == mq->assoc_map)
     return NULL;
 {
   if (NULL == mq->assoc_map)
     return NULL;
@@ -683,16 +745,15 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 
 
 void *
 
 
 void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
+                        uint32_t request_id)
 {
   void *val;
 
   if (NULL == mq->assoc_map)
     return NULL;
   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
 {
   void *val;
 
   if (NULL == mq->assoc_map)
     return NULL;
   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
-  GNUNET_assert (NULL != val);
-  GNUNET_assert (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val));
+  GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
   return val;
 }
 
   return val;
 }
 
@@ -714,55 +775,107 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
-
+  if (NULL != mq->continue_task)
+  {
+    GNUNET_SCHEDULER_cancel (mq->continue_task);
+    mq->continue_task = NULL;
+  }
   while (NULL != mq->envelope_head)
   {
     struct GNUNET_MQ_Envelope *ev;
     ev = mq->envelope_head;
   while (NULL != mq->envelope_head)
   {
     struct GNUNET_MQ_Envelope *ev;
     ev = mq->envelope_head;
+    ev->parent_queue = NULL;
     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
     GNUNET_MQ_discard (ev);
   }
 
   if (NULL != mq->current_envelope)
   {
     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
     GNUNET_MQ_discard (ev);
   }
 
   if (NULL != mq->current_envelope)
   {
+    /* we can only discard envelopes that
+     * are not queued! */
+    mq->current_envelope->parent_queue = NULL;
     GNUNET_MQ_discard (mq->current_envelope);
     mq->current_envelope = NULL;
   }
 
     GNUNET_MQ_discard (mq->current_envelope);
     mq->current_envelope = NULL;
   }
 
+  if (NULL != mq->assoc_map)
+  {
+    GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
+    mq->assoc_map = NULL;
+  }
+
   GNUNET_free (mq);
 }
 
 
   GNUNET_free (mq);
 }
 
 
-struct GNUNET_MessageHeader *
-GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
+                              uint16_t base_size)
 {
   uint16_t whole_size;
   uint16_t nested_size;
 {
   uint16_t whole_size;
   uint16_t nested_size;
-  struct GNUNET_MessageHeader *nested_msg;
+  const struct GNUNET_MessageHeader *nested_msg;
 
   whole_size = ntohs (mh->size);
   GNUNET_assert (whole_size >= base_size);
 
   whole_size = ntohs (mh->size);
   GNUNET_assert (whole_size >= base_size);
-
   nested_size = whole_size - base_size;
   nested_size = whole_size - base_size;
-
   if (0 == nested_size)
     return NULL;
   if (0 == nested_size)
     return NULL;
-
   if (nested_size < sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
     return NULL;
   }
   if (nested_size < sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
     return NULL;
   }
-
-  nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
-
+  nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
   if (ntohs (nested_msg->size) != nested_size)
   {
     GNUNET_break_op (0);
   if (ntohs (nested_msg->size) != nested_size)
   {
     GNUNET_break_op (0);
-    nested_msg->size = htons (nested_size);
+    return NULL;
   }
   }
-
   return nested_msg;
 }
 
 
   return nested_msg;
 }
 
 
+/**
+ * Cancel sending the message. Message must have been sent with
+ * #GNUNET_MQ_send before.  May not be called after the notify sent
+ * callback has been called
+ *
+ * @param ev queued envelope to cancel
+ */
+void
+GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
+{
+  struct GNUNET_MQ_Handle *mq = ev->parent_queue;
+
+  GNUNET_assert (NULL != mq);
+  GNUNET_assert (NULL != mq->cancel_impl);
+
+  if (mq->current_envelope == ev) {
+    // complex case, we already started with transmitting
+    // the message
+    mq->cancel_impl (mq, mq->impl_state);
+    // continue sending the next message, if any
+    if (NULL == mq->envelope_head)
+    {
+      mq->current_envelope = NULL;
+    }
+    else
+    {
+      mq->current_envelope = mq->envelope_head;
+      GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                   mq->envelope_tail,
+                                   mq->current_envelope);
+      mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+    }
+  } else {
+    // simple case, message is still waiting in the queue
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
+  }
+
+  ev->parent_queue = NULL;
+  ev->mh = NULL;
+  GNUNET_free (ev);
+}
+
+/* end of mq.c */