spacing
[oweals/gnunet.git] / src / statistics / statistics_api.c
index 26e2425e60298d9332d08498c5d3b65a47d5921b..290a7b93fe3afdcd96b4397f8b663fc6816a04d5 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
  */
 #include "platform.h"
 #include "gnunet_client_lib.h"
+#include "gnunet_constants.h"
+#include "gnunet_container_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_server_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_strings_lib.h"
 #include "statistics.h"
 
-#define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+/**
+ * How long do we wait until a statistics request for setting
+ * a value times out?  (The update will be lost if the
+ * service does not react within this timeframe).  
+ */
+#define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
 
 
 /**
@@ -41,20 +48,61 @@ enum ActionType
 {
   ACTION_GET,
   ACTION_SET,
-  ACTION_UPDATE
+  ACTION_UPDATE,
+  ACTION_WATCH
+};
+
+
+/**
+ * Entry kept for each value we are watching.
+ */
+struct GNUNET_STATISTICS_WatchEntry
+{
+  /**
+   * What subsystem is this action about? (never NULL)
+   */
+  char *subsystem;
+
+  /**
+   * What value is this action about? (never NULL)
+   */
+  char *name;
+
+  /**
+   * Function to call
+   */
+  GNUNET_STATISTICS_Iterator proc;
+
+  /**
+   * Closure for proc
+   */
+  void *proc_cls;
+
 };
 
 
 /**
  * Linked list of things we still need to do.
  */
-struct ActionItem
+struct GNUNET_STATISTICS_GetHandle
 {
+
+  /**
+   * This is a doubly linked list.
+   */
+  struct GNUNET_STATISTICS_GetHandle *next;
+
   /**
-   * This is a linked list.
+   * This is a doubly linked list.
    */
-  struct ActionItem *next;
+  struct GNUNET_STATISTICS_GetHandle *prev;
 
+  /**
+   * Main statistics handle.
+   */
+  struct GNUNET_STATISTICS_Handle *sh;
   /**
    * What subsystem is this action about? (can be NULL)
    */
@@ -88,7 +136,7 @@ struct ActionItem
   /**
    * Associated value.
    */
-  unsigned long long value;
+  uint64_t value;
 
   /**
    * Flag for SET/UPDATE actions.
@@ -101,7 +149,7 @@ struct ActionItem
   int aborted;
 
   /**
-   * Is this a GET, SET or UPDATE?
+   * Is this a GET, SET, UPDATE or WATCH?
    */
   enum ActionType type;
 
@@ -118,11 +166,6 @@ struct ActionItem
  */
 struct GNUNET_STATISTICS_Handle
 {
-  /**
-   * Our scheduler.
-   */
-  struct GNUNET_SCHEDULER_Handle *sched;
-
   /**
    * Name of our subsystem.
    */
@@ -138,33 +181,127 @@ struct GNUNET_STATISTICS_Handle
    */
   struct GNUNET_CLIENT_Connection *client;
 
+  /**
+   * Currently pending transmission request.
+   */
+  struct GNUNET_CLIENT_TransmitHandle *th;
+
   /**
    * Head of the linked list of pending actions (first action
    * to be performed).
    */
-  struct ActionItem *action_head;
+  struct GNUNET_STATISTICS_GetHandle *action_head;
 
   /**
    * Tail of the linked list of actions (for fast append).
    */
-  struct ActionItem *action_tail;
+  struct GNUNET_STATISTICS_GetHandle *action_tail;
 
   /**
    * Action we are currently busy with (action request has been
    * transmitted, we're now receiving the response from the
    * service).
    */
-  struct ActionItem *current;
+  struct GNUNET_STATISTICS_GetHandle *current;
+
+  /**
+   * Array of watch entries.
+   */
+  struct GNUNET_STATISTICS_WatchEntry **watches;
+
+  /**
+   * Task doing exponential back-off trying to reconnect.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier backoff_task;
 
   /**
-   * Should this handle be destroyed once we've processed
-   * all actions?
+   * Time for next connect retry.
+   */
+  struct GNUNET_TIME_Relative backoff;
+
+  /**
+   * Size of the 'watches' array.
+   */
+  unsigned int watches_size;
+
+  /**
+   * Should this handle auto-destruct once all actions have
+   * been processed?
    */
   int do_destroy;
 
+  /**
+   * Are we currently receiving from the service?
+   */
+  int receiving;
+
 };
 
 
+
+/**
+ * Schedule the next action to be performed.
+ */
+static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
+
+/**
+ * Try to (re)connect to the statistics service.
+ *
+ * @return GNUNET_YES on success, GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_STATISTICS_Handle *ret);
+
+
+static void
+insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
+{
+  GNUNET_CONTAINER_DLL_insert_after (h->action_head,
+                                    h->action_tail,
+                                    h->action_tail,
+                                    ai);                                    
+  if (h->action_head == ai)
+    schedule_action (h);
+}
+
+
+static void
+schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
+                       struct GNUNET_STATISTICS_WatchEntry *watch)
+{
+
+  struct GNUNET_STATISTICS_GetHandle *ai;
+  size_t slen;
+  size_t nlen;
+  size_t nsize;
+  
+  GNUNET_assert (h != NULL);
+  if (GNUNET_YES != try_connect (h))
+    {
+      schedule_action (h);
+      return;
+    }
+  slen = strlen (watch->subsystem) + 1;
+  nlen = strlen (watch->name) + 1;
+  nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
+  if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai->sh = h;
+  ai->subsystem = GNUNET_strdup (watch->subsystem);
+  ai->name = GNUNET_strdup (watch->name);
+  ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+  ai->msize = nsize;
+  ai->type = ACTION_WATCH;
+  ai->proc = watch->proc;
+  ai->cls = watch->proc_cls;
+  insert_ai (h, ai);
+}
+
+
 /**
  * Try to (re)connect to the statistics service.
  *
@@ -173,11 +310,16 @@ struct GNUNET_STATISTICS_Handle
 static int
 try_connect (struct GNUNET_STATISTICS_Handle *ret)
 {
-  if (ret->client != NULL)
-    return GNUNET_OK;
-  ret->client = GNUNET_CLIENT_connect (ret->sched, "statistics", ret->cfg);
+  unsigned int i;
   if (ret->client != NULL)
     return GNUNET_YES;
+  ret->client = GNUNET_CLIENT_connect ("statistics", ret->cfg);
+  if (ret->client != NULL)
+    {
+      for (i=0;i<ret->watches_size;i++)
+       schedule_watch_request (ret, ret->watches[i]);
+      return GNUNET_YES;
+    }
 #if DEBUG_STATISTICS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               _("Failed to connect to statistics service!\n"));
@@ -190,7 +332,7 @@ try_connect (struct GNUNET_STATISTICS_Handle *ret)
  * Free memory associated with the given action item.
  */
 static void
-free_action_item (struct ActionItem *ai)
+free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
 {
   GNUNET_free_non_null (ai->subsystem);
   GNUNET_free_non_null (ai->name);
@@ -199,63 +341,20 @@ free_action_item (struct ActionItem *ai)
 
 
 /**
- * Get handle for the statistics service.
- *
- * @param subsystem name of subsystem using the service
- * @param cfg services configuration in use
- * @return handle to use
- */
-struct GNUNET_STATISTICS_Handle *
-GNUNET_STATISTICS_create (struct GNUNET_SCHEDULER_Handle *sched,
-                          const char *subsystem,
-                          const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-  struct GNUNET_STATISTICS_Handle *ret;
-
-  GNUNET_assert (subsystem != NULL);
-  GNUNET_assert (sched != NULL);
-  GNUNET_assert (cfg != NULL);
-  ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
-  ret->sched = sched;
-  ret->cfg = cfg;
-  ret->subsystem = GNUNET_strdup (subsystem);
-  try_connect (ret);
-  return ret;
-}
-
-
-/**
- * Actually free the handle.
+ * GET processing is complete, tell client about it.
  */
 static void
-do_destroy (struct GNUNET_STATISTICS_Handle *h)
-{
-  GNUNET_assert (h->action_head == NULL);
-  GNUNET_assert (h->current == NULL);
-  if (h->client != NULL)
-    {
-      GNUNET_CLIENT_disconnect (h->client);
-      h->client = NULL;
-    }
-  GNUNET_free (h->subsystem);
-  GNUNET_free (h);
-}
-
-
-/**
- * Destroy a handle (free all state associated with
- * it).
- */
-void
-GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *handle)
+finish (struct GNUNET_STATISTICS_Handle *h, int code)
 {
-  GNUNET_assert (handle->do_destroy == GNUNET_NO);
-  if ((handle->action_head != NULL) || (handle->current != NULL))
+  struct GNUNET_STATISTICS_GetHandle *pos = h->current;
+  h->current = NULL;
+  schedule_action (h);
+  if (pos != NULL)
     {
-      handle->do_destroy = GNUNET_YES;
-      return;
+      if (pos->cont != NULL)
+       pos->cont (pos->cls, code);
+      free_action_item (pos);
     }
-  do_destroy (handle);
 }
 
 
@@ -274,7 +373,13 @@ process_message (struct GNUNET_STATISTICS_Handle *h,
   uint16_t size;
 
   if (h->current->aborted)
-    return GNUNET_OK;           /* don't bother */
+    {
+#if DEBUG_STATISTICS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Iteration was aborted, ignoring VALUE\n");
+#endif      
+      return GNUNET_OK;           /* don't bother */
+    }
   size = ntohs (msg->size);
   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
     {
@@ -296,41 +401,56 @@ process_message (struct GNUNET_STATISTICS_Handle *h,
 #endif
   if (GNUNET_OK !=
       h->current->proc (h->current->cls,
-                        service,
-                        name,
-                        GNUNET_ntohll (smsg->value),
-                        0 !=
-                        (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
+                       service,
+                       name,
+                       GNUNET_ntohll (smsg->value),
+                       0 !=
+                       (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
     {
 #if DEBUG_STATISTICS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Processing of remaining statistics aborted by client.\n");
+                 "Processing of remaining statistics aborted by client.\n");
 #endif
-      h->current->aborted = GNUNET_YES;
+      h->current->aborted = GNUNET_YES;    
     }
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "VALUE processed successfully\n");
+#endif      
   return GNUNET_OK;
 }
 
 
-
-/**
- * Schedule the next action to be performed.
- */
-static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
-
-
-/**
- * GET processing is complete, tell client about it.
- */
-static void
-finish (struct GNUNET_STATISTICS_Handle *h, int code)
+static int
+process_watch_value (struct GNUNET_STATISTICS_Handle *h,
+                    const struct GNUNET_MessageHeader *msg)
 {
-  struct ActionItem *pos = h->current;
-  h->current = NULL;
-  schedule_action (h);
-  if (pos->cont != NULL)
-    pos->cont (pos->cls, code);
-  free_action_item (pos);
+  const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
+  struct GNUNET_STATISTICS_WatchEntry *w;
+  uint32_t wid;
+
+  if (sizeof(struct GNUNET_STATISTICS_WatchValueMessage) !=
+      ntohs (msg->size))
+    {
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
+  wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *)msg;
+  GNUNET_break (0 == ntohl (wvm->reserved));
+  wid = ntohl (wvm->wid);
+  if (wid >= h->watches_size)
+    {
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
+  w = h->watches[wid];
+  (void) w->proc (w->proc_cls,
+                 w->subsystem,
+                 w->name,
+                 GNUNET_ntohll (wvm->value),
+                 0 !=
+                 (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
+  return GNUNET_OK;
 }
 
 
@@ -347,8 +467,11 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
 
   if (msg == NULL)
     {
-      GNUNET_CLIENT_disconnect (h->client);
-      h->client = NULL;
+      if (NULL != h->client)
+       {
+         GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+         h->client = NULL;
+       }
 #if DEBUG_STATISTICS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
                  "Error receiving statistics from service, is the service running?\n" );
@@ -363,27 +486,62 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Received end of statistics marker\n");
 #endif
+      h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+      if (h->watches_size > 0)
+       {
+         GNUNET_CLIENT_receive (h->client,
+                                &receive_stats,
+                                h,
+                                GNUNET_TIME_UNIT_FOREVER_REL);
+       }
+      else
+       {
+         h->receiving = GNUNET_NO;
+       }
       finish (h, GNUNET_OK);
       return;
     case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
       if (GNUNET_OK == process_message (h, msg))
         {
           /* finally, look for more! */
+#if DEBUG_STATISTICS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Processing VALUE done, now reading more\n");
+#endif      
           GNUNET_CLIENT_receive (h->client,
                                  &receive_stats,
                                  h,
                                  GNUNET_TIME_absolute_get_remaining
                                  (h->current->timeout));
+         h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
           return;
         }
       GNUNET_break (0);
       break;
+    case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
+      if (GNUNET_OK ==
+         process_watch_value (h, 
+                              msg))
+       {
+         h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+         GNUNET_assert (h->watches_size > 0);
+         GNUNET_CLIENT_receive (h->client,
+                                &receive_stats,
+                                h,
+                                GNUNET_TIME_UNIT_FOREVER_REL);
+         return;
+       }
+      GNUNET_break (0);
+      break;
     default:
       GNUNET_break (0);
       break;
     }
-  GNUNET_CLIENT_disconnect (h->client);
-  h->client = NULL;
+  if (NULL != h->client)
+    {
+      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+      h->client = NULL;
+    }
   finish (h, GNUNET_SYSERR);
 }
 
@@ -423,15 +581,75 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
                                              2,
                                              handle->current->subsystem,
                                              handle->current->name));
-  GNUNET_CLIENT_receive (handle->client,
-                         &receive_stats,
-                         handle,
-                         GNUNET_TIME_absolute_get_remaining (handle->
-                                                             current->timeout));
+  if (! handle->receiving)
+    {
+#if DEBUG_STATISTICS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Transmission of GET done, now reading response\n");
+#endif      
+      handle->receiving = GNUNET_YES;
+      GNUNET_CLIENT_receive (handle->client,
+                            &receive_stats,
+                            handle,
+                            GNUNET_TIME_absolute_get_remaining (handle->
+                                                                current->timeout));
+    }
   return msize;
 }
 
 
+/**
+ * Transmit a WATCH request (and if successful, start to receive
+ * the response).
+ */
+static size_t
+transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
+{
+  struct GNUNET_MessageHeader *hdr;
+  size_t slen1;
+  size_t slen2;
+  uint16_t msize;
+
+  if (buf == NULL)
+    {
+      /* timeout / error */
+#if DEBUG_STATISTICS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Transmission of request for statistics failed!\n");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return 0;
+    }
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitting watch request for `%s'\n",
+             handle->current->name);
+#endif
+  slen1 = strlen (handle->current->subsystem) + 1;
+  slen2 = strlen (handle->current->name) + 1;
+  msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
+  GNUNET_assert (msize <= size);
+  hdr = (struct GNUNET_MessageHeader *) buf;
+  hdr->size = htons (msize);
+  hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
+  GNUNET_assert (slen1 + slen2 ==
+                 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
+                                             slen1 + slen2,
+                                             2,
+                                             handle->current->subsystem,
+                                             handle->current->name));
+  if (GNUNET_YES != handle->receiving)
+    {
+      handle->receiving = GNUNET_YES;
+      GNUNET_CLIENT_receive (handle->client,
+                            &receive_stats,
+                            handle,
+                            GNUNET_TIME_UNIT_FOREVER_REL);
+    }
+  finish (handle, GNUNET_OK);
+  return msize;
+}
+
 
 /**
  * Transmit a SET/UPDATE request.
@@ -485,6 +703,7 @@ transmit_action (void *cls, size_t size, void *buf)
   struct GNUNET_STATISTICS_Handle *handle = cls;
   size_t ret;
 
+  handle->th = NULL;
   switch (handle->current->type)
     {
     case ACTION_GET:
@@ -494,6 +713,9 @@ transmit_action (void *cls, size_t size, void *buf)
     case ACTION_UPDATE:
       ret = transmit_set (handle, size, buf);
       break;
+    case ACTION_WATCH:
+      ret = transmit_watch (handle, size, buf);
+      break;
     default:
       ret = 0;
       GNUNET_break (0);
@@ -503,6 +725,155 @@ transmit_action (void *cls, size_t size, void *buf)
 }
 
 
+/**
+ * Get handle for the statistics service.
+ *
+ * @param subsystem name of subsystem using the service
+ * @param cfg services configuration in use
+ * @return handle to use
+ */
+struct GNUNET_STATISTICS_Handle *
+GNUNET_STATISTICS_create (const char *subsystem,
+                          const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_STATISTICS_Handle *ret;
+
+  GNUNET_assert (subsystem != NULL);
+  GNUNET_assert (cfg != NULL);
+  ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
+  ret->cfg = cfg;
+  ret->subsystem = GNUNET_strdup (subsystem);
+  ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  if (GNUNET_YES != try_connect (ret))
+    {
+      GNUNET_free (ret->subsystem);
+      GNUNET_free (ret);
+      return NULL;
+    }
+  return ret;
+}
+
+
+/**
+ * Destroy a handle (free all state associated with
+ * it).
+ *
+ * @param h statistics handle to destroy
+ * @param sync_first set to GNUNET_YES if pending SET requests should
+ *        be completed
+ */
+void
+GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
+                          int sync_first)
+{
+  struct GNUNET_STATISTICS_GetHandle *pos;
+  struct GNUNET_STATISTICS_GetHandle *next;
+  struct GNUNET_STATISTICS_GetHandle *prev;
+  struct GNUNET_TIME_Relative timeout;
+  int i;
+
+  if (h == NULL) return;
+  if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
+    GNUNET_SCHEDULER_cancel (h->backoff_task);
+  if (sync_first)
+    {
+      if (h->current != NULL)
+       {
+         if (h->current->type == ACTION_GET)
+           {
+             GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
+             h->th = NULL;
+             free_action_item (h->current);
+             h->current = NULL;
+           }
+       }
+      pos = h->action_head;
+      prev = NULL;
+      while (pos != NULL)
+       {
+         next = pos->next;
+         if (pos->type == ACTION_GET)
+           {
+             if (prev == NULL)
+               h->action_head = next;
+             else
+               prev->next = next;
+             free_action_item (pos);
+           }
+         else
+           {
+             prev = pos;
+           }
+         pos = next;
+       }
+      h->action_tail = prev;
+      if (h->current == NULL)
+       {
+         h->current = h->action_head;
+         if (h->action_head != NULL)
+           {
+             h->action_head = h->action_head->next;
+             if (h->action_head == NULL)
+               h->action_tail = NULL;
+           }
+       }
+      h->do_destroy = GNUNET_YES;
+      if ( (h->current != NULL) &&
+          (h->th == NULL) )
+       {                                       
+         timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
+         h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
+                                                      h->current->msize,
+                                                      timeout,
+                                                      GNUNET_YES,
+                                                      &transmit_action, h);
+         GNUNET_assert (NULL != h->th);
+       }
+      if (h->th != NULL)
+       return;
+    }
+  if (NULL != h->th)
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
+      h->th = NULL;
+    }
+  if (h->current != NULL)
+    free_action_item (h->current);
+  while (NULL != (pos = h->action_head))
+    {
+      h->action_head = pos->next;
+      free_action_item (pos);
+    }
+  if (h->client != NULL)
+    {
+      GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
+      h->client = NULL;
+    }
+  for (i=0;i<h->watches_size;i++)
+    {
+      GNUNET_free (h->watches[i]->subsystem);
+      GNUNET_free (h->watches[i]->name);
+      GNUNET_free (h->watches[i]);
+    }
+  GNUNET_array_grow (h->watches,
+                    h->watches_size,
+                    0);
+  GNUNET_free (h->subsystem);
+  GNUNET_free (h);
+}
+
+
+static void
+finish_task (void *cls,
+            const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STATISTICS_Handle *h = cls;
+
+  h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
+  finish (h, GNUNET_SYSERR);
+}
+
+
 /**
  * Schedule the next action to be performed.
  */
@@ -515,7 +886,12 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (h))
     {
-      finish (h, GNUNET_SYSERR);
+      h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->backoff,
+                                                     &finish_task,
+                                                     h);
+      h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
+      h->backoff = GNUNET_TIME_relative_min (h->backoff,
+                                            GNUNET_CONSTANTS_SERVICE_TIMEOUT);
       return;
     }
 
@@ -523,22 +899,23 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
   h->current = h->action_head;
   if (NULL == h->current)
     {
-      /* no pending network action, check destroy! */
-      if (h->do_destroy != GNUNET_YES)
-        return;
-      do_destroy (h);
+      if (h->do_destroy)
+       {
+         h->do_destroy = GNUNET_NO;
+         GNUNET_STATISTICS_destroy (h, GNUNET_YES);
+       }
       return;
     }
-  h->action_head = h->action_head->next;
-  if (NULL == h->action_head)
-    h->action_tail = NULL;
-  h->current->next = NULL;
-
+  GNUNET_CONTAINER_DLL_remove (h->action_head,
+                              h->action_tail,
+                              h->current);
   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
   if (NULL ==
-      GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                           h->current->msize,
-                                           timeout, &transmit_action, h))
+      (h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
+                                                   h->current->msize,
+                                                   timeout,
+                                                   GNUNET_YES,
+                                                   &transmit_action, h)))
     {
 #if DEBUG_STATISTICS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -549,23 +926,6 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
 }
 
 
-static void
-insert_ai (struct GNUNET_STATISTICS_Handle *h, struct ActionItem *ai)
-{
-  if (h->action_tail == NULL)
-    {
-      h->action_head = ai;
-      h->action_tail = ai;
-      schedule_action (h);
-    }
-  else
-    {
-      h->action_tail->next = ai;
-      h->action_tail = ai;
-    }
-}
-
-
 /**
  * Get statistic from the peer.
  *
@@ -577,8 +937,9 @@ insert_ai (struct GNUNET_STATISTICS_Handle *h, struct ActionItem *ai)
  * @param cont continuation to call when done (can be NULL)
  * @param proc function to call on each value
  * @param cls closure for cont and proc
+ * @return NULL on error
  */
-void
+struct GNUNET_STATISTICS_GetHandle *
 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
                        const char *subsystem,
                        const char *name,
@@ -588,10 +949,11 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
 {
   size_t slen1;
   size_t slen2;
-  struct ActionItem *ai;
+  struct GNUNET_STATISTICS_GetHandle *ai;
 
   GNUNET_assert (handle != NULL);
   GNUNET_assert (proc != NULL);
+  GNUNET_assert (GNUNET_NO == handle->do_destroy);
   if (GNUNET_YES != try_connect (handle))
     {
 #if DEBUG_STATISTICS
@@ -600,19 +962,18 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
                   strlen (subsystem) ? subsystem : "*",
                   strlen (name) ? name : "*");
 #endif
-      if (cont != NULL)
-       cont (cls, GNUNET_SYSERR);
-      return;
+      return NULL;
     }
   if (subsystem == NULL)
     subsystem = "";
   if (name == NULL)
     name = "";
-  slen1 = strlen (subsystem);
-  slen2 = strlen (name);
+  slen1 = strlen (subsystem) + 1;
+  slen2 = strlen (name) + 1;
   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  ai = GNUNET_malloc (sizeof (struct ActionItem));
+  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai->sh = handle;
   ai->subsystem = GNUNET_strdup (subsystem);
   ai->name = GNUNET_strdup (name);
   ai->cont = cont;
@@ -622,6 +983,68 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
   ai->type = ACTION_GET;
   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
   insert_ai (handle, ai);
+  return ai;
+}
+
+
+/**
+ * Cancel a 'get' request.  Must be called before the 'cont' 
+ * function is called.
+ *
+ * @param gh handle of the request to cancel
+ */
+void
+GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
+{
+  if (gh->sh->current == gh)
+    {
+      gh->aborted = GNUNET_YES;
+    }
+  else
+    {
+      GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
+                                  gh->sh->action_tail,
+                                  gh);
+      GNUNET_free (gh->name);
+      GNUNET_free (gh->subsystem);
+      GNUNET_free (gh);
+    }
+}
+
+
+/**
+ * Watch statistics from the peer (be notified whenever they change).
+ * Note that the only way to cancel a "watch" request is to destroy
+ * the statistics handle given as the first argument to this call.
+ *
+ * @param handle identification of the statistics service
+ * @param subsystem limit to the specified subsystem, never NULL
+ * @param name name of the statistic value, never NULL
+ * @param proc function to call on each value
+ * @param proc_cls closure for proc
+ * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ */
+int
+GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
+                        const char *subsystem,
+                        const char *name,
+                        GNUNET_STATISTICS_Iterator proc, 
+                        void *proc_cls)
+{
+  struct GNUNET_STATISTICS_WatchEntry *w;
+
+  if (handle == NULL) 
+    return GNUNET_SYSERR;
+  w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
+  w->subsystem = GNUNET_strdup (subsystem);
+  w->name = GNUNET_strdup (name);
+  w->proc = proc;
+  w->proc_cls = proc_cls;
+  GNUNET_array_append (handle->watches,
+                      handle->watches_size,
+                      w);
+  schedule_watch_request (handle, w);
+  return GNUNET_OK;
 }
 
 
@@ -629,13 +1052,14 @@ static void
 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
                    const char *name,
                    int make_persistent,
-                   unsigned long long value, enum ActionType type)
+                   uint64_t value, enum ActionType type)
 {
-  struct ActionItem *ai;
+  struct GNUNET_STATISTICS_GetHandle *ai;
   size_t slen;
   size_t nlen;
   size_t nsize;
-
+  int64_t delta;
+  
   GNUNET_assert (h != NULL);
   GNUNET_assert (name != NULL);
   if (GNUNET_YES != try_connect (h))
@@ -648,7 +1072,57 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
       GNUNET_break (0);
       return;
     }
-  ai = GNUNET_malloc (sizeof (struct ActionItem));
+  ai = h->action_head;
+  while (ai != NULL)
+    {
+      if ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
+          (0 == strcmp (ai->name, name)) &&
+          ( (ai->type == ACTION_UPDATE) ||
+            (ai->type == ACTION_SET) ) )
+       {
+         if (ai->type == ACTION_SET)
+           {
+             if (type == ACTION_UPDATE)
+               {
+                 delta = (int64_t) value;
+                 if (delta > 0) 
+                   {
+                     ai->value += delta;
+                   }
+                 else
+                   {
+                     if (ai->value < -delta)
+                       ai->value = 0;
+                     else
+                       ai->value += delta;
+                   }
+               }
+             else
+               {
+                 ai->value = value;
+               }
+           }
+         else
+           {
+             if (type == ACTION_UPDATE)
+               {
+                 delta = (int64_t) value;
+                 ai->value += delta;
+               }
+             else
+               {
+                 ai->value = value;
+                 ai->type = type;
+               }
+           }
+         ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
+         ai->make_persistent = make_persistent;
+         return;
+       }
+      ai = ai->next;
+    }
+  ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
+  ai->sh = h;
   ai->subsystem = GNUNET_strdup (h->subsystem);
   ai->name = GNUNET_strdup (name);
   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
@@ -657,7 +1131,6 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
   ai->value = value;
   ai->type = type;
   insert_ai (h, ai);
-  schedule_action (h);
 }
 
 
@@ -673,8 +1146,11 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
 void
 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
                        const char *name,
-                       unsigned long long value, int make_persistent)
+                       uint64_t value, int make_persistent)
 {
+  if (handle == NULL)
+    return;
+  GNUNET_assert (GNUNET_NO == handle->do_destroy);
   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
 }
 
@@ -691,10 +1167,15 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
 void
 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
                           const char *name,
-                          long long delta, int make_persistent)
+                          int64_t delta, int make_persistent)
 {
+  if (handle == NULL)
+    return;
+  if (delta == 0)
+    return;
+  GNUNET_assert (GNUNET_NO == handle->do_destroy);
   add_setter_action (handle, name, make_persistent,
-                     (unsigned long long) delta, ACTION_UPDATE);
+                     (uint64_t) delta, ACTION_UPDATE);
 }