From 291f4c74df104728a0a0bb105650b2f33f9b3d9d Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Tue, 21 Feb 2017 13:30:07 +0100 Subject: [PATCH] Rewrite cadet tests using MQ API --- src/cadet/.gitignore | 1 + src/cadet/Makefile.am | 28 +- src/cadet/cadet_test_lib_new.c | 362 +++++++++++ src/cadet/cadet_test_lib_new.h | 106 ++++ src/cadet/test_cadet.c | 4 +- src/cadet/test_cadet_new.c | 1049 ++++++++++++++++++++++++++++++++ 6 files changed, 1547 insertions(+), 3 deletions(-) create mode 100644 src/cadet/cadet_test_lib_new.c create mode 100644 src/cadet/cadet_test_lib_new.h create mode 100644 src/cadet/test_cadet_new.c diff --git a/src/cadet/.gitignore b/src/cadet/.gitignore index a38b8f495..a73006dae 100644 --- a/src/cadet/.gitignore +++ b/src/cadet/.gitignore @@ -21,3 +21,4 @@ test_cadet_local test_cadet_single gnunet-service-cadet-new test_cadet_local_mq +test_cadet_2_forward_new \ No newline at end of file diff --git a/src/cadet/Makefile.am b/src/cadet/Makefile.am index 1a51453c9..74791d66e 100644 --- a/src/cadet/Makefile.am +++ b/src/cadet/Makefile.am @@ -110,7 +110,7 @@ endif if HAVE_TESTING - noinst_LIBRARIES = libgnunetcadettest.a $(noinst_LIB_EXP) + noinst_LIBRARIES = libgnunetcadettest.a libgnunetcadettestnew.a $(noinst_LIB_EXP) noinst_PROGRAMS = gnunet-cadet-profiler endif @@ -124,6 +124,7 @@ libgnunetcadettest_a_LIBADD = \ if HAVE_TESTING check_PROGRAMS = \ test_cadet_local_mq \ + test_cadet_2_forward_new \ test_cadet_single \ test_cadet_local \ test_cadet_2_forward \ @@ -245,6 +246,31 @@ test_cadet_5_speed_reliable_backwards_SOURCES = \ test_cadet_5_speed_reliable_backwards_LDADD = $(ld_cadet_test_lib) +# NEW TESTS +libgnunetcadettestnew_a_SOURCES = \ + cadet_test_lib_new.c cadet_test_lib_new.h +libgnunetcadettestnew_a_LIBADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testbed/libgnunettestbed.la \ + libgnunetcadetnew.la + +ld_cadet_test_lib_new = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + libgnunetcadetnew.la \ + libgnunetcadettestnew.a \ + $(top_builddir)/src/testbed/libgnunettestbed.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la +dep_cadet_test_lib_new = \ + libgnunetcadetnew.la \ + libgnunetcadettestnew.a \ + $(top_builddir)/src/statistics/libgnunetstatistics.la + +test_cadet_2_forward_new_SOURCES = \ + test_cadet_new.c +test_cadet_2_forward_new_LDADD = $(ld_cadet_test_lib_new) + + if ENABLE_TEST_RUN AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME; TESTS = \ diff --git a/src/cadet/cadet_test_lib_new.c b/src/cadet/cadet_test_lib_new.c new file mode 100644 index 000000000..c3a1540f4 --- /dev/null +++ b/src/cadet/cadet_test_lib_new.c @@ -0,0 +1,362 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012, 2017 GNUnet e.V. + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ +/** + * @file cadet/cadet_test_lib.c + * @author Bartlomiej Polot + * @brief library for writing CADET tests + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "cadet_test_lib_new.h" +#include "gnunet_cadet_service.h" + + +/** + * Test context for a CADET Test. + */ +struct GNUNET_CADET_TEST_Context +{ + /** + * Array of running peers. + */ + struct GNUNET_TESTBED_Peer **peers; + + /** + * Array of handles to the CADET for each peer. + */ + struct GNUNET_CADET_Handle **cadets; + + /** + * Operation associated with the connection to the CADET. + */ + struct GNUNET_TESTBED_Operation **ops; + + /** + * Number of peers running, size of the arrays above. + */ + unsigned int num_peers; + + /** + * Main function of the test to run once all CADETs are available. + */ + GNUNET_CADET_TEST_AppMain app_main; + + /** + * Closure for 'app_main'. + */ + void *app_main_cls; + + /** + * Handler for incoming tunnels. + */ + GNUNET_CADET_ConnectEventHandler connects; + + /** + * Function called when the transmit window size changes. + */ + GNUNET_CADET_WindowSizeEventHandler window_changes; + + /** + * Cleaner for destroyed incoming tunnels. + */ + GNUNET_CADET_DisconnectEventHandler disconnects; + + /** + * Message handlers. + */ + struct GNUNET_MQ_MessageHandler *handlers; + + /** + * Application ports. + */ + const struct GNUNET_HashCode **ports; + + /** + * Number of ports in #ports. + */ + unsigned int port_count; + +}; + + +/** + * Context for a cadet adapter callback. + */ +struct GNUNET_CADET_TEST_AdapterContext +{ + /** + * Peer number for the particular peer. + */ + unsigned int peer; + + /** + * Port handlers for open ports. + */ + struct GNUNET_CADET_Port **ports; + + /** + * General context. + */ + struct GNUNET_CADET_TEST_Context *ctx; +}; + + +/** + * Adapter function called to establish a connection to + * the CADET service. + * + * @param cls closure + * @param cfg configuration of the peer to connect to; will be available until + * GNUNET_TESTBED_operation_done() is called on the operation returned + * from GNUNET_TESTBED_service_connect() + * @return service handle to return in 'op_result', NULL on error + */ +static void * +cadet_connect_adapter (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_CADET_TEST_AdapterContext *actx = cls; + struct GNUNET_CADET_TEST_Context *ctx = actx->ctx; + struct GNUNET_CADET_Handle *h; + unsigned int i; + + h = GNUNET_CADET_connecT (cfg); + if (NULL == ctx->ports) + return h; + + actx->ports = GNUNET_new_array (ctx->port_count, struct GNUNET_CADET_Port *); + for (i = 0; i < ctx->port_count; i++) + { + actx->ports[i] = GNUNET_CADET_open_porT (h, + ctx->ports[i], + ctx->connects, + (void *) (long) actx->peer, + ctx->window_changes, + ctx->disconnects, + ctx->handlers); + } + return h; +} + + +/** + * Adapter function called to destroy a connection to + * the CADET service. + * + * @param cls closure + * @param op_result service handle returned from the connect adapter + */ +static void +cadet_disconnect_adapter (void *cls, + void *op_result) +{ + struct GNUNET_CADET_Handle *cadet = op_result; + struct GNUNET_CADET_TEST_AdapterContext *actx = cls; + + if (NULL != actx->ports) + { + for (int i = 0; i < actx->ctx->port_count; i++) + { + GNUNET_CADET_close_port (actx->ports[i]); + actx->ports[i] = NULL; + } + GNUNET_free (actx->ports); + } + GNUNET_free (actx); + GNUNET_CADET_disconnect (cadet); +} + + +/** + * Callback to be called when a service connect operation is completed. + * + * @param cls The callback closure from functions generating an operation. + * @param op The operation that has been finished. + * @param ca_result The service handle returned from + * GNUNET_TESTBED_ConnectAdapter() (cadet handle). + * @param emsg Error message in case the operation has failed. + * NULL if operation has executed successfully. + */ +static void +cadet_connect_cb (void *cls, + struct GNUNET_TESTBED_Operation *op, + void *ca_result, + const char *emsg) +{ + struct GNUNET_CADET_TEST_Context *ctx = cls; + unsigned int i; + + if (NULL != emsg) + { + fprintf (stderr, "Failed to connect to CADET service: %s\n", + emsg); + GNUNET_SCHEDULER_shutdown (); + return; + } + for (i = 0; i < ctx->num_peers; i++) + if (op == ctx->ops[i]) + { + ctx->cadets[i] = ca_result; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "...cadet %u connected\n", i); + } + for (i = 0; i < ctx->num_peers; i++) + if (NULL == ctx->cadets[i]) + return; /* still some CADET connections missing */ + /* all CADET connections ready! */ + ctx->app_main (ctx->app_main_cls, + ctx, + ctx->num_peers, + ctx->peers, + ctx->cadets); +} + + +void +GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context *ctx) +{ + unsigned int i; + + for (i = 0; i < ctx->num_peers; i++) + { + GNUNET_assert (NULL != ctx->ops[i]); + GNUNET_TESTBED_operation_done (ctx->ops[i]); + ctx->ops[i] = NULL; + } + GNUNET_free (ctx->ops); + GNUNET_free (ctx->cadets); + GNUNET_free (ctx); + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Callback run when the testbed is ready (peers running and connected to + * each other) + * + * @param cls Closure (context). + * @param h the run handle + * @param num_peers Number of peers that are running. + * @param peers Handles to each one of the @c num_peers peers. + * @param links_succeeded the number of overlay link connection attempts that + * succeeded + * @param links_failed the number of overlay link connection attempts that + * failed + */ +static void +cadet_test_run (void *cls, + struct GNUNET_TESTBED_RunHandle *h, + unsigned int num_peers, + struct GNUNET_TESTBED_Peer **peers, + unsigned int links_succeeded, + unsigned int links_failed) +{ + struct GNUNET_CADET_TEST_Context *ctx = cls; + unsigned int i; + + if (0 != links_failed) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Some links failed (%u), ending\n", + links_failed); + exit (2); + } + + if (num_peers != ctx->num_peers) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peers started %u/%u, ending\n", + num_peers, ctx->num_peers); + exit (1); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testbed up, %u peers and %u links\n", + num_peers, links_succeeded); + ctx->peers = peers; + for (i = 0; i < num_peers; i++) + { + struct GNUNET_CADET_TEST_AdapterContext *newctx; + newctx = GNUNET_new (struct GNUNET_CADET_TEST_AdapterContext); + newctx->peer = i; + newctx->ctx = ctx; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connecting to cadet %u\n", i); + ctx->ops[i] = GNUNET_TESTBED_service_connect (ctx, + peers[i], + "cadet", + &cadet_connect_cb, + ctx, + &cadet_connect_adapter, + &cadet_disconnect_adapter, + newctx); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "op handle %p\n", ctx->ops[i]); + } +} + + +/** + * Run a test using the given name, configuration file and number of peers. + * All cadet callbacks will receive the peer number (long) as the closure. + * + * @param testname Name of the test (for logging). + * @param cfgfile Name of the configuration file. + * @param num_peers Number of peers to start. + * @param tmain Main function to run once the testbed is ready. + * @param tmain_cls Closure for @a tmain. + * @param connects Handler for incoming channels. + * @param window_changes Handler for the window size change notification. + * @param disconnects Cleaner for destroyed incoming channels. + * @param handlers Message handlers. + * @param ports Ports the peers offer, NULL-terminated. + */ +void +GNUNET_CADET_TEST_ruN (const char *testname, + const char *cfgfile, + unsigned int num_peers, + GNUNET_CADET_TEST_AppMain tmain, + void *tmain_cls, + GNUNET_CADET_ConnectEventHandler connects, + GNUNET_CADET_WindowSizeEventHandler window_changes, + GNUNET_CADET_DisconnectEventHandler disconnects, + struct GNUNET_MQ_MessageHandler *handlers, + const struct GNUNET_HashCode **ports) +{ + struct GNUNET_CADET_TEST_Context *ctx; + + ctx = GNUNET_new (struct GNUNET_CADET_TEST_Context); + ctx->num_peers = num_peers; + ctx->ops = GNUNET_new_array (num_peers, struct GNUNET_TESTBED_Operation *); + ctx->cadets = GNUNET_new_array (num_peers, struct GNUNET_CADET_Handle *); + ctx->app_main = tmain; + ctx->app_main_cls = tmain_cls; + ctx->connects = connects; + ctx->window_changes = window_changes; + ctx->disconnects = disconnects; + ctx->handlers = GNUNET_MQ_copy_handlers (handlers); + ctx->ports = ports; + ctx->port_count = 0; + while (NULL != ctx->ports[ctx->port_count]) + ctx->port_count++; + + GNUNET_TESTBED_test_run (testname, + cfgfile, + num_peers, + 0LL, NULL, NULL, + &cadet_test_run, ctx); +} + +/* end of cadet_test_lib.c */ diff --git a/src/cadet/cadet_test_lib_new.h b/src/cadet/cadet_test_lib_new.h new file mode 100644 index 000000000..4b3a6b18d --- /dev/null +++ b/src/cadet/cadet_test_lib_new.h @@ -0,0 +1,106 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012,2017 GNUnet e.V. + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ +/** + * @file cadet/cadet_test_lib.h + * @author Bartlomiej Polot + * @brief library for writing CADET tests + */ +#ifndef CADET_TEST_LIB_H +#define CADET_TEST_LIB_H + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + +#include "gnunet_testbed_service.h" +#include "gnunet_cadet_service.h" + +/** + * Test context for a CADET Test. + */ +struct GNUNET_CADET_TEST_Context; + + +/** + * Main function of a CADET test. + * + * @param cls Closure. + * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end. + * @param num_peers Number of peers that are running. + * @param peers Array of peers. + * @param cadets Handle to each of the CADETs of the peers. + */ +typedef void (*GNUNET_CADET_TEST_AppMain) (void *cls, + struct GNUNET_CADET_TEST_Context *ctx, + unsigned int num_peers, + struct GNUNET_TESTBED_Peer **peers, + struct GNUNET_CADET_Handle **cadets); + + +/** + * Run a test using the given name, configuration file and number of peers. + * All cadet callbacks will receive the peer number (long) as the closure. + * + * @param testname Name of the test (for logging). + * @param cfgfile Name of the configuration file. + * @param num_peers Number of peers to start. + * @param tmain Main function to run once the testbed is ready. + * @param tmain_cls Closure for @a tmain. + * @param connects Handler for incoming channels. + * @param window_changes Handler for the window size change notification. + * @param disconnects Cleaner for destroyed incoming channels. + * @param handlers Message handlers. + * @param ports Ports the peers offer, NULL-terminated. + */ +void +GNUNET_CADET_TEST_ruN (const char *testname, + const char *cfgfile, + unsigned int num_peers, + GNUNET_CADET_TEST_AppMain tmain, + void *tmain_cls, + GNUNET_CADET_ConnectEventHandler connects, + GNUNET_CADET_WindowSizeEventHandler window_changes, + GNUNET_CADET_DisconnectEventHandler disconnects, + struct GNUNET_MQ_MessageHandler *handlers, + const struct GNUNET_HashCode **ports); + +/** + * Clean up the testbed. + * + * @param ctx handle for the testbed + */ +void +GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context *ctx); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + + +/* ifndef CADET_TEST_LIB_H */ +#endif diff --git a/src/cadet/test_cadet.c b/src/cadet/test_cadet.c index f2e639e7a..e57c01be2 100644 --- a/src/cadet/test_cadet.c +++ b/src/cadet/test_cadet.c @@ -593,8 +593,8 @@ tmt_rdy (void *cls, size_t size, void *buf) "sending initializer\n"); msg_size = size_payload + 1000; msg->size = htons (msg_size); - if (SPEED_ACK == test) - data_sent++; + if (SPEED_ACK == test) + data_sent++; } else if ( (SPEED == test) || (SPEED_ACK == test) ) diff --git a/src/cadet/test_cadet_new.c b/src/cadet/test_cadet_new.c new file mode 100644 index 000000000..622e87ea1 --- /dev/null +++ b/src/cadet/test_cadet_new.c @@ -0,0 +1,1049 @@ +/* + This file is part of GNUnet. + Copyright (C) 2011, 2017 GNUnet e.V. + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ +/** + * @file cadet/test_cadet_mq.c + * @author Bart Polot + * @author Christian Grothoff + * @brief Test for the cadet service using mq API. + */ +#include +#include "platform.h" +#include "cadet_test_lib_new.h" +#include "gnunet_cadet_service.h" +#include "gnunet_statistics_service.h" +#include + + +/** + * Ugly workaround to unify data handlers on incoming and outgoing channels. + */ +struct CadetTestChannelWrapper +{ + /** + * Channel pointer. + */ + struct GNUNET_CADET_Channel *ch; +}; + +/** + * How many messages to send + */ +#define TOTAL_PACKETS 500 /* Cannot exceed 64k! */ + +/** + * How long until we give up on connecting the peers? + */ +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120) + +/** + * Time to wait for stuff that should be rather fast + */ +#define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20) + +/** + * DIFFERENT TESTS TO RUN + */ +#define SETUP 0 +#define FORWARD 1 +#define KEEPALIVE 2 +#define SPEED 3 +#define SPEED_ACK 4 +#define SPEED_REL 8 +#define P2P_SIGNAL 10 + +/** + * Which test are we running? + */ +static int test; + +/** + * String with test name + */ +static char *test_name; + +/** + * Flag to send traffic leaf->root in speed tests to test BCK_ACK logic. + */ +static int test_backwards = GNUNET_NO; + +/** + * How many events have happened + */ +static int ok; + +/** + * Number of events expected to conclude the test successfully. + */ +static int ok_goal; + +/** + * Size of each test packet's payload + */ +static size_t size_payload = sizeof (uint32_t); + +/** + * Operation to get peer ids. + */ +static struct GNUNET_TESTBED_Operation *t_op[2]; + +/** + * Peer ids. + */ +static struct GNUNET_PeerIdentity *p_id[2]; + +/** + * Port ID + */ +static struct GNUNET_HashCode port; + +/** + * Peer ids counter. + */ +static unsigned int p_ids; + +/** + * Is the setup initialized? + */ +static int initialized; + +/** + * Number of payload packes sent. + */ +static int data_sent; + +/** + * Number of payload packets received. + */ +static int data_received; + +/** + * Number of payload packed acknowledgements sent. + */ +static int ack_sent; + +/** + * Number of payload packed explicitly (app level) acknowledged. + */ +static int ack_received; + +/** + * Total number of peers asked to run. + */ +static unsigned long long peers_requested; + +/** + * Number of currently running peers (should be same as @c peers_requested). + */ +static unsigned long long peers_running; + +/** + * Test context (to shut down). + */ +struct GNUNET_CADET_TEST_Context *test_ctx; + +/** + * Task called to disconnect peers. + */ +static struct GNUNET_SCHEDULER_Task *disconnect_task; + +/** + * Task To perform tests + */ +static struct GNUNET_SCHEDULER_Task *test_task; + +/** + * Task runnining #send_next_msg(). + */ +static struct GNUNET_SCHEDULER_Task *send_next_msg_task; + +/** + * Cadet handle for the root peer + */ +static struct GNUNET_CADET_Handle *h1; + +/** + * Cadet handle for the first leaf peer + */ +static struct GNUNET_CADET_Handle *h2; + +/** + * Channel handle for the root peer + */ +static struct GNUNET_CADET_Channel *outgoing_ch; + +/** + * Channel handle for the dest peer + */ +static struct GNUNET_CADET_Channel *incoming_ch; + +/** + * Time we started the data transmission (after channel has been established + * and initilized). + */ +static struct GNUNET_TIME_Absolute start_time; + +/** + * Peers handle. + */ +static struct GNUNET_TESTBED_Peer **testbed_peers; + +/** + * Statistics operation handle. + */ +static struct GNUNET_TESTBED_Operation *stats_op; + +/** + * Keepalives sent. + */ +static unsigned int ka_sent; + +/** + * Keepalives received. + */ +static unsigned int ka_received; + +/** + * How many messages were dropped by CADET because of full buffers? + */ +static unsigned int msg_dropped; + + +/******************************************************************************/ + + +/******************************************************************************/ + + +/** + * Get the channel considered as the "target" or "receiver", depending on + * the test type and size. + * + * @return Channel handle of the target client, either 0 (for backward tests) + * or the last peer in the line (for other tests). + */ +static struct GNUNET_CADET_Channel * +get_target_channel () +{ + if (SPEED == test && GNUNET_YES == test_backwards) + return outgoing_ch; + else + return incoming_ch; +} + + +/** + * Show the results of the test (banwidth acheived) and log them to GAUGER + */ +static void +show_end_data (void) +{ + static struct GNUNET_TIME_Absolute end_time; + static struct GNUNET_TIME_Relative total_time; + + end_time = GNUNET_TIME_absolute_get (); + total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time); + FPRINTF (stderr, "\nResults of test \"%s\"\n", test_name); + FPRINTF (stderr, "Test time %s\n", + GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES)); + FPRINTF (stderr, "Test bandwidth: %f kb/s\n", 4 * TOTAL_PACKETS * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms + FPRINTF (stderr, "Test throughput: %f packets/s\n\n", TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms + GAUGER ("CADET", test_name, + TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000), + "packets/s"); +} + + +/** + * Disconnect from cadet services af all peers, call shutdown. + * + * @param cls Closure (line number from which termination was requested). + * @param tc Task Context. + */ +static void +disconnect_cadet_peers (void *cls) +{ + long line = (long) cls; + unsigned int i; + + disconnect_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "disconnecting cadet service of peers, called from line %ld\n", + line); + for (i = 0; i < 2; i++) + { + GNUNET_TESTBED_operation_done (t_op[i]); + } + if (NULL != outgoing_ch) + { + GNUNET_CADET_channel_destroy (outgoing_ch); + outgoing_ch = NULL; + } + if (NULL != incoming_ch) + { + GNUNET_CADET_channel_destroy (incoming_ch); + incoming_ch = NULL; + } + GNUNET_CADET_TEST_cleanup (test_ctx); + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Shut down peergroup, clean up. + * + * @param cls Closure (unused). + * @param tc Task Context. + */ +static void +shutdown_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n"); + if (NULL != send_next_msg_task) + { + GNUNET_SCHEDULER_cancel (send_next_msg_task); + send_next_msg_task = NULL; + } + if (NULL != test_task) + { + GNUNET_SCHEDULER_cancel (test_task); + test_task = NULL; + } + if (NULL != disconnect_task) + { + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = + GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, (void *) __LINE__); + } +} + + +/** + * Stats callback. Finish the stats testbed operation and when all stats have + * been iterated, shutdown the test. + * + * @param cls Closure (line number from which termination was requested). + * @param op the operation that has been finished + * @param emsg error message in case the operation has failed; will be NULL if + * operation has executed successfully. + */ +static void +stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " KA sent: %u, KA received: %u\n", + ka_sent, ka_received); + if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1))) + { + GNUNET_break (0); + ok--; + } + GNUNET_TESTBED_operation_done (stats_op); + + if (NULL != disconnect_task) + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, cls); +} + + +/** + * Process statistic values. + * + * @param cls closure (line number, unused) + * @param peer the peer the statistic belong to + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + */ +static int +stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer, + const char *subsystem, const char *name, uint64_t value, + int is_persistent) +{ + static const char *s_sent = "# keepalives sent"; + static const char *s_recv = "# keepalives received"; + static const char *rdrops = "# messages dropped due to full buffer"; + static const char *cdrops = "# messages dropped due to slow client"; + uint32_t i; + + i = GNUNET_TESTBED_get_index (peer); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i, + subsystem, name, (unsigned long long) value); + if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i) + ka_sent = value; + if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i) + ka_received = value; + if (0 == strncmp (rdrops, name, strlen (rdrops))) + msg_dropped += value; + if (0 == strncmp (cdrops, name, strlen (cdrops))) + msg_dropped += value; + + return GNUNET_OK; +} + + +/** + * Task to gather all statistics. + * + * @param cls Closure (line from which the task was scheduled). + */ +static void +gather_stats_and_exit (void *cls) +{ + long l = (long) cls; + + disconnect_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "gathering statistics from line %ld\n", + l); + if (NULL != outgoing_ch) + { + GNUNET_CADET_channel_destroy (outgoing_ch); + outgoing_ch = NULL; + } + stats_op = GNUNET_TESTBED_get_statistics (peers_running, + testbed_peers, + "cadet", + NULL, + &stats_iterator, + stats_cont, + cls); +} + + + +/** + * Abort test: schedule disconnect and shutdown immediately + * + * @param line Line in the code the abort is requested from (__LINE__). + */ +static void +abort_test (long line) +{ + if (NULL != disconnect_task) + { + GNUNET_SCHEDULER_cancel (disconnect_task); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Aborting test from %ld\n", line); + disconnect_task = + GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, (void *) line); + } +} + + +/** + * Send a message on the channel with the appropriate size and payload. + * + * Update the appropriate *_sent counter. + * + * @param channel Channel to send the message on. + */ +static void +send_test_message (struct GNUNET_CADET_Channel *channel) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *msg; + uint32_t *data; + int *counter; + int size; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending test message on channel %p\n", + channel); + size = size_payload; + if (GNUNET_NO == initialized) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n"); + size += 1000; + counter = &data_sent; + if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer + data_sent++; + } + else if (SPEED == test || SPEED_ACK == test) + { + counter = get_target_channel() == channel ? &ack_sent : &data_sent; + size += *counter; + *counter = *counter + 1; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending message %u\n", *counter); + } + else + { + counter = &ack_sent; + } + env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY); + + data = (uint32_t *) &msg[1]; + *data = htonl (*counter); + GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env); +} + +/** + * Task to request a new data transmission in a SPEED test, without waiting + * for previous messages to be sent/arrrive. + * + * @param cls Closure (unused). + */ +static void +send_next_msg (void *cls) +{ + struct GNUNET_CADET_Channel *channel; + + send_next_msg_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending next message: %d\n", data_sent); + + channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch; + GNUNET_assert (NULL != channel); + GNUNET_assert (SPEED == test); + send_test_message (channel); + if (data_sent < TOTAL_PACKETS) + { + /* SPEED test: Send all messages as soon as possible */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling message %d\n", + data_sent + 1); + send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL); + } +} + + +/** + * Every few messages cancel the timeout task and re-schedule it again, to + * avoid timing out when traffic keeps coming. + * + * @param line Code line number to log if a timeout occurs. + */ +static void +reschedule_timeout_task (long line) +{ + if ((ok % 10) == 0) + { + if (NULL != disconnect_task) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " reschedule timeout every 10 messages\n"); + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, + &gather_stats_and_exit, + (void *) line); + } + } +} + + +/** + * Check if payload is sane (size contains payload). + * + * @param cls should match #ch + * @param message The actual message. + * @return #GNUNET_OK to keep the channel open, + * #GNUNET_SYSERR to close it (signal serious error). + */ +static int +check_data (void *cls, const struct GNUNET_MessageHeader *message) +{ + if (sizeof (struct GNUNET_MessageHeader) >= ntohs (message->size)) + return GNUNET_SYSERR; + return GNUNET_OK; /* all is well-formed */ +} + + +/** + * Function is called whenever a message is received. + * + * @param cls closure (set from GNUNET_CADET_connect(), peer number) + * @param message the actual message + */ +static void +handle_data (void *cls, const struct GNUNET_MessageHeader *message) +{ + struct CadetTestChannelWrapper *ch = cls; + struct GNUNET_CADET_Channel *channel = ch->ch; + uint32_t *data; + uint32_t payload; + int *counter; + + ok++; + counter = get_target_channel () == channel ? &data_received : &ack_received; + + reschedule_timeout_task ((long) __LINE__); + + if (channel == outgoing_ch) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n"); + } + else if (channel == incoming_ch) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client got a message.\n"); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unknown channel %p.\n", channel); + GNUNET_assert (0); + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: (%d/%d)\n", ok, ok_goal); + data = (uint32_t *) &message[1]; + payload = ntohl (*data); + if (payload == *counter) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " payload as expected: %u\n", payload); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " payload %u, expected: %u\n", + payload, *counter); + } + + if (GNUNET_NO == initialized) + { + initialized = GNUNET_YES; + start_time = GNUNET_TIME_absolute_get (); + if (SPEED == test) + { + GNUNET_assert (incoming_ch == channel); + send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL); + return; + } + } + + (*counter)++; + if (get_target_channel () == channel) /* Got "data" */ + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received); + if (SPEED != test || (ok_goal - 2) == ok) + { + /* Send ACK */ + send_test_message (channel); + return; + } + else + { + if (data_received < TOTAL_PACKETS) + return; + } + } + else /* Got "ack" */ + { + if (SPEED_ACK == test || SPEED == test) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received); + /* Send more data */ + send_test_message (channel); + if (ack_received < TOTAL_PACKETS && SPEED != test) + return; + if (ok == 2 && SPEED == test) + return; + show_end_data (); + } + if (test == P2P_SIGNAL) + { + GNUNET_CADET_channel_destroy (incoming_ch); + incoming_ch = NULL; + } + else + { + GNUNET_CADET_channel_destroy (outgoing_ch); + outgoing_ch = NULL; + } + } +} + + +/** + * Method called whenever a peer connects to a port in MQ-based CADET. + * + * @param cls Closure from #GNUNET_CADET_open_porT (peer # as long). + * @param channel New handle to the channel. + * @param source Peer that started this channel. + * @return Closure for the incoming @a channel. It's given to: + * - The #GNUNET_CADET_DisconnectEventHandler (given to + * #GNUNET_CADET_open_porT) when the channel dies. + * - Each the #GNUNET_MQ_MessageCallback handlers for each message + * received on the @a channel. + */ +static void * +connect_handler (void *cls, struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct CadetTestChannelWrapper *ch; + long peer = (long) cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Incoming channel from %s to peer %ld\n", + GNUNET_i2s (source), peer); + ok++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok); + if (peer == peers_requested - 1) + { + if (NULL != incoming_ch) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Duplicate incoming channel for client %lu\n", (long) cls); + GNUNET_assert (0); + } + incoming_ch = channel; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Incoming channel for unexpected peer #%lu\n", (long) cls); + GNUNET_assert (0); + } + if (NULL != disconnect_task) + { + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = + GNUNET_SCHEDULER_add_delayed (SHORT_TIME, &gather_stats_and_exit, + (void *) __LINE__); + } + + /* TODO: cannot return channel as-is, in order to unify the data handlers */ + ch = GNUNET_new (struct CadetTestChannelWrapper); + ch->ch = channel; + + return ch; +} + + +/** + * Function called whenever an MQ-channel is destroyed, even if the destruction + * was requested by #GNUNET_CADET_channel_destroy. + * It must NOT call #GNUNET_CADET_channel_destroy on the channel. + * + * It should clean up any associated state, including cancelling any pending + * transmission on this channel. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + */ +static void +disconnect_handler (void *cls, const struct GNUNET_CADET_Channel *channel) +{ + long i = (long) cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Channel disconnected at %p\n", cls); + if (peers_running - 1 == i) + { + ok++; + GNUNET_break (channel == incoming_ch); + incoming_ch = NULL; + } + else if (0L == i) + { + if (P2P_SIGNAL == test) + { + ok++; + } + GNUNET_break (channel == outgoing_ch); + outgoing_ch = NULL; + } + else + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unknown peer! %d\n", (int) i); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok); + + if (NULL != disconnect_task) + { + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = + GNUNET_SCHEDULER_add_now (&gather_stats_and_exit, (void *) __LINE__); + } +} + + +/** + * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES. + * + * Testcase continues when the root receives confirmation of connected peers, + * on callback function ch. + * + * @param cls Closure (unused). + */ +static void +start_test (void *cls) +{ + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end () + }; + struct CadetTestChannelWrapper *ch; + enum GNUNET_CADET_ChannelOption flags; + + test_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test\n"); + if (NULL != disconnect_task) + { + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = NULL; + } + + flags = GNUNET_CADET_OPTION_DEFAULT; + if (SPEED_REL == test) + { + test = SPEED; + flags |= GNUNET_CADET_OPTION_RELIABLE; + } + + ch = GNUNET_new (struct CadetTestChannelWrapper); + outgoing_ch = GNUNET_CADET_channel_creatE (h1, + ch, + p_id[1], + &port, + flags, + NULL, + &disconnect_handler, + handlers); + ch->ch = outgoing_ch; + + disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, + &gather_stats_and_exit, + (void *) __LINE__); + if (KEEPALIVE == test) + return; /* Don't send any data. */ + + + data_received = 0; + data_sent = 0; + ack_received = 0; + ack_sent = 0; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending data initializer...\n"); + send_test_message (outgoing_ch); +} + + +/** + * Callback to be called when the requested peer information is available + * + * @param cls the closure from GNUNET_TESTBED_peer_get_information() + * @param op the operation this callback corresponds to + * @param pinfo the result; will be NULL if the operation has failed + * @param emsg error message if the operation has failed; + * NULL if the operation is successfull + */ +static void +pi_cb (void *cls, struct GNUNET_TESTBED_Operation *op, + const struct GNUNET_TESTBED_PeerInformation *pinfo, const char *emsg) +{ + long i = (long) cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ID callback for %ld\n", i); + + if ((NULL == pinfo) || (NULL != emsg)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg); + abort_test (__LINE__); + return; + } + p_id[i] = pinfo->result.id; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " id: %s\n", GNUNET_i2s (p_id[i])); + p_ids++; + if (p_ids < 2) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n"); + test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL); +} + + +/** + * test main: start test when all peers are connected + * + * @param cls Closure. + * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end. + * @param num_peers Number of peers that are running. + * @param peers Array of peers. + * @param cadets Handle to each of the CADETs of the peers. + */ +static void +tmain (void *cls, + struct GNUNET_CADET_TEST_Context *ctx, + unsigned int num_peers, + struct GNUNET_TESTBED_Peer **peers, + struct GNUNET_CADET_Handle **cadets) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n"); + ok = 0; + test_ctx = ctx; + peers_running = num_peers; + GNUNET_assert (peers_running == peers_requested); + testbed_peers = peers; + h1 = cadets[0]; + h2 = cadets[num_peers - 1]; + disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, + &disconnect_cadet_peers, + (void *) __LINE__); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0], + GNUNET_TESTBED_PIT_IDENTITY, + &pi_cb, + (void *) 0L); + t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1], + GNUNET_TESTBED_PIT_IDENTITY, + &pi_cb, + (void *) 1L); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n"); +} + + +/** + * Main: start test + */ +int +main (int argc, char *argv[]) +{ + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end () + }; + + initialized = GNUNET_NO; + static const struct GNUNET_HashCode *ports[2]; + const char *config_file; + char port_id[] = "test port"; + + GNUNET_CRYPTO_hash (port_id, sizeof (port_id), &port); + + GNUNET_log_setup ("test", "DEBUG", NULL); + config_file = "test_cadet.conf"; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start\n"); + + /* Find out requested size */ + if (strstr (argv[0], "_2_") != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DIRECT CONNECTIONs\n"); + peers_requested = 2; + } + else if (strstr (argv[0], "_5_") != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "5 PEER LINE\n"); + peers_requested = 5; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "SIZE UNKNOWN, USING 2\n"); + peers_requested = 2; + } + + /* Find out requested test */ + if (strstr (argv[0], "_forward") != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "FORWARD\n"); + test = FORWARD; + test_name = "unicast"; + ok_goal = 4; + } + else if (strstr (argv[0], "_signal") != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SIGNAL\n"); + test = P2P_SIGNAL; + test_name = "signal"; + ok_goal = 4; + } + else if (strstr (argv[0], "_speed_ack") != NULL) + { + /* Test is supposed to generate the following callbacks: + * 1 incoming channel (@dest) + * TOTAL_PACKETS received data packet (@dest) + * TOTAL_PACKETS received data packet (@orig) + * 1 received channel destroy (@dest) + */ + ok_goal = TOTAL_PACKETS * 2 + 2; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED_ACK\n"); + test = SPEED_ACK; + test_name = "speed ack"; + } + else if (strstr (argv[0], "_speed") != NULL) + { + /* Test is supposed to generate the following callbacks: + * 1 incoming channel (@dest) + * 1 initial packet (@dest) + * TOTAL_PACKETS received data packet (@dest) + * 1 received data packet (@orig) + * 1 received channel destroy (@dest) + */ + ok_goal = TOTAL_PACKETS + 4; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED\n"); + if (strstr (argv[0], "_reliable") != NULL) + { + test = SPEED_REL; + test_name = "speed reliable"; + config_file = "test_cadet_drop.conf"; + } + else + { + test = SPEED; + test_name = "speed"; + } + } + else if (strstr (argv[0], "_keepalive") != NULL) + { + test = KEEPALIVE; + /* Test is supposed to generate the following callbacks: + * 1 incoming channel (@dest) + * [wait] + * 1 received channel destroy (@dest) + */ + ok_goal = 2; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "UNKNOWN\n"); + test = SETUP; + ok_goal = 0; + } + + if (strstr (argv[0], "backwards") != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "BACKWARDS (LEAF TO ROOT)\n"); + test_backwards = GNUNET_YES; + GNUNET_asprintf (&test_name, "backwards %s", test_name); + } + + p_ids = 0; + ports[0] = &port; + ports[1] = NULL; + GNUNET_CADET_TEST_ruN ("test_cadet_small", + config_file, + peers_requested, + &tmain, + NULL, /* tmain cls */ + &connect_handler, + NULL, + &disconnect_handler, + handlers, + ports); + if (NULL != strstr (argv[0], "_reliable")) + msg_dropped = 0; /* dropped should be retransmitted */ + + if (ok_goal > ok - msg_dropped) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal); + return 1; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n"); + return 0; +} + +/* end of test_cadet.c */ -- 2.25.1