-fix warning
[oweals/gnunet.git] / src / sensor / gnunet-sensor-profiler.c
index 848fdf9c6e21f9df33a0b6584506129ede1854f7..21574774d881cf98ff798e79f59834885520c553 100644 (file)
      Boston, MA 02111-1307, USA.
 */
 
-/**
- * TODO:
- * - Run X peers
- * - Rewrite interval time (optional)
- * - Run 1 dashboard
- * - Monitor dashboard records
- * - Prompt for anomalies when ready:
- *  -- Cut Y peers (remove their connections to other X-Y peers but not the connections among themselves)
- */
-
 /**
  * @file sensor/gnunet-sensor-profiler.c
  * @brief Profiler for the sensor service
 #include "gnunet_testbed_service.h"
 #include "gnunet_peerstore_service.h"
 #include "gnunet_sensor_service.h"
+#include "gnunet_sensor_util_lib.h"
+#include "gnunet_transport_service.h"
+
+/**
+ * Time to wait for the peer to startup completely
+ */
+#define PEER_STARTUP_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
 
 /**
  * Information about a single peer
@@ -55,8 +52,22 @@ struct PeerInfo
    */
   struct GNUNET_TESTBED_Peer *testbed_peer;
 
-};
+  /**
+   * Index of this peer within our list
+   */
+  int index;
+
+  /**
+   * TESTBED operation used to connect to statistics service
+   */
+  struct GNUNET_TESTBED_Operation *statistics_op;
+
+  /**
+   * Handle to the peer's statistics service
+   */
+  struct GNUNET_STATISTICS_Handle *statistics;
 
+};
 
 /**
  * Name of the configuration file used
@@ -74,9 +85,14 @@ static const char *sensor_src_dir = "sensors";
 static const char *sensor_dst_dir = "/tmp/gnunet-sensor-profiler";
 
 /**
- * Return value of the program
+ * Scheduled task to shutdown
  */
-static int ok = 1;
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+
+/**
+ * GNUnet configuration
+ */
+static struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
  * Number of peers to run (Option -p)
@@ -88,6 +104,16 @@ static unsigned int num_peers = 0;
  */
 static unsigned int sensors_interval = 0;
 
+/**
+ * Path to topology file (Option -t)
+ */
+static char *topology_file;
+
+/**
+ * Number of peers to simulate anomalies on (Option -a)
+ */
+static unsigned int anomalous_peers = 0;
+
 /**
  * Array of peer info for all peers
  */
@@ -108,6 +134,31 @@ static struct GNUNET_TESTBED_Operation *peerstore_op;
  */
 static struct GNUNET_PEERSTORE_Handle *peerstore;
 
+/**
+ * Dashboard service on collection point started?
+ */
+static int dashboard_service_started = GNUNET_NO;
+
+/**
+ * Number of peers started the sensor service successfully
+ */
+static int sensor_services_started = 0;
+
+/**
+ * Array of sensor names to be used for watching peerstore records
+ */
+static char **sensor_names;
+
+/**
+ * Size of 'sensor_names' array
+ */
+static unsigned int sensor_names_size = 0;
+
+/**
+ * Task run after any waiting period
+ */
+static GNUNET_SCHEDULER_TaskIdentifier delayed_task = GNUNET_SCHEDULER_NO_TASK;
+
 
 /**
  * Copy directory recursively
@@ -124,9 +175,24 @@ copy_dir (const char *src, const char *dst);
  * Do clean up and shutdown scheduler
  */
 static void
-do_shutdown ()                  // TODO: schedule timeout shutdown
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  int i;
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down.\n");
+  if (GNUNET_SCHEDULER_NO_TASK != delayed_task)
+  {
+    GNUNET_SCHEDULER_cancel (delayed_task);
+    delayed_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  for (i = 0; i < num_peers; i++)
+  {
+    if (NULL != all_peers_info[i].statistics_op)
+    {
+      GNUNET_TESTBED_operation_done (all_peers_info[i].statistics_op);
+      all_peers_info[i].statistics_op = NULL;
+    }
+  }
   if (NULL != peerstore_op)
   {
     GNUNET_TESTBED_operation_done (peerstore_op);
@@ -137,6 +203,17 @@ do_shutdown ()                  // TODO: schedule timeout shutdown
     GNUNET_free (all_peers_info);
     all_peers_info = NULL;
   }
+  if (NULL != cfg)
+  {
+    GNUNET_CONFIGURATION_destroy (cfg);
+    cfg = NULL;
+  }
+  if (NULL != sensor_names)
+  {
+    for (i = 0; i < sensor_names_size; i++)
+      GNUNET_free (sensor_names[i]);
+    GNUNET_array_grow (sensor_names, sensor_names_size, 0);
+  }
   GNUNET_SCHEDULER_shutdown ();
 }
 
@@ -158,8 +235,6 @@ copy_dir_scanner (void *cls, const char *filename)
 
   GNUNET_asprintf (&dst, "%s%s%s", dst_dir, DIR_SEPARATOR_STR,
                    GNUNET_STRINGS_get_short_name (filename));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Copying `%s' to `%s'.\n", filename,
-              dst);
   if (GNUNET_YES == GNUNET_DISK_directory_test (filename, GNUNET_YES))
     copy_result = copy_dir (filename, dst);
   else
@@ -167,6 +242,8 @@ copy_dir_scanner (void *cls, const char *filename)
     if (GNUNET_YES == GNUNET_DISK_file_test (dst))
       GNUNET_DISK_directory_remove (dst);
     copy_result = GNUNET_DISK_file_copy (filename, dst);
+    if (GNUNET_OK == copy_result)
+      GNUNET_DISK_fix_permissions (dst, GNUNET_NO, GNUNET_NO);
   }
   GNUNET_free (dst);
   return copy_result;
@@ -183,8 +260,6 @@ copy_dir_scanner (void *cls, const char *filename)
 static int
 copy_dir (const char *src, const char *dst)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Copying directory `%s' to `%s'.\n", src,
-              dst);
   if (GNUNET_YES != GNUNET_DISK_directory_test (src, GNUNET_YES))
     return GNUNET_SYSERR;
   if (GNUNET_OK != GNUNET_DISK_directory_create (dst))
@@ -209,6 +284,7 @@ sensor_dir_scanner (void *cls, const char *filename)
   const char *file_basename;
   char *dst_path;
   struct GNUNET_CONFIGURATION_Handle *sensor_cfg;
+  char *sensor_name;
 
   file_basename = GNUNET_STRINGS_get_short_name (filename);
   GNUNET_asprintf (&dst_path, "%s%s%s", sensor_dst_dir, DIR_SEPARATOR_STR,
@@ -219,13 +295,15 @@ sensor_dir_scanner (void *cls, const char *filename)
   }
   else
   {
+    sensor_name = GNUNET_strdup (file_basename);
+    GNUNET_array_append (sensor_names, sensor_names_size, sensor_name);
     sensor_cfg = GNUNET_CONFIGURATION_create ();
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONFIGURATION_parse (sensor_cfg, filename));
     GNUNET_CONFIGURATION_set_value_string (sensor_cfg, file_basename,
                                            "COLLECTION_POINT",
-                                           GNUNET_i2s_full (&all_peers_info[0].
-                                                            peer_id));
+                                           GNUNET_i2s_full (&all_peers_info
+                                                            [0].peer_id));
     if (sensors_interval > 0)
     {
       GNUNET_CONFIGURATION_set_value_number (sensor_cfg, file_basename,
@@ -273,8 +351,44 @@ dashboard_started (void *cls, struct GNUNET_TESTBED_Operation *op,
     GNUNET_assert (0);
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dashboard service started.\n");
-  //TODO:
   GNUNET_TESTBED_operation_done (op);
+  dashboard_service_started = GNUNET_YES;
+}
+
+
+/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop
+ */
+static int
+peerstore_watch_cb (void *cls,
+                    const struct GNUNET_PEERSTORE_Record *record,
+                    const char *emsg)
+{
+  struct PeerInfo *peer = cls;
+  struct GNUNET_SENSOR_DashboardAnomalyEntry *anomaly;
+
+  if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "ERROR: %s.\n", emsg);
+    GNUNET_assert (0);
+  }
+  GNUNET_assert (record->value_size ==
+                 sizeof (struct GNUNET_SENSOR_DashboardAnomalyEntry));
+  anomaly = record->value;
+  GNUNET_assert (0 ==
+                 GNUNET_CRYPTO_cmp_peer_identity (&peer->peer_id,
+                                                  record->peer));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Anomaly report:{'peerid': '%s'," "'peer': %d," "'sensor': '%s',"
+              "'anomalous': %d," "'neighbors': %f}\n",
+              GNUNET_i2s (&peer->peer_id), peer->index, record->key,
+              anomaly->anomalous, anomaly->anomalous_neighbors);
+  return GNUNET_YES;
 }
 
 
@@ -291,13 +405,27 @@ static void
 peerstore_connect_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
                       void *ca_result, const char *emsg)
 {
+  int i;
+  int j;
+  struct PeerInfo *peer;
+
   if (NULL != emsg)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "ERROR: %s.\n", emsg);
     GNUNET_assert (0);
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peerstore service.\n");
-  //TODO
+  /* Watch for anomaly reports from other peers */
+  for (i = 0; i < num_peers; i++)
+  {
+    peer = &all_peers_info[i];
+    for (j = 0; j < sensor_names_size; j++)
+    {
+      GNUNET_PEERSTORE_watch (peerstore, "sensordashboard-anomalies",
+                              &peer->peer_id, sensor_names[j],
+                              &peerstore_watch_cb, peer);
+    }
+  }
 }
 
 
@@ -335,6 +463,137 @@ peerstore_disconnect_adapter (void *cls, void *op_result)
 }
 
 
+/**
+ * Callback to be called when statistics service connect operation is completed
+ *
+ * @param cls the callback closure from functions generating an operation
+ * @param op the operation that has been finished
+ * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
+ * @param emsg error message in case the operation has failed; will be NULL if
+ *          operation has executed successfully.
+ */
+static void
+statistics_connect_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
+                      void *ca_result, const char *emsg)
+{
+  struct PeerInfo *peer = cls;
+
+  if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "ERROR: %s.\n", emsg);
+    GNUNET_assert (0);
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      "Connected to statistics service on peer `%s'.\n", GNUNET_i2s (&peer->peer_id));
+  GNUNET_STATISTICS_set (peer->statistics, "# peers connected", 0, GNUNET_NO);
+}
+
+
+/**
+ * Adapter function called to establish a connection to statistics service.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer to connect to; will be available until
+ *          GNUNET_TESTBED_operation_done() is called on the operation returned
+ *          from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+statistics_connect_adapter (void *cls,
+                           const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct PeerInfo *peer = cls;
+
+  peer->statistics = GNUNET_STATISTICS_create ("core", cfg);
+  GNUNET_assert (NULL != peer->statistics);
+  return peer->statistics;
+}
+
+
+/**
+ * Adapter function called to destroy a connection to statistics service.
+ *
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
+ */
+static void
+statistics_disconnect_adapter (void *cls, void *op_result)
+{
+  struct PeerInfo *peer = cls;
+
+  GNUNET_STATISTICS_destroy (peer->statistics, GNUNET_NO);
+  peer->statistics = NULL;
+}
+
+
+/**
+ * This function is called after the estimated training period is over.
+ */
+static void
+simulate_anomalies (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  int i;
+  uint32_t an_peer;
+  struct GNUNET_TIME_Relative shutdown_delay;
+
+  delayed_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Training period over, simulating anomalies now.\n");
+  GNUNET_assert (anomalous_peers <= num_peers);
+  for (i = 0; i < anomalous_peers; i++)
+  {
+    an_peer = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+    if (NULL != all_peers_info[an_peer].statistics_op)
+    {
+      i--;
+      continue;
+    }
+    all_peers_info[an_peer].statistics_op =
+    GNUNET_TESTBED_service_connect (NULL, all_peers_info[an_peer].testbed_peer,
+                                    "statistics", &statistics_connect_cb,
+                                    &all_peers_info[an_peer], &statistics_connect_adapter,
+                                    &statistics_disconnect_adapter, &all_peers_info[an_peer]);
+  }
+  shutdown_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, num_peers * 6);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down in %s\n",
+      GNUNET_STRINGS_relative_time_to_string (shutdown_delay, GNUNET_NO));
+  GNUNET_SCHEDULER_cancel (shutdown_task);
+  shutdown_task = GNUNET_SCHEDULER_add_delayed (shutdown_delay, &do_shutdown, NULL);
+}
+
+
+/**
+ * This function is called after a delay which ensures that all peers are
+ * properly initialized
+ */
+static void
+peers_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  unsigned long long int training_points;
+  struct GNUNET_TIME_Relative training_period;
+
+  delayed_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "All peers are ready.\n");
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONFIGURATION_get_value_number (cfg,
+                                                        "sensor-model-gaussian",
+                                                        "TRAINING_WINDOW",
+                                                        &training_points));
+  training_period =
+      GNUNET_TIME_relative_multiply (GNUNET_TIME_relative_multiply
+                                     (GNUNET_TIME_UNIT_SECONDS,
+                                      (sensors_interval ==
+                                       0) ? 60 : sensors_interval),
+                                     training_points);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sleeping for a training period of %s.\n",
+              GNUNET_STRINGS_relative_time_to_string (training_period,
+                                                      GNUNET_NO));
+  delayed_task =
+      GNUNET_SCHEDULER_add_delayed (training_period, &simulate_anomalies, NULL);
+}
+
+
 /**
  * Callback to be called when sensor service is started
  *
@@ -356,8 +615,15 @@ sensor_service_started (void *cls, struct GNUNET_TESTBED_Operation *op,
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sensor service started on peer `%s'.\n",
               GNUNET_i2s (&peer->peer_id));
-  //TODO:
   GNUNET_TESTBED_operation_done (op);
+  sensor_services_started++;
+  if (sensor_services_started == num_peers)
+  {
+    delayed_task =
+        GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+                                      (PEER_STARTUP_TIME, num_peers),
+                                      &peers_ready, NULL);
+  }
 }
 
 
@@ -385,6 +651,7 @@ peer_info_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op,
   }
   peer->testbed_peer = testbed_peer;
   GNUNET_CRYPTO_get_peer_identity (pinfo->result.cfg, &peer->peer_id);
+  peer->index = peers_known;
   peers_known++;
   if (1 == peers_known)         /* First peer is collection point */
   {
@@ -435,7 +702,6 @@ test_master (void *cls, struct GNUNET_TESTBED_RunHandle *h, unsigned int num,
               "%d peers started. %d links succeeded. %d links failed.\n",
               num_peers, links_succeeded, links_failed);
   GNUNET_assert (num == num_peers);
-  GNUNET_assert (0 == links_failed);
   /* Collect peer information */
   all_peers_info = GNUNET_new_array (num_peers, struct PeerInfo);
 
@@ -456,11 +722,18 @@ test_master (void *cls, struct GNUNET_TESTBED_RunHandle *h, unsigned int num,
 static int
 verify_args ()
 {
-  if (num_peers < 3)
+  if (num_peers < 2)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _
-                ("Invalid or missing number of peers. Set at least 3 peers.\n"));
+                ("Invalid or missing number of peers. Set at least 2 peers.\n"));
+    return GNUNET_SYSERR;
+  }
+  if (NULL == topology_file ||
+      GNUNET_YES != GNUNET_DISK_file_test (topology_file))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("Missing or invalid topology file.\n"));
     return GNUNET_SYSERR;
   }
   return GNUNET_OK;
@@ -479,22 +752,21 @@ static void
 run (void *cls, char *const *args, const char *cf,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  struct GNUNET_CONFIGURATION_Handle *cfg;
-  double links;
-
   if (GNUNET_OK != verify_args ())
   {
-    do_shutdown ();
+    do_shutdown (NULL, NULL);
     return;
   }
   cfg = GNUNET_CONFIGURATION_create ();
   GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_load (cfg, cfg_filename));
-  links = log (num_peers) * log (num_peers) * num_peers / 2;
-  GNUNET_CONFIGURATION_set_value_number ((struct GNUNET_CONFIGURATION_Handle *)
-                                         cfg, "TESTBED", "OVERLAY_RANDOM_LINKS",
-                                         (unsigned long long int) links);
+  GNUNET_CONFIGURATION_set_value_string ((struct GNUNET_CONFIGURATION_Handle *)
+                                         cfg, "TESTBED",
+                                         "OVERLAY_TOPOLOGY_FILE",
+                                         topology_file);
+  shutdown_task =
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &do_shutdown,
+                                    NULL);
   GNUNET_TESTBED_run (NULL, cfg, num_peers, 0, NULL, NULL, &test_master, NULL);
-  GNUNET_CONFIGURATION_destroy (cfg);
 }
 
 
@@ -509,16 +781,21 @@ main (int argc, char *const *argv)
   static struct GNUNET_GETOPT_CommandLineOption options[] = {
     {'p', "peers", "COUNT", gettext_noop ("Number of peers to run"), GNUNET_YES,
      &GNUNET_GETOPT_set_uint, &num_peers},
+    {'t', "topology-file", "FILEPATH", gettext_noop ("Path to topology file"),
+     GNUNET_YES, &GNUNET_GETOPT_set_filename, &topology_file},
     {'i', "sensors-interval", "INTERVAL",
-     gettext_noop ("Change the interval or running sensors to given value"),
+     gettext_noop ("Change the interval of running sensors to given value"),
      GNUNET_YES, &GNUNET_GETOPT_set_uint, &sensors_interval},
+    {'a', "anomalous-peers", "COUNT",
+     gettext_noop ("Number of peers to simulate anomalies on"), GNUNET_YES,
+     &GNUNET_GETOPT_set_uint, &anomalous_peers},
     GNUNET_GETOPT_OPTION_END
   };
 
   return (GNUNET_OK ==
           GNUNET_PROGRAM_run (argc, argv, "gnunet-sensor-profiler",
                               gettext_noop ("Profiler for sensor service"),
-                              options, &run, NULL)) ? ok : 1;
+                              options, &run, NULL)) ? 0 : 1;
 }
 
 /* end of gnunet-sensor-profiler.c */