Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / cadet / test_cadet.c
index 1b5d2dca373afe02539d7cd53f5c1b06a2d31d63..5187bc504243ff0229610ce562eb9c351854a4a4 100644 (file)
@@ -1,26 +1,25 @@
 /*
      This file is part of GNUnet.
-     (C) 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2011, 2017 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 /**
  * @file cadet/test_cadet.c
- *
- * @brief Test for the cadet service: retransmission of traffic.
+ * @author Bart Polot
+ * @author Christian Grothoff
+ * @brief Test for the cadet service using mq API.
  */
 #include <stdio.h>
 #include "platform.h"
 
 
 /**
- * How namy messages to send
+ * Ugly workaround to unify data handlers on incoming and outgoing channels.
+ */
+struct CadetTestChannelWrapper
+{
+  /**
+   * Channel pointer.
+   */
+  struct GNUNET_CADET_Channel *ch;
+};
+
+/**
+ * How many messages to send by default.
  */
-#define TOTAL_PACKETS 2000
+#define TOTAL_PACKETS 500       /* Cannot exceed 64k! */
 
 /**
  * How long until we give up on connecting the peers?
 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
 
 /**
- * Time to wait for stuff that should be rather fast
+ * Time to wait by default  for stuff that should be rather fast.
+ */
+#define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
+
+/**
+ * How fast do we send messages?
  */
-#define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+#define SEND_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 10)
 
 /**
  * DIFFERENT TESTS TO RUN
@@ -64,13 +79,23 @@ static int test;
 /**
  * String with test name
  */
-char *test_name;
+static char *test_name;
 
 /**
  * Flag to send traffic leaf->root in speed tests to test BCK_ACK logic.
  */
 static int test_backwards = GNUNET_NO;
 
+/**
+ * How many packets to send.
+ */
+static unsigned int total_packets;
+
+/**
+ * Time to wait for fast operations.
+ */
+static struct GNUNET_TIME_Relative short_time;
+
 /**
  * How many events have happened
  */
@@ -79,27 +104,32 @@ static int ok;
 /**
  * Number of events expected to conclude the test successfully.
  */
-int ok_goal;
+static int ok_goal;
 
 /**
- * Size of each test packet
+ * Size of each test packet's payload
  */
-size_t size_payload = sizeof (struct GNUNET_MessageHeader) + sizeof (uint32_t);
+static size_t size_payload = sizeof (uint32_t);
 
 /**
  * Operation to get peer ids.
  */
-struct GNUNET_TESTBED_Operation *t_op[2];
+static struct GNUNET_TESTBED_Operation *t_op[2];
 
 /**
  * Peer ids.
  */
-struct GNUNET_PeerIdentity *p_id[2];
+static struct GNUNET_PeerIdentity *p_id[2];
+
+/**
+ * Port ID
+ */
+static struct GNUNET_HashCode port;
 
 /**
  * Peer ids counter.
  */
-unsigned int p_ids;
+static unsigned int p_ids;
 
 /**
  * Is the setup initialized?
@@ -107,19 +137,24 @@ unsigned int p_ids;
 static int initialized;
 
 /**
- * Number of payload packes sent
+ * Number of payload packes sent.
  */
 static int data_sent;
 
 /**
- * Number of payload packets received
+ * Number of payload packets received.
  */
 static int data_received;
 
 /**
- * Number of payload packed explicitly (app level) acknowledged
+ * Number of payload packed acknowledgements sent.
  */
-static int data_ack;
+static int ack_sent;
+
+/**
+ * Number of payload packed explicitly (app level) acknowledged.
+ */
+static int ack_received;
 
 /**
  * Total number of peers asked to run.
@@ -139,17 +174,17 @@ struct GNUNET_CADET_TEST_Context *test_ctx;
 /**
  * Task called to disconnect peers.
  */
-static GNUNET_SCHEDULER_TaskIdentifier disconnect_task;
+static struct GNUNET_SCHEDULER_Task *disconnect_task;
 
 /**
  * Task To perform tests
  */
-static GNUNET_SCHEDULER_TaskIdentifier test_task;
+static struct GNUNET_SCHEDULER_Task *test_task;
 
 /**
- * Task called to shutdown test.
+ * Task runnining #send_next_msg().
  */
-static GNUNET_SCHEDULER_TaskIdentifier shutdown_handle;
+static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
 
 /**
  * Cadet handle for the root peer
@@ -164,7 +199,7 @@ static struct GNUNET_CADET_Handle *h2;
 /**
  * Channel handle for the root peer
  */
-static struct GNUNET_CADET_Channel *ch;
+static struct GNUNET_CADET_Channel *outgoing_ch;
 
 /**
  * Channel handle for the dest peer
@@ -197,75 +232,88 @@ static unsigned int ka_sent;
  */
 static unsigned int ka_received;
 
+/**
+ * How many messages were dropped by CADET because of full buffers?
+ */
+static unsigned int msg_dropped;
+
+
+/******************************************************************************/
+
+
+/******************************************************************************/
+
 
 /**
- * Show the results of the test (banwidth acheived) and log them to GAUGER
+ * Get the channel considered as the "target" or "receiver", depending on
+ * the test type and size.
+ *
+ * @return Channel handle of the target client, either 0 (for backward tests)
+ *         or the last peer in the line (for other tests).
  */
-static void
-show_end_data (void)
+static struct GNUNET_CADET_Channel *
+get_target_channel ()
 {
-  static struct GNUNET_TIME_Absolute end_time;
-  static struct GNUNET_TIME_Relative total_time;
-
-  end_time = GNUNET_TIME_absolute_get();
-  total_time = GNUNET_TIME_absolute_get_difference(start_time, end_time);
-  FPRINTF (stderr, "\nResults of test \"%s\"\n", test_name);
-  FPRINTF (stderr, "Test time %s\n",
-          GNUNET_STRINGS_relative_time_to_string (total_time,
-                                                  GNUNET_YES));
-  FPRINTF (stderr, "Test bandwidth: %f kb/s\n",
-          4 * TOTAL_PACKETS * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms
-  FPRINTF (stderr, "Test throughput: %f packets/s\n\n",
-          TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms
-  GAUGER ("CADET", test_name,
-          TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000),
-          "packets/s");
+  if (SPEED == test && GNUNET_YES == test_backwards)
+    return outgoing_ch;
+  else
+    return incoming_ch;
 }
 
 
 /**
- * Shut down peergroup, clean up.
- *
- * @param cls Closure (unused).
- * @param tc Task Context.
+ * Show the results of the test (banwidth acheived) and log them to GAUGER
  */
 static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+show_end_data (void)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n");
-  shutdown_handle = GNUNET_SCHEDULER_NO_TASK;
+  static struct GNUNET_TIME_Absolute end_time;
+  static struct GNUNET_TIME_Relative total_time;
+
+  end_time = GNUNET_TIME_absolute_get ();
+  total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
+  FPRINTF (stderr,
+          "\nResults of test \"%s\"\n",
+          test_name);
+  FPRINTF (stderr,
+          "Test time %s\n",
+           GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
+  FPRINTF (stderr,
+          "Test bandwidth: %f kb/s\n",
+          4 * total_packets * 1.0 / (total_time.rel_value_us / 1000));    // 4bytes * ms
+  FPRINTF (stderr,
+          "Test throughput: %f packets/s\n\n",
+          total_packets * 1000.0 / (total_time.rel_value_us / 1000));     // packets * ms
+  GAUGER ("CADET",
+         test_name,
+          total_packets * 1000.0 / (total_time.rel_value_us / 1000),
+          "packets/s");
 }
 
 
 /**
  * Disconnect from cadet services af all peers, call shutdown.
  *
- * @param cls Closure (unused).
+ * @param cls Closure (line number from which termination was requested).
  * @param tc Task Context.
  */
 static void
-disconnect_cadet_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+disconnect_cadet_peers (void *cls)
 {
   long line = (long) cls;
-  unsigned int i;
 
-  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "disconnecting cadet peers due to SHUTDOWN! called from %ld\n",
-                line);
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "disconnecting cadet service of peers, called from line %ld\n",
-                line);
-  disconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  for (i = 0; i < 2; i++)
+  disconnect_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "disconnecting cadet service of peers, called from line %ld\n",
+              line);
+  for (unsigned int i = 0; i < 2; i++)
   {
     GNUNET_TESTBED_operation_done (t_op[i]);
   }
-  if (NULL != ch)
+  if (NULL != outgoing_ch)
   {
-    GNUNET_CADET_channel_destroy (ch);
-    ch = NULL;
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
   }
   if (NULL != incoming_ch)
   {
@@ -273,207 +321,364 @@ disconnect_cadet_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
     incoming_ch = NULL;
   }
   GNUNET_CADET_TEST_cleanup (test_ctx);
-  if (GNUNET_SCHEDULER_NO_TASK != shutdown_handle)
-  {
-    GNUNET_SCHEDULER_cancel (shutdown_handle);
-  }
-  shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+  GNUNET_SCHEDULER_shutdown ();
 }
 
 
 /**
- * Abort test: schedule disconnect and shutdown immediately
+ * Shut down peergroup, clean up.
  *
- * @param line Line in the code the abort is requested from (__LINE__).
+ * @param cls Closure (unused).
+ * @param tc Task Context.
  */
 static void
-abort_test (long line)
+shutdown_task (void *cls)
 {
-  if (disconnect_task != GNUNET_SCHEDULER_NO_TASK)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Ending test.\n");
+  if (NULL != send_next_msg_task)
+  {
+    GNUNET_SCHEDULER_cancel (send_next_msg_task);
+    send_next_msg_task = NULL;
+  }
+  if (NULL != test_task)
+  {
+    GNUNET_SCHEDULER_cancel (test_task);
+    test_task = NULL;
+  }
+  if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                                (void *) line);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+                                 (void *) __LINE__);
   }
 }
 
+
 /**
- * Transmit ready callback.
+ * Stats callback. Finish the stats testbed operation and when all stats have
+ * been iterated, shutdown the test.
  *
- * @param cls Closure (message type).
- * @param size Size of the tranmist buffer.
- * @param buf Pointer to the beginning of the buffer.
+ * @param cls Closure (line number from which termination was requested).
+ * @param op the operation that has been finished
+ * @param emsg error message in case the operation has failed; will be NULL if
+ *          operation has executed successfully.
+ */
+static void
+stats_cont (void *cls,
+           struct GNUNET_TESTBED_Operation *op,
+           const char *emsg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             "KA sent: %u, KA received: %u\n",
+              ka_sent,
+             ka_received);
+  if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
+  {
+    GNUNET_break (0);
+    ok--;
+  }
+  GNUNET_TESTBED_operation_done (stats_op);
+
+  if (NULL != disconnect_task)
+    GNUNET_SCHEDULER_cancel (disconnect_task);
+  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+                                             cls);
+}
+
+
+/**
+ * Process statistic values.
  *
- * @return Number of bytes written to buf.
+ * @param cls closure (line number, unused)
+ * @param peer the peer the statistic belong to
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
  */
-static size_t
-tmt_rdy (void *cls, size_t size, void *buf);
+static int
+stats_iterator (void *cls,
+                const struct GNUNET_TESTBED_Peer *peer,
+                const char *subsystem,
+                const char *name,
+                uint64_t value,
+                int is_persistent)
+{
+  static const char *s_sent = "# keepalives sent";
+  static const char *s_recv = "# keepalives received";
+  static const char *rdrops = "# messages dropped due to full buffer";
+  static const char *cdrops = "# messages dropped due to slow client";
+  uint32_t i;
+
+  i = GNUNET_TESTBED_get_index (peer);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
+              subsystem, name, (unsigned long long) value);
+  if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
+    ka_sent = value;
+  if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
+    ka_received = value;
+  if (0 == strncmp (rdrops, name, strlen (rdrops)))
+    msg_dropped += value;
+  if (0 == strncmp (cdrops, name, strlen (cdrops)))
+    msg_dropped += value;
+
+  return GNUNET_OK;
+}
 
 
 /**
- * Task to schedule a new data transmission.
+ * Task to gather all statistics.
  *
- * @param cls Closure (peer #).
- * @param tc Task Context.
+ * @param cls Closure (line from which the task was scheduled).
  */
 static void
-data_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+gather_stats_and_exit (void *cls)
 {
-  struct GNUNET_CADET_TransmitHandle *th;
-  struct GNUNET_CADET_Channel *channel;
+  long l = (long) cls;
 
-  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
-    return;
+  disconnect_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "gathering statistics from line %ld\n",
+              l);
+  if (NULL != outgoing_ch)
+  {
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
+  }
+  stats_op = GNUNET_TESTBED_get_statistics (peers_running,
+                                            testbed_peers,
+                                            "cadet",
+                                            NULL,
+                                            &stats_iterator,
+                                            stats_cont,
+                                            cls);
+}
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Data task\n");
-  if (GNUNET_YES == test_backwards)
+
+/**
+ * Abort test: schedule disconnect and shutdown immediately
+ *
+ * @param line Line in the code the abort is requested from (__LINE__).
+ */
+static void
+abort_test (long line)
+{
+  if (NULL != disconnect_task)
   {
-    channel = incoming_ch;
+    GNUNET_SCHEDULER_cancel (disconnect_task);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               "Aborting test from %ld\n",
+               line);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+                                 (void *) line);
   }
-  else
+}
+
+
+/**
+ * Send a message on the channel with the appropriate size and payload.
+ *
+ * Update the appropriate *_sent counter.
+ *
+ * @param channel Channel to send the message on.
+ */
+static void
+send_test_message (struct GNUNET_CADET_Channel *channel)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_MessageHeader *msg;
+  uint32_t *data;
+  int payload;
+  int size;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending test message on channel %p\n",
+              channel);
+  size = size_payload;
+  if (GNUNET_NO == initialized)
   {
-    channel = ch;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
+    size += 1000;
+    payload = data_sent;
+    if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
+        data_sent++;
   }
-  th = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           size_payload + data_sent,
-                                           &tmt_rdy, (void *) 1L);
-  if (NULL == th)
+  else if (SPEED == test || SPEED_ACK == test)
   {
-    unsigned long i = (unsigned long) cls;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Retransmission\n");
-    if (0 == i)
+    if (get_target_channel() == channel)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "  in 1 ms\n");
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
-                                    &data_task, (void *)1UL);
+      payload = ack_sent;
+      size += ack_sent;
+      ack_sent++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Sending ACK %u [%d bytes]\n",
+                  payload, size);
     }
     else
     {
-      i++;
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "in %u ms\n", i);
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
-                                      GNUNET_TIME_UNIT_MILLISECONDS,
-                                      i),
-                                    &data_task, (void *)i);
+      payload = data_sent;
+      size += data_sent;
+      data_sent++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Sending DATA %u [%d bytes]\n",
+                  data_sent, size);
     }
   }
+  else if (FORWARD == test)
+  {
+    payload = ack_sent;
+  }
+  else if (P2P_SIGNAL == test)
+  {
+    payload = data_sent;
+  }
+  else
+  {
+    GNUNET_assert (0);
+  }
+  env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
+
+  data = (uint32_t *) &msg[1];
+  *data = htonl (payload);
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
 }
 
 
 /**
- * Transmit ready callback
+ * Task to request a new data transmission in a SPEED test, without waiting
+ * for previous messages to be sent/arrrive.
  *
  * @param cls Closure (unused).
- * @param size Size of the buffer we have.
- * @param buf Buffer to copy data to.
  */
-size_t
-tmt_rdy (void *cls, size_t size, void *buf)
+static void
+send_next_msg (void *cls)
 {
-  struct GNUNET_MessageHeader *msg = buf;
-  size_t msg_size;
-  uint32_t *data;
+  struct GNUNET_CADET_Channel *channel;
 
+  send_next_msg_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "tmt_rdy called, filling buffer\n");
-  msg_size = size_payload + data_sent;
-  if (size < msg_size || NULL == buf)
+             "Sending next message: %d\n",
+             data_sent);
+
+  channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
+  GNUNET_assert (NULL != channel);
+  GNUNET_assert (SPEED == test);
+  send_test_message (channel);
+  if (data_sent < total_packets)
   {
+    /* SPEED test: Send all messages as soon as possible */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "size %u, buf %p, data_sent %u, data_received %u\n",
-                size, buf, data_sent, data_received);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ok %u, ok goal %u\n", ok, ok_goal);
-    GNUNET_break (ok >= ok_goal - 2);
-
-    return 0;
-  }
-  msg->size = htons (size);
-  msg->type = htons ((long) cls);
-  data = (uint32_t *) &msg[1];
-  *data = htonl (data_sent);
-  if (GNUNET_NO == initialized)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending initializer\n");
+                "Scheduling message %d\n",
+                data_sent + 1);
+    send_next_msg_task =
+      GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
+                                     &send_next_msg,
+                                     NULL);
   }
-  else if (SPEED == test)
+}
+
+
+/**
+ * Every few messages cancel the timeout task and re-schedule it again, to
+ * avoid timing out when traffic keeps coming.
+ *
+ * @param line Code line number to log if a timeout occurs.
+ */
+static void
+reschedule_timeout_task (long line)
+{
+  if ((ok % 10) == 0)
   {
-    data_sent++;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, " Sent message %d size %u\n",
-                data_sent, msg_size);
-    if (data_sent < TOTAL_PACKETS)
+    if (NULL != disconnect_task)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Scheduling message %d\n",
-                  data_sent + 1);
-      GNUNET_SCHEDULER_add_now (&data_task, NULL);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "reschedule timeout every 10 messages\n");
+      GNUNET_SCHEDULER_cancel (disconnect_task);
+      disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
+                                                      &gather_stats_and_exit,
+                                                      (void *) line);
     }
   }
+}
+
 
-  return msg_size;
+/**
+ * Check if payload is sane (size contains payload).
+ *
+ * @param cls should match #ch
+ * @param message The actual message.
+ * @return #GNUNET_OK to keep the channel open,
+ *         #GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+check_data (void *cls,
+            const struct GNUNET_MessageHeader *message)
+{
+  return GNUNET_OK;             /* all is well-formed */
 }
 
 
 /**
  * Function is called whenever a message is received.
  *
- * @param cls closure (set from GNUNET_CADET_connect)
- * @param channel connection to the other end
- * @param channel_ctx place to store local state associated with the channel
+ * @param cls closure (set from GNUNET_CADET_connect(), peer number)
  * @param message the actual message
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
  */
-int
-data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
-               void **channel_ctx,
-               const struct GNUNET_MessageHeader *message)
+static void
+handle_data (void *cls,
+            const struct GNUNET_MessageHeader *message)
 {
-  long client = (long) cls;
-  long expected_target_client;
+  struct CadetTestChannelWrapper *ch = cls;
+  struct GNUNET_CADET_Channel *channel = ch->ch;
   uint32_t *data;
+  uint32_t payload;
+  int *counter;
 
   ok++;
-
   GNUNET_CADET_receive_done (channel);
+  counter = get_target_channel () == channel ? &data_received : &ack_received;
+
+  reschedule_timeout_task ((long) __LINE__);
 
-  if ((ok % 20) == 0)
+  if (channel == outgoing_ch)
   {
-    if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, " reschedule timeout\n");
-      GNUNET_SCHEDULER_cancel (disconnect_task);
-      disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                      &disconnect_cadet_peers,
-                                                      (void *) __LINE__);
-    }
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Root client got a message.\n");
   }
-
-  switch (client)
+  else if (channel == incoming_ch)
   {
-  case 0L:
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n");
-    break;
-  case 1L:
-  case 4L:
-    GNUNET_assert (client == peers_requested - 1);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client %li got a message.\n",
-                client);
-    break;
-  default:
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Client %li not valid.\n", client);
-    GNUNET_abort ();
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Leaf client got a message.\n");
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Unknown channel %p.\n",
+                channel);
+    GNUNET_assert (0);
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: (%d/%d)\n", ok, ok_goal);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              " ok: (%d/%d)\n",
+              ok,
+              ok_goal);
   data = (uint32_t *) &message[1];
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " payload: (%u)\n", ntohl (*data));
-  if (SPEED == test && GNUNET_YES == test_backwards)
+  payload = ntohl (*data);
+  if (payload == *counter)
   {
-    expected_target_client = 0L;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                " payload as expected: %u\n",
+                payload);
   }
   else
   {
-    expected_target_client = peers_requested - 1;
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                " payload %u, expected: %u\n",
+                payload, *counter);
   }
 
   if (GNUNET_NO == initialized)
@@ -482,45 +687,41 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
     start_time = GNUNET_TIME_absolute_get ();
     if (SPEED == test)
     {
-      GNUNET_assert (peers_requested - 1 == client);
-      GNUNET_SCHEDULER_add_now (&data_task, NULL);
-      return GNUNET_OK;
+      GNUNET_assert (incoming_ch == channel);
+      send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg,
+                                                     NULL);
+      return;
     }
   }
 
-  if (client == expected_target_client) // Normally 4
+  (*counter)++;
+  if (get_target_channel () == channel) /* Got "data" */
   {
-    data_received++;
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received);
     if (SPEED != test || (ok_goal - 2) == ok)
     {
-      GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          size_payload + data_sent,
-                                          &tmt_rdy, (void *) 1L);
-      return GNUNET_OK;
+      /* Send ACK */
+      send_test_message (channel);
+      return;
     }
     else
     {
-      if (data_received < TOTAL_PACKETS)
-        return GNUNET_OK;
+      if (data_received < total_packets)
+        return;
     }
   }
-  else // Normally 0
+  else /* Got "ack" */
   {
-    if (test == SPEED_ACK || test == SPEED)
+    if (SPEED_ACK == test || SPEED == test)
     {
-      data_ack++;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", data_ack);
-      GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          size_payload + data_sent,
-                                          &tmt_rdy, (void *) 1L);
-      if (data_ack < TOTAL_PACKETS && SPEED != test)
-        return GNUNET_OK;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
+      /* Send more data */
+      send_test_message (channel);
+      if (ack_received < total_packets && SPEED != test)
+        return;
       if (ok == 2 && SPEED == test)
-        return GNUNET_OK;
-      show_end_data();
+        return;
+      show_end_data ();
     }
     if (test == P2P_SIGNAL)
     {
@@ -529,207 +730,122 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
     }
     else
     {
-      GNUNET_CADET_channel_destroy (ch);
-      ch = NULL;
+      GNUNET_CADET_channel_destroy (outgoing_ch);
+      outgoing_ch = NULL;
     }
   }
-
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
-  {
-    GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                    &disconnect_cadet_peers,
-                                                    (void *) __LINE__);
-  }
-
-  return GNUNET_OK;
 }
 
 
 /**
- * Stats callback. Finish the stats testbed operation and when all stats have
- * been iterated, shutdown the test.
+ * Method called whenever a peer connects to a port in MQ-based CADET.
  *
- * @param cls closure
- * @param op the operation that has been finished
- * @param emsg error message in case the operation has failed; will be NULL if
- *          operation has executed successfully.
- */
-static void
-stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "stats_cont for peer %u\n", cls);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " KA sent: %u, KA received: %u\n",
-              ka_sent, ka_received);
-  if (ka_sent < 2 || ka_sent > ka_received + 1)
-    ok--;
-  GNUNET_TESTBED_operation_done (stats_op);
-
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
-    GNUNET_SCHEDULER_cancel (disconnect_task);
-  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                              (void *) __LINE__);
-
-}
-
-
-/**
- * Process statistic values.
- *
- * @param cls closure
- * @param peer the peer the statistic belong to
- * @param subsystem name of subsystem that created the statistic
- * @param name the name of the datum
- * @param value the current value
- * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
- * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
- */
-static int
-stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer,
-                const char *subsystem, const char *name,
-                uint64_t value, int is_persistent)
-{
-  static const char *s_sent = "# keepalives sent";
-  static const char *s_recv = "# keepalives received";
-  uint32_t i;
-
-  i = GNUNET_TESTBED_get_index (peer);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  %u - %s [%s]: %llu\n",
-              i, subsystem, name, value);
-  if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
-    ka_sent = value;
-
-  if (0 == strncmp(s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
-    ka_received = value;
-
-  return GNUNET_OK;
-}
-
-
-/**
- * Task check that keepalives were sent and received.
- *
- * @param cls Closure (NULL).
- * @param tc Task Context.
- */
-static void
-check_keepalives (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
-    return;
-
-  disconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "check keepalives\n");
-  GNUNET_CADET_channel_destroy (ch);
-  stats_op = GNUNET_TESTBED_get_statistics (peers_running, testbed_peers,
-                                            "cadet", NULL,
-                                            stats_iterator, stats_cont, NULL);
-}
-
-
-/**
- * Handlers, for diverse services
- */
-static struct GNUNET_CADET_MessageHandler handlers[] = {
-  {&data_callback, 1, sizeof (struct GNUNET_MessageHeader)},
-  {NULL, 0, 0}
-};
-
-
-/**
- * Method called whenever another peer has added us to a channel
- * the other peer initiated.
- *
- * @param cls Closure.
+ * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
  * @param channel New handle to the channel.
- * @param initiator Peer that started the channel.
- * @param port Port this channel is connected to.
- * @param options channel option flags
- * @return Initial channel context for the channel
- *         (can be NULL -- that's not an error).
+ * @param source Peer that started this channel.
+ * @return Closure for the incoming @a channel. It's given to:
+ *         - The #GNUNET_CADET_DisconnectEventHandler (given to
+ *           #GNUNET_CADET_open_port) when the channel dies.
+ *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
+ *           received on the @a channel.
  */
 static void *
-incoming_channel (void *cls, struct GNUNET_CADET_Channel *channel,
-                 const struct GNUNET_PeerIdentity *initiator,
-                 uint32_t port, enum GNUNET_CADET_ChannelOption options)
+connect_handler (void *cls,
+                 struct GNUNET_CADET_Channel *channel,
+                 const struct GNUNET_PeerIdentity *source)
 {
+  struct CadetTestChannelWrapper *ch;
+  long peer = (long) cls;
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel from %s to peer %d\n",
-              GNUNET_i2s (initiator), (long) cls);
+              "Incoming channel from %s to %ld: %p\n",
+              GNUNET_i2s (source),
+              peer,
+              channel);
   ok++;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
-  if ((long) cls == peers_requested - 1)
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              " ok: %d\n",
+              ok);
+  if (peer == peers_requested - 1)
+  {
+    if (NULL != incoming_ch)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Duplicate incoming channel for client %lu\n",
+                  (long) cls);
+      GNUNET_assert (0);
+    }
     incoming_ch = channel;
+  }
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Incoming channel for unknown client %lu\n", (long) cls);
-    GNUNET_break(0);
+                "Incoming channel for unexpected peer #%lu\n",
+                (long) cls);
+    GNUNET_assert (0);
   }
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
+  if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    if (KEEPALIVE == test)
-    {
-      struct GNUNET_TIME_Relative delay;
-      delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS , 5);
-      disconnect_task =
-        GNUNET_SCHEDULER_add_delayed (delay, &check_keepalives, NULL);
-    }
-    else
-      disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                      &disconnect_cadet_peers,
-                                                      (void *) __LINE__);
+    disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
+                                                    &gather_stats_and_exit,
+                                                    (void *) __LINE__);
   }
 
-  return NULL;
+  /* TODO: cannot return channel as-is, in order to unify the data handlers */
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  ch->ch = channel;
+
+  return ch;
 }
 
+
 /**
- * Function called whenever an inbound channel is destroyed.  Should clean up
- * any associated state.
+ * Function called whenever an MQ-channel is destroyed, even if the destruction
+ * was requested by #GNUNET_CADET_channel_destroy.
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
  *
- * @param cls closure (set from GNUNET_CADET_connect)
- * @param channel connection to the other end (henceforth invalid)
- * @param channel_ctx place where local state associated
- *                   with the channel is stored
+ * It should clean up any associated state, including cancelling any pending
+ * transmission on this channel.
+ *
+ * @param cls Channel closure (channel wrapper).
+ * @param channel Connection to the other end (henceforth invalid).
  */
 static void
-channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel,
-                 void *channel_ctx)
+disconnect_handler (void *cls,
+                   const struct GNUNET_CADET_Channel *channel)
 {
-  long i = (long) cls;
+  struct CadetTestChannelWrapper *ch_w = cls;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel disconnected at peer %ld\n", i);
-  if (peers_running - 1 == i)
+             "Channel disconnected at %d\n",
+             ok);
+  GNUNET_assert (ch_w->ch == channel);
+  if (channel == incoming_ch)
   {
     ok++;
-    GNUNET_break (channel == incoming_ch);
     incoming_ch = NULL;
   }
-  else if (0L == i)
+  else if (outgoing_ch == channel)
   {
     if (P2P_SIGNAL == test)
     {
-      ok ++;
+      ok++;
     }
-    GNUNET_break (channel == ch);
-    ch = NULL;
+    outgoing_ch = NULL;
   }
   else
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Unknown peer! %d\n", i);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
-
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
+               "Unknown channel! %p\n",
+               channel);
+  if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                                (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
+                                 (void *) __LINE__);
   }
-
-  return;
+  GNUNET_free (ch_w);
 }
 
 
@@ -737,24 +853,29 @@ channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel,
  * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
  *
  * Testcase continues when the root receives confirmation of connected peers,
- * on callback funtion ch.
+ * on callback function ch.
  *
- * @param cls Closure (unsued).
- * @param tc Task Context.
+ * @param cls Closure (unused).
  */
 static void
-do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+start_test (void *cls)
 {
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+  struct CadetTestChannelWrapper *ch;
   enum GNUNET_CADET_ChannelOption flags;
 
-  if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
-    return;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\n");
-
-  if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
+  test_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test\n");
+  if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
+    disconnect_task = NULL;
   }
 
   flags = GNUNET_CADET_OPTION_DEFAULT;
@@ -763,24 +884,37 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     test = SPEED;
     flags |= GNUNET_CADET_OPTION_RELIABLE;
   }
-  ch = GNUNET_CADET_channel_create (h1, NULL, p_id[1], 1, flags);
 
-  disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                  &disconnect_cadet_peers,
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  outgoing_ch = GNUNET_CADET_channel_create (h1,
+                                             ch,
+                                             p_id[1],
+                                             &port,
+                                             flags,
+                                             NULL,
+                                             &disconnect_handler,
+                                             handlers);
+
+  ch->ch = outgoing_ch;
+
+  disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
+                                                  &gather_stats_and_exit,
                                                   (void *) __LINE__);
   if (KEEPALIVE == test)
-    return; /* Don't send any data. */
+    return;                     /* Don't send any data. */
+
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending data initializer...\n");
-  data_ack = 0;
   data_received = 0;
   data_sent = 0;
-  GNUNET_CADET_notify_transmit_ready (ch, GNUNET_NO,
-                                     GNUNET_TIME_UNIT_FOREVER_REL,
-                                     size_payload, &tmt_rdy, (void *) 1L);
+  ack_received = 0;
+  ack_sent = 0;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending data initializer on channel %p...\n",
+              outgoing_ch);
+  send_test_message (outgoing_ch);
 }
 
+
 /**
  * Callback to be called when the requested peer information is available
  *
@@ -798,24 +932,31 @@ pi_cb (void *cls,
 {
   long i = (long) cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "id callback for %ld\n", i);
-
-  if (NULL == pinfo || NULL != emsg)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "ID callback for %ld\n",
+             i);
+  if ( (NULL == pinfo) ||
+       (NULL != emsg) )
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               "pi_cb: %s\n",
+               emsg);
     abort_test (__LINE__);
     return;
   }
   p_id[i] = pinfo->result.id;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  id: %s\n", GNUNET_i2s (p_id[i]));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "id: %s\n",
+             GNUNET_i2s (p_id[i]));
   p_ids++;
   if (p_ids < 2)
     return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n");
-  test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                            &do_test, NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Got all IDs, starting test\n");
+  test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL);
 }
 
+
 /**
  * test main: start test when all peers are connected
  *
@@ -823,7 +964,7 @@ pi_cb (void *cls,
  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
  * @param num_peers Number of peers that are running.
  * @param peers Array of peers.
- * @param cadetes Handle to each of the CADETs of the peers.
+ * @param cadets Handle to each of the CADETs of the peers.
  */
 static void
 tmain (void *cls,
@@ -840,17 +981,19 @@ tmain (void *cls,
   testbed_peers = peers;
   h1 = cadets[0];
   h2 = cadets[num_peers - 1];
-  disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
+  disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
                                                   &disconnect_cadet_peers,
                                                   (void *) __LINE__);
-  shutdown_handle = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
-                                                  &shutdown_task, NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
   t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
                                                  GNUNET_TESTBED_PIT_IDENTITY,
-                                                 &pi_cb, (void *) 0L);
+                                                 &pi_cb,
+                                                 (void *) 0L);
   t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
                                                  GNUNET_TESTBED_PIT_IDENTITY,
-                                                 &pi_cb, (void *) 1L);
+                                                 &pi_cb,
+                                                 (void *) 1L);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
 }
 
@@ -861,14 +1004,45 @@ tmain (void *cls,
 int
 main (int argc, char *argv[])
 {
-  initialized = GNUNET_NO;
-  static uint32_t ports[2];
+  static const struct GNUNET_HashCode *ports[2];
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
   const char *config_file;
+  char port_id[] = "test port";
+  struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_relative_time ('t',
+                                       "time",
+                                       "short_time",
+                                       gettext_noop ("set short timeout"),
+                                       &short_time),
+    GNUNET_GETOPT_option_uint ('m',
+                              "messages",
+                              "NUM_MESSAGES",
+                              gettext_noop ("set number of messages to send"),
+                              &total_packets),
+
+    GNUNET_GETOPT_OPTION_END
+  };
 
+
+  initialized = GNUNET_NO;
   GNUNET_log_setup ("test", "DEBUG", NULL);
-  config_file = "test_cadet.conf";
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start\n");
+  total_packets = TOTAL_PACKETS;
+  short_time = SHORT_TIME;
+  if (-1 == GNUNET_GETOPT_run (argv[0], options, argc, argv))
+  {
+    FPRINTF (stderr, "test failed: problem with CLI parameters\n");
+    exit (1);
+  }
+
+  config_file = "test_cadet.conf";
+  GNUNET_CRYPTO_hash (port_id, sizeof (port_id), &port);
 
   /* Find out requested size */
   if (strstr (argv[0], "_2_") != NULL)
@@ -906,11 +1080,11 @@ main (int argc, char *argv[])
   {
     /* Test is supposed to generate the following callbacks:
      * 1 incoming channel (@dest)
-     * TOTAL_PACKETS received data packet (@dest)
-     * TOTAL_PACKETS received data packet (@orig)
+     * total_packets received data packet (@dest)
+     * total_packets received data packet (@orig)
      * 1 received channel destroy (@dest)
      */
-    ok_goal = TOTAL_PACKETS * 2 + 2;
+    ok_goal = total_packets * 2 + 2;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED_ACK\n");
     test = SPEED_ACK;
     test_name = "speed ack";
@@ -920,11 +1094,11 @@ main (int argc, char *argv[])
     /* Test is supposed to generate the following callbacks:
      * 1 incoming channel (@dest)
      * 1 initial packet (@dest)
-     * TOTAL_PACKETS received data packet (@dest)
+     * total_packets received data packet (@dest)
      * 1 received data packet (@orig)
      * 1 received channel destroy (@dest)
      */
-    ok_goal = TOTAL_PACKETS + 4;
+    ok_goal = total_packets + 4;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED\n");
     if (strstr (argv[0], "_reliable") != NULL)
     {
@@ -963,22 +1137,24 @@ main (int argc, char *argv[])
   }
 
   p_ids = 0;
-  ports[0] = 1;
-  ports[1] = 0;
-  GNUNET_CADET_TEST_run ("test_cadet_small",
-                        config_file,
-                        peers_requested,
-                        &tmain,
-                        NULL, /* tmain cls */
-                        &incoming_channel,
-                        &channel_cleaner,
-                        handlers,
-                        ports);
-
-  if (ok_goal > ok)
+  ports[0] = &port;
+  ports[1] = NULL;
+  GNUNET_CADET_TEST_ruN ("test_cadet_small",
+                         config_file,
+                         peers_requested,
+                         &tmain,
+                         NULL,        /* tmain cls */
+                         &connect_handler,
+                         NULL,
+                         &disconnect_handler,
+                         handlers,
+                         ports);
+  if (NULL != strstr (argv[0], "_reliable"))
+    msg_dropped = 0;            /* dropped should be retransmitted */
+
+  if (ok_goal > ok - msg_dropped)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "FAILED! (%d/%d)\n", ok, ok_goal);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal);
     return 1;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n");
@@ -986,4 +1162,3 @@ main (int argc, char *argv[])
 }
 
 /* end of test_cadet.c */
-