- don-t re-scheduler, destroy immediately (comes from scheduler)
[oweals/gnunet.git] / src / mesh / gnunet-mesh-profiler.c
index 14316c5dd33e3359310477c65f50f6edf73ed450..5e2b08b28ca2760a4d352dc7aa675a7025498683 100644 (file)
 #define PING 1
 #define PONG 2
 
-/**
- * How many peers to run
- */
-#define TOTAL_PEERS 20
-
-/**
- * How many peers do pinging
- */
-#define PING_PEERS 3
-
-/**
- * Duration of each round.
- */
-#define ROUND_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
 
 /**
  * Paximum ping period in milliseconds. Real period = rand (0, PING_PERIOD)
 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
 
 /**
- * Ratio of peers active. First round always is 1.0.
+ * Total number of rounds.
  */
-static float rounds[] = {0.8, 0.7, 0.6, 0.5, 0.0};
+#define number_rounds sizeof(rounds)/sizeof(rounds[0])
 
 /**
- * Total number of rounds.
+ * Ratio of peers active. First round always is 1.0.
  */
-static const unsigned int number_rounds = sizeof(rounds)/sizeof(rounds[0]);
+static float rounds[] = {0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.0};
 
 /**
  * Message type for pings.
@@ -128,6 +114,11 @@ struct MeshPeer
    */
   struct GNUNET_MESH_Channel *incoming_ch;
 
+  /**
+   * Channel handle for a warmup channel.
+   */
+  struct GNUNET_MESH_Channel *warmup_ch;
+
   /**
    * Number of payload packes sent
    */
@@ -165,6 +156,11 @@ struct MeshPeer
 
 };
 
+/**
+ * Duration of each round.
+ */
+static struct GNUNET_TIME_Relative round_time;
+
 /**
  * GNUNET_PeerIdentity -> MeshPeer
  */
@@ -183,7 +179,7 @@ static struct GNUNET_TESTBED_Operation *stats_op;
 /**
  * Operation to get peer ids.
  */
-struct MeshPeer peers[TOTAL_PEERS];
+struct MeshPeer *peers;
 
 /**
  * Peer ids counter.
@@ -191,10 +187,20 @@ struct MeshPeer peers[TOTAL_PEERS];
 static unsigned int p_ids;
 
 /**
- * Total number of currently running peers.
+ * Total number of peers.
+ */
+static unsigned long long peers_total;
+
+/**
+ * Number of currently running peers.
  */
 static unsigned long long peers_running;
 
+/**
+ * Number of peers doing pings.
+ */
+static unsigned long long peers_pinging;
+
 /**
  * Test context (to shut down).
  */
@@ -220,11 +226,36 @@ static GNUNET_SCHEDULER_TaskIdentifier test_task;
  */
 static unsigned int current_round;
 
+
+/**
+ * Do preconnect? (Each peer creates a tunnel to one other peer).
+ */
+static int do_warmup;
+
+/**
+ * Warmup progress.
+ */
+static unsigned int peers_warmup;
+
 /**
  * Flag to notify callbacks not to generate any new traffic anymore.
  */
 static int test_finished;
 
+
+/**
+ * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE MESH SERVICES.
+ *
+ * Testcase continues when the root receives confirmation of connected peers,
+ * on callback funtion ch.
+ *
+ * @param cls Closure (unsued).
+ * @param tc Task Context.
+ */
+static void
+start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /**
  * Calculate a random delay.
  *
@@ -268,11 +299,12 @@ show_end_data (void)
 
   for (i = 0; i < number_rounds; i++)
   {
-    for (j = 0; j < PING_PEERS; j++)
+    for (j = 0; j < peers_pinging; j++)
     {
       peer = &peers[j];
-      FPRINTF (stdout, "ROUND %u PEER %u: %f, PINGS: %u, PONGS: %u\n",
-               i, j, ((float)peer->sum_delay[i])/peer->pongs[i],
+      FPRINTF (stdout,
+               "ROUND %3u PEER %3u: %10.2f / %10.2f, PINGS: %3u, PONGS: %3u\n",
+               i, j, peer->mean[i], sqrt (peer->var[i] / (peer->pongs[i] - 1)),
                peer->pings[i], peer->pongs[i]);
     }
   }
@@ -308,7 +340,7 @@ disconnect_mesh_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "disconnecting mesh service, called from line %ld\n", line);
   disconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  for (i = 0; i < TOTAL_PEERS; i++)
+  for (i = 0; i < peers_total; i++)
   {
     if (NULL != peers[i].op)
       GNUNET_TESTBED_operation_done (peers[i].op);
@@ -321,6 +353,12 @@ disconnect_mesh_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: channel %p\n", i, peers[i].ch);
       GNUNET_MESH_channel_destroy (peers[i].ch);
     }
+    if (NULL != peers[i].warmup_ch)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: warmup channel %p\n",
+                  i, peers[i].warmup_ch);
+      GNUNET_MESH_channel_destroy (peers[i].warmup_ch);
+    }
     if (NULL != peers[i].incoming_ch)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: incoming channel %p\n",
@@ -415,7 +453,7 @@ collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     return;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start collecting statistics...\n");
-  stats_op = GNUNET_TESTBED_get_statistics (TOTAL_PEERS, testbed_handles,
+  stats_op = GNUNET_TESTBED_get_statistics (peers_total, testbed_handles,
                                             NULL, NULL,
                                             stats_iterator, stats_cont, NULL);
 }
@@ -452,7 +490,7 @@ adjust_running_peers (unsigned int target)
   unsigned int i;
   unsigned int r;
 
-  GNUNET_assert (target <= TOTAL_PEERS);
+  GNUNET_assert (target <= peers_total);
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adjust peers to %u\n", target);
   if (target > peers_running)
@@ -470,8 +508,8 @@ adjust_running_peers (unsigned int target)
   {
     do {
       r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                    TOTAL_PEERS - PING_PEERS);
-      r += PING_PEERS;
+                                    peers_total - peers_pinging);
+      r += peers_pinging;
     } while (peers[r].up == run || NULL != peers[r].incoming);
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "St%s peer %u: %s\n",
                 run ? "arting" : "opping", r, GNUNET_i2s (&peers[r].id));
@@ -520,10 +558,10 @@ next_rnd (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     GNUNET_SCHEDULER_add_now (&finish_profiler, NULL);
     return;
   }
-  adjust_running_peers (rounds[current_round] * TOTAL_PEERS);
+  adjust_running_peers (rounds[current_round] * peers_total);
   current_round++;
 
-  GNUNET_SCHEDULER_add_delayed (ROUND_TIME, &next_rnd, NULL);
+  GNUNET_SCHEDULER_add_delayed (round_time, &next_rnd, NULL);
 }
 
 
@@ -696,7 +734,8 @@ pong_handler (void *cls, struct GNUNET_MESH_Channel *channel,
   struct MeshPingMessage *msg;
   struct GNUNET_TIME_Absolute send_time;
   struct GNUNET_TIME_Relative latency;
-  unsigned int ping_round;
+  unsigned int r /* Ping round */;
+  float delta;
 
   GNUNET_MESH_receive_done (channel);
   peer = &peers[n];
@@ -705,12 +744,16 @@ pong_handler (void *cls, struct GNUNET_MESH_Channel *channel,
 
   send_time = GNUNET_TIME_absolute_ntoh (msg->timestamp);
   latency = GNUNET_TIME_absolute_get_duration (send_time);
-  ping_round = ntohl (msg->round_number);
+  r = ntohl (msg->round_number);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <- %u (%u) latency: %s\n",
               get_index (peer), get_index (peer->dest), ntohl (msg->counter),
               GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_NO));
-  peer->sum_delay[ping_round] += latency.rel_value_us;
-  peer->pongs[ping_round]++;
+
+  /* Online variance calculation */
+  peer->pongs[r]++;
+  delta = latency.rel_value_us - peer->mean[r];
+  peer->mean[r] = peer->mean[r] + delta/peer->pongs[r];
+  peer->var[r] += delta * (latency.rel_value_us - peer->mean[r]);
 
   return GNUNET_OK;
 }
@@ -748,6 +791,17 @@ incoming_channel (void *cls, struct GNUNET_MESH_Channel *channel,
 
   peer = GNUNET_CONTAINER_multipeermap_get (ids, initiator);
   GNUNET_assert (NULL != peer);
+  if (NULL == peers[n].incoming)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "WARMUP %3u: %u <= %u\n",
+                peers_warmup, n, get_index (peer));
+    peers_warmup++;
+    if (peers_warmup < peers_total)
+      return NULL;
+    test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+                                              &start_test, NULL);
+    return NULL;
+  }
   GNUNET_assert (peer == peers[n].incoming);
   GNUNET_assert (peer->dest == &peers[n]);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <= %u %p\n",
@@ -780,6 +834,13 @@ channel_cleaner (void *cls, const struct GNUNET_MESH_Channel *channel,
 }
 
 
+/**
+ * Select a random peer that has no incoming channel
+ *
+ * @param peer ID of the peer connecting. NULL if irrelevant (warmup).
+ *
+ * @return Random peer not yet connected to.
+ */
 static struct MeshPeer *
 select_random_peer (struct MeshPeer *peer)
 {
@@ -787,7 +848,7 @@ select_random_peer (struct MeshPeer *peer)
 
   do
   {
-    r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, TOTAL_PEERS);
+    r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, peers_total);
   } while (NULL != peers[r].incoming);
   peers[r].incoming = peer;
 
@@ -815,9 +876,8 @@ start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start profiler\n");
 
   flags = GNUNET_MESH_OPTION_DEFAULT;
-  for (i = 0; i < PING_PEERS; i++)
+  for (i = 0; i < peers_pinging; i++)
   {
-
     peers[i].dest = select_random_peer (&peers[i]);
     peers[i].ch = GNUNET_MESH_channel_create (peers[i].mesh, NULL,
                                               &peers[i].dest->id,
@@ -825,7 +885,7 @@ start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     if (NULL == peers[i].ch)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Channel %lu failed\n", i);
-      GNUNET_SCHEDULER_shutdown ();
+      GNUNET_MESH_TEST_cleanup (test_ctx);
       return;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u => %u %p\n",
@@ -833,18 +893,44 @@ start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     peers[i].ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (2000),
                                                        &ping, &peers[i]);
   }
-  peers_running = TOTAL_PEERS;
+  peers_running = peers_total;
   if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
     GNUNET_SCHEDULER_cancel (disconnect_task);
   disconnect_task =
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(ROUND_TIME,
+    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(round_time,
                                                                 number_rounds + 1),
                                   &disconnect_mesh_peers,
                                   (void *) __LINE__);
-  GNUNET_SCHEDULER_add_delayed (ROUND_TIME, &next_rnd, NULL);
+  GNUNET_SCHEDULER_add_delayed (round_time, &next_rnd, NULL);
 }
 
 
+/**
+ * Do warmup: create some channels to spread information about the topology.
+ */
+static void
+warmup (void)
+{
+  struct MeshPeer *peer;
+  unsigned int i;
+
+  for (i = 0; i < peers_total; i++)
+  {
+    peer = select_random_peer (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "WARMUP %u => %u\n",
+                i, get_index (peer));
+    peers[i].warmup_ch =
+      GNUNET_MESH_channel_create (peers[i].mesh, NULL, &peer->id,
+                                  1, GNUNET_MESH_OPTION_DEFAULT);
+    if (NULL == peers[i].warmup_ch)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Warmup %u failed\n", i);
+      GNUNET_MESH_TEST_cleanup (test_ctx);
+      return;
+    }
+  }
+}
+
 /**
  * Callback to be called when the requested peer information is available
  *
@@ -879,9 +965,14 @@ peer_id_cb (void *cls,
   peers[n].op = NULL;
 
   p_ids++;
-  if (p_ids < TOTAL_PEERS)
+  if (p_ids < peers_total)
     return;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got all IDs, starting profiler\n");
+  if (do_warmup)
+  {
+    warmup();
+    return; /* start_test from incoming_channel */
+  }
   test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
                                             &start_test, NULL);
 }
@@ -906,8 +997,7 @@ tmain (void *cls,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n");
   test_ctx = ctx;
-  GNUNET_assert (TOTAL_PEERS > 2 * PING_PEERS);
-  GNUNET_assert (TOTAL_PEERS == num_peers);
+  GNUNET_assert (peers_total == num_peers);
   peers_running = num_peers;
   testbed_handles = testbed_peers;
   disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
@@ -915,7 +1005,7 @@ tmain (void *cls,
                                                   (void *) __LINE__);
   shutdown_handle = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                                   &shutdown_task, NULL);
-  for (i = 0; i < TOTAL_PEERS; i++)
+  for (i = 0; i < peers_total; i++)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requesting id %ld\n", i);
     peers[i].up = GNUNET_YES;
@@ -939,18 +1029,51 @@ main (int argc, char *argv[])
   static uint32_t ports[2];
   const char *config_file;
 
-  config_file = "profiler.conf";
+  config_file = ".profiler.conf";
+
+  if (4 > argc)
+  {
+    fprintf (stderr, "usage: %s ROUND_TIME PEERS PINGS [DO_WARMUP]\n", argv[0]);
+    fprintf (stderr, "example: %s 30s 16 1 Y\n", argv[0]);
+    return 1;
+  }
+
+  if (GNUNET_OK != GNUNET_STRINGS_fancy_time_to_relative (argv[1], &round_time))
+  {
+    fprintf (stderr, "%s is not a valid time\n", argv[1]);
+    return 1;
+  }
+
+  peers_total = atoll (argv[2]);
+  if (2 > peers_total)
+  {
+    fprintf (stderr, "%s peers is not valid (> 2)\n", argv[1]);
+    return 1;
+  }
+  peers = GNUNET_malloc (sizeof (struct MeshPeer) * peers_total);
+
+  peers_pinging = atoll (argv[3]);
+
+  if (peers_total < 2 * peers_pinging)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "not enough peers, total should be > 2 * peers_pinging\n");
+    return 1;
+  }
+
+  do_warmup = (5 > argc || argv[4][0] == 'N');
 
-  ids = GNUNET_CONTAINER_multipeermap_create (2 * TOTAL_PEERS, GNUNET_YES);
+  ids = GNUNET_CONTAINER_multipeermap_create (2 * peers_total, GNUNET_YES);
   GNUNET_assert (NULL != ids);
   p_ids = 0;
   test_finished = GNUNET_NO;
   ports[0] = 1;
   ports[1] = 0;
-  GNUNET_MESH_TEST_run ("mesh-profiler", config_file, TOTAL_PEERS,
+  GNUNET_MESH_TEST_run ("mesh-profiler", config_file, peers_total,
                         &tmain, NULL, /* tmain cls */
                         &incoming_channel, &channel_cleaner,
                         handlers, ports);
+  GNUNET_free (peers);
 
   return 0;
 }