rps: start using statistics service
authorJulius Bünger <buenger@mytum.de>
Mon, 15 Jan 2018 17:13:39 +0000 (18:13 +0100)
committerJulius Bünger <buenger@mytum.de>
Tue, 16 Jan 2018 12:18:02 +0000 (13:18 +0100)
src/rps/gnunet-service-rps.c
src/rps/test_rps.c

index ec70075cf9e37649234e8490ae7004deca7f744a..877893ee8bcf4eaa9b787646c2eed4242eee871b 100644 (file)
@@ -29,6 +29,7 @@
 #include "gnunet_cadet_service.h"
 #include "gnunet_peerinfo_service.h"
 #include "gnunet_nse_service.h"
+#include "gnunet_statistics_service.h"
 #include "rps.h"
 #include "rps-test_util.h"
 #include "gnunet-service-rps_sampler.h"
  */
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
+/**
+ * Handle to the statistics service.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
 /**
  * Our own identity.
  */
@@ -2390,6 +2396,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
          send_size * sizeof (struct GNUNET_PeerIdentity));
 
   Peers_send_message (peer_id, ev, "PULL REPLY");
+  GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
 }
 
 
@@ -2961,6 +2968,7 @@ handle_peer_push (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received PUSH (%s)\n",
        GNUNET_i2s (peer));
+  GNUNET_STATISTICS_update(stats, "# push messages received", 1, GNUNET_NO);
 
   #ifdef ENABLE_MALICIOUS
   struct AttackedPeer *tmp_att_peer;
@@ -3013,6 +3021,7 @@ handle_peer_pull_request (void *cls,
   const struct GNUNET_PeerIdentity *view_array;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
+  GNUNET_STATISTICS_update(stats, "# pull request messages received", 1, GNUNET_NO);
 
   #ifdef ENABLE_MALICIOUS
   if (1 == mal_type
@@ -3096,6 +3105,7 @@ handle_peer_pull_reply (void *cls,
 #endif /* ENABLE_MALICIOUS */
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
+  GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
 
   #ifdef ENABLE_MALICIOUS
   // We shouldn't even receive pull replies as we're not sending
@@ -3234,6 +3244,7 @@ send_pull_request (const struct GNUNET_PeerIdentity *peer)
 
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
   Peers_send_message (peer, ev, "PULL REQUEST");
+  GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
 }
 
 
@@ -3253,6 +3264,7 @@ send_push (const struct GNUNET_PeerIdentity *peer_id)
 
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
   Peers_send_message (peer_id, ev, "PUSH");
+  GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
 }
 
 
@@ -3554,6 +3566,7 @@ do_round (void *cls)
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Going to execute next round.\n");
+  GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
   do_round_task = NULL;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Printing view:\n");
@@ -3706,6 +3719,7 @@ do_round (void *cls)
   else
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
+    GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
   }
   // TODO independent of that also get some peers from CADET_get_peers()?
 
@@ -3881,6 +3895,11 @@ shutdown_task (void *cls)
   View_destroy ();
   CustomPeerMap_destroy (push_map);
   CustomPeerMap_destroy (pull_map);
+  if (NULL != stats)
+  {
+    GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+    stats = NULL;
+  }
   #ifdef ENABLE_MALICIOUS
   struct AttackedPeer *tmp_att_peer;
   GNUNET_free (file_name_view_log);
@@ -4129,6 +4148,8 @@ run (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
 
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
+  stats = GNUNET_STATISTICS_create ("rps", cfg);
+
 }
 
 
index 7d61ae6a14268a00250e0cee5aab667674a26736..c958194a8e271c2c66a14ea736ee9987e77918d6 100644 (file)
  */
 static uint32_t num_peers;
 
+/**
+ * How many peers are ready to shutdown?
+ */
+static uint32_t num_shutdown_ready;
+
 /**
  * How long do we run the test?
  */
@@ -236,6 +241,16 @@ struct RPSPeer
    * Pending operation on that peer
    */
   const struct OpListEntry *entry_op_manage;
+
+  /**
+   * Testbed operation to connect to statistics service
+   */
+  struct GNUNET_TESTBED_Operation *stat_op;
+
+  /**
+   * Handle to the statistics service
+   */
+  struct GNUNET_STATISTICS_Handle *stats_h;
 };
 
 
@@ -318,7 +333,7 @@ typedef void (*ReplyHandle) (void *cls,
 /**
  * Called directly before disconnecting from the service
  */
-typedef void (*PostTest) (void *cls, struct GNUNET_RPS_Handle *h);
+typedef void (*PostTest) (const struct RPSPeer *peer);
 
 /**
  * Function called after disconnect to evaluate test success
@@ -354,6 +369,21 @@ enum OPTION_QUICK_QUIT {
   HAVE_NO_QUICK_QUIT,
 };
 
+/**
+ * @brief Do we collect statistics at the end?
+ */
+enum OPTION_COLLECT_STATISTICS {
+  /**
+   * @brief We collect statistics at the end
+   */
+  COLLECT_STATISTICS,
+
+  /**
+   * @brief We do not collect statistics at the end
+   */
+  NO_COLLECT_STATISTICS,
+};
+
 /**
  * Structure to define a single test
  */
@@ -413,6 +443,11 @@ struct SingleTestRun
    * Quit test before timeout?
    */
   enum OPTION_QUICK_QUIT have_quick_quit;
+
+  /**
+   * Collect statistics at the end?
+   */
+  enum OPTION_COLLECT_STATISTICS have_collect_statistics;
 } cur_test_run;
 
 /**
@@ -575,9 +610,21 @@ shutdown_op (void *cls)
     churn_task = NULL;
   }
   for (i = 0; i < num_peers; i++)
+  {
     if (NULL != rps_peers[i].op)
       GNUNET_TESTBED_operation_done (rps_peers[i].op);
-  GNUNET_SCHEDULER_shutdown ();
+    if (NULL != cur_test_run.post_test)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i);
+      cur_test_run.post_test (&rps_peers[i]);
+    }
+  }
+  /* If we do not collect statistics, shut down directly */
+  if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
+      num_peers <= num_shutdown_ready)
+  {
+    GNUNET_SCHEDULER_shutdown ();
+  }
 }
 
 
@@ -753,6 +800,76 @@ rps_connect_adapter (void *cls,
   return h;
 }
 
+/**
+ * Called to open a connection to the peer's statistics
+ *
+ * @param cls peer context
+ * @param cfg configuration of the peer to connect to; will be available until
+ *          GNUNET_TESTBED_operation_done() is called on the operation returned
+ *          from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+stat_connect_adapter (void *cls,
+                      const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct RPSPeer *peer = cls;
+
+  peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg);
+  return peer->stats_h;
+}
+
+/**
+ * Called to disconnect from peer's statistics service
+ *
+ * @param cls peer context
+ * @param op_result service handle returned from the connect adapter
+ */
+static void
+stat_disconnect_adapter (void *cls, void *op_result)
+{
+  struct RPSPeer *peer = cls;
+
+  //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
+  //              (peer->stats_h, "core", "# peers connected",
+  //               stat_iterator, peer));
+  //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
+  //              (peer->stats_h, "nse", "# peers connected",
+  //               stat_iterator, peer));
+  GNUNET_STATISTICS_destroy (op_result, GNUNET_NO);
+  peer->stats_h = NULL;
+}
+
+/**
+ * Called after successfully opening a connection to a peer's statistics
+ * service; we register statistics monitoring for CORE and NSE here.
+ *
+ * @param cls the callback closure from functions generating an operation
+ * @param op the operation that has been finished
+ * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
+ * @param emsg error message in case the operation has failed; will be NULL if
+ *          operation has executed successfully.
+ */
+static void
+stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
+                  void *ca_result, const char *emsg )
+{
+  //struct GNUNET_STATISTICS_Handle *sh = ca_result;
+  //struct RPSPeer *peer = (struct RPSPeer *) cls;
+
+  if (NULL != emsg)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
+  //              (sh, "core", "# peers connected",
+  //               stat_iterator, peer));
+  //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
+  //              (sh, "nse", "# peers connected",
+  //               stat_iterator, peer));
+}
+
 
 /**
  * Adapter function called to destroy connection to
@@ -1541,6 +1658,71 @@ profiler_eval (void)
   return evaluate ();
 }
 
+/**
+ * Continuation called by #GNUNET_STATISTICS_get() functions.
+ *
+ * Checks whether all peers received their statistics yet.
+ * Issues the shutdown.
+ *
+ * @param cls closure
+ * @param success #GNUNET_OK if statistics were
+ *        successfully obtained, #GNUNET_SYSERR if not.
+ */
+void
+post_test_shutdown_ready_cb (void *cls,
+                             int success)
+{
+  const struct RPSPeer *rps_peer = (const struct RPSPeer *) cls;
+  if (NULL != rps_peer->stat_op)
+  {
+    GNUNET_TESTBED_operation_done (rps_peer->stat_op);
+  }
+  num_shutdown_ready++;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      "%" PRIu32 " of %" PRIu32 " Peers are ready to shut down\n",
+      num_shutdown_ready,
+      num_peers);
+  if (num_peers <= num_shutdown_ready)
+  {
+    GNUNET_SCHEDULER_shutdown ();
+  }
+}
+
+/**
+ * Callback function to process statistic values.
+ *
+ * @param cls closure
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
+ */
+int
+stat_iterator (void *cls,
+               const char *subsystem,
+               const char *name,
+               uint64_t value,
+               int is_persistent)
+{
+  //const struct RPSPeer *rps_peer = (const struct RPSPeer *) cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %" PRIu64 "\n", value);
+  return GNUNET_OK;
+}
+
+void post_profiler (const struct RPSPeer *rps_peer)
+{
+  if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
+  {
+    GNUNET_STATISTICS_get (rps_peer->stats_h,
+                           "rps",
+                           "# rounds",
+                           post_test_shutdown_ready_cb,
+                           stat_iterator,
+                           (struct RPSPeer *) rps_peer);
+  }
+}
+
 
 /***********************************************************************
  * /Definition of tests
@@ -1623,6 +1805,19 @@ run (void *cls,
                                         &rps_disconnect_adapter,
                                         &rps_peers[i]);
     }
+    /* Connect all peers to statistics service */
+    if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
+    {
+      rps_peers[i].stat_op =
+        GNUNET_TESTBED_service_connect (NULL,
+                                        peers[i],
+                                        "statistics",
+                                        stat_complete_cb,
+                                        &rps_peers[i],
+                                        &stat_connect_adapter,
+                                        &stat_disconnect_adapter,
+                                        &rps_peers[i]);
+    }
   }
 
   if (NULL != churn_task)
@@ -1644,12 +1839,15 @@ main (int argc, char *argv[])
   int ret_value;
 
   num_peers = 5;
+  num_shutdown_ready = 0;
   cur_test_run.name = "test-rps-default";
   cur_test_run.init_peer = default_init_peer;
   cur_test_run.pre_test = NULL;
   cur_test_run.reply_handle = default_reply_handle;
   cur_test_run.eval_cb = default_eval_cb;
+  cur_test_run.post_test = NULL;
   cur_test_run.have_churn = HAVE_CHURN;
+  cur_test_run.have_collect_statistics = NO_COLLECT_STATISTICS;
   churn_task = NULL;
   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
 
@@ -1767,10 +1965,12 @@ main (int argc, char *argv[])
     cur_test_run.main_test = profiler_cb;
     cur_test_run.reply_handle = profiler_reply_handle;
     cur_test_run.eval_cb = profiler_eval;
+    cur_test_run.post_test = post_profiler;
     cur_test_run.request_interval = 2;
     cur_test_run.num_requests = 5;
     cur_test_run.have_churn = HAVE_CHURN;
     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
+    cur_test_run.have_collect_statistics = COLLECT_STATISTICS;
     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300);
 
     /* 'Clean' directory */