2 This file is part of GNUnet.
3 Copyright (C) 2011, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file cadet/test_cadet_flow.c
18 * @author Christian Grothoff
19 * @brief Test for flow control of CADET service
23 #include "cadet_test_lib.h"
24 #include "gnunet_cadet_service.h"
25 #include "gnunet_statistics_service.h"
30 * Ugly workaround to unify data handlers on incoming and outgoing channels.
32 struct CadetTestChannelWrapper
37 struct GNUNET_CADET_Channel *ch;
41 * How many messages to send by default.
43 #define TOTAL_PACKETS_DEFAULT 500
46 * How long until we give up on connecting the peers?
48 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
51 * Time to wait by default for stuff that should be rather fast.
53 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
56 * How fast do we send messages?
58 #define SEND_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 10)
62 * How many packets to send.
64 static unsigned int total_packets = TOTAL_PACKETS_DEFAULT;
67 * Time to wait for fast operations.
69 static struct GNUNET_TIME_Relative short_time;
72 * Size of each test packet's payload
74 static size_t size_payload = sizeof (uint32_t);
77 * Operation to get peer ids.
79 static struct GNUNET_TESTBED_Operation *t_op[2];
84 static struct GNUNET_PeerIdentity *p_id[2];
89 static struct GNUNET_HashCode port;
94 static unsigned int p_ids;
97 * Is the setup initialized?
99 static int initialized;
102 * Number of payload packes sent.
104 static int data_sent;
107 * Number of payload packets received.
109 static int data_received;
112 * Number of payload packed acknowledgements sent.
117 * Number of payload packed explicitly (app level) acknowledged.
119 static int ack_received;
122 * Total number of peers asked to run.
124 static unsigned int peers_requested = 2;
127 * Number of currently running peers (should be same as @c peers_requested).
129 static unsigned int peers_running;
132 * Test context (to shut down).
134 struct GNUNET_CADET_TEST_Context *test_ctx;
137 * Task called to disconnect peers.
139 static struct GNUNET_SCHEDULER_Task *disconnect_task;
142 * Task To perform tests
144 static struct GNUNET_SCHEDULER_Task *test_task;
147 * Task runnining #send_next_msg().
149 static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
152 * Cadet handle for the root peer
154 static struct GNUNET_CADET_Handle *h1;
157 * Cadet handle for the first leaf peer
159 static struct GNUNET_CADET_Handle *h2;
162 * Channel handle for the root peer
164 static struct GNUNET_CADET_Channel *outgoing_ch;
167 * Channel handle for the dest peer
169 static struct GNUNET_CADET_Channel *incoming_ch;
172 * Time we started the data transmission (after channel has been established
175 static struct GNUNET_TIME_Absolute start_time;
180 static struct GNUNET_TESTBED_Peer **testbed_peers;
183 * Statistics operation handle.
185 static struct GNUNET_TESTBED_Operation *stats_op;
190 static unsigned int ka_sent;
193 * Keepalives received.
195 static unsigned int ka_received;
198 * How many messages were dropped by CADET because of full buffers?
200 static unsigned int msg_dropped;
204 * Show the results of the test (banwidth acheived) and log them to GAUGER
209 static struct GNUNET_TIME_Absolute end_time;
210 static struct GNUNET_TIME_Relative total_time;
212 end_time = GNUNET_TIME_absolute_get ();
213 total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
215 "\nResults of test \"%s\"\n",
219 GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
221 "Test bandwidth: %f kb/s\n",
222 4 * total_packets * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms
224 "Test throughput: %f packets/s\n\n",
225 total_packets * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms
228 total_packets * 1000.0 / (total_time.rel_value_us / 1000),
234 * Shut down peergroup, clean up.
236 * @param cls Closure (unused).
237 * @param tc Task Context.
240 shutdown_task (void *cls)
242 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 if (NULL != send_next_msg_task)
246 GNUNET_SCHEDULER_cancel (send_next_msg_task);
247 send_next_msg_task = NULL;
249 if (NULL != test_task)
251 GNUNET_SCHEDULER_cancel (test_task);
254 for (unsigned int i = 0; i < 2; i++)
255 GNUNET_TESTBED_operation_done (t_op[i]);
256 if (NULL != outgoing_ch)
258 GNUNET_CADET_channel_destroy (outgoing_ch);
261 if (NULL != incoming_ch)
263 GNUNET_CADET_channel_destroy (incoming_ch);
266 GNUNET_CADET_TEST_cleanup (test_ctx);
271 * Stats callback. Finish the stats testbed operation and when all stats have
272 * been iterated, shutdown the test.
274 * @param cls Closure (line number from which termination was requested).
275 * @param op the operation that has been finished
276 * @param emsg error message in case the operation has failed; will be NULL if
277 * operation has executed successfully.
280 stats_cont (void *cls,
281 struct GNUNET_TESTBED_Operation *op,
284 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
285 "KA sent: %u, KA received: %u\n",
288 if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
293 GNUNET_TESTBED_operation_done (stats_op);
295 if (NULL != disconnect_task)
296 GNUNET_SCHEDULER_cancel (disconnect_task);
297 disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
303 * Process statistic values.
305 * @param cls closure (line number, unused)
306 * @param peer the peer the statistic belong to
307 * @param subsystem name of subsystem that created the statistic
308 * @param name the name of the datum
309 * @param value the current value
310 * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
311 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
314 stats_iterator (void *cls,
315 const struct GNUNET_TESTBED_Peer *peer,
316 const char *subsystem,
321 static const char *s_sent = "# keepalives sent";
322 static const char *s_recv = "# keepalives received";
323 static const char *rdrops = "# messages dropped due to full buffer";
324 static const char *cdrops = "# messages dropped due to slow client";
327 i = GNUNET_TESTBED_get_index (peer);
328 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
329 subsystem, name, (unsigned long long) value);
330 if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
332 if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
334 if (0 == strncmp (rdrops, name, strlen (rdrops)))
335 msg_dropped += value;
336 if (0 == strncmp (cdrops, name, strlen (cdrops)))
337 msg_dropped += value;
344 * Task to gather all statistics.
346 * @param cls Closure (line from which the task was scheduled).
349 gather_stats_and_exit (void *cls)
353 disconnect_task = NULL;
354 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
355 "gathering statistics from line %ld\n",
357 if (NULL != outgoing_ch)
359 GNUNET_CADET_channel_destroy (outgoing_ch);
362 stats_op = GNUNET_TESTBED_get_statistics (peers_running,
373 * Abort test: schedule disconnect and shutdown immediately
375 * @param line Line in the code the abort is requested from (__LINE__).
378 abort_test (long line)
380 if (NULL != disconnect_task)
382 GNUNET_SCHEDULER_cancel (disconnect_task);
383 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
384 "Aborting test from %ld\n",
387 GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
394 * Send a message on the channel with the appropriate size and payload.
396 * Update the appropriate *_sent counter.
398 * @param channel Channel to send the message on.
401 send_test_message (struct GNUNET_CADET_Channel *channel)
403 struct GNUNET_MQ_Envelope *env;
404 struct GNUNET_MessageHeader *msg;
409 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
410 "Sending test message on channel %p\n",
413 if (GNUNET_NO == initialized)
415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
418 if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
421 else if (SPEED == test || SPEED_ACK == test)
423 if (get_target_channel() == channel)
428 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
429 "Sending ACK %u [%d bytes]\n",
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "Sending DATA %u [%d bytes]\n",
442 else if (FORWARD == test)
446 else if (P2P_SIGNAL == test)
454 env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
456 data = (uint32_t *) &msg[1];
457 *data = htonl (payload);
458 GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
463 * Task to request a new data transmission in a SPEED test, without waiting
464 * for previous messages to be sent/arrrive.
466 * @param cls Closure (unused).
469 send_next_msg (void *cls)
471 struct GNUNET_CADET_Channel *channel;
473 send_next_msg_task = NULL;
474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475 "Sending next message: %d\n",
478 channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
479 GNUNET_assert (NULL != channel);
480 GNUNET_assert (SPEED == test);
481 send_test_message (channel);
482 if (data_sent < total_packets)
484 /* SPEED test: Send all messages as soon as possible */
485 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
486 "Scheduling message %d\n",
489 GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
497 * Check if payload is sane (size contains payload).
499 * @param cls should match #ch
500 * @param message The actual message.
501 * @return #GNUNET_OK to keep the channel open,
502 * #GNUNET_SYSERR to close it (signal serious error).
505 check_data (void *cls,
506 const struct GNUNET_MessageHeader *message)
508 return GNUNET_OK; /* all is well-formed */
513 * Function is called whenever a message is received.
515 * @param cls closure (set from GNUNET_CADET_connect(), peer number)
516 * @param message the actual message
519 handle_data (void *cls,
520 const struct GNUNET_MessageHeader *message)
522 struct CadetTestChannelWrapper *ch = cls;
523 struct GNUNET_CADET_Channel *channel = ch->ch;
528 GNUNET_CADET_receive_done (channel);
529 counter = get_target_channel () == channel ? &data_received : &ack_received;
530 if (channel == outgoing_ch)
532 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
533 "Root client got a message.\n");
535 else if (channel == incoming_ch)
537 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
538 "Leaf client got a message.\n");
542 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
543 "Unknown channel %p.\n",
548 data = (uint32_t *) &message[1];
549 payload = ntohl (*data);
550 if (payload == *counter)
552 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
553 "Payload as expected: %u\n",
558 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
559 "Received payload %u, expected: %u\n",
563 if (get_target_channel () == channel) /* Got "data" */
565 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
566 " received data %u\n",
568 if (data_received < total_packets)
573 if (SPEED_ACK == test || SPEED == test)
575 GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
577 send_test_message (channel);
578 if (ack_received < total_packets && SPEED != test)
580 if (ok == 2 && SPEED == test)
584 if (test == P2P_SIGNAL)
586 GNUNET_CADET_channel_destroy (incoming_ch);
591 GNUNET_CADET_channel_destroy (outgoing_ch);
599 * Method called whenever a peer connects to a port in MQ-based CADET.
601 * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
602 * @param channel New handle to the channel.
603 * @param source Peer that started this channel.
604 * @return Closure for the incoming @a channel. It's given to:
605 * - The #GNUNET_CADET_DisconnectEventHandler (given to
606 * #GNUNET_CADET_open_port) when the channel dies.
607 * - Each the #GNUNET_MQ_MessageCallback handlers for each message
608 * received on the @a channel.
611 connect_handler (void *cls,
612 struct GNUNET_CADET_Channel *channel,
613 const struct GNUNET_PeerIdentity *source)
615 struct CadetTestChannelWrapper *ch;
616 long peer = (long) cls;
618 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
619 "Incoming channel from %s to %ld: %p\n",
623 if (peer == peers_requested - 1)
625 if (NULL != incoming_ch)
627 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
628 "Duplicate incoming channel for client %lu\n",
632 incoming_ch = channel;
636 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
637 "Incoming channel for unexpected peer #%lu\n",
641 ch = GNUNET_new (struct CadetTestChannelWrapper);
649 * Function called whenever an MQ-channel is destroyed, even if the destruction
650 * was requested by #GNUNET_CADET_channel_destroy.
651 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
653 * It should clean up any associated state, including cancelling any pending
654 * transmission on this channel.
656 * @param cls Channel closure (channel wrapper).
657 * @param channel Connection to the other end (henceforth invalid).
660 disconnect_handler (void *cls,
661 const struct GNUNET_CADET_Channel *channel)
663 struct CadetTestChannelWrapper *ch_w = cls;
665 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
666 "Channel disconnected at %d\n",
668 GNUNET_assert (ch_w->ch == channel);
669 if (channel == incoming_ch)
671 else if (outgoing_ch == channel)
674 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
675 "Disconnect on unknown channel %p\n",
677 if (NULL != disconnect_task)
678 GNUNET_SCHEDULER_cancel (disconnect_task);
679 disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
686 * Start the testcase, we know the peers and have handles to CADET.
688 * Testcase continues when the root receives confirmation of connected peers,
689 * on callback function ch.
691 * @param cls Closure (unused).
694 start_test (void *cls)
696 struct GNUNET_MQ_MessageHandler handlers[] = {
697 GNUNET_MQ_hd_var_size (data,
698 GNUNET_MESSAGE_TYPE_DUMMY,
699 struct GNUNET_MessageHeader,
701 GNUNET_MQ_handler_end ()
703 struct CadetTestChannelWrapper *ch;
704 enum GNUNET_CADET_ChannelOption flags;
707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709 start_time = GNUNET_TIME_absolute_get ();
710 ch = GNUNET_new (struct CadetTestChannelWrapper);
711 outgoing_ch = GNUNET_CADET_channel_create (h1,
719 ch->ch = outgoing_ch;
720 GNUNET_assert (NULL == disconnect_task);
722 = GNUNET_SCHEDULER_add_delayed (short_time,
723 &gather_stats_and_exit,
725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726 "Sending data initializer on channel %p...\n",
728 send_test_message (outgoing_ch);
733 * Callback to be called when the requested peer information is available
735 * @param cls the closure from GNUNET_TESTBED_peer_get_information()
736 * @param op the operation this callback corresponds to
737 * @param pinfo the result; will be NULL if the operation has failed
738 * @param emsg error message if the operation has failed;
739 * NULL if the operation is successfull
743 struct GNUNET_TESTBED_Operation *op,
744 const struct GNUNET_TESTBED_PeerInformation *pinfo,
749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750 "ID callback for %ld\n",
752 if ( (NULL == pinfo) ||
755 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
758 abort_test (__LINE__);
761 p_id[i] = pinfo->result.id;
762 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764 GNUNET_i2s (p_id[i]));
768 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769 "Got all IDs, starting test\n");
770 test_task = GNUNET_SCHEDULER_add_now (&start_test,
776 * test main: start test when all peers are connected
778 * @param cls Closure.
779 * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
780 * @param num_peers Number of peers that are running.
781 * @param peers Array of peers.
782 * @param cadets Handle to each of the CADETs of the peers.
786 struct GNUNET_CADET_TEST_Context *ctx,
787 unsigned int num_peers,
788 struct GNUNET_TESTBED_Peer **peers,
789 struct GNUNET_CADET_Handle **cadets)
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 peers_running = num_peers;
795 GNUNET_assert (peers_running == peers_requested);
796 testbed_peers = peers;
798 h2 = cadets[num_peers - 1];
799 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
802 t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
803 GNUNET_TESTBED_PIT_IDENTITY,
806 t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
807 GNUNET_TESTBED_PIT_IDENTITY,
810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811 "requested peer ids\n");
822 static const struct GNUNET_HashCode *ports[2];
823 struct GNUNET_MQ_MessageHandler handlers[] = {
824 GNUNET_MQ_hd_var_size (data,
825 GNUNET_MESSAGE_TYPE_DUMMY,
826 struct GNUNET_MessageHeader,
828 GNUNET_MQ_handler_end ()
830 const char *config_file = "test_cadet.conf";
831 char port_id[] = "test port";
832 struct GNUNET_GETOPT_CommandLineOption options[] = {
833 GNUNET_GETOPT_option_relative_time ('t',
836 gettext_noop ("set short timeout"),
838 GNUNET_GETOPT_option_uint ('m',
841 gettext_noop ("set number of messages to send"),
843 GNUNET_GETOPT_option_uint ('p',
846 gettext_noop ("number of peers to launch"),
848 GNUNET_GETOPT_OPTION_END
851 GNUNET_log_setup ("test-cadet-flow",
854 total_packets = TOTAL_PACKETS;
855 short_time = SHORT_TIME;
856 if (-1 == GNUNET_GETOPT_run (argv[0],
862 "test failed: problem with CLI parameters\n");
865 GNUNET_CRYPTO_hash (port_id,
870 GNUNET_CADET_TEST_ruN ("test_cadet_flow",
874 NULL, /* tmain cls */
883 /* end of test_cadet_flow.c */