migrate secretsharing to new service API
[oweals/gnunet.git] / src / statistics / statistics_api.c
index e117e971c00f3f8cc688dba8940553bfef58193a..ad4453b2a62a0e8515317e0d972bdd1c740aabab 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
 */
 
 /**
  * @author Christian Grothoff
  */
 #include "platform.h"
  * @author Christian Grothoff
  */
 #include "platform.h"
-#include "gnunet_client_lib.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
 #include "gnunet_constants.h"
-#include "gnunet_container_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_protocols.h"
-#include "gnunet_server_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_statistics_service.h"
-#include "gnunet_strings_lib.h"
 #include "statistics.h"
 
 /**
  * How long do we wait until a statistics request for setting
  * a value times out?  (The update will be lost if the
 #include "statistics.h"
 
 /**
  * How long do we wait until a statistics request for setting
  * a value times out?  (The update will be lost if the
- * service does not react within this timeframe).  
+ * service does not react within this timeframe).
  */
 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
 
  */
 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
 
+#define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
 
 /**
  * Types of actions.
  */
 enum ActionType
 {
 
 /**
  * Types of actions.
  */
 enum ActionType
 {
+  /**
+   * Get a value.
+   */
   ACTION_GET,
   ACTION_GET,
+
+  /**
+   * Set a value.
+   */
   ACTION_SET,
   ACTION_SET,
+
+  /**
+   * Update a value.
+   */
   ACTION_UPDATE,
   ACTION_UPDATE,
+
+  /**
+   * Watch a value.
+   */
   ACTION_WATCH
 };
 
   ACTION_WATCH
 };
 
@@ -75,7 +88,7 @@ struct GNUNET_STATISTICS_WatchEntry
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc
+   * Closure for @e proc
    */
   void *proc_cls;
 
    */
   void *proc_cls;
 
@@ -124,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc and cont.
+   * Closure for @e proc and @e cont.
    */
   void *cls;
 
    */
   void *cls;
 
@@ -149,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle
   int aborted;
 
   /**
   int aborted;
 
   /**
-   * Is this a GET, SET, UPDATE or WATCH?
+   * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
    */
   enum ActionType type;
 
    */
   enum ActionType type;
 
@@ -177,14 +190,9 @@ struct GNUNET_STATISTICS_Handle
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Message queue to the service.
    */
    */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * Currently pending transmission request.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Head of the linked list of pending actions (first action
 
   /**
    * Head of the linked list of pending actions (first action
@@ -212,7 +220,12 @@ struct GNUNET_STATISTICS_Handle
   /**
    * Task doing exponential back-off trying to reconnect.
    */
   /**
    * Task doing exponential back-off trying to reconnect.
    */
-  GNUNET_SCHEDULER_TaskIdentifier backoff_task;
+  struct GNUNET_SCHEDULER_Task *backoff_task;
+
+  /**
+   * Task for running #do_destroy().
+   */
+  struct GNUNET_SCHEDULER_Task *destroy_task;
 
   /**
    * Time for next connect retry.
 
   /**
    * Time for next connect retry.
@@ -220,7 +233,17 @@ struct GNUNET_STATISTICS_Handle
   struct GNUNET_TIME_Relative backoff;
 
   /**
   struct GNUNET_TIME_Relative backoff;
 
   /**
-   * Size of the 'watches' array.
+   * Maximum heap size observed so far (if available).
+   */
+  uint64_t peak_heap_size;
+
+  /**
+   * Maximum resident set side observed so far (if available).
+   */
+  uint64_t peak_rss;
+
+  /**
+   * Size of the @e watches array.
    */
   unsigned int watches_size;
 
    */
   unsigned int watches_size;
 
@@ -238,49 +261,91 @@ struct GNUNET_STATISTICS_Handle
 };
 
 
 };
 
 
-
 /**
 /**
- * Schedule the next action to be performed.
+ * Obtain statistics about this process's memory consumption and
+ * report those as well (if they changed).
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h);
+update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
+{
+#if ENABLE_HEAP_STATISTICS
+  uint64_t current_heap_size = 0;
+  uint64_t current_rss = 0;
+
+  if (GNUNET_NO != h->do_destroy)
+    return;
+#if HAVE_MALLINFO
+  {
+    struct mallinfo mi;
+
+    mi = mallinfo();
+    current_heap_size = mi.uordblks + mi.fordblks;
+  }
+#endif
+#if HAVE_GETRUSAGE
+  {
+    struct rusage ru;
+
+    if (0 == getrusage (RUSAGE_SELF, &ru))
+    {
+      current_rss = 1024LL * ru.ru_maxrss;
+    }
+  }
+#endif
+  if (current_heap_size > h->peak_heap_size)
+  {
+    h->peak_heap_size = current_heap_size;
+    GNUNET_STATISTICS_set (h,
+                          "# peak heap size",
+                          current_heap_size,
+                          GNUNET_NO);
+  }
+  if (current_rss > h->peak_rss)
+  {
+    h->peak_rss = current_rss;
+    GNUNET_STATISTICS_set (h,
+                          "# peak resident set size",
+                          current_rss,
+                          GNUNET_NO);
+  }
+#endif
+}
+
 
 /**
 
 /**
- * Try to (re)connect to the statistics service.
+ * Reconnect at a later time, respecting back-off.
  *
  *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
+ * @param h statistics handle
  */
  */
-static int
-try_connect (struct GNUNET_STATISTICS_Handle *ret);
+static void
+reconnect_later (struct GNUNET_STATISTICS_Handle *h);
 
 
 
 
+/**
+ * Schedule the next action to be performed.
+ *
+ * @param cls statistics handle to reconnect
+ */
 static void
 static void
-insert_ai (struct GNUNET_STATISTICS_Handle *h,
-           struct GNUNET_STATISTICS_GetHandle *ai)
-{
-  GNUNET_CONTAINER_DLL_insert_after (h->action_head, h->action_tail,
-                                     h->action_tail, ai);
-  if (h->action_head == ai)
-    schedule_action (h);
-}
+schedule_action (void *cls);
 
 
 
 
+/**
+ * Transmit request to service that we want to watch
+ * the development of a particular value.
+ *
+ * @param h statistics handle
+ * @param watch watch entry of the value to watch
+ */
 static void
 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
                         struct GNUNET_STATISTICS_WatchEntry *watch)
 {
 static void
 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
                         struct GNUNET_STATISTICS_WatchEntry *watch)
 {
-
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 
-  GNUNET_assert (h != NULL);
-  if (GNUNET_YES != try_connect (h))
-  {
-    schedule_action (h);
-    return;
-  }
   slen = strlen (watch->subsystem) + 1;
   nlen = strlen (watch->name) + 1;
   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
   slen = strlen (watch->subsystem) + 1;
   nlen = strlen (watch->name) + 1;
   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
@@ -289,7 +354,7 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
     GNUNET_break (0);
     return;
   }
     GNUNET_break (0);
     return;
   }
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (watch->subsystem);
   ai->name = GNUNET_strdup (watch->name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (watch->subsystem);
   ai->name = GNUNET_strdup (watch->name);
@@ -298,366 +363,502 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
   ai->type = ACTION_WATCH;
   ai->proc = watch->proc;
   ai->cls = watch->proc_cls;
   ai->type = ACTION_WATCH;
   ai->proc = watch->proc;
   ai->cls = watch->proc_cls;
-  insert_ai (h, ai);
-}
-
-
-/**
- * Try to (re)connect to the statistics service.
- *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
- */
-static int
-try_connect (struct GNUNET_STATISTICS_Handle *ret)
-{
-  unsigned int i;
-
-  if (ret->client != NULL)
-    return GNUNET_YES;
-  ret->client = GNUNET_CLIENT_connect ("statistics", ret->cfg);
-  if (ret->client != NULL)
-  {
-    for (i = 0; i < ret->watches_size; i++)
-      schedule_watch_request (ret, ret->watches[i]);
-    return GNUNET_YES;
-  }
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to statistics service!\n"));
-#endif
-  return GNUNET_NO;
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
+                                   ai);
+  schedule_action (h);
 }
 
 
 /**
  * Free memory associated with the given action item.
 }
 
 
 /**
  * Free memory associated with the given action item.
+ *
+ * @param gh action item to free
  */
 static void
  */
 static void
-free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
+free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
 {
 {
-  GNUNET_free_non_null (ai->subsystem);
-  GNUNET_free_non_null (ai->name);
-  GNUNET_free (ai);
+  GNUNET_free_non_null (gh->subsystem);
+  GNUNET_free_non_null (gh->name);
+  GNUNET_free (gh);
 }
 
 
 /**
 }
 
 
 /**
- * GET processing is complete, tell client about it.
+ * Disconnect from the statistics service.
+ *
+ * @param h statistics handle to disconnect from
  */
 static void
  */
 static void
-finish (struct GNUNET_STATISTICS_Handle *h, int code)
+do_disconnect (struct GNUNET_STATISTICS_Handle *h)
 {
 {
-  struct GNUNET_STATISTICS_GetHandle *pos = h->current;
+  struct GNUNET_STATISTICS_GetHandle *c;
 
 
-  h->current = NULL;
-  schedule_action (h);
-  if (pos != NULL)
+  h->receiving = GNUNET_NO;
+  if (NULL != (c = h->current))
   {
   {
-    if (pos->cont != NULL)
-      pos->cont (pos->cls, code);
-    free_action_item (pos);
+    h->current = NULL;
+    if ( (NULL != c->cont) &&
+        (GNUNET_YES != c->aborted) )
+    {
+      c->cont (c->cls,
+               GNUNET_SYSERR);
+      c->cont = NULL;
+    }
+    free_action_item (c);
+  }
+  if (NULL != h->mq)
+  {
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
 }
 
 
 /**
   }
 }
 
 
 /**
- * Process the message.
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
  *
  *
- * @return GNUNET_OK if the message was well-formed
+ * @param cls statistics handle
+ * @param smsg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
  */
 static int
  */
 static int
-process_message (struct GNUNET_STATISTICS_Handle *h,
-                 const struct GNUNET_MessageHeader *msg)
+check_statistics_value (void *cls,
+                        const struct GNUNET_STATISTICS_ReplyMessage *smsg)
 {
 {
-  char *service;
-  char *name;
-  const struct GNUNET_STATISTICS_ReplyMessage *smsg;
+  const char *service;
+  const char *name;
   uint16_t size;
 
   uint16_t size;
 
-  if (h->current->aborted)
-  {
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Iteration was aborted, ignoring VALUE\n");
-#endif
-    return GNUNET_OK;           /* don't bother */
-  }
-  size = ntohs (msg->size);
-  if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
+  size = ntohs (smsg->header.size);
   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
   if (size !=
   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
   if (size !=
-      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
-                                      &service, &name))
+      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                      size,
+                                      2,
+                                      &service,
+                                      &name))
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received valid statistic on `%s:%s': %llu\n", service, name,
-              GNUNET_ntohll (smsg->value));
-#endif
+  return GNUNET_OK;
+}
+
+
+/**
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
+ *
+ * @param cls statistics handle
+ * @param msg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
+ */
+static void
+handle_statistics_value (void *cls,
+                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  const char *service;
+  const char *name;
+  uint16_t size;
+
+  if (h->current->aborted)
+    return;           /* iteration aborted, don't bother */
+
+  size = ntohs (smsg->header.size);
+  size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
+  GNUNET_assert (size ==
+                 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                                 size,
+                                                 2,
+                                                 &service,
+                                                 &name));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received valid statistic on `%s:%s': %llu\n",
+       service, name,
+       GNUNET_ntohll (smsg->value));
   if (GNUNET_OK !=
   if (GNUNET_OK !=
-      h->current->proc (h->current->cls, service, name,
+      h->current->proc (h->current->cls,
+                        service,
+                        name,
                         GNUNET_ntohll (smsg->value),
                         0 !=
                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
   {
                         GNUNET_ntohll (smsg->value),
                         0 !=
                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
   {
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Processing of remaining statistics aborted by client.\n");
-#endif
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Processing of remaining statistics aborted by client.\n");
     h->current->aborted = GNUNET_YES;
   }
     h->current->aborted = GNUNET_YES;
   }
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
-#endif
-  return GNUNET_OK;
 }
 
 
 }
 
 
-static int
-process_watch_value (struct GNUNET_STATISTICS_Handle *h,
-                     const struct GNUNET_MessageHeader *msg)
+/**
+ * We have received a watch value from the service.  Process it.
+ *
+ * @param cls statistics handle
+ * @param msg the watch value message
+ */
+static void
+handle_statistics_watch_value (void *cls,
+                               const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
 {
 {
-  const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
+  struct GNUNET_STATISTICS_Handle *h = cls;
   struct GNUNET_STATISTICS_WatchEntry *w;
   uint32_t wid;
 
   struct GNUNET_STATISTICS_WatchEntry *w;
   uint32_t wid;
 
-  if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
   GNUNET_break (0 == ntohl (wvm->reserved));
   wid = ntohl (wvm->wid);
   if (wid >= h->watches_size)
   {
   GNUNET_break (0 == ntohl (wvm->reserved));
   wid = ntohl (wvm->wid);
   if (wid >= h->watches_size)
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
   }
   w = h->watches[wid];
   }
   w = h->watches[wid];
-  (void) w->proc (w->proc_cls, w->subsystem, w->name,
+  if (NULL == w)
+    return;
+  (void) w->proc (w->proc_cls,
+                  w->subsystem,
+                  w->name,
                   GNUNET_ntohll (wvm->value),
                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
                   GNUNET_ntohll (wvm->value),
                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
-  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Function called with messages from stats service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
  *
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
+ * @param error error code
  */
 static void
  */
 static void
-receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 
-  if (msg == NULL)
+  if (GNUNET_NO != h->do_destroy)
   {
   {
-    if (NULL != h->client)
+    h->do_destroy = GNUNET_NO;
+    if (NULL != h->destroy_task)
     {
     {
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = NULL;
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+      h->destroy_task = NULL;
     }
     }
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                "Error receiving statistics from service, is the service running?\n");
-#endif
-    finish (h, GNUNET_SYSERR);
+    GNUNET_STATISTICS_destroy (h,
+                               GNUNET_NO);
     return;
   }
     return;
   }
-  switch (ntohs (msg->type))
+  do_disconnect (h);
+  reconnect_later (h);
+}
+
+
+/**
+ * Task used to destroy the statistics handle.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle`
+ */
+static void
+do_destroy (void *cls)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->destroy_task = NULL;
+  h->do_destroy = GNUNET_NO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Running final destruction\n");
+  GNUNET_STATISTICS_destroy (h,
+                             GNUNET_NO);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
+ * message. We receive this message at the end of the shutdown when
+ * the service confirms that all data has been written to disk.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_disconnect_confirm (void *cls,
+                          const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  if (GNUNET_SYSERR != h->do_destroy)
   {
   {
-  case GNUNET_MESSAGE_TYPE_STATISTICS_END:
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
-#endif
-    h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    if (h->watches_size > 0)
-    {
-      GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                             GNUNET_TIME_UNIT_FOREVER_REL);
-    }
-    else
-    {
-      h->receiving = GNUNET_NO;
-    }
-    finish (h, GNUNET_OK);
+    /* not in shutdown, why do we get 'TEST'? */
+    GNUNET_break (0);
+    do_disconnect (h);
+    reconnect_later (h);
     return;
     return;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
-    if (GNUNET_OK == process_message (h, msg))
-    {
-      /* finally, look for more! */
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Processing VALUE done, now reading more\n");
-#endif
-      GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                             GNUNET_TIME_absolute_get_remaining (h->
-                                                                 current->timeout));
-      h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-      return;
-    }
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
+  if (NULL != h->destroy_task)
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+  h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                              h);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
+ * this message in response to a query to indicate that there are no
+ * further matching results.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_statistics_end (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  struct GNUNET_STATISTICS_GetHandle *c;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received end of statistics marker\n");
+  if (NULL == (c = h->current))
+  {
     GNUNET_break (0);
     GNUNET_break (0);
-    break;
-  case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
-    if (GNUNET_OK == process_watch_value (h, msg))
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
+  }
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  h->current = NULL;
+  schedule_action (h);
+  if (NULL != c->cont)
+  {
+    c->cont (c->cls,
+             GNUNET_OK);
+    c->cont = NULL;
+  }
+  free_action_item (c);
+}
+
+
+/**
+ * Try to (re)connect to the statistics service.
+ *
+ * @param h statistics handle to reconnect
+ * @return #GNUNET_YES on success, #GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_STATISTICS_Handle *h)
+{
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (disconnect_confirm,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_fixed_size (statistics_end,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_END,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_var_size (statistics_value,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
+                           struct GNUNET_STATISTICS_ReplyMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (statistics_watch_value,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
+                             struct GNUNET_STATISTICS_WatchValueMessage,
+                             h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_STATISTICS_GetHandle *gh;
+  struct GNUNET_STATISTICS_GetHandle *gn;
+
+  if (NULL != h->backoff_task)
+    return GNUNET_NO;
+  if (NULL != h->mq)
+    return GNUNET_YES;
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                 "statistics",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to connect to statistics service!\n");
+    return GNUNET_NO;
+  }
+  gn = h->action_head;
+  while (NULL != (gh = gn))
+  {
+    gn = gh->next;
+    if (gh->type == ACTION_WATCH)
     {
     {
-      h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-      GNUNET_assert (h->watches_size > 0);
-      GNUNET_CLIENT_receive (h->client, &receive_stats, h,
-                             GNUNET_TIME_UNIT_FOREVER_REL);
-      return;
+      GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                   h->action_tail,
+                                   gh);
+      free_action_item (gh);
     }
     }
-    GNUNET_break (0);
-    break;
-  default:
-    GNUNET_break (0);
-    break;
   }
   }
-  if (NULL != h->client)
+  for (unsigned int i = 0; i < h->watches_size; i++)
+    if (NULL != h->watches[i])
+      schedule_watch_request (h,
+                              h->watches[i]);
+  return GNUNET_YES;
+}
+
+
+/**
+ * We've waited long enough, reconnect now.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
+ */
+static void
+reconnect_task (void *cls)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->backoff_task = NULL;
+  schedule_action (h);
+}
+
+
+/**
+ * Reconnect at a later time, respecting back-off.
+ *
+ * @param h statistics handle
+ */
+static void
+reconnect_later (struct GNUNET_STATISTICS_Handle *h)
+{
+  int loss;
+  struct GNUNET_STATISTICS_GetHandle *gh;
+
+  GNUNET_assert (NULL == h->backoff_task);
+  if (GNUNET_YES == h->do_destroy)
   {
   {
-    GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-    h->client = NULL;
+    /* So we are shutting down and the service is not reachable.
+     * Chances are that it's down for good and we are not going to connect to
+     * it anymore.
+     * Give up and don't sync the rest of the data.
+     */
+    loss = GNUNET_NO;
+    for (gh = h->action_head; NULL != gh; gh = gh->next)
+      if ( (gh->make_persistent) &&
+          (ACTION_SET == gh->type) )
+       loss = GNUNET_YES;
+    if (GNUNET_YES == loss)
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Could not save some persistent statistics\n"));
+    if (NULL != h->destroy_task)
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                                h);
+    return;
   }
   }
-  finish (h, GNUNET_SYSERR);
+  h->backoff_task
+    = GNUNET_SCHEDULER_add_delayed (h->backoff,
+                                    &reconnect_task,
+                                    h);
+  h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
 }
 
 
 }
 
 
+
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
+ *
+ * @param handle statistics handle
  */
  */
-static size_t
-transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_get (struct GNUNET_STATISTICS_Handle *handle)
 {
 {
+  struct GNUNET_STATISTICS_GetHandle *c;
   struct GNUNET_MessageHeader *hdr;
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
 
-  if (buf == NULL)
-  {
-    /* timeout / error */
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Transmission of request for statistics failed!\n");
-#endif
-    finish (handle, GNUNET_SYSERR);
-    return 0;
-  }
-  slen1 = strlen (handle->current->subsystem) + 1;
-  slen2 = strlen (handle->current->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
+  GNUNET_assert (NULL != (c = handle->current));
+  slen1 = strlen (c->subsystem) + 1;
+  slen2 = strlen (c->name) + 1;
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_GET);
   GNUNET_assert (slen1 + slen2 ==
   GNUNET_assert (slen1 + slen2 ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
-                                             handle->current->subsystem,
-                                             handle->current->name));
-  if (!handle->receiving)
-  {
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Transmission of GET done, now reading response\n");
-#endif
-    handle->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
-                           GNUNET_TIME_absolute_get_remaining (handle->
-                                                               current->timeout));
-  }
-  return msize;
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
+                                             c->subsystem,
+                                             c->name));
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 /**
  * Transmit a WATCH request (and if successful, start to receive
  * the response).
 }
 
 
 /**
  * Transmit a WATCH request (and if successful, start to receive
  * the response).
+ *
+ * @param handle statistics handle
  */
  */
-static size_t
-transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
 
-  if (buf == NULL)
-  {
-    /* timeout / error */
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Transmission of request for statistics failed!\n");
-#endif
-    finish (handle, GNUNET_SYSERR);
-    return 0;
-  }
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
-              handle->current->name);
-#endif
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmitting watch request for `%s'\n",
+       handle->current->name);
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
   GNUNET_assert (slen1 + slen2 ==
   GNUNET_assert (slen1 + slen2 ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
                                              handle->current->subsystem,
                                              handle->current->name));
                                              handle->current->subsystem,
                                              handle->current->name));
-  if (GNUNET_YES != handle->receiving)
-  {
-    handle->receiving = GNUNET_YES;
-    GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
-  finish (handle, GNUNET_OK);
-  return msize;
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
+  GNUNET_assert (NULL == handle->current->cont);
+  free_action_item (handle->current);
+  handle->current = NULL;
+  schedule_action (handle);
 }
 
 
 /**
  * Transmit a SET/UPDATE request.
 }
 
 
 /**
  * Transmit a SET/UPDATE request.
+ *
+ * @param handle statistics handle
  */
  */
-static size_t
-transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_set (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_STATISTICS_SetMessage *r;
 {
   struct GNUNET_STATISTICS_SetMessage *r;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen;
   size_t nlen;
   size_t slen;
   size_t nlen;
-  size_t nsize;
-
-  if (NULL == buf)
-  {
-    finish (handle, GNUNET_SYSERR);
-    return 0;
-  }
 
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
 
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
-  nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
-  if (size < nsize)
-  {
-    GNUNET_break (0);
-    finish (handle, GNUNET_SYSERR);
-    return 0;
-  }
-  r = buf;
-  r->header.size = htons (nsize);
-  r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
+  env = GNUNET_MQ_msg_extra (r,
+                             slen + nlen,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_SET);
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
@@ -665,39 +866,20 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
   if (handle->current->type == ACTION_UPDATE)
     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
   GNUNET_assert (slen + nlen ==
   if (handle->current->type == ACTION_UPDATE)
     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
   GNUNET_assert (slen + nlen ==
-                 GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
+                 GNUNET_STRINGS_buffer_fill ((char *) &r[1],
+                                             slen + nlen,
+                                             2,
                                              handle->current->subsystem,
                                              handle->current->name));
                                              handle->current->subsystem,
                                              handle->current->name));
-  finish (handle, GNUNET_OK);
-  return nsize;
-}
-
-
-static size_t
-transmit_action (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_STATISTICS_Handle *handle = cls;
-  size_t ret;
-
-  handle->th = NULL;
-  switch (handle->current->type)
-  {
-  case ACTION_GET:
-    ret = transmit_get (handle, size, buf);
-    break;
-  case ACTION_SET:
-  case ACTION_UPDATE:
-    ret = transmit_set (handle, size, buf);
-    break;
-  case ACTION_WATCH:
-    ret = transmit_watch (handle, size, buf);
-    break;
-  default:
-    ret = 0;
-    GNUNET_break (0);
-    break;
-  }
-  return ret;
+  GNUNET_assert (NULL == handle->current->cont);
+  free_action_item (handle->current);
+  handle->current = NULL;
+  update_memory_statistics (handle);
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 }
 
 
@@ -712,21 +894,18 @@ struct GNUNET_STATISTICS_Handle *
 GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
 GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct GNUNET_STATISTICS_Handle *ret;
-
-  GNUNET_assert (subsystem != NULL);
-  GNUNET_assert (cfg != NULL);
-  ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
-  ret->cfg = cfg;
-  ret->subsystem = GNUNET_strdup (subsystem);
-  ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-  if (GNUNET_YES != try_connect (ret))
-  {
-    GNUNET_free (ret->subsystem);
-    GNUNET_free (ret);
+  struct GNUNET_STATISTICS_Handle *h;
+
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                            "statistics",
+                                            "DISABLE"))
     return NULL;
     return NULL;
-  }
-  return ret;
+  h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
+  h->cfg = cfg;
+  h->subsystem = GNUNET_strdup (subsystem);
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  return h;
 }
 
 
 }
 
 
@@ -735,160 +914,151 @@ GNUNET_STATISTICS_create (const char *subsystem,
  * it).
  *
  * @param h statistics handle to destroy
  * it).
  *
  * @param h statistics handle to destroy
- * @param sync_first set to GNUNET_YES if pending SET requests should
+ * @param sync_first set to #GNUNET_YES if pending SET requests should
  *        be completed
  */
 void
  *        be completed
  */
 void
-GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
+GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
+                           int sync_first)
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
-  struct GNUNET_STATISTICS_GetHandle *prev;
-  struct GNUNET_TIME_Relative timeout;
-  int i;
 
 
-  if (h == NULL)
+  if (NULL == h)
     return;
     return;
-  if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
-    GNUNET_SCHEDULER_cancel (h->backoff_task);
-  if (sync_first)
+  GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
+  if ( (sync_first) &&
+       (NULL != h->mq) &&
+       (0 != GNUNET_MQ_get_length (h->mq)) )
   {
   {
-    if (h->current != NULL)
-    {
-      if (h->current->type == ACTION_GET)
-      {
-        GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-        h->th = NULL;
-        free_action_item (h->current);
-        h->current = NULL;
-      }
-    }
-    pos = h->action_head;
-    prev = NULL;
-    while (pos != NULL)
+    if ( (NULL != h->current) &&
+         (ACTION_GET == h->current->type) )
+      h->current->aborted = GNUNET_YES;
+    next = h->action_head;
+    while (NULL != (pos = next))
     {
       next = pos->next;
     {
       next = pos->next;
-      if (pos->type == ACTION_GET)
+      if ( (ACTION_GET == pos->type) ||
+           (ACTION_WATCH == pos->type) )
       {
       {
-        if (prev == NULL)
-          h->action_head = next;
-        else
-          prev->next = next;
+       GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                    h->action_tail,
+                                    pos);
         free_action_item (pos);
       }
         free_action_item (pos);
       }
-      else
-      {
-        prev = pos;
-      }
-      pos = next;
-    }
-    h->action_tail = prev;
-    if (h->current == NULL)
-    {
-      h->current = h->action_head;
-      if (h->action_head != NULL)
-      {
-        h->action_head = h->action_head->next;
-        if (h->action_head == NULL)
-          h->action_tail = NULL;
-      }
     }
     h->do_destroy = GNUNET_YES;
     }
     h->do_destroy = GNUNET_YES;
-    if ((h->current != NULL) && (h->th == NULL))
-    {
-      timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-      h->th =
-          GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
-                                               timeout, GNUNET_YES,
-                                               &transmit_action, h);
-      GNUNET_assert (NULL != h->th);
-    }
-    if (h->th != NULL)
-      return;
-  }
-  if (NULL != h->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
+    schedule_action (h);
+    GNUNET_assert (NULL == h->destroy_task);
+    h->destroy_task
+      = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
+                                                                     5),
+                                      &do_destroy,
+                                      h);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Deferring destruction\n");
+    return; /* do not finish destruction just yet */
   }
   }
-  if (h->current != NULL)
-    free_action_item (h->current);
+  /* do clean up all */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Cleaning all up\n");
   while (NULL != (pos = h->action_head))
   {
   while (NULL != (pos = h->action_head))
   {
-    h->action_head = pos->next;
+    GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                h->action_tail,
+                                pos);
     free_action_item (pos);
   }
     free_action_item (pos);
   }
-  if (h->client != NULL)
+  do_disconnect (h);
+  if (NULL != h->backoff_task)
   {
   {
-    GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
-    h->client = NULL;
+    GNUNET_SCHEDULER_cancel (h->backoff_task);
+    h->backoff_task = NULL;
+  }
+  if (NULL != h->destroy_task)
+  {
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = NULL;
   }
   }
-  for (i = 0; i < h->watches_size; i++)
+  for (unsigned int i = 0; i < h->watches_size; i++)
   {
   {
+    if (NULL == h->watches[i])
+      continue;
     GNUNET_free (h->watches[i]->subsystem);
     GNUNET_free (h->watches[i]->name);
     GNUNET_free (h->watches[i]);
   }
     GNUNET_free (h->watches[i]->subsystem);
     GNUNET_free (h->watches[i]->name);
     GNUNET_free (h->watches[i]);
   }
-  GNUNET_array_grow (h->watches, h->watches_size, 0);
+  GNUNET_array_grow (h->watches,
+                     h->watches_size,
+                     0);
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
 
 
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
 
 
-static void
-finish_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct GNUNET_STATISTICS_Handle *h = cls;
-
-  h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
-  finish (h, GNUNET_SYSERR);
-}
-
-
 /**
  * Schedule the next action to be performed.
 /**
  * Schedule the next action to be performed.
+ *
+ * @param cls statistics handle
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h)
+schedule_action (void *cls)
 {
 {
-  struct GNUNET_TIME_Relative timeout;
+  struct GNUNET_STATISTICS_Handle *h = cls;
 
 
-  if (h->current != NULL)
+  if (NULL != h->backoff_task)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
   {
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
   {
-    h->backoff_task =
-        GNUNET_SCHEDULER_add_delayed (h->backoff, &finish_task, h);
-    h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
-    h->backoff =
-        GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
+    reconnect_later (h);
     return;
   }
     return;
   }
-
+  if (0 < GNUNET_MQ_get_length (h->mq))
+    return; /* Wait for queue to be reduced more */    
   /* schedule next action */
   /* schedule next action */
-  h->current = h->action_head;
-  if (NULL == h->current)
+  while (NULL == h->current)
   {
   {
-    if (h->do_destroy)
+    h->current = h->action_head;
+    if (NULL == h->current)
+    {
+      struct GNUNET_MessageHeader *hdr;
+      struct GNUNET_MQ_Envelope *env;
+
+      if (GNUNET_YES != h->do_destroy)
+        return; /* nothing to do */
+      /* let service know that we're done */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Notifying service that we are done\n");
+      h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
+      env = GNUNET_MQ_msg (hdr,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
+      GNUNET_MQ_notify_sent (env,
+                             &schedule_action,
+                             h);
+      GNUNET_MQ_send (h->mq,
+                      env);
+      return;
+    }
+    GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                 h->action_tail,
+                                 h->current);
+    switch (h->current->type)
     {
     {
-      h->do_destroy = GNUNET_NO;
-      GNUNET_STATISTICS_destroy (h, GNUNET_YES);
+    case ACTION_GET:
+      transmit_get (h);
+      break;
+    case ACTION_SET:
+    case ACTION_UPDATE:
+      transmit_set (h);
+      break;
+    case ACTION_WATCH:
+      transmit_watch (h);
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
     }
     }
-    return;
-  }
-  GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
-  timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-  if (NULL ==
-      (h->th =
-       GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
-                                            timeout, GNUNET_YES,
-                                            &transmit_action, h)))
-  {
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Failed to transmit request to statistics service.\n");
-#endif
-    finish (h, GNUNET_SYSERR);
   }
 }
 
   }
 }
 
@@ -899,62 +1069,55 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
- * @param timeout after how long should we give up (and call
- *        cont with an error code)?
  * @param cont continuation to call when done (can be NULL)
  * @param cont continuation to call when done (can be NULL)
+ *        This callback CANNOT destroy the statistics handle in the same call.
  * @param proc function to call on each value
  * @param proc function to call on each value
- * @param cls closure for cont and proc
+ * @param cls closure for @a cont and @a proc
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
-                       const char *subsystem, const char *name,
-                       struct GNUNET_TIME_Relative timeout,
+                       const char *subsystem,
+                       const char *name,
                        GNUNET_STATISTICS_Callback cont,
                        GNUNET_STATISTICS_Callback cont,
-                       GNUNET_STATISTICS_Iterator proc, void *cls)
+                       GNUNET_STATISTICS_Iterator proc,
+                       void *cls)
 {
   size_t slen1;
   size_t slen2;
   struct GNUNET_STATISTICS_GetHandle *ai;
 
 {
   size_t slen1;
   size_t slen2;
   struct GNUNET_STATISTICS_GetHandle *ai;
 
-  GNUNET_assert (handle != NULL);
-  GNUNET_assert (proc != NULL);
-  GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  if (GNUNET_YES != try_connect (handle))
-  {
-#if DEBUG_STATISTICS
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Failed to connect to statistics service, can not get value `%s:%s'.\n",
-                strlen (subsystem) ? subsystem : "*",
-                strlen (name) ? name : "*");
-#endif
+  if (NULL == handle)
     return NULL;
     return NULL;
-  }
-  if (subsystem == NULL)
+  GNUNET_assert (NULL != proc);
+  GNUNET_assert (GNUNET_NO == handle->do_destroy);
+  if (NULL == subsystem)
     subsystem = "";
     subsystem = "";
-  if (name == NULL)
+  if (NULL == name)
     name = "";
   slen1 = strlen (subsystem) + 1;
   slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
     name = "";
   slen1 = strlen (subsystem) + 1;
   slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
-  ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  insert_ai (handle, ai);
+  GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
+                                   handle->action_tail,
+                                   ai);
+  schedule_action (handle);
   return ai;
 }
 
 
 /**
   return ai;
 }
 
 
 /**
- * Cancel a 'get' request.  Must be called before the 'cont' 
+ * Cancel a 'get' request.  Must be called before the 'cont'
  * function is called.
  *
  * @param gh handle of the request to cancel
  * function is called.
  *
  * @param gh handle of the request to cancel
@@ -962,55 +1125,117 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
 void
 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
 {
 void
 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
 {
+  if (NULL == gh)
+    return;
+  gh->cont = NULL;
   if (gh->sh->current == gh)
   {
     gh->aborted = GNUNET_YES;
   if (gh->sh->current == gh)
   {
     gh->aborted = GNUNET_YES;
+    return;
   }
   }
-  else
-  {
-    GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
-    GNUNET_free (gh->name);
-    GNUNET_free (gh->subsystem);
-    GNUNET_free (gh);
-  }
+  GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
+                               gh->sh->action_tail,
+                               gh);
+  GNUNET_free (gh->name);
+  GNUNET_free (gh->subsystem);
+  GNUNET_free (gh);
 }
 
 
 /**
  * Watch statistics from the peer (be notified whenever they change).
 }
 
 
 /**
  * Watch statistics from the peer (be notified whenever they change).
- * Note that the only way to cancel a "watch" request is to destroy
- * the statistics handle given as the first argument to this call.
  *
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
  *
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, never NULL
  * @param name name of the statistic value, never NULL
  * @param proc function to call on each value
- * @param proc_cls closure for proc
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 int
 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
  */
 int
 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
-                         const char *subsystem, const char *name,
-                         GNUNET_STATISTICS_Iterator proc, void *proc_cls)
+                         const char *subsystem,
+                         const char *name,
+                         GNUNET_STATISTICS_Iterator proc,
+                         void *proc_cls)
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 
 {
   struct GNUNET_STATISTICS_WatchEntry *w;
 
-  if (handle == NULL)
+  if (NULL == handle)
     return GNUNET_SYSERR;
     return GNUNET_SYSERR;
-  w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
+  w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
   w->subsystem = GNUNET_strdup (subsystem);
   w->name = GNUNET_strdup (name);
   w->proc = proc;
   w->proc_cls = proc_cls;
   w->subsystem = GNUNET_strdup (subsystem);
   w->name = GNUNET_strdup (name);
   w->proc = proc;
   w->proc_cls = proc_cls;
-  GNUNET_array_append (handle->watches, handle->watches_size, w);
-  schedule_watch_request (handle, w);
+  GNUNET_array_append (handle->watches,
+                       handle->watches_size,
+                       w);
+  schedule_watch_request (handle,
+                          w);
   return GNUNET_OK;
 }
 
 
   return GNUNET_OK;
 }
 
 
+/**
+ * Stop watching statistics from the peer.
+ *
+ * @param handle identification of the statistics service
+ * @param subsystem limit to the specified subsystem, never NULL
+ * @param name name of the statistic value, never NULL
+ * @param proc function to call on each value
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
+ */
+int
+GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
+                               const char *subsystem,
+                                const char *name,
+                               GNUNET_STATISTICS_Iterator proc,
+                                void *proc_cls)
+{
+  struct GNUNET_STATISTICS_WatchEntry *w;
+
+  if (NULL == handle)
+    return GNUNET_SYSERR;
+  for (unsigned int i=0;i<handle->watches_size;i++)
+  {
+    w = handle->watches[i];
+    if (NULL == w)
+      continue;
+    if ( (w->proc == proc) &&
+        (w->proc_cls == proc_cls) &&
+        (0 == strcmp (w->name,
+                      name)) &&
+        (0 == strcmp (w->subsystem,
+                      subsystem)) )
+    {
+      GNUNET_free (w->name);
+      GNUNET_free (w->subsystem);
+      GNUNET_free (w);
+      handle->watches[i] = NULL;
+      return GNUNET_OK;
+    }
+  }
+  return GNUNET_SYSERR;
+}
+
+
+/**
+ * Queue a request to change a statistic.
+ *
+ * @param h statistics handle
+ * @param name name of the value
+ * @param make_persistent  should the value be kept across restarts?
+ * @param value new value or change
+ * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
+ */
 static void
 static void
-add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
-                   int make_persistent, uint64_t value, enum ActionType type)
+add_setter_action (struct GNUNET_STATISTICS_Handle *h,
+                   const char *name,
+                   int make_persistent,
+                   uint64_t value,
+                   enum ActionType type)
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
@@ -1018,10 +1243,6 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   size_t nsize;
   int64_t delta;
 
   size_t nsize;
   int64_t delta;
 
-  GNUNET_assert (h != NULL);
-  GNUNET_assert (name != NULL);
-  if (GNUNET_YES != try_connect (h))
-    return;
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
@@ -1030,55 +1251,63 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
     GNUNET_break (0);
     return;
   }
     GNUNET_break (0);
     return;
   }
-  ai = h->action_head;
-  while (ai != NULL)
+  for (ai = h->action_head; NULL != ai; ai = ai->next)
   {
   {
-    if ((0 == strcmp (ai->subsystem, h->subsystem)) &&
-        (0 == strcmp (ai->name, name)) && ((ai->type == ACTION_UPDATE) ||
-                                           (ai->type == ACTION_SET)))
+    if (! ( (0 == strcmp (ai->subsystem,
+                         h->subsystem)) &&
+           (0 == strcmp (ai->name,
+                         name)) &&
+           ( (ACTION_UPDATE == ai->type) ||
+             (ACTION_SET == ai->type) ) ) )
+      continue;
+    if (ACTION_SET == ai->type)
     {
     {
-      if (ai->type == ACTION_SET)
+      if (ACTION_UPDATE == type)
       {
       {
-        if (type == ACTION_UPDATE)
+       delta = (int64_t) value;
+       if (delta > 0)
         {
         {
-          delta = (int64_t) value;
-          if (delta > 0)
-          {
-            ai->value += delta;
-          }
-          else
-          {
-            if (ai->value < -delta)
-              ai->value = 0;
-            else
-              ai->value += delta;
-          }
-        }
-        else
+         /* update old set by new delta */
+         ai->value += delta;
+       }
+       else
         {
         {
-          ai->value = value;
-        }
+         /* update old set by new delta, but never go negative */
+         if (ai->value < -delta)
+           ai->value = 0;
+         else
+           ai->value += delta;
+       }
       }
       else
       {
       }
       else
       {
-        if (type == ACTION_UPDATE)
-        {
-          delta = (int64_t) value;
-          ai->value += delta;
-        }
-        else
-        {
-          ai->value = value;
-          ai->type = type;
-        }
+       /* new set overrides old set */
+       ai->value = value;
       }
       }
-      ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
-      ai->make_persistent = make_persistent;
-      return;
     }
     }
-    ai = ai->next;
+    else
+    {
+      if (ACTION_UPDATE == type)
+      {
+       /* make delta cummulative */
+       delta = (int64_t) value;
+       ai->value += delta;
+      }
+      else
+      {
+       /* drop old 'update', use new 'set' instead */
+       ai->value = value;
+       ai->type = type;
+      }
+    }
+    ai->timeout
+      = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
+    ai->make_persistent
+      = make_persistent;
+    return;
   }
   }
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  /* no existing entry matches, create a fresh one */
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
@@ -1087,7 +1316,10 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
-  insert_ai (h, ai);
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
+                                   ai);
+  schedule_action (h);
 }
 
 
 }
 
 
@@ -1102,12 +1334,18 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
  */
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
-                       const char *name, uint64_t value, int make_persistent)
+                       const char *name,
+                       uint64_t value,
+                       int make_persistent)
 {
 {
-  if (handle == NULL)
+  if (NULL == handle)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, value, ACTION_SET);
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     value,
+                     ACTION_SET);
 }
 
 
 }
 
 
@@ -1122,14 +1360,19 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
  */
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
-                          const char *name, int64_t delta, int make_persistent)
+                          const char *name,
+                          int64_t delta,
+                          int make_persistent)
 {
 {
-  if (handle == NULL)
+  if (NULL == handle)
     return;
     return;
-  if (delta == 0)
+  if (0 == delta)
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
     return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, (uint64_t) delta,
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     (uint64_t) delta,
                      ACTION_UPDATE);
 }
 
                      ACTION_UPDATE);
 }