migrate secretsharing to new service API
[oweals/gnunet.git] / src / statistics / statistics_api.c
index f1a22c22a405d4e9a783673d3a316f7b0606f1dd..ad4453b2a62a0e8515317e0d972bdd1c740aabab 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
 */
 
 /**
  * @author Christian Grothoff
  */
 #include "platform.h"
  * @author Christian Grothoff
  */
 #include "platform.h"
-#include "gnunet_client_lib.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
 #include "gnunet_constants.h"
-#include "gnunet_container_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_protocols.h"
-#include "gnunet_server_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_statistics_service.h"
-#include "gnunet_strings_lib.h"
 #include "statistics.h"
 
 /**
 #include "statistics.h"
 
 /**
@@ -91,7 +88,7 @@ struct GNUNET_STATISTICS_WatchEntry
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc
+   * Closure for @e proc
    */
   void *proc_cls;
 
    */
   void *proc_cls;
 
@@ -140,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc and cont.
+   * Closure for @e proc and @e cont.
    */
   void *cls;
 
    */
   void *cls;
 
@@ -165,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle
   int aborted;
 
   /**
   int aborted;
 
   /**
-   * Is this a GET, SET, UPDATE or WATCH?
+   * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
    */
   enum ActionType type;
 
    */
   enum ActionType type;
 
@@ -193,14 +190,9 @@ struct GNUNET_STATISTICS_Handle
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Message queue to the service.
    */
    */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * Currently pending transmission request.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Head of the linked list of pending actions (first action
 
   /**
    * Head of the linked list of pending actions (first action
@@ -228,7 +220,12 @@ struct GNUNET_STATISTICS_Handle
   /**
    * Task doing exponential back-off trying to reconnect.
    */
   /**
    * Task doing exponential back-off trying to reconnect.
    */
-  GNUNET_SCHEDULER_TaskIdentifier backoff_task;
+  struct GNUNET_SCHEDULER_Task *backoff_task;
+
+  /**
+   * Task for running #do_destroy().
+   */
+  struct GNUNET_SCHEDULER_Task *destroy_task;
 
   /**
    * Time for next connect retry.
 
   /**
    * Time for next connect retry.
@@ -236,7 +233,17 @@ struct GNUNET_STATISTICS_Handle
   struct GNUNET_TIME_Relative backoff;
 
   /**
   struct GNUNET_TIME_Relative backoff;
 
   /**
-   * Size of the 'watches' array.
+   * Maximum heap size observed so far (if available).
+   */
+  uint64_t peak_heap_size;
+
+  /**
+   * Maximum resident set side observed so far (if available).
+   */
+  uint64_t peak_rss;
+
+  /**
+   * Size of the @e watches array.
    */
   unsigned int watches_size;
 
    */
   unsigned int watches_size;
 
@@ -254,13 +261,73 @@ struct GNUNET_STATISTICS_Handle
 };
 
 
 };
 
 
+/**
+ * Obtain statistics about this process's memory consumption and
+ * report those as well (if they changed).
+ */
+static void
+update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
+{
+#if ENABLE_HEAP_STATISTICS
+  uint64_t current_heap_size = 0;
+  uint64_t current_rss = 0;
+
+  if (GNUNET_NO != h->do_destroy)
+    return;
+#if HAVE_MALLINFO
+  {
+    struct mallinfo mi;
+
+    mi = mallinfo();
+    current_heap_size = mi.uordblks + mi.fordblks;
+  }
+#endif
+#if HAVE_GETRUSAGE
+  {
+    struct rusage ru;
+
+    if (0 == getrusage (RUSAGE_SELF, &ru))
+    {
+      current_rss = 1024LL * ru.ru_maxrss;
+    }
+  }
+#endif
+  if (current_heap_size > h->peak_heap_size)
+  {
+    h->peak_heap_size = current_heap_size;
+    GNUNET_STATISTICS_set (h,
+                          "# peak heap size",
+                          current_heap_size,
+                          GNUNET_NO);
+  }
+  if (current_rss > h->peak_rss)
+  {
+    h->peak_rss = current_rss;
+    GNUNET_STATISTICS_set (h,
+                          "# peak resident set size",
+                          current_rss,
+                          GNUNET_NO);
+  }
+#endif
+}
+
+
+/**
+ * Reconnect at a later time, respecting back-off.
+ *
+ * @param h statistics handle
+ */
+static void
+reconnect_later (struct GNUNET_STATISTICS_Handle *h);
+
+
 /**
  * Schedule the next action to be performed.
  *
 /**
  * Schedule the next action to be performed.
  *
- * @param h statistics handle to reconnect
+ * @param cls statistics handle to reconnect
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h);
+schedule_action (void *cls);
 
 
 /**
 
 
 /**
@@ -274,15 +341,11 @@ static void
 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
                         struct GNUNET_STATISTICS_WatchEntry *watch)
 {
 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
                         struct GNUNET_STATISTICS_WatchEntry *watch)
 {
-
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 
-  GNUNET_assert (h != NULL);
-  GNUNET_assert (watch != NULL);
-
   slen = strlen (watch->subsystem) + 1;
   nlen = strlen (watch->name) + 1;
   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
   slen = strlen (watch->subsystem) + 1;
   nlen = strlen (watch->name) + 1;
   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
@@ -291,7 +354,7 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
     GNUNET_break (0);
     return;
   }
     GNUNET_break (0);
     return;
   }
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (watch->subsystem);
   ai->name = GNUNET_strdup (watch->name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (watch->subsystem);
   ai->name = GNUNET_strdup (watch->name);
@@ -300,7 +363,8 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
   ai->type = ACTION_WATCH;
   ai->proc = watch->proc;
   ai->cls = watch->proc_cls;
   ai->type = ACTION_WATCH;
   ai->proc = watch->proc;
   ai->cls = watch->proc_cls;
-  GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
                                    ai);
   schedule_action (h);
 }
                                    ai);
   schedule_action (h);
 }
@@ -329,158 +393,94 @@ static void
 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
 {
   struct GNUNET_STATISTICS_GetHandle *c;
 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
 {
   struct GNUNET_STATISTICS_GetHandle *c;
-  
-  if (NULL != h->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
-  } 
-  if (NULL != h->client)
-  {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
-  }
+
   h->receiving = GNUNET_NO;
   if (NULL != (c = h->current))
   {
     h->current = NULL;
   h->receiving = GNUNET_NO;
   if (NULL != (c = h->current))
   {
     h->current = NULL;
-    if (c->cont != NULL)
-      c->cont (c->cls, GNUNET_SYSERR);
+    if ( (NULL != c->cont) &&
+        (GNUNET_YES != c->aborted) )
+    {
+      c->cont (c->cls,
+               GNUNET_SYSERR);
+      c->cont = NULL;
+    }
     free_action_item (c);
   }
     free_action_item (c);
   }
-}
-
-
-/**
- * Try to (re)connect to the statistics service.
- *
- * @param h statistics handle to reconnect
- * @return GNUNET_YES on success, GNUNET_NO on failure.
- */
-static int
-try_connect (struct GNUNET_STATISTICS_Handle *h)
-{
-  struct GNUNET_STATISTICS_GetHandle *gh;
-  struct GNUNET_STATISTICS_GetHandle *gn;
-  unsigned int i;
-
-  if (h->backoff_task != GNUNET_SCHEDULER_NO_TASK)
-    return GNUNET_NO;
-  if (h->client != NULL)
-    return GNUNET_YES;
-  h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
-  if (h->client != NULL)
+  if (NULL != h->mq)
   {
   {
-    gn = h->action_head; 
-    while (NULL != (gh = gn))
-    {
-      gn = gh->next;
-      if (gh->type == ACTION_WATCH)
-      {
-       GNUNET_CONTAINER_DLL_remove (h->action_head,
-                                    h->action_tail,
-                                    gh);
-       free_action_item (gh);  
-      }
-    }
-    for (i = 0; i < h->watches_size; i++)
-    {
-      if (NULL != h->watches[i])
-        schedule_watch_request (h, h->watches[i]);
-    }
-    return GNUNET_YES;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Failed to connect to statistics service!\n");
-  return GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * We've waited long enough, reconnect now.
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
  *
  *
- * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
- * @param tc scheduler context (unused)
+ * @param cls statistics handle
+ * @param smsg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
  */
  */
-static void
-reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+check_statistics_value (void *cls,
+                        const struct GNUNET_STATISTICS_ReplyMessage *smsg)
 {
 {
-  struct GNUNET_STATISTICS_Handle *h = cls;
-
-  h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
-  schedule_action (h);
-}
-
+  const char *service;
+  const char *name;
+  uint16_t size;
 
 
-/**
- * Reconnect at a later time, respecting back-off.
- *
- * @param h statistics handle
- */
-static void
-reconnect_later (struct GNUNET_STATISTICS_Handle *h)
-{
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
-  if (h->do_destroy)
+  size = ntohs (smsg->header.size);
+  size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
+  if (size !=
+      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                      size,
+                                      2,
+                                      &service,
+                                      &name))
   {
   {
-    /* So we are shutting down and the service is not reachable.
-     * Chances are that it's down for good and we are not going to connect to
-     * it anymore.
-     * Give up and don't sync the rest of the data.
-     */
     GNUNET_break (0);
     GNUNET_break (0);
-    h->do_destroy = GNUNET_NO;
-    GNUNET_STATISTICS_destroy (h, GNUNET_NO);
-    return;
+    return GNUNET_SYSERR;
   }
   }
-  h->backoff_task =
-    GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
-  h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
-  h->backoff =
-    GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
  *
  *
- * @param h statistics handle
+ * @param cls statistics handle
  * @param msg message received from the service, never NULL
  * @param msg message received from the service, never NULL
- * @return GNUNET_OK if the message was well-formed
+ * @return #GNUNET_OK if the message was well-formed
  */
  */
-static int
-process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
-                                 const struct GNUNET_MessageHeader *msg)
+static void
+handle_statistics_value (void *cls,
+                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
 {
 {
-  char *service;
-  char *name;
-  const struct GNUNET_STATISTICS_ReplyMessage *smsg;
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  const char *service;
+  const char *name;
   uint16_t size;
 
   if (h->current->aborted)
   uint16_t size;
 
   if (h->current->aborted)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
-    return GNUNET_OK;           /* don't bother */
-  }
-  size = ntohs (msg->size);
-  if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
+    return;           /* iteration aborted, don't bother */
+
+  size = ntohs (smsg->header.size);
   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
-  if (size !=
-      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
-                                      &service, &name))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
-       service, name, GNUNET_ntohll (smsg->value));
+  GNUNET_assert (size ==
+                 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                                 size,
+                                                 2,
+                                                 &service,
+                                                 &name));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received valid statistic on `%s:%s': %llu\n",
+       service, name,
+       GNUNET_ntohll (smsg->value));
   if (GNUNET_OK !=
   if (GNUNET_OK !=
-      h->current->proc (h->current->cls, service, name,
+      h->current->proc (h->current->cls,
+                        service,
+                        name,
                         GNUNET_ntohll (smsg->value),
                         0 !=
                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
                         GNUNET_ntohll (smsg->value),
                         0 !=
                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
@@ -489,185 +489,315 @@ process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
          "Processing of remaining statistics aborted by client.\n");
     h->current->aborted = GNUNET_YES;
   }
          "Processing of remaining statistics aborted by client.\n");
     h->current->aborted = GNUNET_YES;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
-  return GNUNET_OK;
 }
 
 
 /**
  * We have received a watch value from the service.  Process it.
  *
 }
 
 
 /**
  * We have received a watch value from the service.  Process it.
  *
- * @param h statistics handle
+ * @param cls statistics handle
  * @param msg the watch value message
  * @param msg the watch value message
- * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not,
- *         GNUNET_NO if this watch has been cancelled
  */
  */
-static int
-process_watch_value (struct GNUNET_STATISTICS_Handle *h,
-                     const struct GNUNET_MessageHeader *msg)
+static void
+handle_statistics_watch_value (void *cls,
+                               const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
 {
 {
-  const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
+  struct GNUNET_STATISTICS_Handle *h = cls;
   struct GNUNET_STATISTICS_WatchEntry *w;
   uint32_t wid;
 
   struct GNUNET_STATISTICS_WatchEntry *w;
   uint32_t wid;
 
-  if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
   GNUNET_break (0 == ntohl (wvm->reserved));
   wid = ntohl (wvm->wid);
   if (wid >= h->watches_size)
   {
   GNUNET_break (0 == ntohl (wvm->reserved));
   wid = ntohl (wvm->wid);
   if (wid >= h->watches_size)
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
   }
   w = h->watches[wid];
   }
   w = h->watches[wid];
-  if (NULL == w)  
-    return GNUNET_NO;  
-  (void) w->proc (w->proc_cls, w->subsystem, w->name,
+  if (NULL == w)
+    return;
+  (void) w->proc (w->proc_cls,
+                  w->subsystem,
+                  w->name,
                   GNUNET_ntohll (wvm->value),
                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
                   GNUNET_ntohll (wvm->value),
                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
-  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Function called with messages from stats service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
  *
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
+ * @param error error code
  */
 static void
  */
 static void
-receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
-  struct GNUNET_STATISTICS_GetHandle *c;
-  int ret;
 
 
-  if (msg == NULL)
+  if (GNUNET_NO != h->do_destroy)
   {
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-         "Error receiving statistics from service, is the service running?\n");
+    h->do_destroy = GNUNET_NO;
+    if (NULL != h->destroy_task)
+    {
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+      h->destroy_task = NULL;
+    }
+    GNUNET_STATISTICS_destroy (h,
+                               GNUNET_NO);
+    return;
+  }
+  do_disconnect (h);
+  reconnect_later (h);
+}
+
+
+/**
+ * Task used to destroy the statistics handle.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle`
+ */
+static void
+do_destroy (void *cls)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->destroy_task = NULL;
+  h->do_destroy = GNUNET_NO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Running final destruction\n");
+  GNUNET_STATISTICS_destroy (h,
+                             GNUNET_NO);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
+ * message. We receive this message at the end of the shutdown when
+ * the service confirms that all data has been written to disk.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_disconnect_confirm (void *cls,
+                          const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  if (GNUNET_SYSERR != h->do_destroy)
+  {
+    /* not in shutdown, why do we get 'TEST'? */
+    GNUNET_break (0);
     do_disconnect (h);
     reconnect_later (h);
     return;
   }
     do_disconnect (h);
     reconnect_later (h);
     return;
   }
-  switch (ntohs (msg->type))
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
+  if (NULL != h->destroy_task)
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+  h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                              h);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
+ * this message in response to a query to indicate that there are no
+ * further matching results.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_statistics_end (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  struct GNUNET_STATISTICS_GetHandle *c;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received end of statistics marker\n");
+  if (NULL == (c = h->current))
   {
   {
-  case GNUNET_MESSAGE_TYPE_STATISTICS_END:
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
-    if (NULL == (c = h->current))
-    {
-      GNUNET_break (0);
-      do_disconnect (h);
-      reconnect_later (h);
-      return;
-    }
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    if (h->watches_size > 0)
-    {
-      GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                             GNUNET_TIME_UNIT_FOREVER_REL);
-    }
-    else
-    {
-      h->receiving = GNUNET_NO;
-    }    
-    h->current = NULL;
-    schedule_action (h);
-    if (c->cont != NULL)
-      c->cont (c->cls, GNUNET_OK);
-    free_action_item (c);
+    GNUNET_break (0);
+    do_disconnect (h);
+    reconnect_later (h);
     return;
     return;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
-    if (GNUNET_OK != process_statistics_value_message (h, msg))
-    {
-      do_disconnect (h);
-      reconnect_later (h);
-      return;     
-    }
-    /* finally, look for more! */
+  }
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  h->current = NULL;
+  schedule_action (h);
+  if (NULL != c->cont)
+  {
+    c->cont (c->cls,
+             GNUNET_OK);
+    c->cont = NULL;
+  }
+  free_action_item (c);
+}
+
+
+/**
+ * Try to (re)connect to the statistics service.
+ *
+ * @param h statistics handle to reconnect
+ * @return #GNUNET_YES on success, #GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_STATISTICS_Handle *h)
+{
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (disconnect_confirm,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_fixed_size (statistics_end,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_END,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_var_size (statistics_value,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
+                           struct GNUNET_STATISTICS_ReplyMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (statistics_watch_value,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
+                             struct GNUNET_STATISTICS_WatchValueMessage,
+                             h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_STATISTICS_GetHandle *gh;
+  struct GNUNET_STATISTICS_GetHandle *gn;
+
+  if (NULL != h->backoff_task)
+    return GNUNET_NO;
+  if (NULL != h->mq)
+    return GNUNET_YES;
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                 "statistics",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+  {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Processing VALUE done, now reading more\n");
-    GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                          GNUNET_TIME_absolute_get_remaining (h->
-                                                              current->timeout));
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    return;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
-    if (GNUNET_OK != 
-       (ret = process_watch_value (h, msg)))
+         "Failed to connect to statistics service!\n");
+    return GNUNET_NO;
+  }
+  gn = h->action_head;
+  while (NULL != (gh = gn))
+  {
+    gn = gh->next;
+    if (gh->type == ACTION_WATCH)
     {
     {
-      do_disconnect (h);
-      if (GNUNET_NO == ret)
-       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 
-      reconnect_later (h);
-      return;
+      GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                   h->action_tail,
+                                   gh);
+      free_action_item (gh);
     }
     }
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    GNUNET_assert (h->watches_size > 0);
-    GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    return;    
-  default:
-    GNUNET_break (0);
-    do_disconnect (h);
-    reconnect_later (h);
+  }
+  for (unsigned int i = 0; i < h->watches_size; i++)
+    if (NULL != h->watches[i])
+      schedule_watch_request (h,
+                              h->watches[i]);
+  return GNUNET_YES;
+}
+
+
+/**
+ * We've waited long enough, reconnect now.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
+ */
+static void
+reconnect_task (void *cls)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->backoff_task = NULL;
+  schedule_action (h);
+}
+
+
+/**
+ * Reconnect at a later time, respecting back-off.
+ *
+ * @param h statistics handle
+ */
+static void
+reconnect_later (struct GNUNET_STATISTICS_Handle *h)
+{
+  int loss;
+  struct GNUNET_STATISTICS_GetHandle *gh;
+
+  GNUNET_assert (NULL == h->backoff_task);
+  if (GNUNET_YES == h->do_destroy)
+  {
+    /* So we are shutting down and the service is not reachable.
+     * Chances are that it's down for good and we are not going to connect to
+     * it anymore.
+     * Give up and don't sync the rest of the data.
+     */
+    loss = GNUNET_NO;
+    for (gh = h->action_head; NULL != gh; gh = gh->next)
+      if ( (gh->make_persistent) &&
+          (ACTION_SET == gh->type) )
+       loss = GNUNET_YES;
+    if (GNUNET_YES == loss)
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Could not save some persistent statistics\n"));
+    if (NULL != h->destroy_task)
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                                h);
     return;
   }
     return;
   }
+  h->backoff_task
+    = GNUNET_SCHEDULER_add_delayed (h->backoff,
+                                    &reconnect_task,
+                                    h);
+  h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
 }
 
 
 }
 
 
+
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
  *
  * @param handle statistics handle
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
  *
  * @param handle statistics handle
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
  */
  */
-static size_t
-transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_get (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_STATISTICS_GetHandle *c;
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_STATISTICS_GetHandle *c;
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
   GNUNET_assert (NULL != (c = handle->current));
 
   GNUNET_assert (NULL != (c = handle->current));
-  if (buf == NULL)
-  {
-    /* timeout / error */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission of request for statistics failed!\n");
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
   slen1 = strlen (c->subsystem) + 1;
   slen2 = strlen (c->name) + 1;
   slen1 = strlen (c->subsystem) + 1;
   slen2 = strlen (c->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_GET);
   GNUNET_assert (slen1 + slen2 ==
   GNUNET_assert (slen1 + slen2 ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
                                              c->subsystem,
                                              c->name));
                                              c->subsystem,
                                              c->name));
-  if (GNUNET_YES != handle->receiving)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission of GET done, now reading response\n");
-    handle->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
-                           GNUNET_TIME_absolute_get_remaining (c->timeout));
-  }
-  return msize;
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 }
 
 
@@ -676,50 +806,38 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
  * the response).
  *
  * @param handle statistics handle
  * the response).
  *
  * @param handle statistics handle
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
  */
  */
-static size_t
-transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
 
-  if (buf == NULL)
-  {
-    /* timeout / error */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission of request for statistics failed!\n");
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmitting watch request for `%s'\n",
        handle->current->name);
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
        handle->current->name);
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
   GNUNET_assert (slen1 + slen2 ==
   GNUNET_assert (slen1 + slen2 ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
                                              handle->current->subsystem,
                                              handle->current->name));
                                              handle->current->subsystem,
                                              handle->current->name));
-  if (GNUNET_YES != handle->receiving)
-  {
-    handle->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
-  return msize;
+  schedule_action (handle);
 }
 
 
 }
 
 
@@ -727,37 +845,20 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
  * Transmit a SET/UPDATE request.
  *
  * @param handle statistics handle
  * Transmit a SET/UPDATE request.
  *
  * @param handle statistics handle
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
  */
  */
-static size_t
-transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_set (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_STATISTICS_SetMessage *r;
 {
   struct GNUNET_STATISTICS_SetMessage *r;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen;
   size_t nlen;
   size_t slen;
   size_t nlen;
-  size_t nsize;
 
 
-  if (NULL == buf)
-  {
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
-  nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
-  if (size < nsize)
-  {
-    GNUNET_break (0);
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
-  r = buf;
-  r->header.size = htons (nsize);
-  r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
+  env = GNUNET_MQ_msg_extra (r,
+                             slen + nlen,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_SET);
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
@@ -765,51 +866,20 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
   if (handle->current->type == ACTION_UPDATE)
     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
   GNUNET_assert (slen + nlen ==
   if (handle->current->type == ACTION_UPDATE)
     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
   GNUNET_assert (slen + nlen ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &r[1],
+                                             slen + nlen,
+                                             2,
                                              handle->current->subsystem,
                                              handle->current->name));
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
                                              handle->current->subsystem,
                                              handle->current->name));
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
-  return nsize;
-}
-
-
-/**
- * Function called when we are ready to transmit a request to the service.
- *
- * @param cls the 'struct GNUNET_STATISTICS_Handle'
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
- */
-static size_t
-transmit_action (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_STATISTICS_Handle *h = cls;
-  size_t ret;
-
-  h->th = NULL;
-  ret = 0;
-  if (NULL != h->current)
-    switch (h->current->type)
-    {
-    case ACTION_GET:
-      ret = transmit_get (h, size, buf);
-      break;
-    case ACTION_SET:
-    case ACTION_UPDATE:
-      ret = transmit_set (h, size, buf);
-      break;
-    case ACTION_WATCH:
-      ret = transmit_watch (h, size, buf);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
-    }
-  schedule_action (h);
-  return ret;
+  update_memory_statistics (handle);
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 }
 
 
@@ -824,15 +894,18 @@ struct GNUNET_STATISTICS_Handle *
 GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
 GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct GNUNET_STATISTICS_Handle *ret;
-
-  GNUNET_assert (subsystem != NULL);
-  GNUNET_assert (cfg != NULL);
-  ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
-  ret->cfg = cfg;
-  ret->subsystem = GNUNET_strdup (subsystem);
-  ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-  return ret;
+  struct GNUNET_STATISTICS_Handle *h;
+
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                            "statistics",
+                                            "DISABLE"))
+    return NULL;
+  h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
+  h->cfg = cfg;
+  h->subsystem = GNUNET_strdup (subsystem);
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  return h;
 }
 
 
 }
 
 
@@ -841,42 +914,32 @@ GNUNET_STATISTICS_create (const char *subsystem,
  * it).
  *
  * @param h statistics handle to destroy
  * it).
  *
  * @param h statistics handle to destroy
- * @param sync_first set to GNUNET_YES if pending SET requests should
+ * @param sync_first set to #GNUNET_YES if pending SET requests should
  *        be completed
  */
 void
  *        be completed
  */
 void
-GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
+GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
+                           int sync_first)
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
-  struct GNUNET_TIME_Relative timeout;
-  int i;
 
 
-  if (h == NULL)
+  if (NULL == h)
     return;
     return;
-  GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
-  if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
+  GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
+  if ( (sync_first) &&
+       (NULL != h->mq) &&
+       (0 != GNUNET_MQ_get_length (h->mq)) )
   {
   {
-    GNUNET_SCHEDULER_cancel (h->backoff_task);
-    h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-  if (sync_first)
-  {
-    if (h->current != NULL)
-    {
-      if (h->current->type == ACTION_GET)
-      {
-        GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-        h->th = NULL;
-        free_action_item (h->current);
-        h->current = NULL;
-      }
-    }
-    next = h->action_head; 
+    if ( (NULL != h->current) &&
+         (ACTION_GET == h->current->type) )
+      h->current->aborted = GNUNET_YES;
+    next = h->action_head;
     while (NULL != (pos = next))
     {
       next = pos->next;
     while (NULL != (pos = next))
     {
       next = pos->next;
-      if (pos->type == ACTION_GET)
+      if ( (ACTION_GET == pos->type) ||
+           (ACTION_WATCH == pos->type) )
       {
        GNUNET_CONTAINER_DLL_remove (h->action_head,
                                     h->action_tail,
       {
        GNUNET_CONTAINER_DLL_remove (h->action_head,
                                     h->action_tail,
@@ -884,32 +947,21 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
         free_action_item (pos);
       }
     }
         free_action_item (pos);
       }
     }
-    if ( (NULL == h->current) &&
-        (NULL != (h->current = h->action_head)) )
-      GNUNET_CONTAINER_DLL_remove (h->action_head,
-                                  h->action_tail,
-                                  h->current);
     h->do_destroy = GNUNET_YES;
     h->do_destroy = GNUNET_YES;
-    if ((h->current != NULL) && (h->th == NULL))
-    {
-      if (NULL == h->client)
-      {
-       /* instant-connect (regardless of back-off) to submit final value */
-       h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);
-      }
-      if (NULL != h->client)
-      {
-       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-       h->th =
-          GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
-                                               timeout, GNUNET_YES,
-                                               &transmit_action, h);
-       GNUNET_assert (NULL != h->th);
-      }
-    }
-    if (h->th != NULL)
-      return; /* do not finish destruction just yet */
+    schedule_action (h);
+    GNUNET_assert (NULL == h->destroy_task);
+    h->destroy_task
+      = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
+                                                                     5),
+                                      &do_destroy,
+                                      h);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Deferring destruction\n");
+    return; /* do not finish destruction just yet */
   }
   }
+  /* do clean up all */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Cleaning all up\n");
   while (NULL != (pos = h->action_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->action_head,
   while (NULL != (pos = h->action_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->action_head,
@@ -918,15 +970,28 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
     free_action_item (pos);
   }
   do_disconnect (h);
     free_action_item (pos);
   }
   do_disconnect (h);
-  for (i = 0; i < h->watches_size; i++)
+  if (NULL != h->backoff_task)
+  {
+    GNUNET_SCHEDULER_cancel (h->backoff_task);
+    h->backoff_task = NULL;
+  }
+  if (NULL != h->destroy_task)
+  {
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = NULL;
+  }
+  for (unsigned int i = 0; i < h->watches_size; i++)
   {
     if (NULL == h->watches[i])
   {
     if (NULL == h->watches[i])
-      continue; 
+      continue;
     GNUNET_free (h->watches[i]->subsystem);
     GNUNET_free (h->watches[i]->name);
     GNUNET_free (h->watches[i]);
   }
     GNUNET_free (h->watches[i]->subsystem);
     GNUNET_free (h->watches[i]->name);
     GNUNET_free (h->watches[i]);
   }
-  GNUNET_array_grow (h->watches, h->watches_size, 0);
+  GNUNET_array_grow (h->watches,
+                     h->watches_size,
+                     0);
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
@@ -935,46 +1000,65 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
 /**
  * Schedule the next action to be performed.
  *
 /**
  * Schedule the next action to be performed.
  *
- * @param h statistics handle
+ * @param cls statistics handle
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h)
+schedule_action (void *cls)
 {
 {
-  struct GNUNET_TIME_Relative timeout;
+  struct GNUNET_STATISTICS_Handle *h = cls;
 
 
-  if ( (h->th != NULL) ||
-       (h->backoff_task != GNUNET_SCHEDULER_NO_TASK) )
+  if (NULL != h->backoff_task)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
   {
     reconnect_later (h);
     return;
   }
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
   {
     reconnect_later (h);
     return;
   }
-  if (NULL != h->current)
-    return; /* action already pending */
+  if (0 < GNUNET_MQ_get_length (h->mq))
+    return; /* Wait for queue to be reduced more */    
   /* schedule next action */
   /* schedule next action */
-  h->current = h->action_head;
-  if (NULL == h->current)
+  while (NULL == h->current)
   {
   {
-    if (h->do_destroy)
+    h->current = h->action_head;
+    if (NULL == h->current)
     {
     {
-      h->do_destroy = GNUNET_NO;
-      GNUNET_STATISTICS_destroy (h, GNUNET_YES);
+      struct GNUNET_MessageHeader *hdr;
+      struct GNUNET_MQ_Envelope *env;
+
+      if (GNUNET_YES != h->do_destroy)
+        return; /* nothing to do */
+      /* let service know that we're done */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Notifying service that we are done\n");
+      h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
+      env = GNUNET_MQ_msg (hdr,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
+      GNUNET_MQ_notify_sent (env,
+                             &schedule_action,
+                             h);
+      GNUNET_MQ_send (h->mq,
+                      env);
+      return;
+    }
+    GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                 h->action_tail,
+                                 h->current);
+    switch (h->current->type)
+    {
+    case ACTION_GET:
+      transmit_get (h);
+      break;
+    case ACTION_SET:
+    case ACTION_UPDATE:
+      transmit_set (h);
+      break;
+    case ACTION_WATCH:
+      transmit_watch (h);
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
     }
     }
-    return;
-  }
-  GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
-  timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-  if (NULL ==
-      (h->th =
-       GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
-                                            timeout, GNUNET_YES,
-                                            &transmit_action, h)))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Failed to transmit request to statistics service.\n");
-    do_disconnect (h);
-    reconnect_later (h);
   }
 }
 
   }
 }
 
@@ -985,19 +1069,19 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
- * @param timeout after how long should we give up (and call
- *        cont with an error code)?
  * @param cont continuation to call when done (can be NULL)
  * @param cont continuation to call when done (can be NULL)
+ *        This callback CANNOT destroy the statistics handle in the same call.
  * @param proc function to call on each value
  * @param proc function to call on each value
- * @param cls closure for cont and proc
+ * @param cls closure for @a cont and @a proc
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
-                       const char *subsystem, const char *name,
-                       struct GNUNET_TIME_Relative timeout,
+                       const char *subsystem,
+                       const char *name,
                        GNUNET_STATISTICS_Callback cont,
                        GNUNET_STATISTICS_Callback cont,
-                       GNUNET_STATISTICS_Iterator proc, void *cls)
+                       GNUNET_STATISTICS_Iterator proc,
+                       void *cls)
 {
   size_t slen1;
   size_t slen2;
 {
   size_t slen1;
   size_t slen2;
@@ -1005,27 +1089,27 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
 
   if (NULL == handle)
     return NULL;
 
   if (NULL == handle)
     return NULL;
-  GNUNET_assert (proc != NULL);
+  GNUNET_assert (NULL != proc);
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  if (subsystem == NULL)
+  if (NULL == subsystem)
     subsystem = "";
     subsystem = "";
-  if (name == NULL)
+  if (NULL == name)
     name = "";
   slen1 = strlen (subsystem) + 1;
   slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
     name = "";
   slen1 = strlen (subsystem) + 1;
   slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
-  ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
+                                   handle->action_tail,
                                    ai);
   schedule_action (handle);
   return ai;
                                    ai);
   schedule_action (handle);
   return ai;
@@ -1043,17 +1127,18 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
 {
   if (NULL == gh)
     return;
 {
   if (NULL == gh)
     return;
+  gh->cont = NULL;
   if (gh->sh->current == gh)
   {
     gh->aborted = GNUNET_YES;
   if (gh->sh->current == gh)
   {
     gh->aborted = GNUNET_YES;
+    return;
   }
   }
-  else
-  {
-    GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
-    GNUNET_free (gh->name);
-    GNUNET_free (gh->subsystem);
-    GNUNET_free (gh);
-  }
+  GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
+                               gh->sh->action_tail,
+                               gh);
+  GNUNET_free (gh->name);
+  GNUNET_free (gh->subsystem);
+  GNUNET_free (gh);
 }
 
 
 }
 
 
@@ -1064,69 +1149,78 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
- * @param proc_cls closure for proc
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 int
 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
  */
 int
 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
-                         const char *subsystem, const char *name,
-                         GNUNET_STATISTICS_Iterator proc, void *proc_cls)
+                         const char *subsystem,
+                         const char *name,
+                         GNUNET_STATISTICS_Iterator proc,
+                         void *proc_cls)
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 
-  if (handle == NULL)
+  if (NULL == handle)
     return GNUNET_SYSERR;
     return GNUNET_SYSERR;
-  w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
+  w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
   w->subsystem = GNUNET_strdup (subsystem);
   w->name = GNUNET_strdup (name);
   w->proc = proc;
   w->proc_cls = proc_cls;
   w->subsystem = GNUNET_strdup (subsystem);
   w->name = GNUNET_strdup (name);
   w->proc = proc;
   w->proc_cls = proc_cls;
-  GNUNET_array_append (handle->watches, handle->watches_size, w);
-  schedule_watch_request (handle, w);
+  GNUNET_array_append (handle->watches,
+                       handle->watches_size,
+                       w);
+  schedule_watch_request (handle,
+                          w);
   return GNUNET_OK;
 }
 
 
 /**
   return GNUNET_OK;
 }
 
 
 /**
- * Stop watching statistics from the peer.  
+ * Stop watching statistics from the peer.
  *
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
  *
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
- * @param proc_cls closure for proc
- * @return GNUNET_OK on success, GNUNET_SYSERR on error (no such watch)
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
  */
 int
 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
  */
 int
 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
-                               const char *subsystem, const char *name,
-                               GNUNET_STATISTICS_Iterator proc, void *proc_cls)
+                               const char *subsystem,
+                                const char *name,
+                               GNUNET_STATISTICS_Iterator proc,
+                                void *proc_cls)
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
-  unsigned int i;
 
 
-  if (handle == NULL)
+  if (NULL == handle)
     return GNUNET_SYSERR;
     return GNUNET_SYSERR;
-  for (i=0;i<handle->watches_size;i++)
+  for (unsigned int i=0;i<handle->watches_size;i++)
   {
     w = handle->watches[i];
   {
     w = handle->watches[i];
+    if (NULL == w)
+      continue;
     if ( (w->proc == proc) &&
         (w->proc_cls == proc_cls) &&
     if ( (w->proc == proc) &&
         (w->proc_cls == proc_cls) &&
-        (0 == strcmp (w->name, name)) &&
-        (0 == strcmp (w->subsystem, subsystem)) )
+        (0 == strcmp (w->name,
+                      name)) &&
+        (0 == strcmp (w->subsystem,
+                      subsystem)) )
     {
       GNUNET_free (w->name);
       GNUNET_free (w->subsystem);
       GNUNET_free (w);
     {
       GNUNET_free (w->name);
       GNUNET_free (w->subsystem);
       GNUNET_free (w);
-      handle->watches[i] = NULL;      
+      handle->watches[i] = NULL;
       return GNUNET_OK;
       return GNUNET_OK;
-    }   
+    }
   }
   return GNUNET_SYSERR;
 }
 
 
   }
   return GNUNET_SYSERR;
 }
 
 
-
 /**
  * Queue a request to change a statistic.
  *
 /**
  * Queue a request to change a statistic.
  *
@@ -1134,11 +1228,14 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
  * @param name name of the value
  * @param make_persistent  should the value be kept across restarts?
  * @param value new value or change
  * @param name name of the value
  * @param make_persistent  should the value be kept across restarts?
  * @param value new value or change
- * @param type type of the action (ACTION_SET or ACTION_UPDATE)
+ * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
  */
 static void
  */
 static void
-add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
-                   int make_persistent, uint64_t value, enum ActionType type)
+add_setter_action (struct GNUNET_STATISTICS_Handle *h,
+                   const char *name,
+                   int make_persistent,
+                   uint64_t value,
+                   enum ActionType type)
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
@@ -1146,8 +1243,6 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   size_t nsize;
   int64_t delta;
 
   size_t nsize;
   int64_t delta;
 
-  GNUNET_assert (h != NULL);
-  GNUNET_assert (name != NULL);
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
@@ -1156,16 +1251,18 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
     GNUNET_break (0);
     return;
   }
     GNUNET_break (0);
     return;
   }
-  for (ai = h->action_head; ai != NULL; ai = ai->next)
+  for (ai = h->action_head; NULL != ai; ai = ai->next)
   {
   {
-    if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
-           (0 == strcmp (ai->name, name)) && 
-           ( (ai->type == ACTION_UPDATE) ||
-             (ai->type == ACTION_SET) ) ) )
+    if (! ( (0 == strcmp (ai->subsystem,
+                         h->subsystem)) &&
+           (0 == strcmp (ai->name,
+                         name)) &&
+           ( (ACTION_UPDATE == ai->type) ||
+             (ACTION_SET == ai->type) ) ) )
       continue;
       continue;
-    if (ai->type == ACTION_SET)
+    if (ACTION_SET == ai->type)
     {
     {
-      if (type == ACTION_UPDATE)
+      if (ACTION_UPDATE == type)
       {
        delta = (int64_t) value;
        if (delta > 0)
       {
        delta = (int64_t) value;
        if (delta > 0)
@@ -1190,7 +1287,7 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
     }
     else
     {
     }
     else
     {
-      if (type == ACTION_UPDATE)
+      if (ACTION_UPDATE == type)
       {
        /* make delta cummulative */
        delta = (int64_t) value;
       {
        /* make delta cummulative */
        delta = (int64_t) value;
@@ -1203,12 +1300,14 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
        ai->type = type;
       }
     }
        ai->type = type;
       }
     }
-    ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
-    ai->make_persistent = make_persistent;
-    return;  
+    ai->timeout
+      = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
+    ai->make_persistent
+      = make_persistent;
+    return;
   }
   /* no existing entry matches, create a fresh one */
   }
   /* no existing entry matches, create a fresh one */
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
@@ -1217,7 +1316,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
-  GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
                                    ai);
   schedule_action (h);
 }
                                    ai);
   schedule_action (h);
 }
@@ -1234,12 +1334,18 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
  */
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
-                       const char *name, uint64_t value, int make_persistent)
+                       const char *name,
+                       uint64_t value,
+                       int make_persistent)
 {
 {
-  if (handle == NULL)
+  if (NULL == handle)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, value, ACTION_SET);
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     value,
+                     ACTION_SET);
 }
 
 
 }
 
 
@@ -1254,14 +1360,19 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
-                          const char *name, int64_t delta, int make_persistent)
+                          const char *name,
+                          int64_t delta,
+                          int make_persistent)
 {
 {
-  if (handle == NULL)
+  if (NULL == handle)
     return;
     return;
-  if (delta == 0)
+  if (0 == delta)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, (uint64_t) delta,
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     (uint64_t) delta,
                      ACTION_UPDATE);
 }
 
                      ACTION_UPDATE);
 }