Don't shadow the system() function
[oweals/gnunet.git] / src / util / mq.c
index d0253c40f073ebe0f597fe72b0b3aac7f0c7854c..14e0816e28b3f2aa6e989320ed1ff463321bbf79 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2012 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -23,9 +23,7 @@
  * @file util/mq.c
  * @brief general purpose request queue
  */
  * @file util/mq.c
  * @brief general purpose request queue
  */
-
 #include "platform.h"
 #include "platform.h"
-#include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
 #include "gnunet_util_lib.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
@@ -62,7 +60,7 @@ struct GNUNET_MQ_Envelope
   GNUNET_MQ_NotifyCallback sent_cb;
 
   /**
   GNUNET_MQ_NotifyCallback sent_cb;
 
   /**
-   * Closure for send_cb
+   * Closure for @e send_cb
    */
   void *sent_cls;
 };
    */
   void *sent_cls;
 };
@@ -95,6 +93,11 @@ struct GNUNET_MQ_Handle
    */
   GNUNET_MQ_DestroyImpl destroy_impl;
 
    */
   GNUNET_MQ_DestroyImpl destroy_impl;
 
+  /**
+   * Implementation-dependent send cancel function
+   */
+  GNUNET_MQ_CancelImpl cancel_impl;
+
   /**
    * Implementation-specific state
    */
   /**
    * Implementation-specific state
    */
@@ -123,34 +126,46 @@ struct GNUNET_MQ_Handle
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
-   * Has the current envelope been commited?
-   * Either GNUNET_YES or GNUNET_NO.
+   * Map of associations, lazily allocated
    */
    */
-  int commited;
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
   /**
 
   /**
-   * Map of associations, lazily allocated
+   * Task scheduled during #GNUNET_MQ_impl_send_continue.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+  struct GNUNET_SCHEDULER_Task * continue_task;
 
   /**
 
   /**
-   * Next id that should be used for the assoc_map,
+   * Next id that should be used for the @e assoc_map,
    * initialized lazily to a random value together with
    * initialized lazily to a random value together with
-   * assoc_map
+   * @e assoc_map
    */
   uint32_t assoc_id;
 };
 
 
    */
   uint32_t assoc_id;
 };
 
 
-
-
+/**
+ * Implementation-specific state for connection to
+ * client (MQ for server).
+ */
 struct ServerClientSocketState
 {
 struct ServerClientSocketState
 {
+  /**
+   * Handle of the client that connected to the server.
+   */
   struct GNUNET_SERVER_Client *client;
   struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Active transmission request to the client.
+   */
   struct GNUNET_SERVER_TransmitHandle* th;
 };
 
 
   struct GNUNET_SERVER_TransmitHandle* th;
 };
 
 
+/**
+ * Implementation-specific state for connection to
+ * service (MQ for clients).
+ */
 struct ClientConnectionState
 {
   /**
 struct ClientConnectionState
 {
   /**
@@ -162,46 +177,68 @@ struct ClientConnectionState
    * Do we also want to receive?
    */
   int receive_requested;
    * Do we also want to receive?
    */
   int receive_requested;
+
+  /**
+   * Connection to the service.
+   */
   struct GNUNET_CLIENT_Connection *connection;
   struct GNUNET_CLIENT_Connection *connection;
+
+  /**
+   * Active transmission request (or NULL).
+   */
   struct GNUNET_CLIENT_TransmitHandle *th;
 };
 
 
   struct GNUNET_CLIENT_TransmitHandle *th;
 };
 
 
-
-
 /**
 /**
- * Call the right callback for a message.
+ * Call the message message handler that was registered
+ * for the type of the given message in the given message queue.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
  *
  * @param mq message queue with the handlers
  * @param mh message to dispatch
  */
 void
  *
  * @param mq message queue with the handlers
  * @param mh message to dispatch
  */
 void
-GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
+GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
+                          const struct GNUNET_MessageHeader *mh)
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
 
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
 
-  handler = mq->handlers;
-  if (NULL == handler)
+  if (NULL == mq->handlers)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "No handler for message of type %d\n",
+         ntohs (mh->type));
     return;
     return;
-  for (; NULL != handler->cb; handler++)
+  }
+  for (handler = mq->handlers; NULL != handler->cb; handler++)
   {
     if (handler->type == ntohs (mh->type))
     {
       handler->cb (mq->handlers_cls, mh);
       handled = GNUNET_YES;
   {
     if (handler->type == ntohs (mh->type))
     {
       handler->cb (mq->handlers_cls, mh);
       handled = GNUNET_YES;
+      break;
     }
   }
     }
   }
-  
   if (GNUNET_NO == handled)
   if (GNUNET_NO == handled)
-    LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "No handler for message of type %d\n",
+         ntohs (mh->type));
 }
 
 
 /**
 }
 
 
 /**
- * Call the right callback for an error condition.
+ * Call the error handler of a message queue with the given
+ * error code.  If there is no error handler, log a warning.
+ *
+ * This function is intended to be used by the implementation
+ * of message queues.
  *
  * @param mq message queue
  *
  * @param mq message queue
+ * @param error the error type
  */
 void
 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
  */
 void
 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
@@ -209,8 +246,9 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
 {
   if (NULL == mq->error_handler)
   {
 {
   if (NULL == mq->error_handler)
   {
-    /* FIXME: log what kind of error occured */
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "mq: got error %d, but no handler installed\n",
+                (int) error);
     return;
   }
   mq->error_handler (mq->handlers_cls, error);
     return;
   }
   mq->error_handler (mq->handlers_cls, error);
@@ -228,20 +266,24 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
 /**
  * Send a message with the give message queue.
  * May only be called once per message.
- * 
+ *
  * @param mq message queue
  * @param mq message queue
- * @param ev the message to send.
+ * @param ev the envelope with the message to send.
  */
 void
  */
 void
-GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
+GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
+                struct GNUNET_MQ_Envelope *ev)
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
 {
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL == ev->parent_queue);
-  
+
+  ev->parent_queue = mq;
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
   /* is the implementation busy? queue it! */
   if (NULL != mq->current_envelope)
   {
-    GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
+    GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
+                                      mq->envelope_tail,
+                                      ev);
     return;
   }
   mq->current_envelope = ev;
     return;
   }
   mq->current_envelope = ev;
@@ -250,36 +292,61 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 
 
 /**
 
 
 /**
- * Call the send implementation for the next queued message,
- * if any.
- * Only useful for implementing message queues,
+ * Task run to call the send implementation for the next queued
+ * message, if any.  Only useful for implementing message queues,
  * results in undefined behavior if not used carefully.
  *
  * results in undefined behavior if not used carefully.
  *
- * @param mq message queue to send the next message with
+ * @param cls message queue to send the next message with
+ * @param tc scheduler context
  */
  */
-void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+static void
+impl_send_continue (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct GNUNET_MQ_Envelope *current_envelope;
+
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+
+  mq->continue_task = NULL;
   /* call is only valid if we're actually currently sending
    * a message */
   /* call is only valid if we're actually currently sending
    * a message */
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_YES == mq->commited);
-  mq->commited = GNUNET_NO;
-  GNUNET_free (mq->current_envelope);
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
+  current_envelope->parent_queue = NULL;
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
   if (NULL == mq->envelope_head)
   {
     mq->current_envelope = NULL;
-    return;
   }
   }
+  else
+  {
+    mq->current_envelope = mq->envelope_head;
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                 mq->envelope_tail,
+                                 mq->current_envelope);
+    mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+  }
+  if (NULL != current_envelope->sent_cb)
+    current_envelope->sent_cb (current_envelope->sent_cls);
+  GNUNET_free (current_envelope);
+}
 
 
 
 
-  GNUNET_assert (NULL != mq->envelope_tail);
-  GNUNET_assert (NULL != mq->envelope_head);
-  mq->current_envelope = mq->envelope_head;
-  GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
-                               mq->current_envelope);
-  mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_assert (NULL == mq->continue_task);
+  mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+                                                mq);
 }
 
 
 }
 
 
@@ -288,10 +355,11 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
  *
  * @param send function the implements sending messages
  * @param destroy function that implements destroying the queue
  *
  * @param send function the implements sending messages
  * @param destroy function that implements destroying the queue
- * @param destroy function that implements canceling a message
- * @param state for the queue, passed to 'send' and 'destroy'
+ * @param cancel function that implements canceling a message
+ * @param impl_state for the queue, passed to 'send' and 'destroy'
  * @param handlers array of message handlers
  * @param error_handler handler for read and write errors
  * @param handlers array of message handlers
  * @param error_handler handler for read and write errors
+ * @param cls closure for message handlers and error handler
  * @return a new message queue
  */
 struct GNUNET_MQ_Handle *
  * @return a new message queue
  */
 struct GNUNET_MQ_Handle *
@@ -308,6 +376,7 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->send_impl = send;
   mq->destroy_impl = destroy;
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->send_impl = send;
   mq->destroy_impl = destroy;
+  mq->cancel_impl = cancel;
   mq->handlers = handlers;
   mq->handlers_cls = cls;
   mq->impl_state = impl_state;
   mq->handlers = handlers;
   mq->handlers_cls = cls;
   mq->impl_state = impl_state;
@@ -329,9 +398,9 @@ const struct GNUNET_MessageHeader *
 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
 {
   if (NULL == mq->current_envelope)
 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
 {
   if (NULL == mq->current_envelope)
-    GNUNET_abort ();
+    GNUNET_assert (0);
   if (NULL == mq->current_envelope->mh)
   if (NULL == mq->current_envelope->mh)
-    GNUNET_abort ();
+    GNUNET_assert (0);
   return mq->current_envelope->mh;
 }
 
   return mq->current_envelope->mh;
 }
 
@@ -357,27 +426,10 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
 }
 
 
 }
 
 
-
-/**
- * Mark the current message as irrevocably sent, but do not
- * proceed with sending the next message.
- * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
- *
- * @param mq message queue
- */
-void
-GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
-{
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (GNUNET_NO == mq->commited);
-  mq->commited = GNUNET_YES;
-  if (NULL != mq->current_envelope->sent_cb)
-    mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
-}
-
-
 struct GNUNET_MQ_Envelope *
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
+GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
+                uint16_t size,
+                uint16_t type)
 {
   struct GNUNET_MQ_Envelope *mqm;
 
 {
   struct GNUNET_MQ_Envelope *mqm;
 
@@ -391,8 +443,19 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
 }
 
 
 }
 
 
+/**
+ * Implementation of the GNUNET_MQ_msg_nested_mh macro.
+ *
+ * @param mhp pointer to the message header pointer that will be changed to allocate at
+ *        the newly allocated space for the message.
+ * @param base_size size of the data before the nested message
+ * @param type type of the message in the envelope
+ * @param nested_mh the message to append to the message after base_size
+ */
 struct GNUNET_MQ_Envelope *
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
+GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
+                          uint16_t base_size,
+                          uint16_t type,
                           const struct GNUNET_MessageHeader *nested_mh)
 {
   struct GNUNET_MQ_Envelope *mqm;
                           const struct GNUNET_MessageHeader *nested_mh)
 {
   struct GNUNET_MQ_Envelope *mqm;
@@ -444,31 +507,35 @@ transmit_queued (void *cls, size_t size,
 }
 
 
 }
 
 
-
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
 static void
 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
                             void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
-  
+
+  if (NULL != state->th)
+  {
+    GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
+    state->th = NULL;
+  }
+
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
+
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
 static void
 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
-                         const struct GNUNET_MessageHeader *msg, void *impl_state)
+                         const struct GNUNET_MessageHeader *msg,
+                         void *impl_state)
 {
   struct ServerClientSocketState *state = impl_state;
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
 {
   struct ServerClientSocketState *state = impl_state;
 
   GNUNET_assert (NULL != mq);
   GNUNET_assert (NULL != state);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
+  state->th =
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
@@ -507,7 +574,7 @@ handle_client_message (void *cls,
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
-  
+
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
   if (NULL == msg)
   {
     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
@@ -525,13 +592,14 @@ handle_client_message (void *cls,
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
  * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
 static size_t
-connection_client_transmit_queued (void *cls, size_t size,
-                 void *buf)
+connection_client_transmit_queued (void *cls,
+                                   size_t size,
+                                   void *buf)
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
 {
   struct GNUNET_MQ_Handle *mq = cls;
   const struct GNUNET_MessageHeader *msg;
@@ -555,7 +623,6 @@ connection_client_transmit_queued (void *cls, size_t size,
                            GNUNET_TIME_UNIT_FOREVER_REL);
   }
 
                            GNUNET_TIME_UNIT_FOREVER_REL);
   }
 
-
   msg_size = ntohs (msg->size);
   GNUNET_assert (size >= msg_size);
   memcpy (buf, msg, msg_size);
   msg_size = ntohs (msg->size);
   GNUNET_assert (size >= msg_size);
   memcpy (buf, msg, msg_size);
@@ -567,34 +634,46 @@ connection_client_transmit_queued (void *cls, size_t size,
 }
 
 
 }
 
 
-
 static void
 static void
-connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
+connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
+                                void *impl_state)
 {
   GNUNET_free (impl_state);
 }
 
 {
   GNUNET_free (impl_state);
 }
 
+
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
 static void
 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
-                             const struct GNUNET_MessageHeader *msg, void *impl_state)
+                             const struct GNUNET_MessageHeader *msg,
+                             void *impl_state)
 {
   struct ClientConnectionState *state = impl_state;
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
 {
   struct ClientConnectionState *state = impl_state;
 
   GNUNET_assert (NULL != state);
   GNUNET_assert (NULL == state->th);
-
-  GNUNET_MQ_impl_send_commit (mq);
-
-  state->th = 
-      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), 
+  state->th =
+      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
                                            &connection_client_transmit_queued, mq);
+  GNUNET_assert (NULL != state->th);
+}
+
+
+static void
+connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
+                               void *impl_state)
+{
+  struct ClientConnectionState *state = impl_state;
+  GNUNET_assert (NULL != state->th);
+  GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
+  state->th = NULL;
 }
 
 
 struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
                                        const struct GNUNET_MQ_MessageHandler *handlers,
 }
 
 
 struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
                                        const struct GNUNET_MQ_MessageHandler *handlers,
+                                       GNUNET_MQ_ErrorHandler error_handler,
                                        void *cls)
 {
   struct GNUNET_MQ_Handle *mq;
                                        void *cls)
 {
   struct GNUNET_MQ_Handle *mq;
@@ -604,12 +683,14 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
 
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->handlers = handlers;
 
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->handlers = handlers;
+  mq->error_handler = error_handler;
   mq->handlers_cls = cls;
   state = GNUNET_new (struct ClientConnectionState);
   state->connection = connection;
   mq->impl_state = state;
   mq->send_impl = connection_client_send_impl;
   mq->destroy_impl = connection_client_destroy_impl;
   mq->handlers_cls = cls;
   state = GNUNET_new (struct ClientConnectionState);
   state->connection = connection;
   mq->impl_state = state;
   mq->send_impl = connection_client_send_impl;
   mq->destroy_impl = connection_client_destroy_impl;
+  mq->cancel_impl = connection_client_cancel_impl;
   if (NULL != handlers)
     state->receive_requested = GNUNET_YES;
 
   if (NULL != handlers)
     state->receive_requested = GNUNET_YES;
 
@@ -633,7 +714,6 @@ GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
  * Associate the assoc_data in mq with a unique request id.
  *
  * @param mq message queue, id will be unique for the queue
  * Associate the assoc_data in mq with a unique request id.
  *
  * @param mq message queue, id will be unique for the queue
- * @param mqm message to associate
  * @param assoc_data to associate
  */
 uint32_t
  * @param assoc_data to associate
  */
 uint32_t
@@ -654,9 +734,9 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
 }
 
 
 }
 
 
-
 void *
 void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
+                     uint32_t request_id)
 {
   if (NULL == mq->assoc_map)
     return NULL;
 {
   if (NULL == mq->assoc_map)
     return NULL;
@@ -665,15 +745,15 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 
 
 void *
 
 
 void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
+                        uint32_t request_id)
 {
   void *val;
 
   if (NULL == mq->assoc_map)
     return NULL;
   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
 {
   void *val;
 
   if (NULL == mq->assoc_map)
     return NULL;
   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
-  GNUNET_assert (NULL != val);
-  GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
+  GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
   return val;
 }
 
   return val;
 }
 
@@ -691,48 +771,111 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
 void
 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
 {
 void
 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
 {
-  /* FIXME: destroy all pending messages in the queue */
-
   if (NULL != mq->destroy_impl)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
   if (NULL != mq->destroy_impl)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
+  if (NULL != mq->continue_task)
+  {
+    GNUNET_SCHEDULER_cancel (mq->continue_task);
+    mq->continue_task = NULL;
+  }
+  while (NULL != mq->envelope_head)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+    ev = mq->envelope_head;
+    ev->parent_queue = NULL;
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
+    GNUNET_MQ_discard (ev);
+  }
+
+  if (NULL != mq->current_envelope)
+  {
+    /* we can only discard envelopes that
+     * are not queued! */
+    mq->current_envelope->parent_queue = NULL;
+    GNUNET_MQ_discard (mq->current_envelope);
+    mq->current_envelope = NULL;
+  }
+
+  if (NULL != mq->assoc_map)
+  {
+    GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
+    mq->assoc_map = NULL;
+  }
 
   GNUNET_free (mq);
 }
 
 
 
   GNUNET_free (mq);
 }
 
 
-
-struct GNUNET_MessageHeader *
-GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
+                              uint16_t base_size)
 {
   uint16_t whole_size;
   uint16_t nested_size;
 {
   uint16_t whole_size;
   uint16_t nested_size;
-  struct GNUNET_MessageHeader *nested_msg;
+  const struct GNUNET_MessageHeader *nested_msg;
 
   whole_size = ntohs (mh->size);
   GNUNET_assert (whole_size >= base_size);
 
   whole_size = ntohs (mh->size);
   GNUNET_assert (whole_size >= base_size);
-
   nested_size = whole_size - base_size;
   nested_size = whole_size - base_size;
-
   if (0 == nested_size)
     return NULL;
   if (0 == nested_size)
     return NULL;
-
   if (nested_size < sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
     return NULL;
   }
   if (nested_size < sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
     return NULL;
   }
-
-  nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
-
+  nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
   if (ntohs (nested_msg->size) != nested_size)
   {
     GNUNET_break_op (0);
   if (ntohs (nested_msg->size) != nested_size)
   {
     GNUNET_break_op (0);
-    nested_msg->size = htons (nested_size);
+    return NULL;
   }
   }
-
   return nested_msg;
 }
 
 
   return nested_msg;
 }
 
 
+/**
+ * Cancel sending the message. Message must have been sent with
+ * #GNUNET_MQ_send before.  May not be called after the notify sent
+ * callback has been called
+ *
+ * @param ev queued envelope to cancel
+ */
+void
+GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
+{
+  struct GNUNET_MQ_Handle *mq = ev->parent_queue;
+
+  GNUNET_assert (NULL != mq);
+  GNUNET_assert (NULL != mq->cancel_impl);
+
+  if (mq->current_envelope == ev) {
+    // complex case, we already started with transmitting
+    // the message
+    mq->cancel_impl (mq, mq->impl_state);
+    // continue sending the next message, if any
+    if (NULL == mq->envelope_head)
+    {
+      mq->current_envelope = NULL;
+    }
+    else
+    {
+      mq->current_envelope = mq->envelope_head;
+      GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                                   mq->envelope_tail,
+                                   mq->current_envelope);
+      mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+    }
+  } else {
+    // simple case, message is still waiting in the queue
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
+  }
+
+  ev->parent_queue = NULL;
+  ev->mh = NULL;
+  GNUNET_free (ev);
+}
+
+/* end of mq.c */