- fix disconnect task scheduling
[oweals/gnunet.git] / src / mesh / gnunet-mesh-profiler.c
index b95b6f4abd3633da18aba555021fa1b77bc36912..4ae8848461fc4a6d3d82856cae4e5ab5ffb7b669 100644 (file)
  */
 #define TOTAL_PEERS 10
 
+/**
+ * Duration of each round.
+ */
+#define ROUND_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
+/**
+ * Paximum ping period in milliseconds. Real period = rand (0, PING_PERIOD)
+ */
+#define PING_PERIOD 2000
+
 /**
  * How long until we give up on connecting the peers?
  */
@@ -47,6 +57,7 @@
  */
 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
 
+static float rounds[] = {0.8, 0.7, 0.6, 0.5, 0.0};
 
 struct MeshPeer
 {
@@ -85,6 +96,8 @@ struct MeshPeer
    */
   int data_received;
 
+  int up;
+
   struct MeshPeer *dest;
   struct MeshPeer *incoming;
   GNUNET_SCHEDULER_TaskIdentifier ping_task;
@@ -229,14 +242,27 @@ disconnect_mesh_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   unsigned int i;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "disconnecting mesh service of peers, called from line %ld\n",
-              line);
+              "disconnecting mesh service, called from line %ld\n", line);
   disconnect_task = GNUNET_SCHEDULER_NO_TASK;
   for (i = 0; i < TOTAL_PEERS; i++)
   {
-    GNUNET_TESTBED_operation_done (peers[i].op);
-    GNUNET_MESH_channel_destroy (peers[i].ch);
-    GNUNET_MESH_channel_destroy (peers[i].incoming_ch);
+    if (NULL != peers[i].op)
+      GNUNET_TESTBED_operation_done (peers[i].op);
+
+    if (peers[i].up != GNUNET_YES)
+      continue;
+
+    if (NULL != peers[i].ch)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: channel %p\n", i, peers[i].ch);
+      GNUNET_MESH_channel_destroy (peers[i].ch);
+    }
+    if (NULL != peers[i].incoming_ch)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: incoming channel %p\n",
+                  i, peers[i].incoming_ch);
+      GNUNET_MESH_channel_destroy (peers[i].incoming_ch);
+    }
   }
   GNUNET_MESH_TEST_cleanup (test_ctx);
   if (GNUNET_SCHEDULER_NO_TASK != shutdown_handle)
@@ -324,7 +350,6 @@ collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
     return;
 
-  disconnect_task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start collecting statistics...\n");
   stats_op = GNUNET_TESTBED_get_statistics (TOTAL_PEERS, testbed_handles,
                                             NULL, NULL,
@@ -333,7 +358,7 @@ collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
 
 /**
- * @brief Finish profiler normally.
+ * @brief Finish profiler normally. Signal finish and start collecting stats.
  *
  * @param cls Closure (unused).
  * @param tc Task context.
@@ -349,31 +374,94 @@ finish_profiler (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GNUNET_SCHEDULER_add_now (&collect_stats, NULL);
 }
 
+/**
+ * Set the total number of running peers.
+ *
+ * @param target Desired number of running peers.
+ */
+static void
+adjust_running_peers (unsigned int target)
+{
+  struct GNUNET_TESTBED_Operation *op;
+  unsigned int delta;
+  unsigned int run;
+  unsigned int i;
+  unsigned int r;
+
+  GNUNET_assert (target <= TOTAL_PEERS);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adjust peers to %u\n", target);
+  if (target > peers_running)
+  {
+    delta = target - peers_running;
+    run = GNUNET_YES;
+  }
+  else
+  {
+    delta = peers_running - target;
+    run = GNUNET_NO;
+  }
+
+  for (i = 0; i < delta; i++)
+  {
+    do {
+      r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, TOTAL_PEERS);
+    } while (peers[r].up == run);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "St%s peer %u: %s\n",
+                run ? "arting" : "opping", r, GNUNET_i2s (&peers[r].id));
+
+    if (GNUNET_SCHEDULER_NO_TASK != peers[r].ping_task)
+      GNUNET_SCHEDULER_cancel (peers[r].ping_task);
+    peers[r].ping_task = GNUNET_SCHEDULER_NO_TASK;
+
+    peers[r].up = run;
+    op = GNUNET_TESTBED_peer_manage_service (&peers[r], testbed_handles[r],
+                                             "mesh", NULL, NULL, run);
+    GNUNET_break (NULL != op);
+    peers_running += run ? 1 : -1;
+    GNUNET_assert (peers_running > 0);
+  }
+}
 
 
 /**
- * Transmit ready callback.
+ * @brief Move to next round.
  *
- * @param cls Closure (unused).
- * @param size Size of the tranmist buffer.
- * @param buf Pointer to the beginning of the buffer.
- *
- * @return Number of bytes written to buf.
+ * @param cls Closure (round #).
+ * @param tc Task context.
  */
-static size_t
-tmt_ping_rdy (void *cls, size_t size, void *buf);
+static void
+next_rnd (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  long round = (long) cls;
+
+  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ROUND %ld\n", round);
+  if (0.0 == rounds[round])
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Finishing\n");
+    GNUNET_SCHEDULER_add_now (&finish_profiler, NULL);
+    return;
+  }
+  adjust_running_peers (rounds[round] * TOTAL_PEERS);
+
+  GNUNET_SCHEDULER_add_delayed (ROUND_TIME, &next_rnd, (void *) (round + 1));
+}
+
 
 /**
  * Transmit ready callback.
  *
- * @param cls Closure (unused).
+ * @param cls Closure (peer for PING, NULL for PONG).
  * @param size Size of the tranmist buffer.
  * @param buf Pointer to the beginning of the buffer.
  *
  * @return Number of bytes written to buf.
  */
 static size_t
-tmt_pong_rdy (void *cls, size_t size, void *buf);
+tmt_rdy (void *cls, size_t size, void *buf);
 
 
 /**
@@ -388,7 +476,10 @@ ping (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct MeshPeer *peer = (struct MeshPeer *) cls;
 
   peer->ping_task = GNUNET_SCHEDULER_NO_TASK;
-  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
+
+  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0
+      || GNUNET_YES == test_finished
+      || 0 != peer->timestamp.abs_value_us)
     return;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u -> %u\n",
@@ -396,7 +487,7 @@ ping (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   GNUNET_MESH_notify_transmit_ready (peer->ch, GNUNET_NO,
                                      GNUNET_TIME_UNIT_FOREVER_REL,
-                                     size_payload, &tmt_ping_rdy, peer);
+                                     size_payload, &tmt_rdy, peer);
 }
 
 /**
@@ -410,84 +501,54 @@ pong (struct GNUNET_MESH_Channel *channel)
 {
   GNUNET_MESH_notify_transmit_ready (channel, GNUNET_NO,
                                      GNUNET_TIME_UNIT_FOREVER_REL,
-                                     size_payload, &tmt_pong_rdy, NULL);
+                                     size_payload, &tmt_rdy, NULL);
 }
 
 
 /**
  * Transmit ready callback
  *
- * @param cls Closure (unused).
+ * @param cls Closure (peer for PING, NULL for PONG).
  * @param size Size of the buffer we have.
  * @param buf Buffer to copy data to.
  */
-size_t
-tmt_ping_rdy (void *cls, size_t size, void *buf)
+static size_t
+tmt_rdy (void *cls, size_t size, void *buf)
 {
   struct MeshPeer *peer = (struct MeshPeer *) cls;
   struct GNUNET_MessageHeader *msg = buf;
   uint32_t *data;
-  unsigned int s;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmt_rdy called, filling buffer\n");
   if (size < size_payload || NULL == buf)
   {
-    GNUNET_break (ok >= ok_goal - 2);
+    GNUNET_break (0);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "size %u, buf %p, data_sent %u, data_received %u\n",
                 size, buf, peer->data_sent, peer->data_received);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ok %u, ok goal %u\n", ok, ok_goal);
 
     return 0;
   }
   msg->size = htons (size);
+  if (NULL == peer)
+  {
+    msg->type = htons (PONG);
+    return sizeof (*msg);
+  }
+
   msg->type = htons (PING);
   data = (uint32_t *) &msg[1];
   *data = htonl (peer->data_sent);
-  if (0 == peer->data_sent)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent: initializer\n");
-    s = 5;
-  }
-  else
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent: msg %d\n", peer->data_sent);
-    s = 60;
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent: msg %d\n", peer->data_sent);
   peer->data_sent++;
   peer->timestamp = GNUNET_TIME_absolute_get ();
-  peer->ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (s * 1000),
+  peer->ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (PING_PERIOD),
                                                   &ping, peer);
 
   return size_payload;
 }
 
 
-/**
- * Transmit ready callback
- *
- * @param cls Closure (unused).
- * @param size Size of the buffer we have.
- * @param buf Buffer to copy data to.
- */
-size_t
-tmt_pong_rdy (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_MessageHeader *msg = buf;
-  size_t size_payload = sizeof (struct GNUNET_MessageHeader);
-
-  if (size < size_payload || NULL == buf)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Cannot send PONG\n");
-    return 0;
-  }
-  msg->size = htons (size_payload);
-  msg->type = htons (PONG);
-
-  return size_payload;
-}
-
-
 /**
  * Function is called whenever a PING message is received.
  *
@@ -536,12 +597,22 @@ pong_handler (void *cls, struct GNUNET_MESH_Channel *channel,
   GNUNET_MESH_receive_done (channel);
   peer = &peers[n];
 
-  GNUNET_assert (0 != peer->timestamp.abs_value_us);
-  latency = GNUNET_TIME_absolute_get_duration (peer->incoming->timestamp);
-  FPRINTF (stderr, "%u -> %ld latency: %s\n",
-           get_index (peer->incoming), n,
-           GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_NO));
-  peer->timestamp.abs_value_us = 0;
+  GNUNET_break (0 != peer->timestamp.abs_value_us);
+  latency = GNUNET_TIME_absolute_get_duration (peer->timestamp);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <- %u latency: %s\n",
+              get_index (peer), get_index (peer->dest),
+              GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_NO));
+
+  if (GNUNET_SCHEDULER_NO_TASK == peer->ping_task)
+  {
+    peer->timestamp = GNUNET_TIME_absolute_get ();
+    peer->ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (60 * 1000),
+                                                    &ping, peer);
+  }
+  else
+  {
+    peer->timestamp.abs_value_us = 0;
+  }
 
   return GNUNET_OK;
 }
@@ -579,17 +650,12 @@ incoming_channel (void *cls, struct GNUNET_MESH_Channel *channel,
 
   peer = GNUNET_CONTAINER_multipeermap_get (ids, initiator);
   GNUNET_assert (NULL != peer);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <= %u\n", n, get_index (peer));
+  GNUNET_assert (peer == peers[n].incoming);
+  GNUNET_assert (peer->dest == &peers[n]);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <= %u %p\n",
+              n, get_index (peer), channel);
   peers[n].incoming_ch = channel;
 
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
-  {
-    GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                    &disconnect_mesh_peers,
-                                                    (void *) __LINE__);
-  }
-
   return NULL;
 }
 
@@ -607,17 +673,12 @@ channel_cleaner (void *cls, const struct GNUNET_MESH_Channel *channel,
                  void *channel_ctx)
 {
   long n = (long) cls;
+  struct MeshPeer *peer = &peers[n];
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel disconnected at peer %ld\n", n);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
-
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
-  {
-    GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_mesh_peers,
-                                                (void *) __LINE__);
-  }
+              "Channel %p disconnected at peer %ld\n", channel, n);
+  if (peer->ch == channel)
+    peer->ch = NULL;
 }
 
 
@@ -636,7 +697,7 @@ select_random_peer (struct MeshPeer *peer)
 }
 
 /**
- * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE MESH SERVICES.
+ * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE MESH SERVICES.
  *
  * Testcase continues when the root receives confirmation of connected peers,
  * on callback funtion ch.
@@ -645,7 +706,7 @@ select_random_peer (struct MeshPeer *peer)
  * @param tc Task Context.
  */
 static void
-do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   enum GNUNET_MESH_ChannelOption flags;
   unsigned long i;
@@ -655,12 +716,6 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start profiler\n");
 
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
-    GNUNET_SCHEDULER_cancel (disconnect_task);
-  disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                  &disconnect_mesh_peers,
-                                                  (void *) __LINE__);
-
   flags = GNUNET_MESH_OPTION_DEFAULT;
   for (i = 0; i < TOTAL_PEERS; i++)
   {
@@ -669,11 +724,13 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     peers[i].ch = GNUNET_MESH_channel_create (peers[i].mesh, NULL,
                                               &peers[i].dest->id,
                                               1, flags);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u => %u\n",
-                i, get_index (peers[i].dest));
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u => %u %p\n",
+                i, get_index (peers[i].dest), peers[i].ch);
     peers[i].ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (2000),
                                                        &ping, &peers[i]);
   }
+  peers_running = TOTAL_PEERS;
+  GNUNET_SCHEDULER_add_delayed (ROUND_TIME, &next_rnd, NULL);
 }
 
 
@@ -706,12 +763,16 @@ peer_id_cb (void *cls,
   GNUNET_break (GNUNET_OK ==
                 GNUNET_CONTAINER_multipeermap_put (ids, &peers[n].id, &peers[n],
                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+  GNUNET_TESTBED_operation_done (peers[n].op);
+  peers[n].op = NULL;
+
   p_ids++;
   if (p_ids < TOTAL_PEERS)
     return;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got all IDs, starting profiler\n");
   test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                            &do_test, NULL);
+                                            &start_test, NULL);
 }
 
 /**
@@ -738,8 +799,6 @@ tmain (void *cls,
   GNUNET_assert (TOTAL_PEERS == num_peers);
   peers_running = num_peers;
   testbed_handles = testbed_peers;
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
-                                &finish_profiler, NULL);
   disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
                                                   &disconnect_mesh_peers,
                                                   (void *) __LINE__);
@@ -748,6 +807,7 @@ tmain (void *cls,
   for (i = 0; i < TOTAL_PEERS; i++)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requesting id %ld\n", i);
+    peers[i].up = GNUNET_YES;
     peers[i].mesh = meshes[i];
     peers[i].op =
       GNUNET_TESTBED_peer_get_information (testbed_handles[i],