X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcadet%2Ftest_cadet.c;h=b9f177652c07e3055f3a0869afa17b1cdd47e0b8;hb=a3e88087d09186d847ee8bf042ad665ab6cb0850;hp=9f6bee5387a74d5ee87dc51bdb377ce5583b6318;hpb=5105deda52a78e0aa44b4f86feba98b60a59208b;p=oweals%2Fgnunet.git diff --git a/src/cadet/test_cadet.c b/src/cadet/test_cadet.c index 9f6bee538..b9f177652 100644 --- a/src/cadet/test_cadet.c +++ b/src/cadet/test_cadet.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2011, 2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,13 +14,14 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file cadet/test_cadet.c - * - * @brief Test for the cadet service: retransmission of traffic. + * @author Bart Polot + * @author Christian Grothoff + * @brief Test for the cadet service using mq API. */ #include #include "platform.h" @@ -31,9 +32,20 @@ /** - * How namy messages to send + * Ugly workaround to unify data handlers on incoming and outgoing channels. */ -#define TOTAL_PACKETS 20000 +struct CadetTestChannelWrapper +{ + /** + * Channel pointer. + */ + struct GNUNET_CADET_Channel *ch; +}; + +/** + * How many messages to send by default. + */ +#define TOTAL_PACKETS 500 /* Cannot exceed 64k! */ /** * How long until we give up on connecting the peers? @@ -41,10 +53,15 @@ #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120) /** - * Time to wait for stuff that should be rather fast + * Time to wait by default for stuff that should be rather fast. */ #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20) +/** + * How fast do we send messages? + */ +#define SEND_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 10) + /** * DIFFERENT TESTS TO RUN */ @@ -64,13 +81,23 @@ static int test; /** * String with test name */ -char *test_name; +static char *test_name; /** * Flag to send traffic leaf->root in speed tests to test BCK_ACK logic. */ static int test_backwards = GNUNET_NO; +/** + * How many packets to send. + */ +static unsigned int total_packets; + +/** + * Time to wait for fast operations. + */ +static struct GNUNET_TIME_Relative short_time; + /** * How many events have happened */ @@ -79,27 +106,32 @@ static int ok; /** * Number of events expected to conclude the test successfully. */ -int ok_goal; +static int ok_goal; /** - * Size of each test packet + * Size of each test packet's payload */ -size_t size_payload = sizeof (struct GNUNET_MessageHeader) + sizeof (uint32_t); +static size_t size_payload = sizeof (uint32_t); /** * Operation to get peer ids. */ -struct GNUNET_TESTBED_Operation *t_op[2]; +static struct GNUNET_TESTBED_Operation *t_op[2]; /** * Peer ids. */ -struct GNUNET_PeerIdentity *p_id[2]; +static struct GNUNET_PeerIdentity *p_id[2]; + +/** + * Port ID + */ +static struct GNUNET_HashCode port; /** * Peer ids counter. */ -unsigned int p_ids; +static unsigned int p_ids; /** * Is the setup initialized? @@ -144,17 +176,17 @@ struct GNUNET_CADET_TEST_Context *test_ctx; /** * Task called to disconnect peers. */ -static struct GNUNET_SCHEDULER_Task * disconnect_task; +static struct GNUNET_SCHEDULER_Task *disconnect_task; /** * Task To perform tests */ -static struct GNUNET_SCHEDULER_Task * test_task; +static struct GNUNET_SCHEDULER_Task *test_task; /** - * Task called to shutdown test. + * Task runnining #send_next_msg(). */ -static struct GNUNET_SCHEDULER_Task * shutdown_handle; +static struct GNUNET_SCHEDULER_Task *send_next_msg_task; /** * Cadet handle for the root peer @@ -169,7 +201,7 @@ static struct GNUNET_CADET_Handle *h2; /** * Channel handle for the root peer */ -static struct GNUNET_CADET_Channel *ch; +static struct GNUNET_CADET_Channel *outgoing_ch; /** * Channel handle for the dest peer @@ -202,23 +234,35 @@ static unsigned int ka_sent; */ static unsigned int ka_received; +/** + * How many messages were dropped by CADET because of full buffers? + */ +static unsigned int msg_dropped; + + +/******************************************************************************/ + + +/******************************************************************************/ + /** - * Get the client number considered as the "target" or "receiver", depending on + * Get the channel considered as the "target" or "receiver", depending on * the test type and size. * - * @return Peer # of the target client, either 0 (for backward tests) or - * the last peer in the line (for other tests). + * @return Channel handle of the target client, either 0 (for backward tests) + * or the last peer in the line (for other tests). */ -static unsigned int -get_expected_target () +static struct GNUNET_CADET_Channel * +get_target_channel () { if (SPEED == test && GNUNET_YES == test_backwards) - return 0; + return outgoing_ch; else - return peers_requested - 1; + return incoming_ch; } + /** * Show the results of the test (banwidth acheived) and log them to GAUGER */ @@ -228,36 +272,27 @@ show_end_data (void) static struct GNUNET_TIME_Absolute end_time; static struct GNUNET_TIME_Relative total_time; - end_time = GNUNET_TIME_absolute_get(); - total_time = GNUNET_TIME_absolute_get_difference(start_time, end_time); - FPRINTF (stderr, "\nResults of test \"%s\"\n", test_name); - FPRINTF (stderr, "Test time %s\n", - GNUNET_STRINGS_relative_time_to_string (total_time, - GNUNET_YES)); - FPRINTF (stderr, "Test bandwidth: %f kb/s\n", - 4 * TOTAL_PACKETS * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms - FPRINTF (stderr, "Test throughput: %f packets/s\n\n", - TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms - GAUGER ("CADET", test_name, - TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000), + end_time = GNUNET_TIME_absolute_get (); + total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time); + FPRINTF (stderr, + "\nResults of test \"%s\"\n", + test_name); + FPRINTF (stderr, + "Test time %s\n", + GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES)); + FPRINTF (stderr, + "Test bandwidth: %f kb/s\n", + 4 * total_packets * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms + FPRINTF (stderr, + "Test throughput: %f packets/s\n\n", + total_packets * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms + GAUGER ("CADET", + test_name, + total_packets * 1000.0 / (total_time.rel_value_us / 1000), "packets/s"); } -/** - * Shut down peergroup, clean up. - * - * @param cls Closure (unused). - * @param tc Task Context. - */ -static void -shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n"); - shutdown_handle = NULL; -} - - /** * Disconnect from cadet services af all peers, call shutdown. * @@ -265,29 +300,22 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param tc Task Context. */ static void -disconnect_cadet_peers (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +disconnect_cadet_peers (void *cls) { long line = (long) cls; - unsigned int i; - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "disconnecting cadet peers due to SHUTDOWN! called from %ld\n", - line); - else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "disconnecting cadet service of peers, called from line %ld\n", - line); disconnect_task = NULL; - for (i = 0; i < 2; i++) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "disconnecting cadet service of peers, called from line %ld\n", + line); + for (unsigned int i = 0; i < 2; i++) { GNUNET_TESTBED_operation_done (t_op[i]); } - if (NULL != ch) + if (NULL != outgoing_ch) { - GNUNET_CADET_channel_destroy (ch); - ch = NULL; + GNUNET_CADET_channel_destroy (outgoing_ch); + outgoing_ch = NULL; } if (NULL != incoming_ch) { @@ -295,15 +323,41 @@ disconnect_cadet_peers (void *cls, incoming_ch = NULL; } GNUNET_CADET_TEST_cleanup (test_ctx); - if (NULL != shutdown_handle) + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Shut down peergroup, clean up. + * + * @param cls Closure (unused). + * @param tc Task Context. + */ +static void +shutdown_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ending test.\n"); + if (NULL != send_next_msg_task) { - GNUNET_SCHEDULER_cancel (shutdown_handle); + GNUNET_SCHEDULER_cancel (send_next_msg_task); + send_next_msg_task = NULL; + } + if (NULL != test_task) + { + GNUNET_SCHEDULER_cancel (test_task); + test_task = NULL; + } + if (NULL != disconnect_task) + { + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = + GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, + (void *) __LINE__); } - shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); } - /** * Stats callback. Finish the stats testbed operation and when all stats have * been iterated, shutdown the test. @@ -314,18 +368,25 @@ disconnect_cadet_peers (void *cls, * operation has executed successfully. */ static void -stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg) +stats_cont (void *cls, + struct GNUNET_TESTBED_Operation *op, + const char *emsg) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, " KA sent: %u, KA received: %u\n", - ka_sent, ka_received); - if (KEEPALIVE == test && (ka_sent < 2 || ka_sent > ka_received + 1)) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "KA sent: %u, KA received: %u\n", + ka_sent, + ka_received); + if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1))) + { + GNUNET_break (0); ok--; + } GNUNET_TESTBED_operation_done (stats_op); if (NULL != disconnect_task) GNUNET_SCHEDULER_cancel (disconnect_task); - disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, cls); - + disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, + cls); } @@ -337,26 +398,31 @@ stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg) * @param subsystem name of subsystem that created the statistic * @param name the name of the datum * @param value the current value - * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not - * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration */ static int stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer, - const char *subsystem, const char *name, - uint64_t value, int is_persistent) + const char *subsystem, const char *name, uint64_t value, + int is_persistent) { static const char *s_sent = "# keepalives sent"; static const char *s_recv = "# keepalives received"; + static const char *rdrops = "# messages dropped due to full buffer"; + static const char *cdrops = "# messages dropped due to slow client"; uint32_t i; i = GNUNET_TESTBED_get_index (peer); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", - i, subsystem, name, value); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i, + subsystem, name, (unsigned long long) value); if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i) ka_sent = value; - - if (0 == strncmp(s_recv, name, strlen (s_recv)) && peers_requested - 1 == i) + if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i) ka_received = value; + if (0 == strncmp (rdrops, name, strlen (rdrops))) + msg_dropped += value; + if (0 == strncmp (cdrops, name, strlen (cdrops))) + msg_dropped += value; return GNUNET_OK; } @@ -365,36 +431,32 @@ stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer, /** * Task to gather all statistics. * - * @param cls Closure (NULL). - * @param tc Task Context. + * @param cls Closure (line from which the task was scheduled). */ static void -gather_stats_and_exit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +gather_stats_and_exit (void *cls) { - disconnect_task = NULL; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "gathering statistics\n"); - - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - { - disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, - (void *) __LINE__); - return; - } - + long l = (long) cls; - if (NULL != ch) + disconnect_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "gathering statistics from line %ld\n", + l); + if (NULL != outgoing_ch) { - GNUNET_CADET_channel_destroy (ch); - ch = NULL; + GNUNET_CADET_channel_destroy (outgoing_ch); + outgoing_ch = NULL; } - stats_op = GNUNET_TESTBED_get_statistics (peers_running, testbed_peers, - "cadet", NULL, - stats_iterator, stats_cont, cls); + stats_op = GNUNET_TESTBED_get_statistics (peers_running, + testbed_peers, + "cadet", + NULL, + &stats_iterator, + stats_cont, + cls); } - /** * Abort test: schedule disconnect and shutdown immediately * @@ -403,217 +465,212 @@ gather_stats_and_exit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void abort_test (long line) { - if (disconnect_task != NULL) + if (NULL != disconnect_task) { GNUNET_SCHEDULER_cancel (disconnect_task); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Aborting test from %ld\n", line); - disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, - (void *) line); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Aborting test from %ld\n", + line); + disconnect_task = + GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, + (void *) line); } } -/** - * Transmit ready callback. - * - * @param cls Closure (message type). - * @param size Size of the tranmist buffer. - * @param buf Pointer to the beginning of the buffer. - * - * @return Number of bytes written to buf. - */ -static size_t -tmt_rdy (void *cls, size_t size, void *buf); - /** - * Task to schedule a new data transmission. + * Send a message on the channel with the appropriate size and payload. * - * @param cls Closure (peer #). - * @param tc Task Context. + * Update the appropriate *_sent counter. + * + * @param channel Channel to send the message on. */ static void -data_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +send_test_message (struct GNUNET_CADET_Channel *channel) { - struct GNUNET_CADET_TransmitHandle *th; - struct GNUNET_CADET_Channel *channel; - long src; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Data task\n"); - - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - return; - - if (GNUNET_YES == test_backwards) - { - channel = incoming_ch; - src = peers_requested - 1; - } - else - { - channel = ch; - src = 0; - } + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *msg; + uint32_t *data; + int payload; + int size; - if (NULL == channel) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending test message on channel %p\n", + channel); + size = size_payload; + if (GNUNET_NO == initialized) { - GNUNET_break (0); - return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n"); + size += 1000; + payload = data_sent; + if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer + data_sent++; } - th = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - size_payload + data_sent, - &tmt_rdy, (void *) src); - if (NULL == th) + else if (SPEED == test || SPEED_ACK == test) { - unsigned long i = (unsigned long) cls; - - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Retransmission\n"); - if (0 == i) + if (get_target_channel() == channel) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " in 1 ms\n"); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS, - &data_task, (void *) 1L); + payload = ack_sent; + size += ack_sent; + ack_sent++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending ACK %u [%d bytes]\n", + payload, size); } else { - i++; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "in %u ms\n", i); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply( - GNUNET_TIME_UNIT_MILLISECONDS, - i), - &data_task, (void *) i); + payload = data_sent; + size += data_sent; + data_sent++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending DATA %u [%d bytes]\n", + data_sent, size); } } + else if (FORWARD == test) + { + payload = ack_sent; + } + else if (P2P_SIGNAL == test) + { + payload = data_sent; + } + else + { + GNUNET_assert (0); + } + env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY); + + data = (uint32_t *) &msg[1]; + *data = htonl (payload); + GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env); } /** - * Transmit ready callback + * Task to request a new data transmission in a SPEED test, without waiting + * for previous messages to be sent/arrrive. * - * @param cls Closure (peer # which is sending the data). - * @param size Size of the buffer we have. - * @param buf Buffer to copy data to. + * @param cls Closure (unused). */ -size_t -tmt_rdy (void *cls, size_t size, void *buf) +static void +send_next_msg (void *cls) { - struct GNUNET_MessageHeader *msg = buf; - size_t msg_size; - uint32_t *data; - long id = (long) cls; - unsigned int counter; + struct GNUNET_CADET_Channel *channel; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmt_rdy on %ld, filling buffer\n", id); - counter = get_expected_target () == id ? ack_sent : data_sent; - msg_size = size_payload + counter; - if (size < msg_size || NULL == buf) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "size %u, buf %p, data_sent %u, ack_received %u\n", - size, buf, data_sent, ack_received); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ok %u, ok goal %u\n", ok, ok_goal); - GNUNET_break (ok >= ok_goal - 2); + send_next_msg_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending next message: %d\n", + data_sent); - return 0; - } - msg->size = htons (msg_size); - msg->type = htons (1); - data = (uint32_t *) &msg[1]; - *data = htonl (counter); - if (GNUNET_NO == initialized) + channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch; + GNUNET_assert (NULL != channel); + GNUNET_assert (SPEED == test); + send_test_message (channel); + if (data_sent < total_packets) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending initializer\n"); - msg_size = size_payload + 1000; - if (SPEED_ACK == test) - data_sent++; + /* SPEED test: Send all messages as soon as possible */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling message %d\n", + data_sent + 1); + send_next_msg_task = + GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL, + &send_next_msg, + NULL); } - else if (SPEED == test || SPEED_ACK == test) +} + + +/** + * Every few messages cancel the timeout task and re-schedule it again, to + * avoid timing out when traffic keeps coming. + * + * @param line Code line number to log if a timeout occurs. + */ +static void +reschedule_timeout_task (long line) +{ + if ((ok % 10) == 0) { - if (get_expected_target() == id) - ack_sent++; - else - data_sent++; - counter++; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, " Sent message %d size %u\n", - counter, msg_size); - if (data_sent < TOTAL_PACKETS && SPEED == test) + if (NULL != disconnect_task) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Scheduling message %d\n", - counter + 1); - GNUNET_SCHEDULER_add_now (&data_task, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "reschedule timeout every 10 messages\n"); + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time, + &gather_stats_and_exit, + (void *) line); } } +} + - return msg_size; +/** + * Check if payload is sane (size contains payload). + * + * @param cls should match #ch + * @param message The actual message. + * @return #GNUNET_OK to keep the channel open, + * #GNUNET_SYSERR to close it (signal serious error). + */ +static int +check_data (void *cls, const struct GNUNET_MessageHeader *message) +{ + if (sizeof (struct GNUNET_MessageHeader) >= ntohs (message->size)) + return GNUNET_SYSERR; + return GNUNET_OK; /* all is well-formed */ } /** * Function is called whenever a message is received. * - * @param cls closure (set from GNUNET_CADET_connect) - * @param channel connection to the other end - * @param channel_ctx place to store local state associated with the channel + * @param cls closure (set from GNUNET_CADET_connect(), peer number) * @param message the actual message - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) */ -int -data_callback (void *cls, struct GNUNET_CADET_Channel *channel, - void **channel_ctx, - const struct GNUNET_MessageHeader *message) +static void +handle_data (void *cls, + const struct GNUNET_MessageHeader *message) { - long client = (long) cls; - long expected_target_client; + struct CadetTestChannelWrapper *ch = cls; + struct GNUNET_CADET_Channel *channel = ch->ch; uint32_t *data; uint32_t payload; - unsigned int counter; + int *counter; ok++; - counter = get_expected_target () == client ? data_received : ack_received; - GNUNET_CADET_receive_done (channel); + counter = get_target_channel () == channel ? &data_received : &ack_received; - if ((ok % 10) == 0) + reschedule_timeout_task ((long) __LINE__); + + if (channel == outgoing_ch) { - if (NULL != disconnect_task) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, " reschedule timeout\n"); - GNUNET_SCHEDULER_cancel (disconnect_task); - disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, - &gather_stats_and_exit, - (void *) __LINE__); - } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message.\n"); } - - switch (client) + else if (channel == incoming_ch) { - case 0L: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n"); - break; - case 1L: - case 4L: - GNUNET_assert (client == peers_requested - 1); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client %li got a message.\n", - client); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Client %li not valid.\n", client); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client got a message.\n"); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unknown channel %p.\n", channel); GNUNET_assert (0); } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: (%d/%d)\n", ok, ok_goal); data = (uint32_t *) &message[1]; payload = ntohl (*data); - if (payload == counter) + if (payload == *counter) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, " payload as expected: %u\n", payload); } else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " payload %u, expected: %u\n", - payload, counter); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " payload %u, expected: %u\n", + payload, *counter); } - expected_target_client = get_expected_target (); if (GNUNET_NO == initialized) { @@ -621,47 +678,40 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel, start_time = GNUNET_TIME_absolute_get (); if (SPEED == test) { - GNUNET_assert (peers_requested - 1 == client); - GNUNET_SCHEDULER_add_now (&data_task, NULL); - return GNUNET_OK; + GNUNET_assert (incoming_ch == channel); + send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL); + return; } } - counter++; - if (client == expected_target_client) /* Normally 4 */ + (*counter)++; + if (get_target_channel () == channel) /* Got "data" */ { - data_received++; GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received); if (SPEED != test || (ok_goal - 2) == ok) { /* Send ACK */ - GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - size_payload + ack_sent, &tmt_rdy, - (void *) client); - return GNUNET_OK; + send_test_message (channel); + return; } else { - if (data_received < TOTAL_PACKETS) - return GNUNET_OK; + if (data_received < total_packets) + return; } } - else /* Normally 0 */ + else /* Got "ack" */ { if (SPEED_ACK == test || SPEED == test) { - ack_received++; GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received); - GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - size_payload + data_sent, &tmt_rdy, - (void *) client); - if (ack_received < TOTAL_PACKETS && SPEED != test) - return GNUNET_OK; + /* Send more data */ + send_test_message (channel); + if (ack_received < total_packets && SPEED != test) + return; if (ok == 2 && SPEED == test) - return GNUNET_OK; - show_end_data(); + return; + show_end_data (); } if (test == P2P_SIGNAL) { @@ -670,110 +720,115 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel, } else { - GNUNET_CADET_channel_destroy (ch); - ch = NULL; + GNUNET_CADET_channel_destroy (outgoing_ch); + outgoing_ch = NULL; } } - - return GNUNET_OK; } /** - * Handlers, for diverse services - */ -static struct GNUNET_CADET_MessageHandler handlers[] = { - {&data_callback, 1, sizeof (struct GNUNET_MessageHeader)}, - {NULL, 0, 0} -}; - - -/** - * Method called whenever another peer has added us to a channel - * the other peer initiated. + * Method called whenever a peer connects to a port in MQ-based CADET. * - * @param cls Closure. + * @param cls Closure from #GNUNET_CADET_open_port (peer # as long). * @param channel New handle to the channel. - * @param initiator Peer that started the channel. - * @param port Port this channel is connected to. - * @param options channel option flags - * @return Initial channel context for the channel - * (can be NULL -- that's not an error). + * @param source Peer that started this channel. + * @return Closure for the incoming @a channel. It's given to: + * - The #GNUNET_CADET_DisconnectEventHandler (given to + * #GNUNET_CADET_open_port) when the channel dies. + * - Each the #GNUNET_MQ_MessageCallback handlers for each message + * received on the @a channel. */ static void * -incoming_channel (void *cls, struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - uint32_t port, enum GNUNET_CADET_ChannelOption options) +connect_handler (void *cls, struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) { + struct CadetTestChannelWrapper *ch; + long peer = (long) cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Incoming channel from %s to peer %d\n", - GNUNET_i2s (initiator), (long) cls); + "Incoming channel from %s to %ld: %p\n", + GNUNET_i2s (source), peer, channel); ok++; GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok); - if ((long) cls == peers_requested - 1) + if (peer == peers_requested - 1) + { + if (NULL != incoming_ch) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Duplicate incoming channel for client %lu\n", (long) cls); + GNUNET_assert (0); + } incoming_ch = channel; + } else { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Incoming channel for unknown client %lu\n", (long) cls); - GNUNET_break(0); + "Incoming channel for unexpected peer #%lu\n", (long) cls); + GNUNET_assert (0); } if (NULL != disconnect_task) { GNUNET_SCHEDULER_cancel (disconnect_task); - disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, + disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time, &gather_stats_and_exit, (void *) __LINE__); } - return NULL; + /* TODO: cannot return channel as-is, in order to unify the data handlers */ + ch = GNUNET_new (struct CadetTestChannelWrapper); + ch->ch = channel; + + return ch; } + /** - * Function called whenever an inbound channel is destroyed. Should clean up - * any associated state. + * Function called whenever an MQ-channel is destroyed, even if the destruction + * was requested by #GNUNET_CADET_channel_destroy. + * It must NOT call #GNUNET_CADET_channel_destroy on the channel. * - * @param cls closure (set from GNUNET_CADET_connect) - * @param channel connection to the other end (henceforth invalid) - * @param channel_ctx place where local state associated - * with the channel is stored + * It should clean up any associated state, including cancelling any pending + * transmission on this channel. + * + * @param cls Channel closure (channel wrapper). + * @param channel Connection to the other end (henceforth invalid). */ static void -channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) +disconnect_handler (void *cls, + const struct GNUNET_CADET_Channel *channel) { - long i = (long) cls; + struct CadetTestChannelWrapper *ch_w = cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Incoming channel disconnected at peer %ld\n", i); - if (peers_running - 1 == i) + "Channel disconnected at %d\n", + ok); + GNUNET_assert (ch_w->ch == channel); + if (channel == incoming_ch) { ok++; - GNUNET_break (channel == incoming_ch); incoming_ch = NULL; } - else if (0L == i) + else if (outgoing_ch == channel) { if (P2P_SIGNAL == test) { - ok ++; + ok++; } - GNUNET_break (channel == ch); - ch = NULL; + outgoing_ch = NULL; } else GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Unknown peer! %d\n", i); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok); - + "Unknown channel! %p\n", + channel); if (NULL != disconnect_task) { GNUNET_SCHEDULER_cancel (disconnect_task); - disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit, - (void *) __LINE__); + disconnect_task = + GNUNET_SCHEDULER_add_now (&gather_stats_and_exit, + (void *) __LINE__); } - - return; + GNUNET_free (ch_w); } @@ -781,24 +836,29 @@ channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel, * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES. * * Testcase continues when the root receives confirmation of connected peers, - * on callback funtion ch. + * on callback function ch. * * @param cls Closure (unused). - * @param tc Task Context. */ static void -do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +start_test (void *cls) { + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end () + }; + struct CadetTestChannelWrapper *ch; enum GNUNET_CADET_ChannelOption flags; - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - return; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\n"); - + test_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test\n"); if (NULL != disconnect_task) { GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = NULL; } flags = GNUNET_CADET_OPTION_DEFAULT; @@ -807,25 +867,37 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) test = SPEED; flags |= GNUNET_CADET_OPTION_RELIABLE; } - ch = GNUNET_CADET_channel_create (h1, NULL, p_id[1], 1, flags); - disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, + ch = GNUNET_new (struct CadetTestChannelWrapper); + outgoing_ch = GNUNET_CADET_channel_create (h1, + ch, + p_id[1], + &port, + flags, + NULL, + &disconnect_handler, + handlers); + + ch->ch = outgoing_ch; + + disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time, &gather_stats_and_exit, (void *) __LINE__); if (KEEPALIVE == test) - return; /* Don't send any data. */ + return; /* Don't send any data. */ + - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending data initializer...\n"); data_received = 0; data_sent = 0; ack_received = 0; ack_sent = 0; - GNUNET_CADET_notify_transmit_ready (ch, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - size_payload + 1000, - &tmt_rdy, (void *) 0L); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending data initializer on channel %p...\n", + outgoing_ch); + send_test_message (outgoing_ch); } + /** * Callback to be called when the requested peer information is available * @@ -843,24 +915,31 @@ pi_cb (void *cls, { long i = (long) cls; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "id callback for %ld\n", i); - - if (NULL == pinfo || NULL != emsg) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ID callback for %ld\n", + i); + if ( (NULL == pinfo) || + (NULL != emsg) ) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "pi_cb: %s\n", + emsg); abort_test (__LINE__); return; } p_id[i] = pinfo->result.id; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " id: %s\n", GNUNET_i2s (p_id[i])); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "id: %s\n", + GNUNET_i2s (p_id[i])); p_ids++; if (p_ids < 2) return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n"); - test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &do_test, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got all IDs, starting test\n"); + test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL); } + /** * test main: start test when all peers are connected * @@ -868,7 +947,7 @@ pi_cb (void *cls, * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end. * @param num_peers Number of peers that are running. * @param peers Array of peers. - * @param cadetes Handle to each of the CADETs of the peers. + * @param cadets Handle to each of the CADETs of the peers. */ static void tmain (void *cls, @@ -885,17 +964,19 @@ tmain (void *cls, testbed_peers = peers; h1 = cadets[0]; h2 = cadets[num_peers - 1]; - disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, + disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time, &disconnect_cadet_peers, (void *) __LINE__); - shutdown_handle = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0], GNUNET_TESTBED_PIT_IDENTITY, - &pi_cb, (void *) 0L); + &pi_cb, + (void *) 0L); t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1], GNUNET_TESTBED_PIT_IDENTITY, - &pi_cb, (void *) 1L); + &pi_cb, + (void *) 1L); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n"); } @@ -906,14 +987,46 @@ tmain (void *cls, int main (int argc, char *argv[]) { - initialized = GNUNET_NO; - static uint32_t ports[2]; + static const struct GNUNET_HashCode *ports[2]; + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end () + }; const char *config_file; + char port_id[] = "test port"; + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_relative_time ('t', + "time", + "short_time", + gettext_noop ("set short timeout"), + &short_time), + + GNUNET_GETOPT_option_uint ('m', + "messages", + "NUM_MESSAGES", + gettext_noop ("set number of messages to send"), + &total_packets), + + GNUNET_GETOPT_OPTION_END + }; + + initialized = GNUNET_NO; GNUNET_log_setup ("test", "DEBUG", NULL); - config_file = "test_cadet.conf"; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start\n"); + total_packets = TOTAL_PACKETS; + short_time = SHORT_TIME; + if (-1 == GNUNET_GETOPT_run (argv[0], options, argc, argv)) + { + FPRINTF (stderr, "test failed: problem with CLI parameters\n"); + exit (1); + } + + config_file = "test_cadet.conf"; + GNUNET_CRYPTO_hash (port_id, sizeof (port_id), &port); /* Find out requested size */ if (strstr (argv[0], "_2_") != NULL) @@ -951,11 +1064,11 @@ main (int argc, char *argv[]) { /* Test is supposed to generate the following callbacks: * 1 incoming channel (@dest) - * TOTAL_PACKETS received data packet (@dest) - * TOTAL_PACKETS received data packet (@orig) + * total_packets received data packet (@dest) + * total_packets received data packet (@orig) * 1 received channel destroy (@dest) */ - ok_goal = TOTAL_PACKETS * 2 + 2; + ok_goal = total_packets * 2 + 2; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED_ACK\n"); test = SPEED_ACK; test_name = "speed ack"; @@ -965,11 +1078,11 @@ main (int argc, char *argv[]) /* Test is supposed to generate the following callbacks: * 1 incoming channel (@dest) * 1 initial packet (@dest) - * TOTAL_PACKETS received data packet (@dest) + * total_packets received data packet (@dest) * 1 received data packet (@orig) * 1 received channel destroy (@dest) */ - ok_goal = TOTAL_PACKETS + 4; + ok_goal = total_packets + 4; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED\n"); if (strstr (argv[0], "_reliable") != NULL) { @@ -1008,22 +1121,24 @@ main (int argc, char *argv[]) } p_ids = 0; - ports[0] = 1; - ports[1] = 0; - GNUNET_CADET_TEST_run ("test_cadet_small", - config_file, - peers_requested, - &tmain, - NULL, /* tmain cls */ - &incoming_channel, - &channel_cleaner, - handlers, - ports); - - if (ok_goal > ok) + ports[0] = &port; + ports[1] = NULL; + GNUNET_CADET_TEST_ruN ("test_cadet_small", + config_file, + peers_requested, + &tmain, + NULL, /* tmain cls */ + &connect_handler, + NULL, + &disconnect_handler, + handlers, + ports); + if (NULL != strstr (argv[0], "_reliable")) + msg_dropped = 0; /* dropped should be retransmitted */ + + if (ok_goal > ok - msg_dropped) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "FAILED! (%d/%d)\n", ok, ok_goal); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal); return 1; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n"); @@ -1031,4 +1146,3 @@ main (int argc, char *argv[]) } /* end of test_cadet.c */ -