spacing
[oweals/gnunet.git] / src / statistics / gnunet-service-statistics.c
index c34a00ddc2611f60f9082cbdec37d3be4debfdf3..c506dee7b9aa44408be58b387a12f0a617da195a 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -27,6 +27,7 @@
  * - use BIO for IO operations
  */
 #include "platform.h"
+#include "gnunet_container_lib.h"
 #include "gnunet_disk_lib.h"
 #include "gnunet_getopt_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_time_lib.h"
 #include "statistics.h"
 
+/**
+ * Watch entry.
+ */
+struct WatchEntry
+{
+
+  struct WatchEntry *next;
+
+  struct WatchEntry *prev;
+
+  struct GNUNET_SERVER_Client *client;
+
+  uint64_t last_value;
+  
+  uint32_t wid;
+
+};
+
+
+/**
+ * Client entry.
+ */
+struct ClientEntry
+{
+
+  struct ClientEntry *next;
+
+  struct ClientEntry *prev;
+
+  struct GNUNET_SERVER_Client *client;
+  
+  uint32_t max_wid;
+
+};
+
 /**
  * Entry in the statistics list.
  */
@@ -65,6 +101,18 @@ struct StatsEntry
    */
   struct GNUNET_STATISTICS_SetMessage *msg;
 
+  /**
+   * Watch context for changes to this
+   * value, or NULL for none.
+   */
+  struct WatchEntry *we_head;
+
+  /**
+   * Watch context for changes to this
+   * value, or NULL for none.
+   */
+  struct WatchEntry *we_tail;
+
   /**
    * Our value.
    */
@@ -92,6 +140,32 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
  */
 static struct StatsEntry *start;
 
+static struct ClientEntry *client_head;
+
+static struct ClientEntry *client_tail;
+
+/**
+ * Our notification context.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
+/**
+ * Counter used to generate unique values.
+ */
+static uint32_t uidgen;
+
+
+static void
+inject_message (void *cls,
+               void *client,
+               const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_SERVER_Handle *server = cls;
+
+  GNUNET_break (GNUNET_OK == GNUNET_SERVER_inject (server, NULL, msg));
+}
+
+
 /**
  * Load persistent values from disk.  Disk format is
  * exactly the same format that we also use for
@@ -107,8 +181,7 @@ load (struct GNUNET_SERVER_Handle *server)
   struct GNUNET_DISK_MapHandle *mh;
   struct stat sb;
   char *buf;
-  size_t off;
-  const struct GNUNET_MessageHeader *msg;
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
 
   fn = GNUNET_DISK_get_home_filename (cfg,
                                       "statistics", "statistics.data", NULL);
@@ -137,18 +210,16 @@ load (struct GNUNET_SERVER_Handle *server)
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               _("Loading %llu bytes of statistics from `%s'\n"),
               (unsigned long long) sb.st_size, fn);
-  off = 0;
-  while (off + sizeof (struct GNUNET_MessageHeader) < sb.st_size)
-    {
-      msg = (const struct GNUNET_MessageHeader *) &buf[off];
-      if ((ntohs (msg->size) + off > sb.st_size) ||
-          (GNUNET_OK != GNUNET_SERVER_inject (server, NULL, msg)))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      off += ntohs (msg->size);
-    }
+  mst = GNUNET_SERVER_mst_create (&inject_message,
+                                 server);
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_SERVER_mst_receive (mst,
+                                          NULL,
+                                          buf,
+                                          sb.st_size,
+                                          GNUNET_YES,
+                                          GNUNET_NO));
+  GNUNET_SERVER_mst_destroy (mst);
   GNUNET_break (GNUNET_OK == GNUNET_DISK_file_unmap (mh));
   GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh));
   GNUNET_free (fn);
@@ -209,7 +280,7 @@ save ()
  * Transmit the given stats value.
  */
 static void
-transmit (struct GNUNET_SERVER_TransmitContext *tc,
+transmit (struct GNUNET_SERVER_Client *client,
           const struct StatsEntry *e)
 {
   struct GNUNET_STATISTICS_ReplyMessage *m;
@@ -232,10 +303,11 @@ transmit (struct GNUNET_SERVER_TransmitContext *tc,
                                                      2, e->service, e->name));
 #if DEBUG_STATISTICS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting value for `%s:%s': %llu\n",
-              e->service, e->name, e->value);
+              "Transmitting value for `%s:%s' (%d): %llu\n",
+              e->service, e->name,
+             e->persistent, e->value);
 #endif
-  GNUNET_SERVER_transmit_context_append_message (tc, &m->header);
+  GNUNET_SERVER_notification_context_unicast (nc, client, &m->header, GNUNET_NO);
   GNUNET_free (m);
 }
 
@@ -252,6 +324,31 @@ matches (const struct StatsEntry *e, const char *service, const char *name)
 }
 
 
+static struct ClientEntry *
+make_client_entry (struct GNUNET_SERVER_Client *client)
+{
+  struct ClientEntry *ce;
+
+  GNUNET_assert (client != NULL);
+  ce = client_head;
+  while (ce != NULL)
+    {
+      if (ce->client == client)
+       return ce;
+      ce = ce->next;
+    }
+  ce = GNUNET_malloc (sizeof (struct ClientEntry));
+  ce->client = client;
+  GNUNET_SERVER_client_keep (client);
+  GNUNET_CONTAINER_DLL_insert (client_head,
+                              client_tail,
+                              ce);
+  GNUNET_SERVER_notification_context_add (nc,
+                                         client);
+  return ce;
+}
+
+
 /**
  * Handle GET-message.
  *
@@ -266,12 +363,14 @@ handle_get (void *cls,
             struct GNUNET_SERVER_Client *client,
             const struct GNUNET_MessageHeader *message)
 {
+  struct GNUNET_MessageHeader end;
   char *service;
   char *name;
   struct StatsEntry *pos;
-  struct GNUNET_SERVER_TransmitContext *tc;
   size_t size;
 
+  if (client != NULL)
+    make_client_entry (client);
   size = ntohs (message->size) - sizeof (struct GNUNET_MessageHeader);
   if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1],
                                               size, 2, &service, &name))
@@ -285,20 +384,51 @@ handle_get (void *cls,
               "Received request for statistics on `%s:%s'\n",
               strlen (service) ? service : "*", strlen (name) ? name : "*");
 #endif
-  tc = GNUNET_SERVER_transmit_context_create (client);
   pos = start;
   while (pos != NULL)
     {
       if (matches (pos, service, name))
-        transmit (tc, pos);
+        transmit (client, pos);
       pos = pos->next;
     }
-  GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0,
-                                             GNUNET_MESSAGE_TYPE_STATISTICS_END);
-  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+  end.size = htons (sizeof (struct GNUNET_MessageHeader));
+  end.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_END);
+  GNUNET_SERVER_notification_context_unicast (nc,
+                                             client,
+                                             &end,
+                                             GNUNET_NO);
+  GNUNET_SERVER_receive_done (client,
+                             GNUNET_OK);
 }
 
 
+static void
+notify_change (struct StatsEntry *se)
+{
+  struct GNUNET_STATISTICS_WatchValueMessage wvm;
+  struct WatchEntry *pos;
+
+  pos = se->we_head;
+  while (pos != NULL)
+    {
+      if (pos->last_value != se->value)
+       {
+         wvm.header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE);
+         wvm.header.size = htons (sizeof (struct GNUNET_STATISTICS_WatchValueMessage));
+         wvm.flags = htonl (se->persistent ? GNUNET_STATISTICS_PERSIST_BIT : 0);
+         wvm.wid = htonl (pos->wid);
+         wvm.reserved = htonl (0);
+         wvm.value = GNUNET_htonll (se->value);
+         GNUNET_SERVER_notification_context_unicast (nc,
+                                                     pos->client,
+                                                     &wvm.header,
+                                                     GNUNET_NO);
+         pos->last_value = se->value;
+       }
+      pos = pos->next;
+    }
+}
+
 /**
  * Handle SET-message.
  *
@@ -311,11 +441,6 @@ handle_set (void *cls,
             struct GNUNET_SERVER_Client *client,
             const struct GNUNET_MessageHeader *message)
 {
-  /**
-   * Counter used to generate unique values.
-   */
-  static uint32_t uidgen;
-
   char *service;
   char *name;
   uint16_t msize;
@@ -326,7 +451,10 @@ handle_set (void *cls,
   uint32_t flags;
   uint64_t value;
   int64_t delta;
+  int changed;
 
+  if (client != NULL)
+    make_client_entry (client);
   msize = ntohs (message->size);
   if (msize < sizeof (struct GNUNET_STATISTICS_SetMessage))
     {
@@ -344,13 +472,15 @@ handle_set (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+  flags = ntohl (msg->flags);
+  value = GNUNET_ntohll (msg->value);
 #if DEBUG_STATISTICS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received request to update statistic on `%s:%s'\n",
-              service, name);
+              "Received request to update statistic on `%s:%s' (%u) to/by %llu\n",
+              service, name,
+             (unsigned int) flags,
+             (unsigned long long) value);
 #endif
-  flags = ntohl (msg->flags);
-  value = GNUNET_ntohll (msg->value);
   pos = start;
   prev = NULL;
   while (pos != NULL)
@@ -359,17 +489,20 @@ handle_set (void *cls,
         {
           if ((flags & GNUNET_STATISTICS_SETFLAG_RELATIVE) == 0)
             {
+             changed = (pos->value != value);
               pos->value = value;
             }
           else
             {
               delta = (int64_t) value;
               if ((delta < 0) && (pos->value < -delta))
-                {
-                  pos->value = 0;
+               {
+                 changed = (pos->value != 0);
+                 pos->value = 0;
                 }
               else
                 {
+                 changed = (delta != 0);
                   GNUNET_break ((delta <= 0) ||
                                 (pos->value + delta > pos->value));
                   pos->value += delta;
@@ -391,6 +524,8 @@ handle_set (void *cls,
                       "Statistic `%s:%s' updated to value %llu.\n",
                       service, name, pos->value);
 #endif
+         if (changed) 
+           notify_change (pos);
           GNUNET_SERVER_receive_done (client, GNUNET_OK);
           return;
         }
@@ -419,6 +554,87 @@ handle_set (void *cls,
 }
 
 
+/**
+ * Handle WATCH-message.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ */
+static void
+handle_watch (void *cls,
+             struct GNUNET_SERVER_Client *client,
+             const struct GNUNET_MessageHeader *message)
+{
+  char *service;
+  char *name;
+  uint16_t msize;
+  uint16_t size;
+  struct StatsEntry *pos;
+  struct ClientEntry *ce;
+  struct WatchEntry *we;
+  size_t slen;
+
+  ce = make_client_entry (client);
+  msize = ntohs (message->size);
+  if (msize < sizeof (struct GNUNET_MessageHeader))
+    {
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+  size = msize - sizeof (struct GNUNET_MessageHeader);
+  if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1],
+                                              size, 2, &service, &name))
+    {
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received request to watch statistic on `%s:%s'\n",
+              service, name);
+#endif
+  pos = start;
+  while (pos != NULL)
+    {
+      if (matches (pos, service, name))
+        break;
+      pos = pos->next;
+    }
+  if (pos == NULL)
+    {
+      pos = GNUNET_malloc (sizeof (struct StatsEntry) + 
+                          sizeof (struct GNUNET_STATISTICS_SetMessage) + 
+                          size);
+      pos->next = start;
+      pos->uid = uidgen++;
+      pos->msg = (void *) &pos[1];
+      pos->msg->header.size = htons (sizeof (struct GNUNET_STATISTICS_SetMessage) + 
+                                    size);
+      pos->msg->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
+      pos->service = (const char *) &pos->msg[1];
+      slen = strlen (service) + 1;
+      memcpy ((void*) pos->service, service, slen);
+      pos->name = &pos->service[slen];
+      memcpy ((void*) pos->name, name, strlen (name)+1);
+      start = pos;
+    }
+  we = GNUNET_malloc (sizeof (struct WatchEntry));
+  we->client = client;
+  GNUNET_SERVER_client_keep (client);
+  we->wid = ce->max_wid++;
+  GNUNET_CONTAINER_DLL_insert (pos->we_head,
+                              pos->we_tail,
+                              we);
+  if (pos->value != 0)
+    notify_change (pos);
+  GNUNET_SERVER_receive_done (client,
+                             GNUNET_OK);
+}
+
+
 /**
  * Task run during shutdown.
  *
@@ -429,7 +645,84 @@ static void
 shutdown_task (void *cls,
               const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct ClientEntry *ce;
+  struct WatchEntry *we;
+  struct StatsEntry *se;
+
   save ();
+  GNUNET_SERVER_notification_context_destroy (nc);
+  nc = NULL;
+  while (NULL != (ce = client_head))
+    {
+      GNUNET_SERVER_client_drop (ce->client);
+      GNUNET_CONTAINER_DLL_remove (client_head,
+                                  client_tail,
+                                  ce);
+      GNUNET_free (ce);
+    }
+  while (NULL != (se = start))
+    {
+      start = se->next;
+      while (NULL != (we = se->we_head))
+       {
+         GNUNET_SERVER_client_drop (we->client);
+         GNUNET_CONTAINER_DLL_remove (se->we_head,
+                                      se->we_tail,
+                                      we);
+         GNUNET_free (we);
+       }
+      GNUNET_free (se);
+    }
+}
+
+
+/**
+ * A client disconnected.  Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ */
+static void
+handle_client_disconnect (void *cls,
+                         struct GNUNET_SERVER_Client
+                         * client)
+{
+  struct ClientEntry *ce;
+  struct WatchEntry *we;
+  struct WatchEntry *wen;
+  struct StatsEntry *se;
+  
+  ce = client_head;
+  while (NULL != ce)
+    {
+      if (ce->client == client)
+       {
+         GNUNET_SERVER_client_drop (ce->client);
+         GNUNET_CONTAINER_DLL_remove (client_head,
+                                      client_tail,
+                                      ce);
+         GNUNET_free (ce);
+         break;
+       }
+      ce = ce->next;
+    }
+  se = start;
+  while (NULL != se)
+    {
+      wen = se->we_head;
+      while (NULL != (we = wen))
+       {
+         wen = we->next;
+         if (we->client != client)
+           continue;
+         GNUNET_SERVER_client_drop (we->client);
+         GNUNET_CONTAINER_DLL_remove (se->we_head,
+                                      se->we_tail,
+                                      we);
+         GNUNET_free (we);
+       }
+      se = se->next;
+    }
 }
 
 
@@ -437,26 +730,28 @@ shutdown_task (void *cls,
  * Process statistics requests.
  *
  * @param cls closure
- * @param sched scheduler to use
  * @param server the initialized server
  * @param c configuration to use
  */
 static void
 run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *sched,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
     {&handle_set, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_SET, 0},
     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_GET, 0},
+    {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_WATCH, 0},
     {NULL, NULL, 0, 0}
   };
   cfg = c;
   GNUNET_SERVER_add_handlers (server, handlers);
+  nc = GNUNET_SERVER_notification_context_create (server, 16);
+  GNUNET_SERVER_disconnect_notify (server, 
+                                  &handle_client_disconnect,
+                                  NULL);
   load (server);
-  GNUNET_SCHEDULER_add_delayed (sched,
-                               GNUNET_TIME_UNIT_FOREVER_REL,
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                &shutdown_task,
                                NULL);
 }