plibc: win32 related, socket
[oweals/gnunet.git] / src / util / mq.c
index cb380de4303af45b065e1c0d9af8ae8dff9e4f18..188606fb48299ce4fbf79e641bb4e06f58b67eaf 100644 (file)
@@ -1,22 +1,22 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012-2014 GNUnet e.V.
+     Copyright (C) 2012-2019 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
-*/
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
 
 /**
  * @author Florian Dold
 #include "platform.h"
 #include "gnunet_util_lib.h"
 
-#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
+#define LOG(kind, ...) GNUNET_log_from(kind, "util-mq", __VA_ARGS__)
 
 
-struct GNUNET_MQ_Envelope
-{
+struct GNUNET_MQ_Envelope {
   /**
    * Messages are stored in a linked list.
    * Each queue has its own list of envelopes.
@@ -58,7 +57,7 @@ struct GNUNET_MQ_Envelope
   /**
    * Called after the message was sent irrevocably.
    */
-  GNUNET_MQ_NotifyCallback sent_cb;
+  GNUNET_SCHEDULER_TaskCallback sent_cb;
 
   /**
    * Closure for @e send_cb
@@ -70,14 +69,7 @@ struct GNUNET_MQ_Envelope
    * #GNUNET_MQ_env_set_options().   Only valid if
    * @e have_custom_options is set.
    */
-  uint64_t flags;
-
-  /**
-   * Additional options buffer set for this envelope by
-   * #GNUNET_MQ_env_set_options().  Only valid if
-   * @e have_custom_options is set.
-   */
-  const void *extra;
+  enum GNUNET_MQ_PriorityPreferences priority;
 
   /**
    * Did the application call #GNUNET_MQ_env_set_options()?
@@ -89,8 +81,7 @@ struct GNUNET_MQ_Envelope
 /**
  * Handle to a message queue.
  */
-struct GNUNET_MQ_Handle
-{
+struct GNUNET_MQ_Handle {
   /**
    * Handlers array, or NULL if the queue should not receive messages
    */
@@ -128,10 +119,10 @@ struct GNUNET_MQ_Handle
   void *error_handler_cls;
 
   /**
-   * Task to asynchronously run #impl_send_continue(). 
+   * Task to asynchronously run #impl_send_continue().
    */
   struct GNUNET_SCHEDULER_Task *send_task;
-  
+
   /**
    * Linked list of messages pending to be sent
    */
@@ -164,17 +155,11 @@ struct GNUNET_MQ_Handle
    */
   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
 
-  /**
-   * Additional options buffer set for this queue by
-   * #GNUNET_MQ_set_options().  Default is 0.
-   */
-  const void *default_extra;
-
   /**
    * Flags that were set for this queue by
    * #GNUNET_MQ_set_options().   Default is 0.
    */
-  uint64_t default_flags;
+  enum GNUNET_MQ_PriorityPreferences priority;
 
   /**
    * Next id that should be used for the @e assoc_map,
@@ -202,86 +187,100 @@ struct GNUNET_MQ_Handle
 
 
 /**
- * Implementation-specific state for connection to
- * client (MQ for server).
+ * Call the message message handler that was registered
+ * for the type of the given message in the given message queue.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param mq message queue with the handlers
+ * @param mh message to dispatch
  */
-struct ServerClientSocketState
+void
+GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq,
+                         const struct GNUNET_MessageHeader *mh)
 {
-  /**
-   * Handle of the client that connected to the server.
-   */
-  struct GNUNET_SERVER_Client *client;
+  int ret;
 
-  /**
-   * Active transmission request to the client.
-   */
-  struct GNUNET_SERVER_TransmitHandle *th;
-};
+  ret = GNUNET_MQ_handle_message(mq->handlers, mh);
+  if (GNUNET_SYSERR == ret)
+    {
+      GNUNET_MQ_inject_error(mq, GNUNET_MQ_ERROR_MALFORMED);
+      return;
+    }
+}
 
 
 /**
  * Call the message message handler that was registered
- * for the type of the given message in the given message queue.
+ * for the type of the given message in the given @a handlers list.
  *
  * This function is indended to be used for the implementation
  * of message queues.
  *
- * @param mq message queue with the handlers
+ * @param handlers a set of handlers
  * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ *         #GNUNET_SYSERR if message was rejected by check function
  */
-void
-GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
-                          const struct GNUNET_MessageHeader *mh)
+int
+GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers,
+                         const struct GNUNET_MessageHeader *mh)
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
-  uint16_t ms = ntohs (mh->size);
+  uint16_t msize = ntohs(mh->size);
+  uint16_t mtype = ntohs(mh->type);
 
-  if (NULL == mq->handlers)
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "Received message of type %u and size %u\n",
+      mtype,
+      msize);
+
+  if (NULL == handlers)
     goto done;
-  for (handler = mq->handlers; NULL != handler->cb; handler++)
-  {
-    if (handler->type == ntohs (mh->type))
+  for (handler = handlers; NULL != handler->cb; handler++)
     {
-      handled = GNUNET_YES;
-      if ( (handler->expected_size > ms) ||
-          ( (handler->expected_size != ms) &&
-            (NULL == handler->mv) ) )
-      {
-       /* Too small, or not an exact size and
-          no 'mv' handler to check rest */
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    "Received malformed message of type %u\n",
-                    (unsigned int) handler->type);
-       GNUNET_MQ_inject_error (mq,
-                               GNUNET_MQ_ERROR_MALFORMED);
-       break;
-      }
-      if ( (NULL == handler->mv) ||
-          (GNUNET_OK ==
-           handler->mv (handler->cls, mh)) )
-      {
-       /* message well-formed, pass to handler */
-       handler->cb (handler->cls, mh);
-      }
-      else
-      {
-       /* Message rejected by check routine */
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    "Received malformed message of type %u\n",
-                    (unsigned int) handler->type);
-       GNUNET_MQ_inject_error (mq,
-                               GNUNET_MQ_ERROR_MALFORMED);
-      }
-      break;
+      if (handler->type == mtype)
+        {
+          handled = GNUNET_YES;
+          if ((handler->expected_size > msize) ||
+              ((handler->expected_size != msize) && (NULL == handler->mv)))
+            {
+              /* Too small, or not an exact size and
+                 no 'mv' handler to check rest */
+              LOG(GNUNET_ERROR_TYPE_ERROR,
+                  "Received malformed message of type %u\n",
+                  (unsigned int)handler->type);
+              return GNUNET_SYSERR;
+            }
+          if ((NULL == handler->mv) ||
+              (GNUNET_OK == handler->mv(handler->cls, mh)))
+            {
+              /* message well-formed, pass to handler */
+              handler->cb(handler->cls, mh);
+            }
+          else
+            {
+              /* Message rejected by check routine */
+              LOG(GNUNET_ERROR_TYPE_ERROR,
+                  "Received malformed message of type %u\n",
+                  (unsigned int)handler->type);
+              return GNUNET_SYSERR;
+            }
+          break;
+        }
     }
-  }
- done:
+done:
   if (GNUNET_NO == handled)
-    LOG (GNUNET_ERROR_TYPE_INFO,
-         "No handler for message of type %d and size %d\n",
-         ntohs (mh->type),
-         ntohs (mh->size));
+    {
+      LOG(GNUNET_ERROR_TYPE_INFO,
+          "No handler for message of type %u and size %u\n",
+          mtype,
+          msize);
+      return GNUNET_NO;
+    }
+  return GNUNET_OK;
 }
 
 
@@ -296,18 +295,16 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
  * @param error the error type
  */
 void
-GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
-                        enum GNUNET_MQ_Error error)
+GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
 {
   if (NULL == mq->error_handler)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "Got error %d, but no handler installed\n",
-         (int) error);
-    return;
-  }
-  mq->error_handler (mq->error_handler_cls,
-                     error);
+    {
+      LOG(GNUNET_ERROR_TYPE_WARNING,
+          "Got error %d, but no handler installed\n",
+          (int)error);
+      return;
+    }
+  mq->error_handler(mq->error_handler_cls, error);
 }
 
 
@@ -319,10 +316,10 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
  * @param mqm the message to discard
  */
 void
-GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
+GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
 {
-  GNUNET_assert (NULL == ev->parent_queue);
-  GNUNET_free (ev);
+  GNUNET_assert(NULL == ev->parent_queue);
+  GNUNET_free(ev);
 }
 
 
@@ -333,12 +330,12 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
  * @return number of queued, non-transmitted messages
  */
 unsigned int
-GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
 {
   if (GNUNET_YES != mq->in_flight)
-  {
-    return mq->queue_length;
-  }
+    {
+      return mq->queue_length;
+    }
   return mq->queue_length - 1;
 }
 
@@ -351,27 +348,74 @@ GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
  * @param ev the envelope with the message to send.
  */
 void
-GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
-                struct GNUNET_MQ_Envelope *ev)
+GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 {
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL == ev->parent_queue);
+  GNUNET_assert(NULL != mq);
+  GNUNET_assert(NULL == ev->parent_queue);
 
   mq->queue_length++;
+  if (mq->queue_length >= 10000)
+    {
+      /* This would seem like a bug... */
+      GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
+                 "MQ with %u entries extended by message of type %u (FC broken?)\n",
+                 (unsigned int)mq->queue_length,
+                 (unsigned int)ntohs(ev->mh->type));
+    }
   ev->parent_queue = mq;
   /* is the implementation busy? queue it! */
-  if ( (NULL != mq->current_envelope) ||
-       (NULL != mq->send_task) )
-  {
-    GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
-                                      mq->envelope_tail,
-                                      ev);
-    return;
-  }
+  if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
+    {
+      GNUNET_CONTAINER_DLL_insert_tail(mq->envelope_head, mq->envelope_tail, ev);
+      return;
+    }
+  GNUNET_assert(NULL == mq->envelope_head);
   mq->current_envelope = ev;
-  mq->send_impl (mq,
-                ev->mh,
-                mq->impl_state);
+
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "sending message of type %u, queue empty (MQ: %p)\n",
+      ntohs(ev->mh->type),
+      mq);
+
+  mq->send_impl(mq, ev->mh, mq->impl_state);
+}
+
+
+/**
+ * Remove the first envelope that has not yet been sent from the message
+ * queue and return it.
+ *
+ * @param mq queue to remove envelope from
+ * @return NULL if queue is empty (or has no envelope that is not under transmission)
+ */
+struct GNUNET_MQ_Envelope *
+GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
+{
+  struct GNUNET_MQ_Envelope *env;
+
+  env = mq->envelope_head;
+  GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, env);
+  mq->queue_length--;
+  env->parent_queue = NULL;
+  return env;
+}
+
+
+/**
+ * Function to copy an envelope.  The envelope must not yet
+ * be in any queue or have any options or callbacks set.
+ *
+ * @param env envelope to copy
+ * @return copy of @a env
+ */
+struct GNUNET_MQ_Envelope *
+GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_assert(NULL == env->next);
+  GNUNET_assert(NULL == env->parent_queue);
+  GNUNET_assert(NULL == env->sent_cb);
+  GNUNET_assert(GNUNET_NO == env->have_custom_options);
+  return GNUNET_MQ_msg_copy(env->mh);
 }
 
 
@@ -383,23 +427,19 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
  * @param ev the envelope with the message to send.
  */
 void
-GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
-                     const struct GNUNET_MQ_Envelope *ev)
+GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq,
+                    const struct GNUNET_MQ_Envelope *ev)
 {
   struct GNUNET_MQ_Envelope *env;
   uint16_t msize;
 
-  msize = ntohs (ev->mh->size);
-  env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
-                       msize);
-  env->mh = (struct GNUNET_MessageHeader *) &env[1];
+  msize = ntohs(ev->mh->size);
+  env = GNUNET_malloc(sizeof(struct GNUNET_MQ_Envelope) + msize);
+  env->mh = (struct GNUNET_MessageHeader *)&env[1];
   env->sent_cb = ev->sent_cb;
   env->sent_cls = ev->sent_cls;
-  GNUNET_memcpy (&env[1],
-          ev->mh,
-          msize);
-  GNUNET_MQ_send (mq,
-                  env);
+  GNUNET_memcpy(&env[1], ev->mh, msize);
+  GNUNET_MQ_send(mq, env);
 }
 
 
@@ -411,22 +451,25 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
  * @param cls message queue to send the next message with
  */
 static void
-impl_send_continue (void *cls)
+impl_send_continue(void *cls)
 {
   struct GNUNET_MQ_Handle *mq = cls;
-  
+
   mq->send_task = NULL;
   /* call is only valid if we're actually currently sending
    * a message */
   if (NULL == mq->envelope_head)
     return;
   mq->current_envelope = mq->envelope_head;
-  GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
-                              mq->envelope_tail,
-                              mq->current_envelope);
-  mq->send_impl (mq,
-                mq->current_envelope->mh,
-                mq->impl_state);
+  GNUNET_CONTAINER_DLL_remove(mq->envelope_head,
+                              mq->envelope_tail,
+                              mq->current_envelope);
+
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "sending message of type %u from queue\n",
+      ntohs(mq->current_envelope->mh->type));
+
+  mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state);
 }
 
 
@@ -438,25 +481,25 @@ impl_send_continue (void *cls)
  * @param mq message queue to send the next message with
  */
 void
-GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
 {
   struct GNUNET_MQ_Envelope *current_envelope;
-  GNUNET_MQ_NotifyCallback cb;
-  
-  GNUNET_assert (0 < mq->queue_length);
+  GNUNET_SCHEDULER_TaskCallback cb;
+
+  GNUNET_assert(0 < mq->queue_length);
   mq->queue_length--;
+  mq->in_flight = GNUNET_NO;
   current_envelope = mq->current_envelope;
   current_envelope->parent_queue = NULL;
   mq->current_envelope = NULL;
-  GNUNET_assert (NULL == mq->send_task);
-  mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
-                                           mq);
+  GNUNET_assert(NULL == mq->send_task);
+  mq->send_task = GNUNET_SCHEDULER_add_now(&impl_send_continue, mq);
   if (NULL != (cb = current_envelope->sent_cb))
-  {
-    current_envelope->sent_cb = NULL;
-    cb (current_envelope->sent_cls);
-  }  
-  GNUNET_free (current_envelope);
+    {
+      current_envelope->sent_cb = NULL;
+      cb(current_envelope->sent_cls);
+    }
+  GNUNET_free(current_envelope);
 }
 
 
@@ -471,23 +514,23 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
  * @param mq message queue to send the next message with
  */
 void
-GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
 {
   struct GNUNET_MQ_Envelope *current_envelope;
-  GNUNET_MQ_NotifyCallback cb;
-  
+  GNUNET_SCHEDULER_TaskCallback cb;
+
   mq->in_flight = GNUNET_YES;
   /* call is only valid if we're actually currently sending
    * a message */
   current_envelope = mq->current_envelope;
-  GNUNET_assert (NULL != current_envelope);
+  GNUNET_assert(NULL != current_envelope);
   /* can't call cancel from now on anymore */
   current_envelope->parent_queue = NULL;
   if (NULL != (cb = current_envelope->sent_cb))
-  {
-    current_envelope->sent_cb = NULL;
-    cb (current_envelope->sent_cls);
-  }
+    {
+      current_envelope->sent_cb = NULL;
+      cb(current_envelope->sent_cls);
+    }
 }
 
 
@@ -504,30 +547,21 @@ GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
  * @return a new message queue
  */
 struct GNUNET_MQ_Handle *
-GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
-                               GNUNET_MQ_DestroyImpl destroy,
-                               GNUNET_MQ_CancelImpl cancel,
-                               void *impl_state,
-                               const struct GNUNET_MQ_MessageHandler *handlers,
-                               GNUNET_MQ_ErrorHandler error_handler,
-                               void *error_handler_cls)
+GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send,
+                              GNUNET_MQ_DestroyImpl destroy,
+                              GNUNET_MQ_CancelImpl cancel,
+                              void *impl_state,
+                              const struct GNUNET_MQ_MessageHandler *handlers,
+                              GNUNET_MQ_ErrorHandler error_handler,
+                              void *error_handler_cls)
 {
   struct GNUNET_MQ_Handle *mq;
-  unsigned int i;
 
-  mq = GNUNET_new (struct GNUNET_MQ_Handle);
+  mq = GNUNET_new(struct GNUNET_MQ_Handle);
   mq->send_impl = send;
   mq->destroy_impl = destroy;
   mq->cancel_impl = cancel;
-  if (NULL != handlers)
-  {
-    for (i=0;NULL != handlers[i].cb; i++) ;
-    mq->handlers = GNUNET_new_array (i + 1,
-                                    struct GNUNET_MQ_MessageHandler);
-    GNUNET_memcpy (mq->handlers,
-           handlers,
-           i * sizeof (struct GNUNET_MQ_MessageHandler));
-  }
+  mq->handlers = GNUNET_MQ_copy_handlers(handlers);
   mq->error_handler = error_handler;
   mq->error_handler_cls = error_handler_cls;
   mq->impl_state = impl_state;
@@ -544,14 +578,11 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
  * @param handlers_cls new closure to use
  */
 void
-GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
-                                void *handlers_cls)
+GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
 {
-  unsigned int i;
-
   if (NULL == mq->handlers)
     return;
-  for (i=0;NULL != mq->handlers[i].cb; i++)
+  for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
     mq->handlers[i].cls = handlers_cls;
 }
 
@@ -566,10 +597,10 @@ GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
  * @return message to send, never NULL
  */
 const struct GNUNET_MessageHeader *
-GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
 {
-  GNUNET_assert (NULL != mq->current_envelope);
-  GNUNET_assert (NULL != mq->current_envelope->mh);
+  GNUNET_assert(NULL != mq->current_envelope);
+  GNUNET_assert(NULL != mq->current_envelope->mh);
   return mq->current_envelope->mh;
 }
 
@@ -589,23 +620,21 @@ GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
  * @return message to send, never NULL
  */
 void *
-GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
 {
   return mq->impl_state;
 }
 
 
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
-                uint16_t size,
-                uint16_t type)
+GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
 {
   struct GNUNET_MQ_Envelope *ev;
 
-  ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
-  ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
-  ev->mh->size = htons (size);
-  ev->mh->type = htons (type);
+  ev = GNUNET_malloc(size + sizeof(struct GNUNET_MQ_Envelope));
+  ev->mh = (struct GNUNET_MessageHeader *)&ev[1];
+  ev->mh->size = htons(size);
+  ev->mh->type = htons(type);
   if (NULL != mhp)
     *mhp = ev->mh;
   return ev;
@@ -619,16 +648,14 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
  * @return envelope containing @a hdr
  */
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
+GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
 {
   struct GNUNET_MQ_Envelope *mqm;
-  uint16_t size = ntohs (hdr->size);
+  uint16_t size = ntohs(hdr->size);
 
-  mqm = GNUNET_malloc (sizeof (*mqm) + size);
-  mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
-  GNUNET_memcpy (mqm->mh,
-          hdr,
-          size);
+  mqm = GNUNET_malloc(sizeof(*mqm) + size);
+  mqm->mh = (struct GNUNET_MessageHeader *)&mqm[1];
+  GNUNET_memcpy(mqm->mh, hdr, size);
   return mqm;
 }
 
@@ -643,113 +670,32 @@ GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
  * @param nested_mh the message to append to the message after base_size
  */
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
-                          uint16_t base_size,
-                          uint16_t type,
-                          const struct GNUNET_MessageHeader *nested_mh)
+GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp,
+                         uint16_t base_size,
+                         uint16_t type,
+                         const struct GNUNET_MessageHeader *nested_mh)
 {
   struct GNUNET_MQ_Envelope *mqm;
   uint16_t size;
 
   if (NULL == nested_mh)
-    return GNUNET_MQ_msg_ (mhp, base_size, type);
+    return GNUNET_MQ_msg_(mhp, base_size, type);
 
-  size = base_size + ntohs (nested_mh->size);
+  size = base_size + ntohs(nested_mh->size);
 
   /* check for uint16_t overflow */
   if (size < base_size)
     return NULL;
 
-  mqm = GNUNET_MQ_msg_ (mhp, size, type);
-  GNUNET_memcpy ((char *) mqm->mh + base_size,
-                nested_mh,
-                ntohs (nested_mh->size));
+  mqm = GNUNET_MQ_msg_(mhp, size, type);
+  GNUNET_memcpy((char *)mqm->mh + base_size,
+                nested_mh,
+                ntohs(nested_mh->size));
 
   return mqm;
 }
 
 
-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_queued (void *cls,
-                 size_t size,
-                 void *buf)
-{
-  struct GNUNET_MQ_Handle *mq = cls;
-  struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
-  const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
-  size_t msg_size;
-
-  GNUNET_assert (NULL != buf);
-  msg_size = ntohs (msg->size);
-  GNUNET_assert (size >= msg_size);
-  GNUNET_memcpy (buf, msg, msg_size);
-  state->th = NULL;
-
-  GNUNET_MQ_impl_send_continue (mq);
-
-  return msg_size;
-}
-
-
-static void
-server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
-                            void *impl_state)
-{
-  struct ServerClientSocketState *state = impl_state;
-
-  if (NULL != state->th)
-  {
-    GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
-    state->th = NULL;
-  }
-
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != state);
-  GNUNET_SERVER_client_drop (state->client);
-  GNUNET_free (state);
-}
-
-
-static void
-server_client_send_impl (struct GNUNET_MQ_Handle *mq,
-                         const struct GNUNET_MessageHeader *msg,
-                         void *impl_state)
-{
-  struct ServerClientSocketState *state = impl_state;
-
-  GNUNET_assert (NULL != mq);
-  state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
-                                                  ntohs (msg->size),
-                                                  GNUNET_TIME_UNIT_FOREVER_REL,
-                                                  &transmit_queued, mq);
-}
-
-
-struct GNUNET_MQ_Handle *
-GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
-{
-  struct GNUNET_MQ_Handle *mq;
-  struct ServerClientSocketState *scss;
-
-  mq = GNUNET_new (struct GNUNET_MQ_Handle);
-  scss = GNUNET_new (struct ServerClientSocketState);
-  mq->impl_state = scss;
-  scss->client = client;
-  GNUNET_SERVER_client_keep (client);
-  mq->send_impl = &server_client_send_impl;
-  mq->destroy_impl = &server_client_destroy_impl;
-  return mq;
-}
-
-
 /**
  * Associate the assoc_data in mq with a unique request id.
  *
@@ -757,45 +703,58 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
  * @param assoc_data to associate
  */
 uint32_t
-GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
-                     void *assoc_data)
+GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
 {
   uint32_t id;
 
   if (NULL == mq->assoc_map)
-  {
-    mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
-    mq->assoc_id = 1;
-  }
+    {
+      mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create(8);
+      mq->assoc_id = 1;
+    }
   id = mq->assoc_id++;
-  GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  GNUNET_assert(GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap32_put(
+                  mq->assoc_map,
+                  id,
+                  assoc_data,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   return id;
 }
 
 
+/**
+ * Get the data associated with a @a request_id in a queue
+ *
+ * @param mq the message queue with the association
+ * @param request_id the request id we are interested in
+ * @return the associated data
+ */
 void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
-                     uint32_t request_id)
+GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
   if (NULL == mq->assoc_map)
     return NULL;
-  return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
+  return GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id);
 }
 
 
+/**
+ * Remove the association for a @a request_id
+ *
+ * @param mq the message queue with the association
+ * @param request_id the request id we want to remove
+ * @return the associated data
+ */
 void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
-                        uint32_t request_id)
+GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
   void *val;
 
   if (NULL == mq->assoc_map)
     return NULL;
-  val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
-                                            request_id);
-  GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
-                                             request_id);
+  val = GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id);
+  GNUNET_CONTAINER_multihashmap32_remove_all(mq->assoc_map, request_id);
   return val;
 }
 
@@ -810,12 +769,14 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
  * @param cb_cls closure for the callback
  */
 void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
-                       GNUNET_MQ_NotifyCallback cb,
-                       void *cb_cls)
+GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev,
+                      GNUNET_SCHEDULER_TaskCallback cb,
+                      void *cb_cls)
 {
-  mqm->sent_cb = cb;
-  mqm->sent_cls = cb_cls;
+  /* allow setting *OR* clearing callback */
+  GNUNET_assert((NULL == ev->sent_cb) || (NULL == cb));
+  ev->sent_cb = cb;
+  ev->sent_cls = cb_cls;
 }
 
 
@@ -823,8 +784,7 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
  * Handle we return for callbacks registered to be
  * notified when #GNUNET_MQ_destroy() is called on a queue.
  */
-struct GNUNET_MQ_DestroyNotificationHandle
-{
+struct GNUNET_MQ_DestroyNotificationHandle {
   /**
    * Kept in a DLL.
    */
@@ -858,82 +818,86 @@ struct GNUNET_MQ_DestroyNotificationHandle
  * @param mq message queue to destroy
  */
 void
-GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
 {
   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
 
   if (NULL != mq->destroy_impl)
-  {
-    mq->destroy_impl (mq, mq->impl_state);
-  }
+    {
+      mq->destroy_impl(mq, mq->impl_state);
+    }
   if (NULL != mq->send_task)
-  {
-    GNUNET_SCHEDULER_cancel (mq->send_task);
-    mq->send_task = NULL;
-  }
+    {
+      GNUNET_SCHEDULER_cancel(mq->send_task);
+      mq->send_task = NULL;
+    }
   while (NULL != mq->envelope_head)
-  {
-    struct GNUNET_MQ_Envelope *ev;
-
-    ev = mq->envelope_head;
-    ev->parent_queue = NULL;
-    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
-                                mq->envelope_tail,
-                                ev);
-    GNUNET_assert (0 < mq->queue_length);
-    mq->queue_length--;
-    GNUNET_MQ_discard (ev);
-  }
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      ev = mq->envelope_head;
+      ev->parent_queue = NULL;
+      GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, ev);
+      GNUNET_assert(0 < mq->queue_length);
+      mq->queue_length--;
+      LOG(GNUNET_ERROR_TYPE_DEBUG,
+          "MQ destroy drops message of type %u\n",
+          ntohs(ev->mh->type));
+      GNUNET_MQ_discard(ev);
+    }
   if (NULL != mq->current_envelope)
-  {
-    /* we can only discard envelopes that
-     * are not queued! */
-    mq->current_envelope->parent_queue = NULL;
-    GNUNET_MQ_discard (mq->current_envelope);
-    mq->current_envelope = NULL;
-    GNUNET_assert (0 < mq->queue_length);
-    mq->queue_length--;
-  }
-  GNUNET_assert (0 == mq->queue_length);
+    {
+      /* we can only discard envelopes that
+       * are not queued! */
+      mq->current_envelope->parent_queue = NULL;
+      LOG(GNUNET_ERROR_TYPE_DEBUG,
+          "MQ destroy drops current message of type %u\n",
+          ntohs(mq->current_envelope->mh->type));
+      GNUNET_MQ_discard(mq->current_envelope);
+      mq->current_envelope = NULL;
+      GNUNET_assert(0 < mq->queue_length);
+      mq->queue_length--;
+    }
+  GNUNET_assert(0 == mq->queue_length);
   while (NULL != (dnh = mq->dnh_head))
-  {
-    dnh->cb (dnh->cb_cls);
-    GNUNET_MQ_destroy_notify_cancel (dnh);
-  }
+    {
+      dnh->cb(dnh->cb_cls);
+      GNUNET_MQ_destroy_notify_cancel(dnh);
+    }
   if (NULL != mq->assoc_map)
-  {
-    GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
-    mq->assoc_map = NULL;
-  }
-  GNUNET_free_non_null (mq->handlers);
-  GNUNET_free (mq);
+    {
+      GNUNET_CONTAINER_multihashmap32_destroy(mq->assoc_map);
+      mq->assoc_map = NULL;
+    }
+  GNUNET_free_non_null(mq->handlers);
+  GNUNET_free(mq);
 }
 
 
 const struct GNUNET_MessageHeader *
-GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
-                              uint16_t base_size)
+GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh,
+                             uint16_t base_size)
 {
   uint16_t whole_size;
   uint16_t nested_size;
   const struct GNUNET_MessageHeader *nested_msg;
 
-  whole_size = ntohs (mh->size);
-  GNUNET_assert (whole_size >= base_size);
+  whole_size = ntohs(mh->size);
+  GNUNET_assert(whole_size >= base_size);
   nested_size = whole_size - base_size;
   if (0 == nested_size)
     return NULL;
-  if (nested_size < sizeof (struct GNUNET_MessageHeader))
-  {
-    GNUNET_break_op (0);
-    return NULL;
-  }
-  nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
-  if (ntohs (nested_msg->size) != nested_size)
-  {
-    GNUNET_break_op (0);
-    return NULL;
-  }
+  if (nested_size < sizeof(struct GNUNET_MessageHeader))
+    {
+      GNUNET_break_op(0);
+      return NULL;
+    }
+  nested_msg = (const struct GNUNET_MessageHeader *)((char *)mh + base_size);
+  if (ntohs(nested_msg->size) != nested_size)
+    {
+      GNUNET_break_op(0);
+      return NULL;
+    }
   return nested_msg;
 }
 
@@ -946,56 +910,53 @@ GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
  * @param ev queued envelope to cancel
  */
 void
-GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
+GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
 {
   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
 
-  GNUNET_assert (NULL != mq);
-  GNUNET_assert (NULL != mq->cancel_impl);
-  
+  GNUNET_assert(NULL != mq);
+  GNUNET_assert(NULL != mq->cancel_impl);
+
   mq->evacuate_called = GNUNET_NO;
 
   if (mq->current_envelope == ev)
-  {
-    // complex case, we already started with transmitting
-    // the message
-    GNUNET_assert (0 < mq->queue_length);
-    mq->queue_length--;
-    mq->cancel_impl (mq,
-                    mq->impl_state);
-    // continue sending the next message, if any
-    if (NULL == mq->envelope_head)
-    {
-      mq->current_envelope = NULL;
-    }
-    else
     {
+      /* complex case, we already started with transmitting
+         the message using the callbacks. */
+      GNUNET_assert(GNUNET_NO == mq->in_flight);
+      GNUNET_assert(0 < mq->queue_length);
+      mq->queue_length--;
+      mq->cancel_impl(mq, mq->impl_state);
+      /* continue sending the next message, if any */
       mq->current_envelope = mq->envelope_head;
-      GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
-                                   mq->envelope_tail,
-                                   mq->current_envelope);
-      mq->send_impl (mq,
-                    mq->current_envelope->mh,
-                    mq->impl_state);
+      if (NULL != mq->current_envelope)
+        {
+          GNUNET_CONTAINER_DLL_remove(mq->envelope_head,
+                                      mq->envelope_tail,
+                                      mq->current_envelope);
+
+          LOG(GNUNET_ERROR_TYPE_DEBUG,
+              "sending canceled message of type %u queue\n",
+              ntohs(ev->mh->type));
+
+          mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state);
+        }
     }
-  }
   else
-  {
-    // simple case, message is still waiting in the queue
-    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
-                                mq->envelope_tail,
-                                ev);
-    GNUNET_assert (0 < mq->queue_length);
-    mq->queue_length--;
-  }
+    {
+      /* simple case, message is still waiting in the queue */
+      GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, ev);
+      GNUNET_assert(0 < mq->queue_length);
+      mq->queue_length--;
+    }
 
   if (GNUNET_YES != mq->evacuate_called)
-  {
-    ev->parent_queue = NULL;
-    ev->mh = NULL;
-    /* also frees ev */
-    GNUNET_free (ev);
-  }
+    {
+      ev->parent_queue = NULL;
+      ev->mh = NULL;
+      /* also frees ev */
+      GNUNET_free(ev);
+    }
 }
 
 
@@ -1007,7 +968,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
  * @return the current envelope
  */
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
 {
   return mq->current_envelope;
 }
@@ -1020,7 +981,7 @@ GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
  * @return the last envelope in the queue
  */
 struct GNUNET_MQ_Envelope *
-GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
+GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
 {
   if (NULL != mq->envelope_tail)
     return mq->envelope_tail;
@@ -1030,21 +991,18 @@ GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
 
 
 /**
- * Set application-specific options for this envelope.
+ * Set application-specific preferences for this envelope.
  * Overrides the options set for the queue with
  * #GNUNET_MQ_set_options() for this message only.
  *
  * @param env message to set options for
- * @param flags flags to use (meaning is queue-specific)
- * @param extra additional buffer for further data (also queue-specific)
+ * @param pp priorities and preferences to apply
  */
 void
-GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
-                          uint64_t flags,
-                          const void *extra)
+GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env,
+                          enum GNUNET_MQ_PriorityPreferences pp)
 {
-  env->flags = flags;
-  env->extra = extra;
+  env->priority = pp;
   env->have_custom_options = GNUNET_YES;
 }
 
@@ -1053,44 +1011,85 @@ GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
  * Get application-specific options for this envelope.
  *
  * @param env message to set options for
- * @param[out] flags set to flags to use (meaning is queue-specific)
- * @return extra additional buffer for further data (also queue-specific)
+ * @return priorities and preferences to apply for @a env
  */
-const void *
-GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
-                          uint64_t *flags)
+enum GNUNET_MQ_PriorityPreferences
+GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
 {
   struct GNUNET_MQ_Handle *mq = env->parent_queue;
 
   if (GNUNET_YES == env->have_custom_options)
-  {
-    *flags = env->flags;
-    return env->extra;
-  }
+    return env->priority;
   if (NULL == mq)
-  {
-    *flags = 0;
-    return NULL;
-  }
-  *flags = mq->default_flags;
-  return mq->default_extra;
+    return 0;
+  return mq->priority;
 }
 
 
 /**
- * Set application-specific options for this queue.
+ * Combine performance preferences set for different
+ * envelopes that are being combined into one larger envelope.
+ *
+ * @param p1 one set of preferences
+ * @param p2 second set of preferences
+ * @return combined priority and preferences to use
+ */
+enum GNUNET_MQ_PriorityPreferences
+GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1,
+                              enum GNUNET_MQ_PriorityPreferences p2)
+{
+  enum GNUNET_MQ_PriorityPreferences ret;
+
+  ret = GNUNET_MAX(p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
+  ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
+  ret |=
+    ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
+  ret |=
+    ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
+  ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
+  ret |=
+    ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
+  return ret;
+}
+
+
+/**
+ * Set application-specific default options for this queue.
  *
  * @param mq message queue to set options for
- * @param flags flags to use (meaning is queue-specific)
- * @param extra additional buffer for further data (also queue-specific)
+ * @param pp priorities and preferences to apply
  */
 void
-GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
-                      uint64_t flags,
-                      const void *extra)
+GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq,
+                      enum GNUNET_MQ_PriorityPreferences pp)
+{
+  mq->priority = pp;
+}
+
+
+/**
+ * Obtain message contained in envelope.
+ *
+ * @param env the envelope
+ * @return message contained in the envelope
+ */
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
 {
-  mq->default_flags = flags;
-  mq->default_extra = extra;
+  return env->mh;
+}
+
+
+/**
+ * Return next envelope in queue.
+ *
+ * @param env a queued envelope
+ * @return next one, or NULL
+ */
+const struct GNUNET_MQ_Envelope *
+GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
+{
+  return env->next;
 }
 
 
@@ -1104,19 +1103,17 @@ GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
  */
 struct GNUNET_MQ_DestroyNotificationHandle *
-GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
-                         GNUNET_SCHEDULER_TaskCallback cb,
-                         void *cb_cls)
+GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq,
+                         GNUNET_SCHEDULER_TaskCallback cb,
+                         void *cb_cls)
 {
   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
 
-  dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
+  dnh = GNUNET_new(struct GNUNET_MQ_DestroyNotificationHandle);
   dnh->mq = mq;
   dnh->cb = cb;
   dnh->cb_cls = cb_cls;
-  GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
-                              mq->dnh_tail,
-                              dnh);
+  GNUNET_CONTAINER_DLL_insert(mq->dnh_head, mq->dnh_tail, dnh);
   return dnh;
 }
 
@@ -1127,14 +1124,189 @@ GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
  * @param dnh handle for registration to cancel
  */
 void
-GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
+GNUNET_MQ_destroy_notify_cancel(
+  struct GNUNET_MQ_DestroyNotificationHandle *dnh)
 {
   struct GNUNET_MQ_Handle *mq = dnh->mq;
 
-  GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
-                              mq->dnh_tail,
-                              dnh);
-  GNUNET_free (dnh);
+  GNUNET_CONTAINER_DLL_remove(mq->dnh_head, mq->dnh_tail, dnh);
+  GNUNET_free(dnh);
+}
+
+
+/**
+ * Insert @a env into the envelope DLL starting at @a env_head
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module.  This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env.  Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to insert at the tail
+ */
+void
+GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head,
+                          struct GNUNET_MQ_Envelope **env_tail,
+                          struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_CONTAINER_DLL_insert(*env_head, *env_tail, env);
+}
+
+
+/**
+ * Insert @a env into the envelope DLL starting at @a env_head
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module.  This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env.  Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to insert at the tail
+ */
+void
+GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head,
+                          struct GNUNET_MQ_Envelope **env_tail,
+                          struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_CONTAINER_DLL_insert_tail(*env_head, *env_tail, env);
+}
+
+
+/**
+ * Remove @a env from the envelope DLL starting at @a env_head.
+ * Note that @a env must not be in any MQ while this function
+ * is used with DLLs defined outside of the MQ module. This
+ * is just in case some application needs to also manage a
+ * FIFO of envelopes independent of MQ itself and wants to
+ * re-use the pointers internal to @a env.  Use with caution.
+ *
+ * @param[in|out] env_head of envelope DLL
+ * @param[in|out] env_tail tail of envelope DLL
+ * @param[in|out] env element to remove from the DLL
+ */
+void
+GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head,
+                     struct GNUNET_MQ_Envelope **env_tail,
+                     struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_CONTAINER_DLL_remove(*env_head, *env_tail, env);
+}
+
+
+/**
+ * Copy an array of handlers.
+ *
+ * Useful if the array has been delared in local memory and needs to be
+ * persisted for future use.
+ *
+ * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
+ * @return A newly allocated array of handlers.
+ *         Needs to be freed with #GNUNET_free.
+ */
+struct GNUNET_MQ_MessageHandler *
+GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  struct GNUNET_MQ_MessageHandler *copy;
+  unsigned int count;
+
+  if (NULL == handlers)
+    return NULL;
+
+  count = GNUNET_MQ_count_handlers(handlers);
+  copy = GNUNET_new_array(count + 1, struct GNUNET_MQ_MessageHandler);
+  GNUNET_memcpy(copy,
+                handlers,
+                count * sizeof(struct GNUNET_MQ_MessageHandler));
+  return copy;
+}
+
+
+/**
+ * Copy an array of handlers, appending AGPL handler.
+ *
+ * Useful if the array has been delared in local memory and needs to be
+ * persisted for future use.
+ *
+ * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
+ * @param agpl_handler function to call for AGPL handling
+ * @param agpl_cls closure for @a agpl_handler
+ * @return A newly allocated array of handlers.
+ *         Needs to be freed with #GNUNET_free.
+ */
+struct GNUNET_MQ_MessageHandler *
+GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers,
+                         GNUNET_MQ_MessageCallback agpl_handler,
+                         void *agpl_cls)
+{
+  struct GNUNET_MQ_MessageHandler *copy;
+  unsigned int count;
+
+  if (NULL == handlers)
+    return NULL;
+  count = GNUNET_MQ_count_handlers(handlers);
+  copy = GNUNET_new_array(count + 2, struct GNUNET_MQ_MessageHandler);
+  GNUNET_memcpy(copy,
+                handlers,
+                count * sizeof(struct GNUNET_MQ_MessageHandler));
+  copy[count].mv = NULL;
+  copy[count].cb = agpl_handler;
+  copy[count].cls = agpl_cls;
+  copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
+  copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
+  return copy;
+}
+
+
+/**
+ * Count the handlers in a handler array.
+ *
+ * @param handlers Array of handlers to be counted.
+ * @return The number of handlers in the array.
+ */
+unsigned int
+GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  unsigned int i;
+
+  if (NULL == handlers)
+    return 0;
+
+  for (i = 0; NULL != handlers[i].cb; i++)
+    ;
+
+  return i;
+}
+
+
+/**
+ * Convert an `enum GNUNET_MQ_PreferenceType` to a string
+ *
+ * @param type the preference type
+ * @return a string or NULL if invalid
+ */
+const char *
+GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
+{
+  switch (type)
+    {
+    case GNUNET_MQ_PREFERENCE_NONE:
+      return "NONE";
+
+    case GNUNET_MQ_PREFERENCE_BANDWIDTH:
+      return "BANDWIDTH";
+
+    case GNUNET_MQ_PREFERENCE_LATENCY:
+      return "LATENCY";
+
+    case GNUNET_MQ_PREFERENCE_RELIABILITY:
+      return "RELIABILITY";
+    }
+  ;
+  return NULL;
 }