migrate secretsharing to new service API
[oweals/gnunet.git] / src / statistics / statistics_api.c
index 305c6f3a5c038ebcf48971a703ce1ef916b8b78a..ad4453b2a62a0e8515317e0d972bdd1c740aabab 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
 */
 
 /**
  * @author Christian Grothoff
  */
 #include "platform.h"
  * @author Christian Grothoff
  */
 #include "platform.h"
-#include "gnunet_client_lib.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
 #include "gnunet_constants.h"
-#include "gnunet_container_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_protocols.h"
-#include "gnunet_server_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_statistics_service.h"
-#include "gnunet_strings_lib.h"
 #include "statistics.h"
 
 /**
 #include "statistics.h"
 
 /**
@@ -91,7 +88,7 @@ struct GNUNET_STATISTICS_WatchEntry
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc
+   * Closure for @e proc
    */
   void *proc_cls;
 
    */
   void *proc_cls;
 
@@ -140,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc and cont.
+   * Closure for @e proc and @e cont.
    */
   void *cls;
 
    */
   void *cls;
 
@@ -165,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle
   int aborted;
 
   /**
   int aborted;
 
   /**
-   * Is this a GET, SET, UPDATE or WATCH?
+   * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
    */
   enum ActionType type;
 
    */
   enum ActionType type;
 
@@ -193,14 +190,9 @@ struct GNUNET_STATISTICS_Handle
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Message queue to the service.
    */
    */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * Currently pending transmission request.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Head of the linked list of pending actions (first action
 
   /**
    * Head of the linked list of pending actions (first action
@@ -228,7 +220,12 @@ struct GNUNET_STATISTICS_Handle
   /**
    * Task doing exponential back-off trying to reconnect.
    */
   /**
    * Task doing exponential back-off trying to reconnect.
    */
-  GNUNET_SCHEDULER_TaskIdentifier backoff_task;
+  struct GNUNET_SCHEDULER_Task *backoff_task;
+
+  /**
+   * Task for running #do_destroy().
+   */
+  struct GNUNET_SCHEDULER_Task *destroy_task;
 
   /**
    * Time for next connect retry.
 
   /**
    * Time for next connect retry.
@@ -246,7 +243,7 @@ struct GNUNET_STATISTICS_Handle
   uint64_t peak_rss;
 
   /**
   uint64_t peak_rss;
 
   /**
-   * Size of the 'watches' array.
+   * Size of the @e watches array.
    */
   unsigned int watches_size;
 
    */
   unsigned int watches_size;
 
@@ -280,11 +277,11 @@ update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
 #if HAVE_MALLINFO
   {
     struct mallinfo mi;
 #if HAVE_MALLINFO
   {
     struct mallinfo mi;
-    
+
     mi = mallinfo();
     mi = mallinfo();
-    current_heap_size = mi.uordblks + mi.fordblks;  
+    current_heap_size = mi.uordblks + mi.fordblks;
   }
   }
-#endif  
+#endif
 #if HAVE_GETRUSAGE
   {
     struct rusage ru;
 #if HAVE_GETRUSAGE
   {
     struct rusage ru;
@@ -292,30 +289,45 @@ update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
     if (0 == getrusage (RUSAGE_SELF, &ru))
     {
       current_rss = 1024LL * ru.ru_maxrss;
     if (0 == getrusage (RUSAGE_SELF, &ru))
     {
       current_rss = 1024LL * ru.ru_maxrss;
-    }    
+    }
   }
 #endif
   if (current_heap_size > h->peak_heap_size)
   {
     h->peak_heap_size = current_heap_size;
   }
 #endif
   if (current_heap_size > h->peak_heap_size)
   {
     h->peak_heap_size = current_heap_size;
-    GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
+    GNUNET_STATISTICS_set (h,
+                          "# peak heap size",
+                          current_heap_size,
+                          GNUNET_NO);
   }
   if (current_rss > h->peak_rss)
   {
     h->peak_rss = current_rss;
   }
   if (current_rss > h->peak_rss)
   {
     h->peak_rss = current_rss;
-    GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
+    GNUNET_STATISTICS_set (h,
+                          "# peak resident set size",
+                          current_rss,
+                          GNUNET_NO);
   }
 #endif
 }
 
 
   }
 #endif
 }
 
 
+/**
+ * Reconnect at a later time, respecting back-off.
+ *
+ * @param h statistics handle
+ */
+static void
+reconnect_later (struct GNUNET_STATISTICS_Handle *h);
+
+
 /**
  * Schedule the next action to be performed.
  *
 /**
  * Schedule the next action to be performed.
  *
- * @param h statistics handle to reconnect
+ * @param cls statistics handle to reconnect
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h);
+schedule_action (void *cls);
 
 
 /**
 
 
 /**
@@ -329,15 +341,11 @@ static void
 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
                         struct GNUNET_STATISTICS_WatchEntry *watch)
 {
 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
                         struct GNUNET_STATISTICS_WatchEntry *watch)
 {
-
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 
-  GNUNET_assert (NULL != h);
-  GNUNET_assert (NULL != watch);
-
   slen = strlen (watch->subsystem) + 1;
   nlen = strlen (watch->name) + 1;
   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
   slen = strlen (watch->subsystem) + 1;
   nlen = strlen (watch->name) + 1;
   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
@@ -346,7 +354,7 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
     GNUNET_break (0);
     return;
   }
     GNUNET_break (0);
     return;
   }
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (watch->subsystem);
   ai->name = GNUNET_strdup (watch->name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (watch->subsystem);
   ai->name = GNUNET_strdup (watch->name);
@@ -355,7 +363,8 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
   ai->type = ACTION_WATCH;
   ai->proc = watch->proc;
   ai->cls = watch->proc_cls;
   ai->type = ACTION_WATCH;
   ai->proc = watch->proc;
   ai->cls = watch->proc_cls;
-  GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
                                    ai);
   schedule_action (h);
 }
                                    ai);
   schedule_action (h);
 }
@@ -384,392 +393,411 @@ static void
 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
 {
   struct GNUNET_STATISTICS_GetHandle *c;
 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
 {
   struct GNUNET_STATISTICS_GetHandle *c;
-  
-  if (NULL != h->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
-  } 
-  if (NULL != h->client)
-  {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
-  }
+
   h->receiving = GNUNET_NO;
   if (NULL != (c = h->current))
   {
     h->current = NULL;
   h->receiving = GNUNET_NO;
   if (NULL != (c = h->current))
   {
     h->current = NULL;
-    if (NULL != c->cont)
-      c->cont (c->cls, GNUNET_SYSERR);
+    if ( (NULL != c->cont) &&
+        (GNUNET_YES != c->aborted) )
+    {
+      c->cont (c->cls,
+               GNUNET_SYSERR);
+      c->cont = NULL;
+    }
     free_action_item (c);
   }
     free_action_item (c);
   }
+  if (NULL != h->mq)
+  {
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
+  }
 }
 
 
 /**
 }
 
 
 /**
- * Try to (re)connect to the statistics service.
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
  *
  *
- * @param h statistics handle to reconnect
- * @return GNUNET_YES on success, GNUNET_NO on failure.
+ * @param cls statistics handle
+ * @param smsg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
  */
 static int
  */
 static int
-try_connect (struct GNUNET_STATISTICS_Handle *h)
+check_statistics_value (void *cls,
+                        const struct GNUNET_STATISTICS_ReplyMessage *smsg)
 {
 {
-  struct GNUNET_STATISTICS_GetHandle *gh;
-  struct GNUNET_STATISTICS_GetHandle *gn;
-  unsigned int i;
+  const char *service;
+  const char *name;
+  uint16_t size;
 
 
-  if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
-    return GNUNET_NO;
-  if (NULL != h->client)
-    return GNUNET_YES;
-  h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
-  if (NULL != h->client)
+  size = ntohs (smsg->header.size);
+  size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
+  if (size !=
+      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                      size,
+                                      2,
+                                      &service,
+                                      &name))
   {
   {
-    gn = h->action_head; 
-    while (NULL != (gh = gn))
-    {
-      gn = gh->next;
-      if (gh->type == ACTION_WATCH)
-      {
-       GNUNET_CONTAINER_DLL_remove (h->action_head,
-                                    h->action_tail,
-                                    gh);
-       free_action_item (gh);  
-      }
-    }
-    for (i = 0; i < h->watches_size; i++)
-    {
-      if (NULL != h->watches[i])
-        schedule_watch_request (h, h->watches[i]);
-    }
-    return GNUNET_YES;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Failed to connect to statistics service!\n");
-  return GNUNET_NO;
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * We've waited long enough, reconnect now.
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
  *
  *
- * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
- * @param tc scheduler context (unused)
+ * @param cls statistics handle
+ * @param msg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
  */
 static void
  */
 static void
-reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+handle_statistics_value (void *cls,
+                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
+  const char *service;
+  const char *name;
+  uint16_t size;
 
 
-  h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
-  schedule_action (h);
+  if (h->current->aborted)
+    return;           /* iteration aborted, don't bother */
+
+  size = ntohs (smsg->header.size);
+  size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
+  GNUNET_assert (size ==
+                 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                                 size,
+                                                 2,
+                                                 &service,
+                                                 &name));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received valid statistic on `%s:%s': %llu\n",
+       service, name,
+       GNUNET_ntohll (smsg->value));
+  if (GNUNET_OK !=
+      h->current->proc (h->current->cls,
+                        service,
+                        name,
+                        GNUNET_ntohll (smsg->value),
+                        0 !=
+                        (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Processing of remaining statistics aborted by client.\n");
+    h->current->aborted = GNUNET_YES;
+  }
 }
 
 
 /**
 }
 
 
 /**
- * Task used by 'reconnect_later' to shutdown the handle
+ * We have received a watch value from the service.  Process it.
  *
  *
- * @param cls the statistics handle
- * @param tc scheduler context
+ * @param cls statistics handle
+ * @param msg the watch value message
  */
 static void
  */
 static void
-do_destroy (void *cls,
-              const struct GNUNET_SCHEDULER_TaskContext *tc)
+handle_statistics_watch_value (void *cls,
+                               const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
+  struct GNUNET_STATISTICS_WatchEntry *w;
+  uint32_t wid;
 
 
-  GNUNET_STATISTICS_destroy (h, GNUNET_NO);
+  GNUNET_break (0 == ntohl (wvm->reserved));
+  wid = ntohl (wvm->wid);
+  if (wid >= h->watches_size)
+  {
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
+  }
+  w = h->watches[wid];
+  if (NULL == w)
+    return;
+  (void) w->proc (w->proc_cls,
+                  w->subsystem,
+                  w->name,
+                  GNUNET_ntohll (wvm->value),
+                  0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
 }
 
 
 /**
 }
 
 
 /**
- * Reconnect at a later time, respecting back-off.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
  *
  *
- * @param h statistics handle
+ * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
+ * @param error error code
  */
 static void
  */
 static void
-reconnect_later (struct GNUNET_STATISTICS_Handle *h)
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
 {
 {
-  int loss;
-  struct GNUNET_STATISTICS_GetHandle *gh;
+  struct GNUNET_STATISTICS_Handle *h = cls;
 
 
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
-  if (GNUNET_YES == h->do_destroy)
+  if (GNUNET_NO != h->do_destroy)
   {
   {
-    /* So we are shutting down and the service is not reachable.
-     * Chances are that it's down for good and we are not going to connect to
-     * it anymore.
-     * Give up and don't sync the rest of the data.
-     */
-    loss = GNUNET_NO;
-    for (gh = h->action_head; NULL != gh; gh = gh->next)
-      if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
-       loss = GNUNET_YES;
-    if (GNUNET_YES == loss)
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Could not save some persistent statistics\n"));
     h->do_destroy = GNUNET_NO;
     h->do_destroy = GNUNET_NO;
-    GNUNET_SCHEDULER_add_continuation (&do_destroy, h,
-                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    if (NULL != h->destroy_task)
+    {
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+      h->destroy_task = NULL;
+    }
+    GNUNET_STATISTICS_destroy (h,
+                               GNUNET_NO);
     return;
   }
     return;
   }
-  h->backoff_task =
-    GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
-  h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
+  do_disconnect (h);
+  reconnect_later (h);
 }
 
 
 /**
 }
 
 
 /**
- * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
+ * Task used to destroy the statistics handle.
  *
  *
- * @param h statistics handle
- * @param msg message received from the service, never NULL
- * @return GNUNET_OK if the message was well-formed
+ * @param cls the `struct GNUNET_STATISTICS_Handle`
  */
  */
-static int
-process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
-                                 const struct GNUNET_MessageHeader *msg)
+static void
+do_destroy (void *cls)
 {
 {
-  char *service;
-  char *name;
-  const struct GNUNET_STATISTICS_ReplyMessage *smsg;
-  uint16_t size;
+  struct GNUNET_STATISTICS_Handle *h = cls;
 
 
-  if (h->current->aborted)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
-    return GNUNET_OK;           /* don't bother */
-  }
-  size = ntohs (msg->size);
-  if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
+  h->destroy_task = NULL;
+  h->do_destroy = GNUNET_NO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Running final destruction\n");
+  GNUNET_STATISTICS_destroy (h,
+                             GNUNET_NO);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
+ * message. We receive this message at the end of the shutdown when
+ * the service confirms that all data has been written to disk.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_disconnect_confirm (void *cls,
+                          const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  if (GNUNET_SYSERR != h->do_destroy)
   {
   {
+    /* not in shutdown, why do we get 'TEST'? */
     GNUNET_break (0);
     GNUNET_break (0);
-    return GNUNET_SYSERR;
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
   }
   }
-  smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
-  size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
-  if (size !=
-      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
-                                      &service, &name))
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
+  if (NULL != h->destroy_task)
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+  h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                              h);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
+ * this message in response to a query to indicate that there are no
+ * further matching results.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_statistics_end (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  struct GNUNET_STATISTICS_GetHandle *c;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received end of statistics marker\n");
+  if (NULL == (c = h->current))
   {
     GNUNET_break (0);
   {
     GNUNET_break (0);
-    return GNUNET_SYSERR;
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
   }
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
-       service, name, GNUNET_ntohll (smsg->value));
-  if (GNUNET_OK !=
-      h->current->proc (h->current->cls, service, name,
-                        GNUNET_ntohll (smsg->value),
-                        0 !=
-                        (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  h->current = NULL;
+  schedule_action (h);
+  if (NULL != c->cont)
   {
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Processing of remaining statistics aborted by client.\n");
-    h->current->aborted = GNUNET_YES;
+    c->cont (c->cls,
+             GNUNET_OK);
+    c->cont = NULL;
   }
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
-  return GNUNET_OK;
+  free_action_item (c);
 }
 
 
 /**
 }
 
 
 /**
- * We have received a watch value from the service.  Process it.
+ * Try to (re)connect to the statistics service.
  *
  *
- * @param h statistics handle
- * @param msg the watch value message
- * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not,
- *         GNUNET_NO if this watch has been cancelled
+ * @param h statistics handle to reconnect
+ * @return #GNUNET_YES on success, #GNUNET_NO on failure.
  */
 static int
  */
 static int
-process_watch_value (struct GNUNET_STATISTICS_Handle *h,
-                     const struct GNUNET_MessageHeader *msg)
+try_connect (struct GNUNET_STATISTICS_Handle *h)
 {
 {
-  const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
-  struct GNUNET_STATISTICS_WatchEntry *w;
-  uint32_t wid;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (disconnect_confirm,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_fixed_size (statistics_end,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_END,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_var_size (statistics_value,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
+                           struct GNUNET_STATISTICS_ReplyMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (statistics_watch_value,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
+                             struct GNUNET_STATISTICS_WatchValueMessage,
+                             h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_STATISTICS_GetHandle *gh;
+  struct GNUNET_STATISTICS_GetHandle *gn;
 
 
-  if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
+  if (NULL != h->backoff_task)
+    return GNUNET_NO;
+  if (NULL != h->mq)
+    return GNUNET_YES;
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                 "statistics",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
   {
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to connect to statistics service!\n");
+    return GNUNET_NO;
   }
   }
-  wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
-  GNUNET_break (0 == ntohl (wvm->reserved));
-  wid = ntohl (wvm->wid);
-  if (wid >= h->watches_size)
+  gn = h->action_head;
+  while (NULL != (gh = gn))
   {
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    gn = gh->next;
+    if (gh->type == ACTION_WATCH)
+    {
+      GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                   h->action_tail,
+                                   gh);
+      free_action_item (gh);
+    }
   }
   }
-  w = h->watches[wid];
-  if (NULL == w)  
-    return GNUNET_NO;  
-  (void) w->proc (w->proc_cls, w->subsystem, w->name,
-                  GNUNET_ntohll (wvm->value),
-                  0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
-  return GNUNET_OK;
+  for (unsigned int i = 0; i < h->watches_size; i++)
+    if (NULL != h->watches[i])
+      schedule_watch_request (h,
+                              h->watches[i]);
+  return GNUNET_YES;
 }
 
 
 }
 
 
+/**
+ * We've waited long enough, reconnect now.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
+ */
 static void
 static void
-destroy_task (void *cls,
-             const struct GNUNET_SCHEDULER_TaskContext *tc)
+reconnect_task (void *cls)
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 
-  GNUNET_STATISTICS_destroy (h, GNUNET_NO);
+  h->backoff_task = NULL;
+  schedule_action (h);
 }
 
 
 /**
 }
 
 
 /**
- * Function called with messages from stats service.
+ * Reconnect at a later time, respecting back-off.
  *
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param h statistics handle
  */
 static void
  */
 static void
-receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
+reconnect_later (struct GNUNET_STATISTICS_Handle *h)
 {
 {
-  struct GNUNET_STATISTICS_Handle *h = cls;
-  struct GNUNET_STATISTICS_GetHandle *c;
-  int ret;
+  int loss;
+  struct GNUNET_STATISTICS_GetHandle *gh;
 
 
-  if (NULL == msg)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-         "Error receiving statistics from service, is the service running?\n");
-    do_disconnect (h);
-    reconnect_later (h);
-    return;
-  }
-  switch (ntohs (msg->type))
+  GNUNET_assert (NULL == h->backoff_task);
+  if (GNUNET_YES == h->do_destroy)
   {
   {
-  case GNUNET_MESSAGE_TYPE_TEST:
-    if (GNUNET_SYSERR != h->do_destroy)
-    {
-      /* not in shutdown, why do we get 'TEST'? */
-      GNUNET_break (0);
-      do_disconnect (h);
-      reconnect_later (h);
-      return;
-    }
-    h->do_destroy = GNUNET_NO;
-    GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
-                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-    break;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_END:
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
-    if (NULL == (c = h->current))
-    {
-      GNUNET_break (0);
-      do_disconnect (h);
-      reconnect_later (h);
-      return;
-    }
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    if (h->watches_size > 0)
-    {
-      GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                             GNUNET_TIME_UNIT_FOREVER_REL);
-    }
-    else
-    {
-      h->receiving = GNUNET_NO;
-    }    
-    h->current = NULL;
-    schedule_action (h);
-    if (NULL != c->cont)
-      c->cont (c->cls, GNUNET_OK);
-    free_action_item (c);
-    return;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
-    if (GNUNET_OK != process_statistics_value_message (h, msg))
-    {
-      do_disconnect (h);
-      reconnect_later (h);
-      return;     
-    }
-    /* finally, look for more! */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Processing VALUE done, now reading more\n");
-    GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                          GNUNET_TIME_absolute_get_remaining (h->
-                                                              current->timeout));
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    return;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
-    if (GNUNET_OK != 
-       (ret = process_watch_value (h, msg)))
-    {
-      do_disconnect (h);
-      if (GNUNET_NO == ret)
-       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 
-      reconnect_later (h);
-      return;
-    }
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    GNUNET_assert (h->watches_size > 0);
-    GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    return;    
-  default:
-    GNUNET_break (0);
-    do_disconnect (h);
-    reconnect_later (h);
+    /* So we are shutting down and the service is not reachable.
+     * Chances are that it's down for good and we are not going to connect to
+     * it anymore.
+     * Give up and don't sync the rest of the data.
+     */
+    loss = GNUNET_NO;
+    for (gh = h->action_head; NULL != gh; gh = gh->next)
+      if ( (gh->make_persistent) &&
+          (ACTION_SET == gh->type) )
+       loss = GNUNET_YES;
+    if (GNUNET_YES == loss)
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Could not save some persistent statistics\n"));
+    if (NULL != h->destroy_task)
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                                h);
     return;
   }
     return;
   }
+  h->backoff_task
+    = GNUNET_SCHEDULER_add_delayed (h->backoff,
+                                    &reconnect_task,
+                                    h);
+  h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
 }
 
 
 }
 
 
+
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
  *
  * @param handle statistics handle
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
  *
  * @param handle statistics handle
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
  */
  */
-static size_t
-transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_get (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_STATISTICS_GetHandle *c;
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_STATISTICS_GetHandle *c;
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
   GNUNET_assert (NULL != (c = handle->current));
 
   GNUNET_assert (NULL != (c = handle->current));
-  if (NULL == buf)
-  {
-    /* timeout / error */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission of request for statistics failed!\n");
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
   slen1 = strlen (c->subsystem) + 1;
   slen2 = strlen (c->name) + 1;
   slen1 = strlen (c->subsystem) + 1;
   slen2 = strlen (c->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_GET);
   GNUNET_assert (slen1 + slen2 ==
   GNUNET_assert (slen1 + slen2 ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
                                              c->subsystem,
                                              c->name));
                                              c->subsystem,
                                              c->name));
-  if (GNUNET_YES != handle->receiving)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission of GET done, now reading response\n");
-    handle->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
-                           GNUNET_TIME_absolute_get_remaining (c->timeout));
-  }
-  return msize;
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 }
 
 
@@ -778,50 +806,38 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
  * the response).
  *
  * @param handle statistics handle
  * the response).
  *
  * @param handle statistics handle
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
  */
  */
-static size_t
-transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
 
-  if (NULL == buf)
-  {
-    /* timeout / error */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission of request for statistics failed!\n");
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmitting watch request for `%s'\n",
        handle->current->name);
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
        handle->current->name);
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
   GNUNET_assert (slen1 + slen2 ==
   GNUNET_assert (slen1 + slen2 ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
                                              handle->current->subsystem,
                                              handle->current->name));
                                              handle->current->subsystem,
                                              handle->current->name));
-  if (GNUNET_YES != handle->receiving)
-  {
-    handle->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
-  return msize;
+  schedule_action (handle);
 }
 
 
 }
 
 
@@ -829,37 +845,20 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
  * Transmit a SET/UPDATE request.
  *
  * @param handle statistics handle
  * Transmit a SET/UPDATE request.
  *
  * @param handle statistics handle
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
  */
  */
-static size_t
-transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_set (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_STATISTICS_SetMessage *r;
 {
   struct GNUNET_STATISTICS_SetMessage *r;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen;
   size_t nlen;
   size_t slen;
   size_t nlen;
-  size_t nsize;
 
 
-  if (NULL == buf)
-  {
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
-  nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
-  if (size < nsize)
-  {
-    GNUNET_break (0);
-    do_disconnect (handle);
-    reconnect_later (handle);
-    return 0;
-  }
-  r = buf;
-  r->header.size = htons (nsize);
-  r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
+  env = GNUNET_MQ_msg_extra (r,
+                             slen + nlen,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_SET);
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
@@ -867,52 +866,20 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
   if (handle->current->type == ACTION_UPDATE)
     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
   GNUNET_assert (slen + nlen ==
   if (handle->current->type == ACTION_UPDATE)
     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
   GNUNET_assert (slen + nlen ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &r[1],
+                                             slen + nlen,
+                                             2,
                                              handle->current->subsystem,
                                              handle->current->name));
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
   update_memory_statistics (handle);
                                              handle->current->subsystem,
                                              handle->current->name));
   GNUNET_assert (NULL == handle->current->cont);
   free_action_item (handle->current);
   handle->current = NULL;
   update_memory_statistics (handle);
-  return nsize;
-}
-
-
-/**
- * Function called when we are ready to transmit a request to the service.
- *
- * @param cls the 'struct GNUNET_STATISTICS_Handle'
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
- */
-static size_t
-transmit_action (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_STATISTICS_Handle *h = cls;
-  size_t ret;
-
-  h->th = NULL;
-  ret = 0;
-  if (NULL != h->current)
-    switch (h->current->type)
-    {
-    case ACTION_GET:
-      ret = transmit_get (h, size, buf);
-      break;
-    case ACTION_SET:
-    case ACTION_UPDATE:
-      ret = transmit_set (h, size, buf);
-      break;
-    case ACTION_WATCH:
-      ret = transmit_watch (h, size, buf);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
-    }
-  schedule_action (h);
-  return ret;
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 }
 
 
@@ -927,15 +894,18 @@ struct GNUNET_STATISTICS_Handle *
 GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
 GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct GNUNET_STATISTICS_Handle *ret;
-
-  GNUNET_assert (NULL != subsystem);
-  GNUNET_assert (NULL != cfg);
-  ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
-  ret->cfg = cfg;
-  ret->subsystem = GNUNET_strdup (subsystem);
-  ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-  return ret;
+  struct GNUNET_STATISTICS_Handle *h;
+
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                            "statistics",
+                                            "DISABLE"))
+    return NULL;
+  h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
+  h->cfg = cfg;
+  h->subsystem = GNUNET_strdup (subsystem);
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  return h;
 }
 
 
 }
 
 
@@ -944,42 +914,32 @@ GNUNET_STATISTICS_create (const char *subsystem,
  * it).
  *
  * @param h statistics handle to destroy
  * it).
  *
  * @param h statistics handle to destroy
- * @param sync_first set to GNUNET_YES if pending SET requests should
+ * @param sync_first set to #GNUNET_YES if pending SET requests should
  *        be completed
  */
 void
  *        be completed
  */
 void
-GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
+GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
+                           int sync_first)
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
-  struct GNUNET_TIME_Relative timeout;
-  int i;
 
   if (NULL == h)
     return;
 
   if (NULL == h)
     return;
-  GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
-  if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
+  GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
+  if ( (sync_first) &&
+       (NULL != h->mq) &&
+       (0 != GNUNET_MQ_get_length (h->mq)) )
   {
   {
-    GNUNET_SCHEDULER_cancel (h->backoff_task);
-    h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-  if (sync_first)
-  {
-    if (NULL != h->current)
-    {
-      if (ACTION_GET == h->current->type)
-      {
-        GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-        h->th = NULL;
-        free_action_item (h->current);
-        h->current = NULL;
-      }
-    }
-    next = h->action_head; 
+    if ( (NULL != h->current) &&
+         (ACTION_GET == h->current->type) )
+      h->current->aborted = GNUNET_YES;
+    next = h->action_head;
     while (NULL != (pos = next))
     {
       next = pos->next;
     while (NULL != (pos = next))
     {
       next = pos->next;
-      if (ACTION_GET == pos->type)
+      if ( (ACTION_GET == pos->type) ||
+           (ACTION_WATCH == pos->type) )
       {
        GNUNET_CONTAINER_DLL_remove (h->action_head,
                                     h->action_tail,
       {
        GNUNET_CONTAINER_DLL_remove (h->action_head,
                                     h->action_tail,
@@ -987,25 +947,21 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
         free_action_item (pos);
       }
     }
         free_action_item (pos);
       }
     }
-    if ( (NULL == h->current) &&
-        (NULL != (h->current = h->action_head)) )
-      GNUNET_CONTAINER_DLL_remove (h->action_head,
-                                  h->action_tail,
-                                  h->current);
     h->do_destroy = GNUNET_YES;
     h->do_destroy = GNUNET_YES;
-    if ((NULL != h->current) && (NULL == h->th) &&
-       (NULL != h->client))
-    {
-      timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-      h->th =
-       GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
-                                            timeout, GNUNET_YES,
-                                            &transmit_action, h);
-      GNUNET_assert (NULL != h->th);
-    }
-    if (NULL != h->th)
-      return; /* do not finish destruction just yet */
+    schedule_action (h);
+    GNUNET_assert (NULL == h->destroy_task);
+    h->destroy_task
+      = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
+                                                                     5),
+                                      &do_destroy,
+                                      h);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Deferring destruction\n");
+    return; /* do not finish destruction just yet */
   }
   }
+  /* do clean up all */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Cleaning all up\n");
   while (NULL != (pos = h->action_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->action_head,
   while (NULL != (pos = h->action_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->action_head,
@@ -1014,108 +970,95 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
     free_action_item (pos);
   }
   do_disconnect (h);
     free_action_item (pos);
   }
   do_disconnect (h);
-  for (i = 0; i < h->watches_size; i++)
+  if (NULL != h->backoff_task)
+  {
+    GNUNET_SCHEDULER_cancel (h->backoff_task);
+    h->backoff_task = NULL;
+  }
+  if (NULL != h->destroy_task)
+  {
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = NULL;
+  }
+  for (unsigned int i = 0; i < h->watches_size; i++)
   {
     if (NULL == h->watches[i])
   {
     if (NULL == h->watches[i])
-      continue; 
+      continue;
     GNUNET_free (h->watches[i]->subsystem);
     GNUNET_free (h->watches[i]->name);
     GNUNET_free (h->watches[i]);
   }
     GNUNET_free (h->watches[i]->subsystem);
     GNUNET_free (h->watches[i]->name);
     GNUNET_free (h->watches[i]);
   }
-  GNUNET_array_grow (h->watches, h->watches_size, 0);
+  GNUNET_array_grow (h->watches,
+                     h->watches_size,
+                     0);
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
 
 
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
 
 
-/**
- * Function called to transmit TEST message to service to
- * confirm that the service has received all of our 'SET'
- * messages (during statistics disconnect/shutdown).
- *
- * @param cls the 'struct GNUNET_STATISTICS_Handle'
- * @param size how many bytes can we write to buf
- * @param buf where to write requests to the service
- * @return number of bytes written to buf
- */
-static size_t
-transmit_test_on_shutdown (void *cls,
-                          size_t size,
-                          void *buf)
-{
-  struct GNUNET_STATISTICS_Handle *h = cls;
-  struct GNUNET_MessageHeader hdr;
-
-  h->th = NULL;
-  if (NULL == buf)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-               _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n"));
-    h->do_destroy = GNUNET_NO;
-    GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
-                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-    return 0;
-  }
-  hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST);
-  hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
-  memcpy (buf, &hdr, sizeof (hdr));
-  if (GNUNET_YES != h->receiving)
-  {
-    h->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
-  return sizeof (struct GNUNET_MessageHeader);
-}
-
-
 /**
  * Schedule the next action to be performed.
  *
 /**
  * Schedule the next action to be performed.
  *
- * @param h statistics handle
+ * @param cls statistics handle
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h)
+schedule_action (void *cls)
 {
 {
-  struct GNUNET_TIME_Relative timeout;
+  struct GNUNET_STATISTICS_Handle *h = cls;
 
 
-  if ( (NULL != h->th) ||
-       (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) )
+  if (NULL != h->backoff_task)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
   {
     reconnect_later (h);
     return;
   }
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
   {
     reconnect_later (h);
     return;
   }
-  if (NULL != h->current)
-    return; /* action already pending */
+  if (0 < GNUNET_MQ_get_length (h->mq))
+    return; /* Wait for queue to be reduced more */    
   /* schedule next action */
   /* schedule next action */
-  h->current = h->action_head;
-  if (NULL == h->current)
+  while (NULL == h->current)
   {
   {
-    if (GNUNET_YES == h->do_destroy)
+    h->current = h->action_head;
+    if (NULL == h->current)
     {
     {
+      struct GNUNET_MessageHeader *hdr;
+      struct GNUNET_MQ_Envelope *env;
+
+      if (GNUNET_YES != h->do_destroy)
+        return; /* nothing to do */
+      /* let service know that we're done */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Notifying service that we are done\n");
       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
-      h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                                  sizeof (struct GNUNET_MessageHeader),
-                                                  SET_TRANSMIT_TIMEOUT,
-                                                  GNUNET_NO,
-                                                  &transmit_test_on_shutdown, h);
+      env = GNUNET_MQ_msg (hdr,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
+      GNUNET_MQ_notify_sent (env,
+                             &schedule_action,
+                             h);
+      GNUNET_MQ_send (h->mq,
+                      env);
+      return;
+    }
+    GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                 h->action_tail,
+                                 h->current);
+    switch (h->current->type)
+    {
+    case ACTION_GET:
+      transmit_get (h);
+      break;
+    case ACTION_SET:
+    case ACTION_UPDATE:
+      transmit_set (h);
+      break;
+    case ACTION_WATCH:
+      transmit_watch (h);
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
     }
     }
-    return;
-  }
-  GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
-  timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-  if (NULL ==
-      (h->th =
-       GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
-                                            timeout, GNUNET_YES,
-                                            &transmit_action, h)))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Failed to transmit request to statistics service.\n");
-    do_disconnect (h);
-    reconnect_later (h);
   }
 }
 
   }
 }
 
@@ -1126,20 +1069,19 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
- * @param timeout after how long should we give up (and call
- *        cont with an error code)?
  * @param cont continuation to call when done (can be NULL)
  *        This callback CANNOT destroy the statistics handle in the same call.
  * @param proc function to call on each value
  * @param cont continuation to call when done (can be NULL)
  *        This callback CANNOT destroy the statistics handle in the same call.
  * @param proc function to call on each value
- * @param cls closure for cont and proc
+ * @param cls closure for @a cont and @a proc
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
-                       const char *subsystem, const char *name,
-                       struct GNUNET_TIME_Relative timeout,
+                       const char *subsystem,
+                       const char *name,
                        GNUNET_STATISTICS_Callback cont,
                        GNUNET_STATISTICS_Callback cont,
-                       GNUNET_STATISTICS_Iterator proc, void *cls)
+                       GNUNET_STATISTICS_Iterator proc,
+                       void *cls)
 {
   size_t slen1;
   size_t slen2;
 {
   size_t slen1;
   size_t slen2;
@@ -1157,17 +1099,17 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
   slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
   slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
-  ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
+                                   handle->action_tail,
                                    ai);
   schedule_action (handle);
   return ai;
                                    ai);
   schedule_action (handle);
   return ai;
@@ -1185,17 +1127,18 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
 {
   if (NULL == gh)
     return;
 {
   if (NULL == gh)
     return;
+  gh->cont = NULL;
   if (gh->sh->current == gh)
   {
     gh->aborted = GNUNET_YES;
   if (gh->sh->current == gh)
   {
     gh->aborted = GNUNET_YES;
+    return;
   }
   }
-  else
-  {
-    GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
-    GNUNET_free (gh->name);
-    GNUNET_free (gh->subsystem);
-    GNUNET_free (gh);
-  }
+  GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
+                               gh->sh->action_tail,
+                               gh);
+  GNUNET_free (gh->name);
+  GNUNET_free (gh->subsystem);
+  GNUNET_free (gh);
 }
 
 
 }
 
 
@@ -1206,71 +1149,78 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
- * @param proc_cls closure for proc
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 int
 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
  */
 int
 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
-                         const char *subsystem, const char *name,
-                         GNUNET_STATISTICS_Iterator proc, void *proc_cls)
+                         const char *subsystem,
+                         const char *name,
+                         GNUNET_STATISTICS_Iterator proc,
+                         void *proc_cls)
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 
   if (NULL == handle)
     return GNUNET_SYSERR;
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 
   if (NULL == handle)
     return GNUNET_SYSERR;
-  w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
+  w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
   w->subsystem = GNUNET_strdup (subsystem);
   w->name = GNUNET_strdup (name);
   w->proc = proc;
   w->proc_cls = proc_cls;
   w->subsystem = GNUNET_strdup (subsystem);
   w->name = GNUNET_strdup (name);
   w->proc = proc;
   w->proc_cls = proc_cls;
-  GNUNET_array_append (handle->watches, handle->watches_size, w);
-  schedule_watch_request (handle, w);
+  GNUNET_array_append (handle->watches,
+                       handle->watches_size,
+                       w);
+  schedule_watch_request (handle,
+                          w);
   return GNUNET_OK;
 }
 
 
 /**
   return GNUNET_OK;
 }
 
 
 /**
- * Stop watching statistics from the peer.  
+ * Stop watching statistics from the peer.
  *
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
  *
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
- * @param proc_cls closure for proc
- * @return GNUNET_OK on success, GNUNET_SYSERR on error (no such watch)
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
  */
 int
 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
  */
 int
 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
-                               const char *subsystem, const char *name,
-                               GNUNET_STATISTICS_Iterator proc, void *proc_cls)
+                               const char *subsystem,
+                                const char *name,
+                               GNUNET_STATISTICS_Iterator proc,
+                                void *proc_cls)
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
-  unsigned int i;
 
   if (NULL == handle)
     return GNUNET_SYSERR;
 
   if (NULL == handle)
     return GNUNET_SYSERR;
-  for (i=0;i<handle->watches_size;i++)
+  for (unsigned int i=0;i<handle->watches_size;i++)
   {
     w = handle->watches[i];
     if (NULL == w)
       continue;
     if ( (w->proc == proc) &&
         (w->proc_cls == proc_cls) &&
   {
     w = handle->watches[i];
     if (NULL == w)
       continue;
     if ( (w->proc == proc) &&
         (w->proc_cls == proc_cls) &&
-        (0 == strcmp (w->name, name)) &&
-        (0 == strcmp (w->subsystem, subsystem)) )
+        (0 == strcmp (w->name,
+                      name)) &&
+        (0 == strcmp (w->subsystem,
+                      subsystem)) )
     {
       GNUNET_free (w->name);
       GNUNET_free (w->subsystem);
       GNUNET_free (w);
     {
       GNUNET_free (w->name);
       GNUNET_free (w->subsystem);
       GNUNET_free (w);
-      handle->watches[i] = NULL;      
+      handle->watches[i] = NULL;
       return GNUNET_OK;
       return GNUNET_OK;
-    }   
+    }
   }
   return GNUNET_SYSERR;
 }
 
 
   }
   return GNUNET_SYSERR;
 }
 
 
-
 /**
  * Queue a request to change a statistic.
  *
 /**
  * Queue a request to change a statistic.
  *
@@ -1278,11 +1228,14 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
  * @param name name of the value
  * @param make_persistent  should the value be kept across restarts?
  * @param value new value or change
  * @param name name of the value
  * @param make_persistent  should the value be kept across restarts?
  * @param value new value or change
- * @param type type of the action (ACTION_SET or ACTION_UPDATE)
+ * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
  */
 static void
  */
 static void
-add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
-                   int make_persistent, uint64_t value, enum ActionType type)
+add_setter_action (struct GNUNET_STATISTICS_Handle *h,
+                   const char *name,
+                   int make_persistent,
+                   uint64_t value,
+                   enum ActionType type)
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
@@ -1290,8 +1243,6 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   size_t nsize;
   int64_t delta;
 
   size_t nsize;
   int64_t delta;
 
-  GNUNET_assert (NULL != h);
-  GNUNET_assert (NULL != name);
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
@@ -1302,8 +1253,10 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   }
   for (ai = h->action_head; NULL != ai; ai = ai->next)
   {
   }
   for (ai = h->action_head; NULL != ai; ai = ai->next)
   {
-    if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
-           (0 == strcmp (ai->name, name)) && 
+    if (! ( (0 == strcmp (ai->subsystem,
+                         h->subsystem)) &&
+           (0 == strcmp (ai->name,
+                         name)) &&
            ( (ACTION_UPDATE == ai->type) ||
              (ACTION_SET == ai->type) ) ) )
       continue;
            ( (ACTION_UPDATE == ai->type) ||
              (ACTION_SET == ai->type) ) ) )
       continue;
@@ -1347,12 +1300,14 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
        ai->type = type;
       }
     }
        ai->type = type;
       }
     }
-    ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
-    ai->make_persistent = make_persistent;
-    return;  
+    ai->timeout
+      = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
+    ai->make_persistent
+      = make_persistent;
+    return;
   }
   /* no existing entry matches, create a fresh one */
   }
   /* no existing entry matches, create a fresh one */
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
@@ -1361,7 +1316,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
-  GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
                                    ai);
   schedule_action (h);
 }
                                    ai);
   schedule_action (h);
 }
@@ -1378,12 +1334,18 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
  */
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
-                       const char *name, uint64_t value, int make_persistent)
+                       const char *name,
+                       uint64_t value,
+                       int make_persistent)
 {
   if (NULL == handle)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
 {
   if (NULL == handle)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, value, ACTION_SET);
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     value,
+                     ACTION_SET);
 }
 
 
 }
 
 
@@ -1398,14 +1360,19 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
-                          const char *name, int64_t delta, int make_persistent)
+                          const char *name,
+                          int64_t delta,
+                          int make_persistent)
 {
   if (NULL == handle)
     return;
   if (0 == delta)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
 {
   if (NULL == handle)
     return;
   if (0 == delta)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, (uint64_t) delta,
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     (uint64_t) delta,
                      ACTION_UPDATE);
 }
 
                      ACTION_UPDATE);
 }