add avg latency and various bugfixes
authorSchanzenbach, Martin <mschanzenbach@posteo.de>
Mon, 23 Dec 2019 06:34:26 +0000 (15:34 +0900)
committerSchanzenbach, Martin <mschanzenbach@posteo.de>
Mon, 23 Dec 2019 06:34:26 +0000 (15:34 +0900)
src/transport/test_communicator_basic.c
src/transport/transport-testing2.c

index ef18d6a81a33e3d07e5f208b2cf0e724be5ca50c..176996b9d6f5ec8fcfee1ca930e332e14190d086 100644 (file)
@@ -80,21 +80,17 @@ static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc;
 
 #define SIZE_CHECK 2
 
-#define MAX_BUF_LEN 10
+#define MAX_BUF_LEN 1
 
 static int buf_len = 0;
 
-static char short_payload[SHORT_MESSAGE_SIZE];
-
-static char long_payload[LONG_MESSAGE_SIZE];
-
 static uint32_t ack = 0;
 
 static int phase;
 
-static size_t long_received = 0;
+static size_t num_received = 0;
 
-static size_t short_received = 0;
+static uint64_t avg_latency = 0;
 
 static void
 communicator_available_cb (void *cls,
@@ -163,20 +159,42 @@ queue_create_reply_cb (void *cls,
 }
 
 
+static char*
+make_payload (size_t payload_size)
+{
+  char *payload = GNUNET_malloc (payload_size);
+  struct GNUNET_TIME_Absolute ts;
+  struct GNUNET_TIME_AbsoluteNBO ts_n;
+  GNUNET_assert (payload_size >= 8); // So that out timestamp fits
+  ts = GNUNET_TIME_absolute_get ();
+  ts_n = GNUNET_TIME_absolute_hton (ts);
+  memset (payload, 0, payload_size);
+  memcpy (payload, &ts_n, sizeof (struct GNUNET_TIME_AbsoluteNBO));
+  return payload;
+}
+
+
 static void
 size_test (void *cls)
 {
-  char payload[ack];
+  char *payload;
   phase = SIZE_CHECK;
 
-  memset (payload, 0, ack);
-  if (ack < 64000) //Leave some room for our protocol.
+  if (ack < 64000) // Leave some room for our protocol.
   {
+    payload = make_payload (ack);
     GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
-                                                          &payload,
-                                                          sizeof(payload));
+                                                          payload,
+                                                          ack);
+    GNUNET_free (payload);
     return;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+              "Size packet test done.\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+              "#packets: %lu -- latency: %lu\n",
+              num_received,
+              avg_latency);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Finished\n");
   GNUNET_SCHEDULER_shutdown ();
@@ -189,23 +207,34 @@ long_test (void *cls)
 {
   struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration (
     start_long);
+  char *payload;
   if (LONG_BURST_WINDOW.rel_value_us > duration.rel_value_us)
   {
+    //FIXME: Not sure how aggressive we should be here, our transport does not
+    //implement congestion control or flow control... (also for the other three
     if (buf_len < MAX_BUF_LEN)
     {
+      payload = make_payload (LONG_MESSAGE_SIZE);
       GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
-                                                            &long_payload,
-                                                            sizeof(long_payload));
+                                                            payload,
+                                                            LONG_MESSAGE_SIZE);
       buf_len++;
+      GNUNET_free (payload);
+      GNUNET_SCHEDULER_add_now (&long_test, NULL);
     }
-    GNUNET_SCHEDULER_add_now (&long_test, NULL);
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
-              "LONG Goodput (bytes/s): %lu - received packets: %lu\n",
-              (LONG_MESSAGE_SIZE * long_received) / LONG_BURST_SECONDS,
-              long_received);
-  ack = 5;
+              "Long size packet test done.\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+              "goodput: %lu b/s -- #packets: %lu -- latency: %lu\n",
+              (LONG_MESSAGE_SIZE * num_received) / LONG_BURST_SECONDS,
+              num_received,
+              avg_latency);
+  ack = 10;
+  num_received = 0;
+  buf_len = 0;
+  avg_latency = 0;
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &size_test, NULL);
 }
 
@@ -215,27 +244,33 @@ short_test (void *cls)
 {
   struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration (
     start_short);
+  char *payload;
   if (SHORT_BURST_WINDOW.rel_value_us > duration.rel_value_us)
   {
     if (buf_len < MAX_BUF_LEN)
     {
+      payload = make_payload (SHORT_MESSAGE_SIZE);
       GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
-                                                            &short_payload,
-                                                            sizeof(short_payload));
+                                                            payload,
+                                                            SHORT_MESSAGE_SIZE);
       buf_len++;
+      GNUNET_free (payload);
+      GNUNET_SCHEDULER_add_now (&short_test, NULL);
     }
-    GNUNET_SCHEDULER_add_now (&short_test, NULL);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Short test done!\n");
   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
-              "SHORT Goodput (bytes/s): %lu - received packets: %lu\n",
-              (SHORT_MESSAGE_SIZE * short_received) / SHORT_BURST_SECONDS,
-              short_received);
+              "Short size packet test done.\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+              "goodput: %lu b/s -- #packets: %lu -- latency: %lu\n",
+              (SHORT_MESSAGE_SIZE * num_received) / SHORT_BURST_SECONDS,
+              num_received,
+              avg_latency);
   start_long = GNUNET_TIME_absolute_get ();
   phase = BURST_LONG;
   buf_len = 0;
+  avg_latency = 0;
+  num_received = 0;
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &long_test, NULL);
 }
 
@@ -269,6 +304,25 @@ add_queue_cb (void *cls,
 }
 
 
+static void
+update_avg_latency (const char*payload)
+{
+  struct GNUNET_TIME_AbsoluteNBO *ts_n;
+  struct GNUNET_TIME_Absolute ts;
+  struct GNUNET_TIME_Relative latency;
+
+  ts_n = (struct GNUNET_TIME_AbsoluteNBO *) payload;
+  ts = GNUNET_TIME_absolute_ntoh (*ts_n);
+  latency = GNUNET_TIME_absolute_get_duration (ts);
+  if (1 == num_received)
+    avg_latency = latency.rel_value_us;
+  else
+    avg_latency = ((avg_latency * (num_received - 1)) + latency.rel_value_us)
+                  / num_received;
+
+}
+
+
 /**
  * @brief Handle an incoming message
  *
@@ -290,14 +344,20 @@ incoming_message_cb (void *cls,
   if (phase == BURST_SHORT)
   {
     GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
-    short_received++;
+    num_received++;
+    update_avg_latency (payload);
+    if (buf_len == MAX_BUF_LEN)
+      GNUNET_SCHEDULER_add_now (&short_test, NULL);
     buf_len--;
   }
   else if (phase == BURST_LONG)
   {
     if (LONG_MESSAGE_SIZE != payload_len)
       return; // Ignore
-    long_received++;
+    num_received++;
+    update_avg_latency (payload);
+    if (buf_len == MAX_BUF_LEN)
+      GNUNET_SCHEDULER_add_now (&long_test, NULL);
     buf_len--;
   }
   else         // if (phase == SIZE_CHECK) {
@@ -312,6 +372,8 @@ incoming_message_cb (void *cls,
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
+    num_received++;
+    update_avg_latency (payload);
     ack += 5; // Next expected message size
     GNUNET_SCHEDULER_add_now (&size_test, NULL);
   }
@@ -327,8 +389,7 @@ static void
 run (void *cls)
 {
   ret = 0;
-  memset (long_payload, 0, LONG_MESSAGE_SIZE);
-  memset (short_payload, 0, SHORT_MESSAGE_SIZE);
+  num_received = 0;
   for (int i = 0; i < NUM_PEERS; i++)
   {
     tc_hs[i] = GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
index 547f8611b229cd6135192dd8e3fa947470fc9bc0..ba58776fbad294f1b79d4916f68076d691fb10f9 100644 (file)
@@ -136,6 +136,11 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
    */
   GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_msg_cb;
 
+  /**
+   * Our service handle
+   */
+  struct GNUNET_SERVICE_Handle *sh;
+
   /**
    * @brief Closure to the callback
    */
@@ -637,21 +642,15 @@ transport_communicator_start (
                              tc_h),
     GNUNET_MQ_handler_end ()
   };
-  struct GNUNET_SERVICE_Handle *h;
 
-  h = GNUNET_SERVICE_start ("transport",
+
+  tc_h->sh = GNUNET_SERVICE_start ("transport",
                             tc_h->cfg,
                             &connect_cb,
                             &disconnect_cb,
                             tc_h,
                             mh);
-  if (NULL == h)
-    LOG (GNUNET_ERROR_TYPE_ERROR, "Failed starting service!\n");
-  else
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Started service\n");
-    /* TODO */ GNUNET_SCHEDULER_add_shutdown (&shutdown_service, h);
-  }
+  GNUNET_assert (NULL != tc_h->sh);
 }
 
 
@@ -665,11 +664,11 @@ shutdown_communicator (void *cls)
 {
   struct GNUNET_OS_Process *proc = cls;
 
-  if (GNUNET_OK != GNUNET_OS_process_kill (proc, SIGTERM))
+  if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Error shutting down communicator with SIGERM, trying SIGKILL\n");
-    if (GNUNET_OK != GNUNET_OS_process_kill (proc, SIGKILL))
+    if (0 != GNUNET_OS_process_kill (proc, SIGKILL))
     {
       LOG (GNUNET_ERROR_TYPE_ERROR,
            "Error shutting down communicator with SIGERM and SIGKILL\n");
@@ -710,8 +709,15 @@ communicator_start (
   }
   LOG (GNUNET_ERROR_TYPE_INFO, "started communicator\n");
   GNUNET_free (binary);
-  /* TODO */ GNUNET_SCHEDULER_add_shutdown (&shutdown_communicator,
-                                            tc_h->c_proc);
+}
+
+
+static void
+do_shutdown (void *cls)
+{
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  shutdown_communicator(tc_h->c_proc);
+  shutdown_service(tc_h->sh);
 }
 
 
@@ -769,10 +775,10 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
   /* Schedule start communicator */
   communicator_start (tc_h,
                       binary_name);
+  GNUNET_SCHEDULER_add_shutdown (&do_shutdown, tc_h);
   return tc_h;
 }
 
-
 /**
  * @brief Instruct communicator to open a queue
  *