migrate secretsharing to new service API
[oweals/gnunet.git] / src / statistics / statistics_api.c
index 5c5588e918491c37475beeb3b775314be8befb10..ad4453b2a62a0e8515317e0d972bdd1c740aabab 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
 */
 
 /**
  * @author Christian Grothoff
  */
 #include "platform.h"
  * @author Christian Grothoff
  */
 #include "platform.h"
-#include "gnunet_client_lib.h"
-#include "gnunet_container_lib.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_constants.h"
 #include "gnunet_protocols.h"
 #include "gnunet_protocols.h"
-#include "gnunet_server_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_statistics_service.h"
-#include "gnunet_strings_lib.h"
 #include "statistics.h"
 
 /**
  * How long do we wait until a statistics request for setting
  * a value times out?  (The update will be lost if the
 #include "statistics.h"
 
 /**
  * How long do we wait until a statistics request for setting
  * a value times out?  (The update will be lost if the
- * service does not react within this timeframe).  
+ * service does not react within this timeframe).
  */
  */
-#define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
+#define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
 
 
+#define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
 
 /**
  * Types of actions.
  */
 enum ActionType
 {
 
 /**
  * Types of actions.
  */
 enum ActionType
 {
+  /**
+   * Get a value.
+   */
   ACTION_GET,
   ACTION_GET,
+
+  /**
+   * Set a value.
+   */
   ACTION_SET,
   ACTION_SET,
-  ACTION_UPDATE
+
+  /**
+   * Update a value.
+   */
+  ACTION_UPDATE,
+
+  /**
+   * Watch a value.
+   */
+  ACTION_WATCH
+};
+
+
+/**
+ * Entry kept for each value we are watching.
+ */
+struct GNUNET_STATISTICS_WatchEntry
+{
+
+  /**
+   * What subsystem is this action about? (never NULL)
+   */
+  char *subsystem;
+
+  /**
+   * What value is this action about? (never NULL)
+   */
+  char *name;
+
+  /**
+   * Function to call
+   */
+  GNUNET_STATISTICS_Iterator proc;
+
+  /**
+   * Closure for @e proc
+   */
+  void *proc_cls;
+
 };
 
 
 };
 
 
@@ -71,7 +115,7 @@ struct GNUNET_STATISTICS_GetHandle
    * Main statistics handle.
    */
   struct GNUNET_STATISTICS_Handle *sh;
    * Main statistics handle.
    */
   struct GNUNET_STATISTICS_Handle *sh;
+
   /**
    * What subsystem is this action about? (can be NULL)
    */
   /**
    * What subsystem is this action about? (can be NULL)
    */
@@ -93,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle
   GNUNET_STATISTICS_Iterator proc;
 
   /**
   GNUNET_STATISTICS_Iterator proc;
 
   /**
-   * Closure for proc and cont.
+   * Closure for @e proc and @e cont.
    */
   void *cls;
 
    */
   void *cls;
 
@@ -118,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle
   int aborted;
 
   /**
   int aborted;
 
   /**
-   * Is this a GET, SET or UPDATE?
+   * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
    */
   enum ActionType type;
 
    */
   enum ActionType type;
 
@@ -135,11 +179,6 @@ struct GNUNET_STATISTICS_GetHandle
  */
 struct GNUNET_STATISTICS_Handle
 {
  */
 struct GNUNET_STATISTICS_Handle
 {
-  /**
-   * Our scheduler.
-   */
-  struct GNUNET_SCHEDULER_Handle *sched;
-
   /**
    * Name of our subsystem.
    */
   /**
    * Name of our subsystem.
    */
@@ -151,14 +190,9 @@ struct GNUNET_STATISTICS_Handle
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
-   */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * Currently pending transmission request.
+   * Message queue to the service.
    */
    */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Head of the linked list of pending actions (first action
 
   /**
    * Head of the linked list of pending actions (first action
@@ -178,256 +212,653 @@ struct GNUNET_STATISTICS_Handle
    */
   struct GNUNET_STATISTICS_GetHandle *current;
 
    */
   struct GNUNET_STATISTICS_GetHandle *current;
 
+  /**
+   * Array of watch entries.
+   */
+  struct GNUNET_STATISTICS_WatchEntry **watches;
+
+  /**
+   * Task doing exponential back-off trying to reconnect.
+   */
+  struct GNUNET_SCHEDULER_Task *backoff_task;
+
+  /**
+   * Task for running #do_destroy().
+   */
+  struct GNUNET_SCHEDULER_Task *destroy_task;
+
+  /**
+   * Time for next connect retry.
+   */
+  struct GNUNET_TIME_Relative backoff;
+
+  /**
+   * Maximum heap size observed so far (if available).
+   */
+  uint64_t peak_heap_size;
+
+  /**
+   * Maximum resident set side observed so far (if available).
+   */
+  uint64_t peak_rss;
+
+  /**
+   * Size of the @e watches array.
+   */
+  unsigned int watches_size;
+
   /**
    * Should this handle auto-destruct once all actions have
    * been processed?
    */
   int do_destroy;
 
   /**
    * Should this handle auto-destruct once all actions have
    * been processed?
    */
   int do_destroy;
 
+  /**
+   * Are we currently receiving from the service?
+   */
+  int receiving;
+
 };
 
 
 /**
 };
 
 
 /**
- * Try to (re)connect to the statistics service.
- *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
+ * Obtain statistics about this process's memory consumption and
+ * report those as well (if they changed).
  */
  */
-static int
-try_connect (struct GNUNET_STATISTICS_Handle *ret)
+static void
+update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
 {
 {
-  if (ret->client != NULL)
-    return GNUNET_OK;
-  ret->client = GNUNET_CLIENT_connect (ret->sched, "statistics", ret->cfg);
-  if (ret->client != NULL)
-    return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to statistics service!\n"));
+#if ENABLE_HEAP_STATISTICS
+  uint64_t current_heap_size = 0;
+  uint64_t current_rss = 0;
+
+  if (GNUNET_NO != h->do_destroy)
+    return;
+#if HAVE_MALLINFO
+  {
+    struct mallinfo mi;
+
+    mi = mallinfo();
+    current_heap_size = mi.uordblks + mi.fordblks;
+  }
+#endif
+#if HAVE_GETRUSAGE
+  {
+    struct rusage ru;
+
+    if (0 == getrusage (RUSAGE_SELF, &ru))
+    {
+      current_rss = 1024LL * ru.ru_maxrss;
+    }
+  }
+#endif
+  if (current_heap_size > h->peak_heap_size)
+  {
+    h->peak_heap_size = current_heap_size;
+    GNUNET_STATISTICS_set (h,
+                          "# peak heap size",
+                          current_heap_size,
+                          GNUNET_NO);
+  }
+  if (current_rss > h->peak_rss)
+  {
+    h->peak_rss = current_rss;
+    GNUNET_STATISTICS_set (h,
+                          "# peak resident set size",
+                          current_rss,
+                          GNUNET_NO);
+  }
 #endif
 #endif
-  return GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * Free memory associated with the given action item.
+ * Reconnect at a later time, respecting back-off.
+ *
+ * @param h statistics handle
  */
 static void
  */
 static void
-free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
-{
-  GNUNET_free_non_null (ai->subsystem);
-  GNUNET_free_non_null (ai->name);
-  GNUNET_free (ai);
-}
+reconnect_later (struct GNUNET_STATISTICS_Handle *h);
 
 
 /**
  * Schedule the next action to be performed.
 
 
 /**
  * Schedule the next action to be performed.
+ *
+ * @param cls statistics handle to reconnect
  */
  */
-static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
+static void
+schedule_action (void *cls);
 
 
 /**
 
 
 /**
- * GET processing is complete, tell client about it.
+ * Transmit request to service that we want to watch
+ * the development of a particular value.
+ *
+ * @param h statistics handle
+ * @param watch watch entry of the value to watch
  */
 static void
  */
 static void
-finish (struct GNUNET_STATISTICS_Handle *h, int code)
+schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
+                        struct GNUNET_STATISTICS_WatchEntry *watch)
 {
 {
-  struct GNUNET_STATISTICS_GetHandle *pos = h->current;
-  h->current = NULL;
+  struct GNUNET_STATISTICS_GetHandle *ai;
+  size_t slen;
+  size_t nlen;
+  size_t nsize;
+
+  slen = strlen (watch->subsystem) + 1;
+  nlen = strlen (watch->name) + 1;
+  nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
+  if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
+  ai->sh = h;
+  ai->subsystem = GNUNET_strdup (watch->subsystem);
+  ai->name = GNUNET_strdup (watch->name);
+  ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+  ai->msize = nsize;
+  ai->type = ACTION_WATCH;
+  ai->proc = watch->proc;
+  ai->cls = watch->proc_cls;
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
+                                   ai);
   schedule_action (h);
   schedule_action (h);
-  if (pos->cont != NULL)
-    pos->cont (pos->cls, code);
-  free_action_item (pos);
 }
 
 
 /**
 }
 
 
 /**
- * Process the message.
+ * Free memory associated with the given action item.
+ *
+ * @param gh action item to free
+ */
+static void
+free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
+{
+  GNUNET_free_non_null (gh->subsystem);
+  GNUNET_free_non_null (gh->name);
+  GNUNET_free (gh);
+}
+
+
+/**
+ * Disconnect from the statistics service.
  *
  *
- * @return GNUNET_OK if the message was well-formed
+ * @param h statistics handle to disconnect from
+ */
+static void
+do_disconnect (struct GNUNET_STATISTICS_Handle *h)
+{
+  struct GNUNET_STATISTICS_GetHandle *c;
+
+  h->receiving = GNUNET_NO;
+  if (NULL != (c = h->current))
+  {
+    h->current = NULL;
+    if ( (NULL != c->cont) &&
+        (GNUNET_YES != c->aborted) )
+    {
+      c->cont (c->cls,
+               GNUNET_SYSERR);
+      c->cont = NULL;
+    }
+    free_action_item (c);
+  }
+  if (NULL != h->mq)
+  {
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
+  }
+}
+
+
+/**
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
+ *
+ * @param cls statistics handle
+ * @param smsg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
  */
 static int
  */
 static int
-process_message (struct GNUNET_STATISTICS_Handle *h,
-                 const struct GNUNET_MessageHeader *msg)
+check_statistics_value (void *cls,
+                        const struct GNUNET_STATISTICS_ReplyMessage *smsg)
 {
 {
-  char *service;
-  char *name;
-  const struct GNUNET_STATISTICS_ReplyMessage *smsg;
+  const char *service;
+  const char *name;
+  uint16_t size;
+
+  size = ntohs (smsg->header.size);
+  size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
+  if (size !=
+      GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                      size,
+                                      2,
+                                      &service,
+                                      &name))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
+ *
+ * @param cls statistics handle
+ * @param msg message received from the service, never NULL
+ * @return #GNUNET_OK if the message was well-formed
+ */
+static void
+handle_statistics_value (void *cls,
+                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  const char *service;
+  const char *name;
   uint16_t size;
 
   if (h->current->aborted)
   uint16_t size;
 
   if (h->current->aborted)
-    return GNUNET_OK;           /* don't bother */
-  size = ntohs (msg->size);
-  if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
+    return;           /* iteration aborted, don't bother */
+
+  size = ntohs (smsg->header.size);
   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
-  if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
-                                              size, 2, &service, &name))
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received valid statistic on `%s:%s': %llu\n",
-              service, name, GNUNET_ntohll (smsg->value));
-#endif
+  GNUNET_assert (size ==
+                 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
+                                                 size,
+                                                 2,
+                                                 &service,
+                                                 &name));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received valid statistic on `%s:%s': %llu\n",
+       service, name,
+       GNUNET_ntohll (smsg->value));
   if (GNUNET_OK !=
       h->current->proc (h->current->cls,
   if (GNUNET_OK !=
       h->current->proc (h->current->cls,
-                       service,
-                       name,
-                       GNUNET_ntohll (smsg->value),
-                       0 !=
-                       (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
-    {
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Processing of remaining statistics aborted by client.\n");
-#endif
-      h->current->aborted = GNUNET_YES;    
-    }
-  return GNUNET_OK;
+                        service,
+                        name,
+                        GNUNET_ntohll (smsg->value),
+                        0 !=
+                        (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Processing of remaining statistics aborted by client.\n");
+    h->current->aborted = GNUNET_YES;
+  }
 }
 
 
 /**
 }
 
 
 /**
- * Function called with messages from stats service.
+ * We have received a watch value from the service.  Process it.
  *
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param cls statistics handle
+ * @param msg the watch value message
  */
 static void
  */
 static void
-receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
+handle_statistics_watch_value (void *cls,
+                               const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
 {
   struct GNUNET_STATISTICS_Handle *h = cls;
+  struct GNUNET_STATISTICS_WatchEntry *w;
+  uint32_t wid;
+
+  GNUNET_break (0 == ntohl (wvm->reserved));
+  wid = ntohl (wvm->wid);
+  if (wid >= h->watches_size)
+  {
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
+  }
+  w = h->watches[wid];
+  if (NULL == w)
+    return;
+  (void) w->proc (w->proc_cls,
+                  w->subsystem,
+                  w->name,
+                  GNUNET_ntohll (wvm->value),
+                  0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
+}
+
 
 
-  if (msg == NULL)
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  if (GNUNET_NO != h->do_destroy)
+  {
+    h->do_destroy = GNUNET_NO;
+    if (NULL != h->destroy_task)
     {
     {
-      if (NULL != h->client)
-       {
-         GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-         h->client = NULL;
-       }
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                 "Error receiving statistics from service, is the service running?\n" );
-#endif
-      finish (h, GNUNET_SYSERR);
-      return;
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+      h->destroy_task = NULL;
     }
     }
-  switch (ntohs (msg->type))
-    {
-    case GNUNET_MESSAGE_TYPE_STATISTICS_END:
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received end of statistics marker\n");
-#endif
-      finish (h, GNUNET_OK);
-      return;
-    case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
-      if (GNUNET_OK == process_message (h, msg))
-        {
-          /* finally, look for more! */
-          GNUNET_CLIENT_receive (h->client,
-                                 &receive_stats,
-                                 h,
-                                 GNUNET_TIME_absolute_get_remaining
-                                 (h->current->timeout));
-          return;
-        }
-      GNUNET_break (0);
-      break;
-    default:
-      GNUNET_break (0);
-      break;
-    }
-  if (NULL != h->client)
+    GNUNET_STATISTICS_destroy (h,
+                               GNUNET_NO);
+    return;
+  }
+  do_disconnect (h);
+  reconnect_later (h);
+}
+
+
+/**
+ * Task used to destroy the statistics handle.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle`
+ */
+static void
+do_destroy (void *cls)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->destroy_task = NULL;
+  h->do_destroy = GNUNET_NO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Running final destruction\n");
+  GNUNET_STATISTICS_destroy (h,
+                             GNUNET_NO);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
+ * message. We receive this message at the end of the shutdown when
+ * the service confirms that all data has been written to disk.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_disconnect_confirm (void *cls,
+                          const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  if (GNUNET_SYSERR != h->do_destroy)
+  {
+    /* not in shutdown, why do we get 'TEST'? */
+    GNUNET_break (0);
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
+  if (NULL != h->destroy_task)
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+  h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                              h);
+}
+
+
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
+ * this message in response to a query to indicate that there are no
+ * further matching results.
+ *
+ * @param cls our `struct GNUNET_STATISTICS_Handle *`
+ * @param msg the message
+ */
+static void
+handle_statistics_end (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+  struct GNUNET_STATISTICS_GetHandle *c;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received end of statistics marker\n");
+  if (NULL == (c = h->current))
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    reconnect_later (h);
+    return;
+  }
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  h->current = NULL;
+  schedule_action (h);
+  if (NULL != c->cont)
+  {
+    c->cont (c->cls,
+             GNUNET_OK);
+    c->cont = NULL;
+  }
+  free_action_item (c);
+}
+
+
+/**
+ * Try to (re)connect to the statistics service.
+ *
+ * @param h statistics handle to reconnect
+ * @return #GNUNET_YES on success, #GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_STATISTICS_Handle *h)
+{
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (disconnect_confirm,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_fixed_size (statistics_end,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_END,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_hd_var_size (statistics_value,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
+                           struct GNUNET_STATISTICS_ReplyMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (statistics_watch_value,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
+                             struct GNUNET_STATISTICS_WatchValueMessage,
+                             h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_STATISTICS_GetHandle *gh;
+  struct GNUNET_STATISTICS_GetHandle *gn;
+
+  if (NULL != h->backoff_task)
+    return GNUNET_NO;
+  if (NULL != h->mq)
+    return GNUNET_YES;
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                 "statistics",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to connect to statistics service!\n");
+    return GNUNET_NO;
+  }
+  gn = h->action_head;
+  while (NULL != (gh = gn))
+  {
+    gn = gh->next;
+    if (gh->type == ACTION_WATCH)
     {
     {
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = NULL;
+      GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                   h->action_tail,
+                                   gh);
+      free_action_item (gh);
     }
     }
-  finish (h, GNUNET_SYSERR);
+  }
+  for (unsigned int i = 0; i < h->watches_size; i++)
+    if (NULL != h->watches[i])
+      schedule_watch_request (h,
+                              h->watches[i]);
+  return GNUNET_YES;
+}
+
+
+/**
+ * We've waited long enough, reconnect now.
+ *
+ * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
+ */
+static void
+reconnect_task (void *cls)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->backoff_task = NULL;
+  schedule_action (h);
 }
 
 
 }
 
 
+/**
+ * Reconnect at a later time, respecting back-off.
+ *
+ * @param h statistics handle
+ */
+static void
+reconnect_later (struct GNUNET_STATISTICS_Handle *h)
+{
+  int loss;
+  struct GNUNET_STATISTICS_GetHandle *gh;
+
+  GNUNET_assert (NULL == h->backoff_task);
+  if (GNUNET_YES == h->do_destroy)
+  {
+    /* So we are shutting down and the service is not reachable.
+     * Chances are that it's down for good and we are not going to connect to
+     * it anymore.
+     * Give up and don't sync the rest of the data.
+     */
+    loss = GNUNET_NO;
+    for (gh = h->action_head; NULL != gh; gh = gh->next)
+      if ( (gh->make_persistent) &&
+          (ACTION_SET == gh->type) )
+       loss = GNUNET_YES;
+    if (GNUNET_YES == loss)
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Could not save some persistent statistics\n"));
+    if (NULL != h->destroy_task)
+      GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
+                                                h);
+    return;
+  }
+  h->backoff_task
+    = GNUNET_SCHEDULER_add_delayed (h->backoff,
+                                    &reconnect_task,
+                                    h);
+  h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
+}
+
+
+
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
 /**
  * Transmit a GET request (and if successful, start to receive
  * the response).
+ *
+ * @param handle statistics handle
+ */
+static void
+transmit_get (struct GNUNET_STATISTICS_Handle *handle)
+{
+  struct GNUNET_STATISTICS_GetHandle *c;
+  struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
+  size_t slen1;
+  size_t slen2;
+
+  GNUNET_assert (NULL != (c = handle->current));
+  slen1 = strlen (c->subsystem) + 1;
+  slen2 = strlen (c->name) + 1;
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_GET);
+  GNUNET_assert (slen1 + slen2 ==
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
+                                             c->subsystem,
+                                             c->name));
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
+}
+
+
+/**
+ * Transmit a WATCH request (and if successful, start to receive
+ * the response).
+ *
+ * @param handle statistics handle
  */
  */
-static size_t
-transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_MessageHeader *hdr;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen1;
   size_t slen2;
   size_t slen1;
   size_t slen2;
-  uint16_t msize;
 
 
-  if (buf == NULL)
-    {
-      /* timeout / error */
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Transmission of request for statistics failed!\n");
-#endif
-      finish (handle, GNUNET_SYSERR);
-      return 0;
-    }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmitting watch request for `%s'\n",
+       handle->current->name);
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
   slen1 = strlen (handle->current->subsystem) + 1;
   slen2 = strlen (handle->current->name) + 1;
-  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  GNUNET_assert (msize <= size);
-  hdr = (struct GNUNET_MessageHeader *) buf;
-  hdr->size = htons (msize);
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
+  env = GNUNET_MQ_msg_extra (hdr,
+                             slen1 + slen2,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
   GNUNET_assert (slen1 + slen2 ==
                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
                                              slen1 + slen2,
                                              2,
                                              handle->current->subsystem,
                                              handle->current->name));
   GNUNET_assert (slen1 + slen2 ==
                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
                                              slen1 + slen2,
                                              2,
                                              handle->current->subsystem,
                                              handle->current->name));
-  GNUNET_CLIENT_receive (handle->client,
-                         &receive_stats,
-                         handle,
-                         GNUNET_TIME_absolute_get_remaining (handle->
-                                                             current->timeout));
-  return msize;
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
+  GNUNET_assert (NULL == handle->current->cont);
+  free_action_item (handle->current);
+  handle->current = NULL;
+  schedule_action (handle);
 }
 
 
 }
 
 
-
 /**
  * Transmit a SET/UPDATE request.
 /**
  * Transmit a SET/UPDATE request.
+ *
+ * @param handle statistics handle
  */
  */
-static size_t
-transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+static void
+transmit_set (struct GNUNET_STATISTICS_Handle *handle)
 {
   struct GNUNET_STATISTICS_SetMessage *r;
 {
   struct GNUNET_STATISTICS_SetMessage *r;
+  struct GNUNET_MQ_Envelope *env;
   size_t slen;
   size_t nlen;
   size_t slen;
   size_t nlen;
-  size_t nsize;
-
-  if (NULL == buf)
-    {
-      finish (handle, GNUNET_SYSERR);
-      return 0;
-    }
 
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
 
   slen = strlen (handle->current->subsystem) + 1;
   nlen = strlen (handle->current->name) + 1;
-  nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
-  if (size < nsize)
-    {
-      GNUNET_break (0);
-      finish (handle, GNUNET_SYSERR);
-      return 0;
-    }
-  r = buf;
-  r->header.size = htons (nsize);
-  r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
+  env = GNUNET_MQ_msg_extra (r,
+                             slen + nlen,
+                             GNUNET_MESSAGE_TYPE_STATISTICS_SET);
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
   r->flags = 0;
   r->value = GNUNET_htonll (handle->current->value);
   if (handle->current->make_persistent)
@@ -440,60 +871,41 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
                                              2,
                                              handle->current->subsystem,
                                              handle->current->name));
                                              2,
                                              handle->current->subsystem,
                                              handle->current->name));
-  finish (handle, GNUNET_OK);
-  return nsize;
-}
-
-
-static size_t
-transmit_action (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_STATISTICS_Handle *handle = cls;
-  size_t ret;
-
-  handle->th = NULL;
-  switch (handle->current->type)
-    {
-    case ACTION_GET:
-      ret = transmit_get (handle, size, buf);
-      break;
-    case ACTION_SET:
-    case ACTION_UPDATE:
-      ret = transmit_set (handle, size, buf);
-      break;
-    default:
-      ret = 0;
-      GNUNET_break (0);
-      break; 
-    }
-  return ret;
+  GNUNET_assert (NULL == handle->current->cont);
+  free_action_item (handle->current);
+  handle->current = NULL;
+  update_memory_statistics (handle);
+  GNUNET_MQ_notify_sent (env,
+                         &schedule_action,
+                         handle);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
 /**
  * Get handle for the statistics service.
  *
 }
 
 
 /**
  * Get handle for the statistics service.
  *
- * @param sched scheduler to use
  * @param subsystem name of subsystem using the service
  * @param cfg services configuration in use
  * @return handle to use
  */
 struct GNUNET_STATISTICS_Handle *
  * @param subsystem name of subsystem using the service
  * @param cfg services configuration in use
  * @return handle to use
  */
 struct GNUNET_STATISTICS_Handle *
-GNUNET_STATISTICS_create (struct GNUNET_SCHEDULER_Handle *sched,
-                          const char *subsystem,
+GNUNET_STATISTICS_create (const char *subsystem,
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
                           const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct GNUNET_STATISTICS_Handle *ret;
-
-  GNUNET_assert (subsystem != NULL);
-  GNUNET_assert (sched != NULL);
-  GNUNET_assert (cfg != NULL);
-  ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
-  ret->sched = sched;
-  ret->cfg = cfg;
-  ret->subsystem = GNUNET_strdup (subsystem);
-  try_connect (ret);
-  return ret;
+  struct GNUNET_STATISTICS_Handle *h;
+
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                            "statistics",
+                                            "DISABLE"))
+    return NULL;
+  h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
+  h->cfg = cfg;
+  h->subsystem = GNUNET_strdup (subsystem);
+  h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  return h;
 }
 
 
 }
 
 
@@ -502,153 +914,152 @@ GNUNET_STATISTICS_create (struct GNUNET_SCHEDULER_Handle *sched,
  * it).
  *
  * @param h statistics handle to destroy
  * it).
  *
  * @param h statistics handle to destroy
- * @param sync_first set to GNUNET_YES if pending SET requests should
+ * @param sync_first set to #GNUNET_YES if pending SET requests should
  *        be completed
  */
 void
 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
  *        be completed
  */
 void
 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
-                          int sync_first)
+                           int sync_first)
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
 {
   struct GNUNET_STATISTICS_GetHandle *pos;
   struct GNUNET_STATISTICS_GetHandle *next;
-  struct GNUNET_STATISTICS_GetHandle *prev;
-  struct GNUNET_TIME_Relative timeout;
 
 
-  if (sync_first)
-    {
-      if (h->current != NULL)
-       {
-         if (h->current->type == ACTION_GET)
-           {
-             GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-             h->th = NULL;
-             free_action_item (h->current);
-             h->current = NULL;
-           }
-       }
-      pos = h->action_head;
-      prev = NULL;
-      while (pos != NULL)
-       {
-         next = pos->next;
-         if (pos->type == ACTION_GET)
-           {
-             if (prev == NULL)
-               h->action_head = next;
-             else
-               prev->next = next;
-             free_action_item (pos);
-           }
-         else
-           {
-             prev = pos;
-           }
-         pos = next;
-       }
-      h->action_tail = prev;
-      if (h->current == NULL)
-       {
-         h->current = h->action_head;
-         if (h->action_head != NULL)
-           {
-             h->action_head = h->action_head->next;
-             if (h->action_head == NULL)
-               h->action_tail = NULL;
-           }
-       }
-      if ( (h->current != NULL) &&
-          (h->th == NULL) )
-       {                                       
-         timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-         h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                                      h->current->msize,
-                                                      timeout,
-                                                      GNUNET_YES,
-                                                      &transmit_action, h);
-         GNUNET_assert (NULL != h->th);
-       }
-      h->do_destroy = GNUNET_YES;
-      return;
-    }
-  if (NULL != h->th)
+  if (NULL == h)
+    return;
+  GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
+  if ( (sync_first) &&
+       (NULL != h->mq) &&
+       (0 != GNUNET_MQ_get_length (h->mq)) )
+  {
+    if ( (NULL != h->current) &&
+         (ACTION_GET == h->current->type) )
+      h->current->aborted = GNUNET_YES;
+    next = h->action_head;
+    while (NULL != (pos = next))
     {
     {
-      GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-      h->th = NULL;
+      next = pos->next;
+      if ( (ACTION_GET == pos->type) ||
+           (ACTION_WATCH == pos->type) )
+      {
+       GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                    h->action_tail,
+                                    pos);
+        free_action_item (pos);
+      }
     }
     }
-  if (h->current != NULL)
-    free_action_item (h->current);
+    h->do_destroy = GNUNET_YES;
+    schedule_action (h);
+    GNUNET_assert (NULL == h->destroy_task);
+    h->destroy_task
+      = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
+                                                                     5),
+                                      &do_destroy,
+                                      h);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Deferring destruction\n");
+    return; /* do not finish destruction just yet */
+  }
+  /* do clean up all */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Cleaning all up\n");
   while (NULL != (pos = h->action_head))
   while (NULL != (pos = h->action_head))
-    {
-      h->action_head = pos->next;
-      free_action_item (pos);
-    }
-  if (h->client != NULL)
-    {
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
-      h->client = NULL;
-    }
+  {
+    GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                h->action_tail,
+                                pos);
+    free_action_item (pos);
+  }
+  do_disconnect (h);
+  if (NULL != h->backoff_task)
+  {
+    GNUNET_SCHEDULER_cancel (h->backoff_task);
+    h->backoff_task = NULL;
+  }
+  if (NULL != h->destroy_task)
+  {
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_cancel (h->destroy_task);
+    h->destroy_task = NULL;
+  }
+  for (unsigned int i = 0; i < h->watches_size; i++)
+  {
+    if (NULL == h->watches[i])
+      continue;
+    GNUNET_free (h->watches[i]->subsystem);
+    GNUNET_free (h->watches[i]->name);
+    GNUNET_free (h->watches[i]);
+  }
+  GNUNET_array_grow (h->watches,
+                     h->watches_size,
+                     0);
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
 
 
   GNUNET_free (h->subsystem);
   GNUNET_free (h);
 }
 
 
-
 /**
  * Schedule the next action to be performed.
 /**
  * Schedule the next action to be performed.
+ *
+ * @param cls statistics handle
  */
 static void
  */
 static void
-schedule_action (struct GNUNET_STATISTICS_Handle *h)
+schedule_action (void *cls)
 {
 {
-  struct GNUNET_TIME_Relative timeout;
+  struct GNUNET_STATISTICS_Handle *h = cls;
 
 
-  if (h->current != NULL)
+  if (NULL != h->backoff_task)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
-    {
-      finish (h, GNUNET_SYSERR);
-      return;
-    }
-
+  {
+    reconnect_later (h);
+    return;
+  }
+  if (0 < GNUNET_MQ_get_length (h->mq))
+    return; /* Wait for queue to be reduced more */    
   /* schedule next action */
   /* schedule next action */
-  h->current = h->action_head;
-  if (NULL == h->current)
+  while (NULL == h->current)
+  {
+    h->current = h->action_head;
+    if (NULL == h->current)
     {
     {
-      if (h->do_destroy)
-       {
-         h->do_destroy = GNUNET_NO;
-         GNUNET_STATISTICS_destroy (h, GNUNET_YES);
-       }
+      struct GNUNET_MessageHeader *hdr;
+      struct GNUNET_MQ_Envelope *env;
+
+      if (GNUNET_YES != h->do_destroy)
+        return; /* nothing to do */
+      /* let service know that we're done */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Notifying service that we are done\n");
+      h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
+      env = GNUNET_MQ_msg (hdr,
+                           GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
+      GNUNET_MQ_notify_sent (env,
+                             &schedule_action,
+                             h);
+      GNUNET_MQ_send (h->mq,
+                      env);
       return;
     }
       return;
     }
-  GNUNET_CONTAINER_DLL_remove (h->action_head,
-                              h->action_tail,
-                              h->current);
-  timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
-  if (NULL ==
-      (h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                                   h->current->msize,
-                                                   timeout,
-                                                   GNUNET_YES,
-                                                   &transmit_action, h)))
+    GNUNET_CONTAINER_DLL_remove (h->action_head,
+                                 h->action_tail,
+                                 h->current);
+    switch (h->current->type)
     {
     {
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to statistics service.\n");
-#endif
-      finish (h, GNUNET_SYSERR);
+    case ACTION_GET:
+      transmit_get (h);
+      break;
+    case ACTION_SET:
+    case ACTION_UPDATE:
+      transmit_set (h);
+      break;
+    case ACTION_WATCH:
+      transmit_watch (h);
+      break;
+    default:
+      GNUNET_assert (0);
+      break;
     }
     }
-}
-
-
-static void
-insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
-{
-  GNUNET_CONTAINER_DLL_insert_after (h->action_head,
-                                    h->action_tail,
-                                    h->action_tail,
-                                    ai);                                    
-  if (h->action_head == ai)
-    schedule_action (h);
+  }
 }
 
 
 }
 
 
@@ -658,63 +1069,55 @@ insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandl
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
  * @param handle identification of the statistics service
  * @param subsystem limit to the specified subsystem, NULL for our subsystem
  * @param name name of the statistic value, NULL for all values
- * @param timeout after how long should we give up (and call
- *        cont with an error code)?
  * @param cont continuation to call when done (can be NULL)
  * @param cont continuation to call when done (can be NULL)
+ *        This callback CANNOT destroy the statistics handle in the same call.
  * @param proc function to call on each value
  * @param proc function to call on each value
- * @param cls closure for cont and proc
+ * @param cls closure for @a cont and @a proc
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
                        const char *subsystem,
                        const char *name,
  * @return NULL on error
  */
 struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
                        const char *subsystem,
                        const char *name,
-                       struct GNUNET_TIME_Relative timeout,
                        GNUNET_STATISTICS_Callback cont,
                        GNUNET_STATISTICS_Callback cont,
-                       GNUNET_STATISTICS_Iterator proc, void *cls)
+                       GNUNET_STATISTICS_Iterator proc,
+                       void *cls)
 {
   size_t slen1;
   size_t slen2;
   struct GNUNET_STATISTICS_GetHandle *ai;
 
 {
   size_t slen1;
   size_t slen2;
   struct GNUNET_STATISTICS_GetHandle *ai;
 
-  GNUNET_assert (handle != NULL);
-  GNUNET_assert (proc != NULL);
+  if (NULL == handle)
+    return NULL;
+  GNUNET_assert (NULL != proc);
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  if (GNUNET_YES != try_connect (handle))
-    {
-#if DEBUG_STATISTICS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to connect to statistics service, can not get value `%s:%s'.\n",
-                  strlen (subsystem) ? subsystem : "*",
-                  strlen (name) ? name : "*");
-#endif
-      return NULL;
-    }
-  if (subsystem == NULL)
+  if (NULL == subsystem)
     subsystem = "";
     subsystem = "";
-  if (name == NULL)
+  if (NULL == name)
     name = "";
     name = "";
-  slen1 = strlen (subsystem);
-  slen2 = strlen (name);
+  slen1 = strlen (subsystem) + 1;
+  slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
   ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
   ai->proc = proc;
   ai->cls = cls;
-  ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
-  insert_ai (handle, ai);
+  GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
+                                   handle->action_tail,
+                                   ai);
+  schedule_action (handle);
   return ai;
 }
 
 
 /**
   return ai;
 }
 
 
 /**
- * Cancel a 'get' request.  Must be called before the 'cont' 
+ * Cancel a 'get' request.  Must be called before the 'cont'
  * function is called.
  *
  * @param gh handle of the request to cancel
  * function is called.
  *
  * @param gh handle of the request to cancel
@@ -722,46 +1125,189 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
 void
 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
 {
 void
 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
 {
+  if (NULL == gh)
+    return;
+  gh->cont = NULL;
   if (gh->sh->current == gh)
   if (gh->sh->current == gh)
+  {
+    gh->aborted = GNUNET_YES;
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
+                               gh->sh->action_tail,
+                               gh);
+  GNUNET_free (gh->name);
+  GNUNET_free (gh->subsystem);
+  GNUNET_free (gh);
+}
+
+
+/**
+ * Watch statistics from the peer (be notified whenever they change).
+ *
+ * @param handle identification of the statistics service
+ * @param subsystem limit to the specified subsystem, never NULL
+ * @param name name of the statistic value, never NULL
+ * @param proc function to call on each value
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
+ */
+int
+GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
+                         const char *subsystem,
+                         const char *name,
+                         GNUNET_STATISTICS_Iterator proc,
+                         void *proc_cls)
+{
+  struct GNUNET_STATISTICS_WatchEntry *w;
+
+  if (NULL == handle)
+    return GNUNET_SYSERR;
+  w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
+  w->subsystem = GNUNET_strdup (subsystem);
+  w->name = GNUNET_strdup (name);
+  w->proc = proc;
+  w->proc_cls = proc_cls;
+  GNUNET_array_append (handle->watches,
+                       handle->watches_size,
+                       w);
+  schedule_watch_request (handle,
+                          w);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Stop watching statistics from the peer.
+ *
+ * @param handle identification of the statistics service
+ * @param subsystem limit to the specified subsystem, never NULL
+ * @param name name of the statistic value, never NULL
+ * @param proc function to call on each value
+ * @param proc_cls closure for @a proc
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
+ */
+int
+GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
+                               const char *subsystem,
+                                const char *name,
+                               GNUNET_STATISTICS_Iterator proc,
+                                void *proc_cls)
+{
+  struct GNUNET_STATISTICS_WatchEntry *w;
+
+  if (NULL == handle)
+    return GNUNET_SYSERR;
+  for (unsigned int i=0;i<handle->watches_size;i++)
+  {
+    w = handle->watches[i];
+    if (NULL == w)
+      continue;
+    if ( (w->proc == proc) &&
+        (w->proc_cls == proc_cls) &&
+        (0 == strcmp (w->name,
+                      name)) &&
+        (0 == strcmp (w->subsystem,
+                      subsystem)) )
     {
     {
-      gh->aborted = GNUNET_YES;
-    }
-  else
-    {
-      GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
-                                  gh->sh->action_tail,
-                                  gh);
-      GNUNET_free (gh->name);
-      GNUNET_free (gh->subsystem);
-      GNUNET_free (gh);
+      GNUNET_free (w->name);
+      GNUNET_free (w->subsystem);
+      GNUNET_free (w);
+      handle->watches[i] = NULL;
+      return GNUNET_OK;
     }
     }
+  }
+  return GNUNET_SYSERR;
 }
 
 
 }
 
 
+/**
+ * Queue a request to change a statistic.
+ *
+ * @param h statistics handle
+ * @param name name of the value
+ * @param make_persistent  should the value be kept across restarts?
+ * @param value new value or change
+ * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
+ */
 static void
 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
                    const char *name,
                    int make_persistent,
 static void
 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
                    const char *name,
                    int make_persistent,
-                   uint64_t value, enum ActionType type)
+                   uint64_t value,
+                   enum ActionType type)
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
 {
   struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
+  int64_t delta;
 
 
-  GNUNET_assert (h != NULL);
-  GNUNET_assert (name != NULL);
-  if (GNUNET_YES != try_connect (h))
-    return;
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
   slen = strlen (h->subsystem) + 1;
   nlen = strlen (name) + 1;
   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  for (ai = h->action_head; NULL != ai; ai = ai->next)
+  {
+    if (! ( (0 == strcmp (ai->subsystem,
+                         h->subsystem)) &&
+           (0 == strcmp (ai->name,
+                         name)) &&
+           ( (ACTION_UPDATE == ai->type) ||
+             (ACTION_SET == ai->type) ) ) )
+      continue;
+    if (ACTION_SET == ai->type)
     {
     {
-      GNUNET_break (0);
-      return;
+      if (ACTION_UPDATE == type)
+      {
+       delta = (int64_t) value;
+       if (delta > 0)
+        {
+         /* update old set by new delta */
+         ai->value += delta;
+       }
+       else
+        {
+         /* update old set by new delta, but never go negative */
+         if (ai->value < -delta)
+           ai->value = 0;
+         else
+           ai->value += delta;
+       }
+      }
+      else
+      {
+       /* new set overrides old set */
+       ai->value = value;
+      }
     }
     }
-  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+    else
+    {
+      if (ACTION_UPDATE == type)
+      {
+       /* make delta cummulative */
+       delta = (int64_t) value;
+       ai->value += delta;
+      }
+      else
+      {
+       /* drop old 'update', use new 'set' instead */
+       ai->value = value;
+       ai->type = type;
+      }
+    }
+    ai->timeout
+      = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
+    ai->make_persistent
+      = make_persistent;
+    return;
+  }
+  /* no existing entry matches, create a fresh one */
+  ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
   ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
@@ -770,7 +1316,9 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
   ai->msize = nsize;
   ai->value = value;
   ai->type = type;
-  insert_ai (h, ai);
+  GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
+                                    h->action_tail,
+                                   ai);
   schedule_action (h);
 }
 
   schedule_action (h);
 }
 
@@ -787,10 +1335,17 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
                        const char *name,
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
                        const char *name,
-                       uint64_t value, int make_persistent)
+                       uint64_t value,
+                       int make_persistent)
 {
 {
+  if (NULL == handle)
+    return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent, value, ACTION_SET);
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     value,
+                     ACTION_SET);
 }
 
 
 }
 
 
@@ -806,11 +1361,19 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
                           const char *name,
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
                           const char *name,
-                          int64_t delta, int make_persistent)
+                          int64_t delta,
+                          int make_persistent)
 {
 {
+  if (NULL == handle)
+    return;
+  if (0 == delta)
+    return;
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
   GNUNET_assert (GNUNET_NO == handle->do_destroy);
-  add_setter_action (handle, name, make_persistent,
-                     (unsigned long long) delta, ACTION_UPDATE);
+  add_setter_action (handle,
+                     name,
+                     make_persistent,
+                     (uint64_t) delta,
+                     ACTION_UPDATE);
 }
 
 
 }