From 58b8c9ffcbfb433fb764e6259a73c9bc971f1545 Mon Sep 17 00:00:00 2001 From: "Schanzenbach, Martin" Date: Mon, 23 Dec 2019 15:34:26 +0900 Subject: [PATCH] add avg latency and various bugfixes --- src/transport/test_communicator_basic.c | 123 ++++++++++++++++++------ src/transport/transport-testing2.c | 34 ++++--- 2 files changed, 112 insertions(+), 45 deletions(-) diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index ef18d6a81..176996b9d 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c @@ -80,21 +80,17 @@ static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc; #define SIZE_CHECK 2 -#define MAX_BUF_LEN 10 +#define MAX_BUF_LEN 1 static int buf_len = 0; -static char short_payload[SHORT_MESSAGE_SIZE]; - -static char long_payload[LONG_MESSAGE_SIZE]; - static uint32_t ack = 0; static int phase; -static size_t long_received = 0; +static size_t num_received = 0; -static size_t short_received = 0; +static uint64_t avg_latency = 0; static void communicator_available_cb (void *cls, @@ -163,20 +159,42 @@ queue_create_reply_cb (void *cls, } +static char* +make_payload (size_t payload_size) +{ + char *payload = GNUNET_malloc (payload_size); + struct GNUNET_TIME_Absolute ts; + struct GNUNET_TIME_AbsoluteNBO ts_n; + GNUNET_assert (payload_size >= 8); // So that out timestamp fits + ts = GNUNET_TIME_absolute_get (); + ts_n = GNUNET_TIME_absolute_hton (ts); + memset (payload, 0, payload_size); + memcpy (payload, &ts_n, sizeof (struct GNUNET_TIME_AbsoluteNBO)); + return payload; +} + + static void size_test (void *cls) { - char payload[ack]; + char *payload; phase = SIZE_CHECK; - memset (payload, 0, ack); - if (ack < 64000) //Leave some room for our protocol. + if (ack < 64000) // Leave some room for our protocol. { + payload = make_payload (ack); GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, - &payload, - sizeof(payload)); + payload, + ack); + GNUNET_free (payload); return; } + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Size packet test done.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "#packets: %lu -- latency: %lu\n", + num_received, + avg_latency); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Finished\n"); GNUNET_SCHEDULER_shutdown (); @@ -189,23 +207,34 @@ long_test (void *cls) { struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration ( start_long); + char *payload; if (LONG_BURST_WINDOW.rel_value_us > duration.rel_value_us) { + //FIXME: Not sure how aggressive we should be here, our transport does not + //implement congestion control or flow control... (also for the other three if (buf_len < MAX_BUF_LEN) { + payload = make_payload (LONG_MESSAGE_SIZE); GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, - &long_payload, - sizeof(long_payload)); + payload, + LONG_MESSAGE_SIZE); buf_len++; + GNUNET_free (payload); + GNUNET_SCHEDULER_add_now (&long_test, NULL); } - GNUNET_SCHEDULER_add_now (&long_test, NULL); return; } GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - "LONG Goodput (bytes/s): %lu - received packets: %lu\n", - (LONG_MESSAGE_SIZE * long_received) / LONG_BURST_SECONDS, - long_received); - ack = 5; + "Long size packet test done.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "goodput: %lu b/s -- #packets: %lu -- latency: %lu\n", + (LONG_MESSAGE_SIZE * num_received) / LONG_BURST_SECONDS, + num_received, + avg_latency); + ack = 10; + num_received = 0; + buf_len = 0; + avg_latency = 0; GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &size_test, NULL); } @@ -215,27 +244,33 @@ short_test (void *cls) { struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration ( start_short); + char *payload; if (SHORT_BURST_WINDOW.rel_value_us > duration.rel_value_us) { if (buf_len < MAX_BUF_LEN) { + payload = make_payload (SHORT_MESSAGE_SIZE); GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, - &short_payload, - sizeof(short_payload)); + payload, + SHORT_MESSAGE_SIZE); buf_len++; + GNUNET_free (payload); + GNUNET_SCHEDULER_add_now (&short_test, NULL); } - GNUNET_SCHEDULER_add_now (&short_test, NULL); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Short test done!\n"); GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - "SHORT Goodput (bytes/s): %lu - received packets: %lu\n", - (SHORT_MESSAGE_SIZE * short_received) / SHORT_BURST_SECONDS, - short_received); + "Short size packet test done.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "goodput: %lu b/s -- #packets: %lu -- latency: %lu\n", + (SHORT_MESSAGE_SIZE * num_received) / SHORT_BURST_SECONDS, + num_received, + avg_latency); start_long = GNUNET_TIME_absolute_get (); phase = BURST_LONG; buf_len = 0; + avg_latency = 0; + num_received = 0; GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &long_test, NULL); } @@ -269,6 +304,25 @@ add_queue_cb (void *cls, } +static void +update_avg_latency (const char*payload) +{ + struct GNUNET_TIME_AbsoluteNBO *ts_n; + struct GNUNET_TIME_Absolute ts; + struct GNUNET_TIME_Relative latency; + + ts_n = (struct GNUNET_TIME_AbsoluteNBO *) payload; + ts = GNUNET_TIME_absolute_ntoh (*ts_n); + latency = GNUNET_TIME_absolute_get_duration (ts); + if (1 == num_received) + avg_latency = latency.rel_value_us; + else + avg_latency = ((avg_latency * (num_received - 1)) + latency.rel_value_us) + / num_received; + +} + + /** * @brief Handle an incoming message * @@ -290,14 +344,20 @@ incoming_message_cb (void *cls, if (phase == BURST_SHORT) { GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len); - short_received++; + num_received++; + update_avg_latency (payload); + if (buf_len == MAX_BUF_LEN) + GNUNET_SCHEDULER_add_now (&short_test, NULL); buf_len--; } else if (phase == BURST_LONG) { if (LONG_MESSAGE_SIZE != payload_len) return; // Ignore - long_received++; + num_received++; + update_avg_latency (payload); + if (buf_len == MAX_BUF_LEN) + GNUNET_SCHEDULER_add_now (&long_test, NULL); buf_len--; } else // if (phase == SIZE_CHECK) { @@ -312,6 +372,8 @@ incoming_message_cb (void *cls, GNUNET_SCHEDULER_shutdown (); return; } + num_received++; + update_avg_latency (payload); ack += 5; // Next expected message size GNUNET_SCHEDULER_add_now (&size_test, NULL); } @@ -327,8 +389,7 @@ static void run (void *cls) { ret = 0; - memset (long_payload, 0, LONG_MESSAGE_SIZE); - memset (short_payload, 0, SHORT_MESSAGE_SIZE); + num_received = 0; for (int i = 0; i < NUM_PEERS; i++) { tc_hs[i] = GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index 547f8611b..ba58776fb 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -136,6 +136,11 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle */ GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_msg_cb; + /** + * Our service handle + */ + struct GNUNET_SERVICE_Handle *sh; + /** * @brief Closure to the callback */ @@ -637,21 +642,15 @@ transport_communicator_start ( tc_h), GNUNET_MQ_handler_end () }; - struct GNUNET_SERVICE_Handle *h; - h = GNUNET_SERVICE_start ("transport", + + tc_h->sh = GNUNET_SERVICE_start ("transport", tc_h->cfg, &connect_cb, &disconnect_cb, tc_h, mh); - if (NULL == h) - LOG (GNUNET_ERROR_TYPE_ERROR, "Failed starting service!\n"); - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Started service\n"); - /* TODO */ GNUNET_SCHEDULER_add_shutdown (&shutdown_service, h); - } + GNUNET_assert (NULL != tc_h->sh); } @@ -665,11 +664,11 @@ shutdown_communicator (void *cls) { struct GNUNET_OS_Process *proc = cls; - if (GNUNET_OK != GNUNET_OS_process_kill (proc, SIGTERM)) + if (0 != GNUNET_OS_process_kill (proc, SIGTERM)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Error shutting down communicator with SIGERM, trying SIGKILL\n"); - if (GNUNET_OK != GNUNET_OS_process_kill (proc, SIGKILL)) + if (0 != GNUNET_OS_process_kill (proc, SIGKILL)) { LOG (GNUNET_ERROR_TYPE_ERROR, "Error shutting down communicator with SIGERM and SIGKILL\n"); @@ -710,8 +709,15 @@ communicator_start ( } LOG (GNUNET_ERROR_TYPE_INFO, "started communicator\n"); GNUNET_free (binary); - /* TODO */ GNUNET_SCHEDULER_add_shutdown (&shutdown_communicator, - tc_h->c_proc); +} + + +static void +do_shutdown (void *cls) +{ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + shutdown_communicator(tc_h->c_proc); + shutdown_service(tc_h->sh); } @@ -769,10 +775,10 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( /* Schedule start communicator */ communicator_start (tc_h, binary_name); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, tc_h); return tc_h; } - /** * @brief Instruct communicator to open a queue * -- 2.25.1