From 5441f24ab5327f576b941e8e1e14a00270690759 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Mon, 15 Jan 2018 18:13:39 +0100 Subject: [PATCH] rps: start using statistics service --- src/rps/gnunet-service-rps.c | 21 ++++ src/rps/test_rps.c | 204 ++++++++++++++++++++++++++++++++++- 2 files changed, 223 insertions(+), 2 deletions(-) diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index ec70075cf..877893ee8 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -29,6 +29,7 @@ #include "gnunet_cadet_service.h" #include "gnunet_peerinfo_service.h" #include "gnunet_nse_service.h" +#include "gnunet_statistics_service.h" #include "rps.h" #include "rps-test_util.h" #include "gnunet-service-rps_sampler.h" @@ -59,6 +60,11 @@ */ static const struct GNUNET_CONFIGURATION_Handle *cfg; +/** + * Handle to the statistics service. + */ +static struct GNUNET_STATISTICS_Handle *stats; + /** * Our own identity. */ @@ -2390,6 +2396,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, send_size * sizeof (struct GNUNET_PeerIdentity)); Peers_send_message (peer_id, ev, "PULL REPLY"); + GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO); } @@ -2961,6 +2968,7 @@ handle_peer_push (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PUSH (%s)\n", GNUNET_i2s (peer)); + GNUNET_STATISTICS_update(stats, "# push messages received", 1, GNUNET_NO); #ifdef ENABLE_MALICIOUS struct AttackedPeer *tmp_att_peer; @@ -3013,6 +3021,7 @@ handle_peer_pull_request (void *cls, const struct GNUNET_PeerIdentity *view_array; LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer)); + GNUNET_STATISTICS_update(stats, "# pull request messages received", 1, GNUNET_NO); #ifdef ENABLE_MALICIOUS if (1 == mal_type @@ -3096,6 +3105,7 @@ handle_peer_pull_reply (void *cls, #endif /* ENABLE_MALICIOUS */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender)); + GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO); #ifdef ENABLE_MALICIOUS // We shouldn't even receive pull replies as we're not sending @@ -3234,6 +3244,7 @@ send_pull_request (const struct GNUNET_PeerIdentity *peer) ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); Peers_send_message (peer, ev, "PULL REQUEST"); + GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO); } @@ -3253,6 +3264,7 @@ send_push (const struct GNUNET_PeerIdentity *peer_id) ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); Peers_send_message (peer_id, ev, "PUSH"); + GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO); } @@ -3554,6 +3566,7 @@ do_round (void *cls) LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round.\n"); + GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); do_round_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Printing view:\n"); @@ -3706,6 +3719,7 @@ do_round (void *cls) else { LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); + GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); } // TODO independent of that also get some peers from CADET_get_peers()? @@ -3881,6 +3895,11 @@ shutdown_task (void *cls) View_destroy (); CustomPeerMap_destroy (push_map); CustomPeerMap_destroy (pull_map); + if (NULL != stats) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + stats = NULL; + } #ifdef ENABLE_MALICIOUS struct AttackedPeer *tmp_att_peer; GNUNET_free (file_name_view_log); @@ -4129,6 +4148,8 @@ run (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + stats = GNUNET_STATISTICS_create ("rps", cfg); + } diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c index 7d61ae6a1..c958194a8 100644 --- a/src/rps/test_rps.c +++ b/src/rps/test_rps.c @@ -40,6 +40,11 @@ */ static uint32_t num_peers; +/** + * How many peers are ready to shutdown? + */ +static uint32_t num_shutdown_ready; + /** * How long do we run the test? */ @@ -236,6 +241,16 @@ struct RPSPeer * Pending operation on that peer */ const struct OpListEntry *entry_op_manage; + + /** + * Testbed operation to connect to statistics service + */ + struct GNUNET_TESTBED_Operation *stat_op; + + /** + * Handle to the statistics service + */ + struct GNUNET_STATISTICS_Handle *stats_h; }; @@ -318,7 +333,7 @@ typedef void (*ReplyHandle) (void *cls, /** * Called directly before disconnecting from the service */ -typedef void (*PostTest) (void *cls, struct GNUNET_RPS_Handle *h); +typedef void (*PostTest) (const struct RPSPeer *peer); /** * Function called after disconnect to evaluate test success @@ -354,6 +369,21 @@ enum OPTION_QUICK_QUIT { HAVE_NO_QUICK_QUIT, }; +/** + * @brief Do we collect statistics at the end? + */ +enum OPTION_COLLECT_STATISTICS { + /** + * @brief We collect statistics at the end + */ + COLLECT_STATISTICS, + + /** + * @brief We do not collect statistics at the end + */ + NO_COLLECT_STATISTICS, +}; + /** * Structure to define a single test */ @@ -413,6 +443,11 @@ struct SingleTestRun * Quit test before timeout? */ enum OPTION_QUICK_QUIT have_quick_quit; + + /** + * Collect statistics at the end? + */ + enum OPTION_COLLECT_STATISTICS have_collect_statistics; } cur_test_run; /** @@ -575,9 +610,21 @@ shutdown_op (void *cls) churn_task = NULL; } for (i = 0; i < num_peers; i++) + { if (NULL != rps_peers[i].op) GNUNET_TESTBED_operation_done (rps_peers[i].op); - GNUNET_SCHEDULER_shutdown (); + if (NULL != cur_test_run.post_test) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i); + cur_test_run.post_test (&rps_peers[i]); + } + } + /* If we do not collect statistics, shut down directly */ + if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics || + num_peers <= num_shutdown_ready) + { + GNUNET_SCHEDULER_shutdown (); + } } @@ -753,6 +800,76 @@ rps_connect_adapter (void *cls, return h; } +/** + * Called to open a connection to the peer's statistics + * + * @param cls peer context + * @param cfg configuration of the peer to connect to; will be available until + * GNUNET_TESTBED_operation_done() is called on the operation returned + * from GNUNET_TESTBED_service_connect() + * @return service handle to return in 'op_result', NULL on error + */ +static void * +stat_connect_adapter (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct RPSPeer *peer = cls; + + peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg); + return peer->stats_h; +} + +/** + * Called to disconnect from peer's statistics service + * + * @param cls peer context + * @param op_result service handle returned from the connect adapter + */ +static void +stat_disconnect_adapter (void *cls, void *op_result) +{ + struct RPSPeer *peer = cls; + + //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel + // (peer->stats_h, "core", "# peers connected", + // stat_iterator, peer)); + //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel + // (peer->stats_h, "nse", "# peers connected", + // stat_iterator, peer)); + GNUNET_STATISTICS_destroy (op_result, GNUNET_NO); + peer->stats_h = NULL; +} + +/** + * Called after successfully opening a connection to a peer's statistics + * service; we register statistics monitoring for CORE and NSE here. + * + * @param cls the callback closure from functions generating an operation + * @param op the operation that has been finished + * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter() + * @param emsg error message in case the operation has failed; will be NULL if + * operation has executed successfully. + */ +static void +stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op, + void *ca_result, const char *emsg ) +{ + //struct GNUNET_STATISTICS_Handle *sh = ca_result; + //struct RPSPeer *peer = (struct RPSPeer *) cls; + + if (NULL != emsg) + { + GNUNET_break (0); + return; + } + //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch + // (sh, "core", "# peers connected", + // stat_iterator, peer)); + //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch + // (sh, "nse", "# peers connected", + // stat_iterator, peer)); +} + /** * Adapter function called to destroy connection to @@ -1541,6 +1658,71 @@ profiler_eval (void) return evaluate (); } +/** + * Continuation called by #GNUNET_STATISTICS_get() functions. + * + * Checks whether all peers received their statistics yet. + * Issues the shutdown. + * + * @param cls closure + * @param success #GNUNET_OK if statistics were + * successfully obtained, #GNUNET_SYSERR if not. + */ +void +post_test_shutdown_ready_cb (void *cls, + int success) +{ + const struct RPSPeer *rps_peer = (const struct RPSPeer *) cls; + if (NULL != rps_peer->stat_op) + { + GNUNET_TESTBED_operation_done (rps_peer->stat_op); + } + num_shutdown_ready++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%" PRIu32 " of %" PRIu32 " Peers are ready to shut down\n", + num_shutdown_ready, + num_peers); + if (num_peers <= num_shutdown_ready) + { + GNUNET_SCHEDULER_shutdown (); + } +} + +/** + * Callback function to process statistic values. + * + * @param cls closure + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + */ +int +stat_iterator (void *cls, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + //const struct RPSPeer *rps_peer = (const struct RPSPeer *) cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %" PRIu64 "\n", value); + return GNUNET_OK; +} + +void post_profiler (const struct RPSPeer *rps_peer) +{ + if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics) + { + GNUNET_STATISTICS_get (rps_peer->stats_h, + "rps", + "# rounds", + post_test_shutdown_ready_cb, + stat_iterator, + (struct RPSPeer *) rps_peer); + } +} + /*********************************************************************** * /Definition of tests @@ -1623,6 +1805,19 @@ run (void *cls, &rps_disconnect_adapter, &rps_peers[i]); } + /* Connect all peers to statistics service */ + if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics) + { + rps_peers[i].stat_op = + GNUNET_TESTBED_service_connect (NULL, + peers[i], + "statistics", + stat_complete_cb, + &rps_peers[i], + &stat_connect_adapter, + &stat_disconnect_adapter, + &rps_peers[i]); + } } if (NULL != churn_task) @@ -1644,12 +1839,15 @@ main (int argc, char *argv[]) int ret_value; num_peers = 5; + num_shutdown_ready = 0; cur_test_run.name = "test-rps-default"; cur_test_run.init_peer = default_init_peer; cur_test_run.pre_test = NULL; cur_test_run.reply_handle = default_reply_handle; cur_test_run.eval_cb = default_eval_cb; + cur_test_run.post_test = NULL; cur_test_run.have_churn = HAVE_CHURN; + cur_test_run.have_collect_statistics = NO_COLLECT_STATISTICS; churn_task = NULL; timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); @@ -1767,10 +1965,12 @@ main (int argc, char *argv[]) cur_test_run.main_test = profiler_cb; cur_test_run.reply_handle = profiler_reply_handle; cur_test_run.eval_cb = profiler_eval; + cur_test_run.post_test = post_profiler; cur_test_run.request_interval = 2; cur_test_run.num_requests = 5; cur_test_run.have_churn = HAVE_CHURN; cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT; + cur_test_run.have_collect_statistics = COLLECT_STATISTICS; timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300); /* 'Clean' directory */ -- 2.25.1