spacing
[oweals/gnunet.git] / src / statistics / gnunet-service-statistics.c
index 42ff8d927b8b69a8cfb9bdf60098b4b89198428e..c506dee7b9aa44408be58b387a12f0a617da195a 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @file statistics/gnunet-service-statistics.c
  * @brief program that tracks statistics
  * @author Christian Grothoff
+ * 
+ * TODO:
+ * - use BIO for IO operations
  */
 #include "platform.h"
+#include "gnunet_container_lib.h"
 #include "gnunet_disk_lib.h"
 #include "gnunet_getopt_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_time_lib.h"
 #include "statistics.h"
 
+/**
+ * Watch entry.
+ */
+struct WatchEntry
+{
+
+  struct WatchEntry *next;
+
+  struct WatchEntry *prev;
+
+  struct GNUNET_SERVER_Client *client;
+
+  uint64_t last_value;
+  
+  uint32_t wid;
+
+};
+
+
+/**
+ * Client entry.
+ */
+struct ClientEntry
+{
+
+  struct ClientEntry *next;
+
+  struct ClientEntry *prev;
+
+  struct GNUNET_SERVER_Client *client;
+  
+  uint32_t max_wid;
+
+};
+
 /**
  * Entry in the statistics list.
  */
@@ -62,6 +101,18 @@ struct StatsEntry
    */
   struct GNUNET_STATISTICS_SetMessage *msg;
 
+  /**
+   * Watch context for changes to this
+   * value, or NULL for none.
+   */
+  struct WatchEntry *we_head;
+
+  /**
+   * Watch context for changes to this
+   * value, or NULL for none.
+   */
+  struct WatchEntry *we_tail;
+
   /**
    * Our value.
    */
@@ -79,31 +130,58 @@ struct StatsEntry
 
 };
 
+/**
+ * Our configuration.
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
+
 /**
  * Linked list of our active statistics.
  */
 static struct StatsEntry *start;
 
+static struct ClientEntry *client_head;
+
+static struct ClientEntry *client_tail;
+
+/**
+ * Our notification context.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
 /**
  * Counter used to generate unique values.
  */
 static uint32_t uidgen;
 
+
+static void
+inject_message (void *cls,
+               void *client,
+               const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_SERVER_Handle *server = cls;
+
+  GNUNET_break (GNUNET_OK == GNUNET_SERVER_inject (server, NULL, msg));
+}
+
+
 /**
  * Load persistent values from disk.  Disk format is
  * exactly the same format that we also use for
  * setting the values over the network.
+ *
+ * @param server handle to the server context
  */
 static void
-load (struct GNUNET_SERVER_Handle *server,
-      struct GNUNET_CONFIGURATION_Handle *cfg)
+load (struct GNUNET_SERVER_Handle *server)
 {
   char *fn;
-  int fd;
+  struct GNUNET_DISK_FileHandle *fh;
+  struct GNUNET_DISK_MapHandle *mh;
   struct stat sb;
   char *buf;
-  size_t off;
-  const struct GNUNET_MessageHeader *msg;
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
 
   fn = GNUNET_DISK_get_home_filename (cfg,
                                       "statistics", "statistics.data", NULL);
@@ -114,85 +192,80 @@ load (struct GNUNET_SERVER_Handle *server,
       GNUNET_free (fn);
       return;
     }
-  fd = GNUNET_DISK_file_open (fn, O_RDONLY);
-  if (fd == -1)
+  fh = GNUNET_DISK_file_open (fn, GNUNET_DISK_OPEN_READ,
+                             GNUNET_DISK_PERM_NONE);
+  if (!fh)
     {
       GNUNET_free (fn);
       return;
     }
-  buf = MMAP (NULL, sb.st_size, PROT_READ, MAP_SHARED, fd, 0);
-  if (MAP_FAILED == buf)
+  buf = GNUNET_DISK_file_map (fh, &mh, GNUNET_DISK_MAP_TYPE_READ, sb.st_size);
+  if (NULL == buf)
     {
       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, "mmap", fn);
-      GNUNET_break (0 == CLOSE (fd));
+      GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh));
       GNUNET_free (fn);
       return;
     }
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               _("Loading %llu bytes of statistics from `%s'\n"),
               (unsigned long long) sb.st_size, fn);
-  off = 0;
-  while (off + sizeof (struct GNUNET_MessageHeader) < sb.st_size)
-    {
-      msg = (const struct GNUNET_MessageHeader *) &buf[off];
-      if ((ntohs (msg->size) + off > sb.st_size) ||
-          (GNUNET_OK != GNUNET_SERVER_inject (server, NULL, msg)))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      off += ntohs (msg->size);
-    }
-  GNUNET_break (0 == MUNMAP (buf, sb.st_size));
-  GNUNET_break (0 == CLOSE (fd));
+  mst = GNUNET_SERVER_mst_create (&inject_message,
+                                 server);
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_SERVER_mst_receive (mst,
+                                          NULL,
+                                          buf,
+                                          sb.st_size,
+                                          GNUNET_YES,
+                                          GNUNET_NO));
+  GNUNET_SERVER_mst_destroy (mst);
+  GNUNET_break (GNUNET_OK == GNUNET_DISK_file_unmap (mh));
+  GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh));
   GNUNET_free (fn);
 }
 
-
 /**
  * Write persistent statistics to disk.
- *
- * @param cls closure
- * @param cfg configuration to use
  */
 static void
-save (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
+save ()       
 {
   struct StatsEntry *pos;
   char *fn;
-  int fd;
+  struct GNUNET_DISK_FileHandle *fh;
   uint16_t size;
   unsigned long long total;
 
-  fd = -1;
+  fh = NULL;
   fn = GNUNET_DISK_get_home_filename (cfg,
                                       "statistics", "statistics.data", NULL);
   if (fn != NULL)
-    fd =
-      GNUNET_DISK_file_open (fn, O_WRONLY | O_CREAT | O_TRUNC,
-                             S_IRUSR | S_IWUSR);
+    fh = GNUNET_DISK_file_open (fn, GNUNET_DISK_OPEN_WRITE
+        | GNUNET_DISK_OPEN_CREATE | GNUNET_DISK_OPEN_TRUNCATE,
+        GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE);
   total = 0;
   while (NULL != (pos = start))
     {
       start = pos->next;
-      if ((pos->persistent) && (fd != -1))
+      if ((pos->persistent) && (NULL != fh))
         {
           size = htons (pos->msg->header.size);
-          if (size != WRITE (fd, pos->msg, size))
+          if (size != GNUNET_DISK_file_write (fh, pos->msg, size))
             {
               GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
                                         "write", fn);
-              GNUNET_DISK_file_close (fn, fd);
-              fd = -1;
+              GNUNET_DISK_file_close (fh);
+             fh = NULL;
             }
           else
             total += size;
         }
       GNUNET_free (pos);
     }
-  if (fd != -1)
+  if (NULL != fh)
     {
-      GNUNET_DISK_file_close (fn, fd);
+      GNUNET_DISK_file_close (fh);
       if (total == 0)
         GNUNET_break (0 == UNLINK (fn));
       else
@@ -207,20 +280,19 @@ save (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
  * Transmit the given stats value.
  */
 static void
-transmit (struct GNUNET_SERVER_TransmitContext *tc,
+transmit (struct GNUNET_SERVER_Client *client,
           const struct StatsEntry *e)
 {
   struct GNUNET_STATISTICS_ReplyMessage *m;
-  struct GNUNET_MessageHeader *h;
   size_t size;
-  uint16_t msize;
 
   size =
     sizeof (struct GNUNET_STATISTICS_ReplyMessage) + strlen (e->service) + 1 +
     strlen (e->name) + 1;
   GNUNET_assert (size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  msize = size - sizeof (struct GNUNET_MessageHeader);
   m = GNUNET_malloc (size);
+  m->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_VALUE);
+  m->header.size = htons (size);
   m->uid = htonl (e->uid);
   if (e->persistent)
     m->uid |= htonl (GNUNET_STATISTICS_PERSIST_BIT);
@@ -231,14 +303,11 @@ transmit (struct GNUNET_SERVER_TransmitContext *tc,
                                                      2, e->service, e->name));
 #if DEBUG_STATISTICS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting value for `%s:%s': %llu\n",
-              e->service, e->name, e->value);
+              "Transmitting value for `%s:%s' (%d): %llu\n",
+              e->service, e->name,
+             e->persistent, e->value);
 #endif
-  h = &m->header;
-  GNUNET_SERVER_transmit_context_append (tc,
-                                         &h[1],
-                                         msize,
-                                         GNUNET_MESSAGE_TYPE_STATISTICS_VALUE);
+  GNUNET_SERVER_notification_context_unicast (nc, client, &m->header, GNUNET_NO);
   GNUNET_free (m);
 }
 
@@ -255,6 +324,31 @@ matches (const struct StatsEntry *e, const char *service, const char *name)
 }
 
 
+static struct ClientEntry *
+make_client_entry (struct GNUNET_SERVER_Client *client)
+{
+  struct ClientEntry *ce;
+
+  GNUNET_assert (client != NULL);
+  ce = client_head;
+  while (ce != NULL)
+    {
+      if (ce->client == client)
+       return ce;
+      ce = ce->next;
+    }
+  ce = GNUNET_malloc (sizeof (struct ClientEntry));
+  ce->client = client;
+  GNUNET_SERVER_client_keep (client);
+  GNUNET_CONTAINER_DLL_insert (client_head,
+                              client_tail,
+                              ce);
+  GNUNET_SERVER_notification_context_add (nc,
+                                         client);
+  return ce;
+}
+
+
 /**
  * Handle GET-message.
  *
@@ -269,12 +363,14 @@ handle_get (void *cls,
             struct GNUNET_SERVER_Client *client,
             const struct GNUNET_MessageHeader *message)
 {
+  struct GNUNET_MessageHeader end;
   char *service;
   char *name;
   struct StatsEntry *pos;
-  struct GNUNET_SERVER_TransmitContext *tc;
   size_t size;
 
+  if (client != NULL)
+    make_client_entry (client);
   size = ntohs (message->size) - sizeof (struct GNUNET_MessageHeader);
   if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1],
                                               size, 2, &service, &name))
@@ -288,20 +384,51 @@ handle_get (void *cls,
               "Received request for statistics on `%s:%s'\n",
               strlen (service) ? service : "*", strlen (name) ? name : "*");
 #endif
-  tc = GNUNET_SERVER_transmit_context_create (client);
   pos = start;
   while (pos != NULL)
     {
       if (matches (pos, service, name))
-        transmit (tc, pos);
+        transmit (client, pos);
       pos = pos->next;
     }
-  GNUNET_SERVER_transmit_context_append (tc, NULL, 0,
-                                         GNUNET_MESSAGE_TYPE_STATISTICS_END);
-  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+  end.size = htons (sizeof (struct GNUNET_MessageHeader));
+  end.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_END);
+  GNUNET_SERVER_notification_context_unicast (nc,
+                                             client,
+                                             &end,
+                                             GNUNET_NO);
+  GNUNET_SERVER_receive_done (client,
+                             GNUNET_OK);
 }
 
 
+static void
+notify_change (struct StatsEntry *se)
+{
+  struct GNUNET_STATISTICS_WatchValueMessage wvm;
+  struct WatchEntry *pos;
+
+  pos = se->we_head;
+  while (pos != NULL)
+    {
+      if (pos->last_value != se->value)
+       {
+         wvm.header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE);
+         wvm.header.size = htons (sizeof (struct GNUNET_STATISTICS_WatchValueMessage));
+         wvm.flags = htonl (se->persistent ? GNUNET_STATISTICS_PERSIST_BIT : 0);
+         wvm.wid = htonl (pos->wid);
+         wvm.reserved = htonl (0);
+         wvm.value = GNUNET_htonll (se->value);
+         GNUNET_SERVER_notification_context_unicast (nc,
+                                                     pos->client,
+                                                     &wvm.header,
+                                                     GNUNET_NO);
+         pos->last_value = se->value;
+       }
+      pos = pos->next;
+    }
+}
+
 /**
  * Handle SET-message.
  *
@@ -324,7 +451,10 @@ handle_set (void *cls,
   uint32_t flags;
   uint64_t value;
   int64_t delta;
+  int changed;
 
+  if (client != NULL)
+    make_client_entry (client);
   msize = ntohs (message->size);
   if (msize < sizeof (struct GNUNET_STATISTICS_SetMessage))
     {
@@ -342,13 +472,15 @@ handle_set (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+  flags = ntohl (msg->flags);
+  value = GNUNET_ntohll (msg->value);
 #if DEBUG_STATISTICS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received request to update statistic on `%s:%s'\n",
-              service, name);
+              "Received request to update statistic on `%s:%s' (%u) to/by %llu\n",
+              service, name,
+             (unsigned int) flags,
+             (unsigned long long) value);
 #endif
-  flags = ntohl (msg->flags);
-  value = GNUNET_ntohll (msg->value);
   pos = start;
   prev = NULL;
   while (pos != NULL)
@@ -357,17 +489,20 @@ handle_set (void *cls,
         {
           if ((flags & GNUNET_STATISTICS_SETFLAG_RELATIVE) == 0)
             {
+             changed = (pos->value != value);
               pos->value = value;
             }
           else
             {
               delta = (int64_t) value;
               if ((delta < 0) && (pos->value < -delta))
-                {
-                  pos->value = 0;
+               {
+                 changed = (pos->value != 0);
+                 pos->value = 0;
                 }
               else
                 {
+                 changed = (delta != 0);
                   GNUNET_break ((delta <= 0) ||
                                 (pos->value + delta > pos->value));
                   pos->value += delta;
@@ -389,6 +524,8 @@ handle_set (void *cls,
                       "Statistic `%s:%s' updated to value %llu.\n",
                       service, name, pos->value);
 #endif
+         if (changed) 
+           notify_change (pos);
           GNUNET_SERVER_receive_done (client, GNUNET_OK);
           return;
         }
@@ -418,32 +555,205 @@ handle_set (void *cls,
 
 
 /**
- * List of handlers for the messages understood by this
- * service.
+ * Handle WATCH-message.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
  */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&handle_set, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_SET, 0},
-  {&handle_get, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_GET, 0},
-  {NULL, NULL, 0, 0}
-};
+static void
+handle_watch (void *cls,
+             struct GNUNET_SERVER_Client *client,
+             const struct GNUNET_MessageHeader *message)
+{
+  char *service;
+  char *name;
+  uint16_t msize;
+  uint16_t size;
+  struct StatsEntry *pos;
+  struct ClientEntry *ce;
+  struct WatchEntry *we;
+  size_t slen;
+
+  ce = make_client_entry (client);
+  msize = ntohs (message->size);
+  if (msize < sizeof (struct GNUNET_MessageHeader))
+    {
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+  size = msize - sizeof (struct GNUNET_MessageHeader);
+  if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1],
+                                              size, 2, &service, &name))
+    {
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received request to watch statistic on `%s:%s'\n",
+              service, name);
+#endif
+  pos = start;
+  while (pos != NULL)
+    {
+      if (matches (pos, service, name))
+        break;
+      pos = pos->next;
+    }
+  if (pos == NULL)
+    {
+      pos = GNUNET_malloc (sizeof (struct StatsEntry) + 
+                          sizeof (struct GNUNET_STATISTICS_SetMessage) + 
+                          size);
+      pos->next = start;
+      pos->uid = uidgen++;
+      pos->msg = (void *) &pos[1];
+      pos->msg->header.size = htons (sizeof (struct GNUNET_STATISTICS_SetMessage) + 
+                                    size);
+      pos->msg->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
+      pos->service = (const char *) &pos->msg[1];
+      slen = strlen (service) + 1;
+      memcpy ((void*) pos->service, service, slen);
+      pos->name = &pos->service[slen];
+      memcpy ((void*) pos->name, name, strlen (name)+1);
+      start = pos;
+    }
+  we = GNUNET_malloc (sizeof (struct WatchEntry));
+  we->client = client;
+  GNUNET_SERVER_client_keep (client);
+  we->wid = ce->max_wid++;
+  GNUNET_CONTAINER_DLL_insert (pos->we_head,
+                              pos->we_tail,
+                              we);
+  if (pos->value != 0)
+    notify_change (pos);
+  GNUNET_SERVER_receive_done (client,
+                             GNUNET_OK);
+}
+
+
+/**
+ * Task run during shutdown.
+ *
+ * @param cls unused
+ * @param tc unused
+ */
+static void
+shutdown_task (void *cls,
+              const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct ClientEntry *ce;
+  struct WatchEntry *we;
+  struct StatsEntry *se;
+
+  save ();
+  GNUNET_SERVER_notification_context_destroy (nc);
+  nc = NULL;
+  while (NULL != (ce = client_head))
+    {
+      GNUNET_SERVER_client_drop (ce->client);
+      GNUNET_CONTAINER_DLL_remove (client_head,
+                                  client_tail,
+                                  ce);
+      GNUNET_free (ce);
+    }
+  while (NULL != (se = start))
+    {
+      start = se->next;
+      while (NULL != (we = se->we_head))
+       {
+         GNUNET_SERVER_client_drop (we->client);
+         GNUNET_CONTAINER_DLL_remove (se->we_head,
+                                      se->we_tail,
+                                      we);
+         GNUNET_free (we);
+       }
+      GNUNET_free (se);
+    }
+}
+
+
+/**
+ * A client disconnected.  Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ */
+static void
+handle_client_disconnect (void *cls,
+                         struct GNUNET_SERVER_Client
+                         * client)
+{
+  struct ClientEntry *ce;
+  struct WatchEntry *we;
+  struct WatchEntry *wen;
+  struct StatsEntry *se;
+  
+  ce = client_head;
+  while (NULL != ce)
+    {
+      if (ce->client == client)
+       {
+         GNUNET_SERVER_client_drop (ce->client);
+         GNUNET_CONTAINER_DLL_remove (client_head,
+                                      client_tail,
+                                      ce);
+         GNUNET_free (ce);
+         break;
+       }
+      ce = ce->next;
+    }
+  se = start;
+  while (NULL != se)
+    {
+      wen = se->we_head;
+      while (NULL != (we = wen))
+       {
+         wen = we->next;
+         if (we->client != client)
+           continue;
+         GNUNET_SERVER_client_drop (we->client);
+         GNUNET_CONTAINER_DLL_remove (se->we_head,
+                                      se->we_tail,
+                                      we);
+         GNUNET_free (we);
+       }
+      se = se->next;
+    }
+}
 
 
 /**
  * Process statistics requests.
  *
  * @param cls closure
- * @param sched scheduler to use
  * @param server the initialized server
- * @param cfg configuration to use
+ * @param c configuration to use
  */
 static void
 run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *sched,
      struct GNUNET_SERVER_Handle *server,
-     struct GNUNET_CONFIGURATION_Handle *cfg)
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&handle_set, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_SET, 0},
+    {&handle_get, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_GET, 0},
+    {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_WATCH, 0},
+    {NULL, NULL, 0, 0}
+  };
+  cfg = c;
   GNUNET_SERVER_add_handlers (server, handlers);
-  load (server, cfg);
+  nc = GNUNET_SERVER_notification_context_create (server, 16);
+  GNUNET_SERVER_disconnect_notify (server, 
+                                  &handle_client_disconnect,
+                                  NULL);
+  load (server);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                               &shutdown_task,
+                               NULL);
 }
 
 
@@ -460,7 +770,9 @@ main (int argc, char *const *argv)
   return (GNUNET_OK ==
           GNUNET_SERVICE_run (argc,
                               argv,
-                              "statistics", &run, NULL, &save, NULL)) ? 0 : 1;
+                              "statistics",
+                             GNUNET_SERVICE_OPTION_NONE,
+                             &run, NULL)) ? 0 : 1;
 }
 
 /* end of gnunet-service-statistics.c */