global reindent, now with uncrustify hook enabled
[oweals/gnunet.git] / src / cadet / test_cadet.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file cadet/test_cadet.c
22  * @author Bart Polot
23  * @author Christian Grothoff
24  * @brief Test for the cadet service using mq API.
25  */
26 #include <stdio.h>
27 #include "platform.h"
28 #include "cadet_test_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_statistics_service.h"
31 #include <gauger.h>
32
33
34 /**
35  * Ugly workaround to unify data handlers on incoming and outgoing channels.
36  */
37 struct CadetTestChannelWrapper
38 {
39   /**
40    * Channel pointer.
41    */
42   struct GNUNET_CADET_Channel *ch;
43 };
44
45 /**
46  * How many messages to send by default.
47  */
48 #define TOTAL_PACKETS 500       /* Cannot exceed 64k! */
49
50 /**
51  * How long until we give up on connecting the peers?
52  */
53 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
54
55 /**
56  * Time to wait by default  for stuff that should be rather fast.
57  */
58 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
59
60 /**
61  * How fast do we send messages?
62  */
63 #define SEND_INTERVAL GNUNET_TIME_relative_multiply ( \
64     GNUNET_TIME_UNIT_MILLISECONDS, 10)
65
66 /**
67  * DIFFERENT TESTS TO RUN
68  */
69 #define SETUP 0
70 #define FORWARD 1
71 #define KEEPALIVE 2
72 #define SPEED 3
73 #define SPEED_ACK 4
74 #define SPEED_REL 8
75 #define P2P_SIGNAL 10
76 #define REOPEN 11
77
78 /**
79  * Which test are we running?
80  */
81 static int test;
82
83 /**
84  * String with test name
85  */
86 static char *test_name;
87
88 /**
89  * Flag to send traffic leaf->root in speed tests to test BCK_ACK logic.
90  */
91 static int test_backwards = GNUNET_NO;
92
93 /**
94  * How many packets to send.
95  */
96 static unsigned int total_packets;
97
98 /**
99  * Time to wait for fast operations.
100  */
101 static struct GNUNET_TIME_Relative short_time;
102
103 /**
104  * How many events have happened
105  */
106 static int ok;
107
108 /**
109  * Number of events expected to conclude the test successfully.
110  */
111 static int ok_goal;
112
113 /**
114  * Size of each test packet's payload
115  */
116 static size_t size_payload = sizeof(uint32_t);
117
118 /**
119  * Operation to get peer ids.
120  */
121 static struct GNUNET_TESTBED_Operation *t_op[2];
122
123 /**
124  * Peer ids.
125  */
126 static struct GNUNET_PeerIdentity *p_id[2];
127
128 /**
129  * Port ID
130  */
131 static struct GNUNET_HashCode port;
132
133 /**
134  * Peer ids counter.
135  */
136 static unsigned int p_ids;
137
138 /**
139  * Is the setup initialized?
140  */
141 static int initialized;
142
143 /**
144  * Number of payload packes sent.
145  */
146 static int data_sent;
147
148 /**
149  * Number of payload packets received.
150  */
151 static int data_received;
152
153 /**
154  * Number of payload packed acknowledgements sent.
155  */
156 static int ack_sent;
157
158 /**
159  * Number of payload packed explicitly (app level) acknowledged.
160  */
161 static int ack_received;
162
163 /**
164  * Total number of peers asked to run.
165  */
166 static unsigned long long peers_requested;
167
168 /**
169  * Number of currently running peers (should be same as @c peers_requested).
170  */
171 static unsigned long long peers_running;
172
173 /**
174  * Test context (to shut down).
175  */
176 struct GNUNET_CADET_TEST_Context *test_ctx;
177
178 /**
179  * Task called to disconnect peers.
180  */
181 static struct GNUNET_SCHEDULER_Task *disconnect_task;
182
183 /**
184  * Task called to reconnect peers.
185  */
186 static struct GNUNET_SCHEDULER_Task *reconnect_task;
187
188 /**
189  * Task To perform tests
190  */
191 static struct GNUNET_SCHEDULER_Task *test_task;
192
193 /**
194  * Task runnining #send_next_msg().
195  */
196 static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
197
198 /**
199  * Cadet handle for the root peer
200  */
201 static struct GNUNET_CADET_Handle *h1;
202
203 /**
204  * Cadet handle for the first leaf peer
205  */
206 static struct GNUNET_CADET_Handle *h2;
207
208 /**
209  * Channel handle for the root peer
210  */
211 static struct GNUNET_CADET_Channel *outgoing_ch;
212
213 /**
214  * Channel handle for the dest peer
215  */
216 static struct GNUNET_CADET_Channel *incoming_ch;
217
218 /**
219  * Time we started the data transmission (after channel has been established
220  * and initilized).
221  */
222 static struct GNUNET_TIME_Absolute start_time;
223
224 /**
225  * Peers handle.
226  */
227 static struct GNUNET_TESTBED_Peer **testbed_peers;
228
229 /**
230  * Statistics operation handle.
231  */
232 static struct GNUNET_TESTBED_Operation *stats_op;
233
234 /**
235  * Keepalives sent.
236  */
237 static unsigned int ka_sent;
238
239 /**
240  * Keepalives received.
241  */
242 static unsigned int ka_received;
243
244 /**
245  * How many messages were dropped by CADET because of full buffers?
246  */
247 static unsigned int msg_dropped;
248
249
250 /******************************************************************************/
251
252
253 /******************************************************************************/
254
255
256 /**
257  * Get the channel considered as the "target" or "receiver", depending on
258  * the test type and size.
259  *
260  * @return Channel handle of the target client, either 0 (for backward tests)
261  *         or the last peer in the line (for other tests).
262  */
263 static struct GNUNET_CADET_Channel *
264 get_target_channel ()
265 {
266   if ((SPEED == test)&&(GNUNET_YES == test_backwards))
267     return outgoing_ch;
268   else
269     return incoming_ch;
270 }
271
272
273 /**
274  * Show the results of the test (banwidth acheived) and log them to GAUGER
275  */
276 static void
277 show_end_data (void)
278 {
279   static struct GNUNET_TIME_Absolute end_time;
280   static struct GNUNET_TIME_Relative total_time;
281
282   end_time = GNUNET_TIME_absolute_get ();
283   total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
284   fprintf (stderr,
285            "\nResults of test \"%s\"\n",
286            test_name);
287   fprintf (stderr,
288            "Test time %s\n",
289            GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
290   fprintf (stderr,
291            "Test bandwidth: %f kb/s\n",
292            4 * total_packets * 1.0 / (total_time.rel_value_us / 1000));    // 4bytes * ms
293   fprintf (stderr,
294            "Test throughput: %f packets/s\n\n",
295            total_packets * 1000.0 / (total_time.rel_value_us / 1000));     // packets * ms
296   GAUGER ("CADET",
297           test_name,
298           total_packets * 1000.0 / (total_time.rel_value_us / 1000),
299           "packets/s");
300 }
301
302
303 /**
304  * Disconnect from cadet services af all peers, call shutdown.
305  *
306  * @param cls Closure (line number from which termination was requested).
307  * @param tc Task Context.
308  */
309 static void
310 disconnect_cadet_peers (void *cls)
311 {
312   long line = (long) cls;
313
314   disconnect_task = NULL;
315   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
316               "disconnecting cadet service of peers, called from line %ld\n",
317               line);
318   for (unsigned int i = 0; i < 2; i++)
319   {
320     GNUNET_TESTBED_operation_done (t_op[i]);
321   }
322   if (NULL != outgoing_ch)
323   {
324     GNUNET_CADET_channel_destroy (outgoing_ch);
325     outgoing_ch = NULL;
326   }
327   if (NULL != incoming_ch)
328   {
329     GNUNET_CADET_channel_destroy (incoming_ch);
330     incoming_ch = NULL;
331   }
332   GNUNET_CADET_TEST_cleanup (test_ctx);
333   GNUNET_SCHEDULER_shutdown ();
334 }
335
336
337 /**
338  * Shut down peergroup, clean up.
339  *
340  * @param cls Closure (unused).
341  * @param tc Task Context.
342  */
343 static void
344 shutdown_task (void *cls)
345 {
346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
347               "Ending test.\n");
348   if (NULL != send_next_msg_task)
349   {
350     GNUNET_SCHEDULER_cancel (send_next_msg_task);
351     send_next_msg_task = NULL;
352   }
353   if (NULL != test_task)
354   {
355     GNUNET_SCHEDULER_cancel (test_task);
356     test_task = NULL;
357   }
358   if (NULL != disconnect_task)
359   {
360     GNUNET_SCHEDULER_cancel (disconnect_task);
361     disconnect_task =
362       GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
363                                 (void *) __LINE__);
364   }
365 }
366
367
368 /**
369  * Stats callback. Finish the stats testbed operation and when all stats have
370  * been iterated, shutdown the test.
371  *
372  * @param cls Closure (line number from which termination was requested).
373  * @param op the operation that has been finished
374  * @param emsg error message in case the operation has failed; will be NULL if
375  *          operation has executed successfully.
376  */
377 static void
378 stats_cont (void *cls,
379             struct GNUNET_TESTBED_Operation *op,
380             const char *emsg)
381 {
382   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
383               "KA sent: %u, KA received: %u\n",
384               ka_sent,
385               ka_received);
386   if (((KEEPALIVE == test)||(REOPEN == test)) &&
387       ((ka_sent < 2) || (ka_sent > ka_received + 1)))
388   {
389     GNUNET_break (0);
390     ok--;
391   }
392   GNUNET_TESTBED_operation_done (stats_op);
393
394   if (NULL != disconnect_task)
395     GNUNET_SCHEDULER_cancel (disconnect_task);
396   disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
397                                               cls);
398 }
399
400
401 /**
402  * Process statistic values.
403  *
404  * @param cls closure (line number, unused)
405  * @param peer the peer the statistic belong to
406  * @param subsystem name of subsystem that created the statistic
407  * @param name the name of the datum
408  * @param value the current value
409  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
410  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
411  */
412 static int
413 stats_iterator (void *cls,
414                 const struct GNUNET_TESTBED_Peer *peer,
415                 const char *subsystem,
416                 const char *name,
417                 uint64_t value,
418                 int is_persistent)
419 {
420   static const char *s_sent = "# keepalives sent";
421   static const char *s_recv = "# keepalives received";
422   static const char *rdrops = "# messages dropped due to full buffer";
423   static const char *cdrops = "# messages dropped due to slow client";
424   uint32_t i;
425
426   i = GNUNET_TESTBED_get_index (peer);
427   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
428               subsystem, name, (unsigned long long) value);
429   if ((0 == strncmp (s_sent, name, strlen (s_sent)))&&(0 == i))
430     ka_sent = value;
431   if ((0 == strncmp (s_recv, name, strlen (s_recv)))&&(peers_requested - 1 ==
432                                                        i) )
433     ka_received = value;
434   if (0 == strncmp (rdrops, name, strlen (rdrops)))
435     msg_dropped += value;
436   if (0 == strncmp (cdrops, name, strlen (cdrops)))
437     msg_dropped += value;
438
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Task to gather all statistics.
445  *
446  * @param cls Closure (line from which the task was scheduled).
447  */
448 static void
449 gather_stats_and_exit (void *cls)
450 {
451   long l = (long) cls;
452
453   disconnect_task = NULL;
454   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
455               "gathering statistics from line %ld\n",
456               l);
457   if (NULL != outgoing_ch)
458   {
459     GNUNET_CADET_channel_destroy (outgoing_ch);
460     outgoing_ch = NULL;
461   }
462   stats_op = GNUNET_TESTBED_get_statistics (peers_running,
463                                             testbed_peers,
464                                             "cadet",
465                                             NULL,
466                                             &stats_iterator,
467                                             stats_cont,
468                                             cls);
469 }
470
471
472 /**
473  * Send a message on the channel with the appropriate size and payload.
474  *
475  * Update the appropriate *_sent counter.
476  *
477  * @param channel Channel to send the message on.
478  */
479 static void
480 send_test_message (struct GNUNET_CADET_Channel *channel);
481
482 /**
483  * Check if payload is sane (size contains payload).
484  *
485  * @param cls should match #ch
486  * @param message The actual message.
487  * @return #GNUNET_OK to keep the channel open,
488  *         #GNUNET_SYSERR to close it (signal serious error).
489  */
490 static int
491 check_data (void *cls,
492             const struct GNUNET_MessageHeader *message);
493
494 /**
495  * Function is called whenever a message is received.
496  *
497  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
498  * @param message the actual message
499  */
500 static void
501 handle_data (void *cls,
502              const struct GNUNET_MessageHeader *message);
503
504 /**
505  * Function called whenever an MQ-channel is destroyed, unless the destruction
506  * was requested by #GNUNET_CADET_channel_destroy.
507  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
508  *
509  * It should clean up any associated state, including cancelling any pending
510  * transmission on this channel.
511  *
512  * @param cls Channel closure (channel wrapper).
513  * @param channel Connection to the other end (henceforth invalid).
514  */
515 static void
516 disconnect_handler (void *cls,
517                     const struct GNUNET_CADET_Channel *channel);
518
519
520 /**
521  * Task to reconnect to other peer.
522  *
523  * @param cls Closure (line from which the task was scheduled).
524  */
525 static void
526 reconnect_op (void *cls)
527 {
528   struct GNUNET_MQ_MessageHandler handlers[] = {
529     GNUNET_MQ_hd_var_size (data,
530                            GNUNET_MESSAGE_TYPE_DUMMY,
531                            struct GNUNET_MessageHeader,
532                            NULL),
533     GNUNET_MQ_handler_end ()
534   };
535   long l = (long) cls;
536   struct CadetTestChannelWrapper *ch;
537
538   reconnect_task = NULL;
539   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
540               "reconnecting from line %ld\n",
541               l);
542   if (NULL != outgoing_ch)
543   {
544     GNUNET_CADET_channel_destroy (outgoing_ch);
545     outgoing_ch = NULL;
546   }
547   ch = GNUNET_new (struct CadetTestChannelWrapper);
548   outgoing_ch = GNUNET_CADET_channel_create (h1,
549                                              ch,
550                                              p_id[1],
551                                              &port,
552                                              NULL,
553                                              &disconnect_handler,
554                                              handlers);
555   ch->ch = outgoing_ch;
556   send_test_message (outgoing_ch);
557 }
558
559 /**
560  * Function called whenever an MQ-channel is destroyed, unless the destruction
561  * was requested by #GNUNET_CADET_channel_destroy.
562  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
563  *
564  * It should clean up any associated state, including cancelling any pending
565  * transmission on this channel.
566  *
567  * @param cls Channel closure (channel wrapper).
568  * @param channel Connection to the other end (henceforth invalid).
569  */
570 static void
571 disconnect_handler (void *cls,
572                     const struct GNUNET_CADET_Channel *channel)
573 {
574   struct CadetTestChannelWrapper *ch_w = cls;
575
576   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
577               "Channel disconnected at %d\n",
578               ok);
579   GNUNET_assert (ch_w->ch == channel);
580   if (channel == incoming_ch)
581   {
582     ok++;
583     incoming_ch = NULL;
584   }
585   else if (outgoing_ch == channel)
586   {
587     if (P2P_SIGNAL == test)
588     {
589       ok++;
590     }
591     outgoing_ch = NULL;
592   }
593   else
594     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
595                 "Unknown channel! %p\n",
596                 channel);
597   if ((NULL != disconnect_task)&&(REOPEN != test))
598   {
599     GNUNET_SCHEDULER_cancel (disconnect_task);
600     disconnect_task =
601       GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
602                                 (void *) __LINE__);
603   }
604   else if ((NULL != reconnect_task)&&(REOPEN == test))
605   {
606     GNUNET_SCHEDULER_cancel (reconnect_task);
607     reconnect_task =
608       GNUNET_SCHEDULER_add_now (&reconnect_op,
609                                 (void *) __LINE__);
610   }
611   GNUNET_free (ch_w);
612 }
613
614
615 /**
616  * Abort test: schedule disconnect and shutdown immediately
617  *
618  * @param line Line in the code the abort is requested from (__LINE__).
619  */
620 static void
621 abort_test (long line)
622 {
623   if (NULL != disconnect_task)
624   {
625     GNUNET_SCHEDULER_cancel (disconnect_task);
626     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
627                 "Aborting test from %ld\n",
628                 line);
629     disconnect_task =
630       GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
631                                 (void *) line);
632   }
633 }
634
635
636 /**
637  * Send a message on the channel with the appropriate size and payload.
638  *
639  * Update the appropriate *_sent counter.
640  *
641  * @param channel Channel to send the message on.
642  */
643 static void
644 send_test_message (struct GNUNET_CADET_Channel *channel)
645 {
646   struct GNUNET_MQ_Envelope *env;
647   struct GNUNET_MessageHeader *msg;
648   uint32_t *data;
649   int payload;
650   int size;
651
652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
653               "Sending test message on channel %p\n",
654               channel);
655   size = size_payload;
656   if (GNUNET_NO == initialized)
657   {
658     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
659     size += 1000;
660     payload = data_sent;
661     if (SPEED_ACK == test)   // FIXME unify SPEED_ACK with an initializer
662       data_sent++;
663   }
664   else if ((SPEED == test)||(SPEED_ACK == test))
665   {
666     if (get_target_channel () == channel)
667     {
668       payload = ack_sent;
669       size += ack_sent;
670       ack_sent++;
671       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672                   "Sending ACK %u [%d bytes]\n",
673                   payload, size);
674     }
675     else
676     {
677       payload = data_sent;
678       size += data_sent;
679       data_sent++;
680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
681                   "Sending DATA %u [%d bytes]\n",
682                   data_sent, size);
683     }
684   }
685   else if (FORWARD == test)
686   {
687     payload = ack_sent;
688   }
689   else if (P2P_SIGNAL == test)
690   {
691     payload = data_sent;
692   }
693   else if (REOPEN == test)
694   {
695     payload = data_sent;
696     data_sent++;
697     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
698                 "Sending DATA %u [%d bytes]\n",
699                 data_sent, size);
700   }
701   else
702   {
703     GNUNET_assert (0);
704   }
705   env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
706
707   data = (uint32_t *) &msg[1];
708   *data = htonl (payload);
709   GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
710 }
711
712
713 /**
714  * Task to request a new data transmission in a SPEED test, without waiting
715  * for previous messages to be sent/arrrive.
716  *
717  * @param cls Closure (unused).
718  */
719 static void
720 send_next_msg (void *cls)
721 {
722   struct GNUNET_CADET_Channel *channel;
723
724   send_next_msg_task = NULL;
725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726               "Sending next message: %d\n",
727               data_sent);
728
729   channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
730   GNUNET_assert (NULL != channel);
731   GNUNET_assert (SPEED == test);
732   send_test_message (channel);
733   if (data_sent < total_packets)
734   {
735     /* SPEED test: Send all messages as soon as possible */
736     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737                 "Scheduling message %d\n",
738                 data_sent + 1);
739     send_next_msg_task =
740       GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
741                                     &send_next_msg,
742                                     NULL);
743   }
744 }
745
746
747 /**
748  * Every few messages cancel the timeout task and re-schedule it again, to
749  * avoid timing out when traffic keeps coming.
750  *
751  * @param line Code line number to log if a timeout occurs.
752  */
753 static void
754 reschedule_timeout_task (long line)
755 {
756   if ((ok % 10) == 0)
757   {
758     if (NULL != disconnect_task)
759     {
760       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
761                   "reschedule timeout every 10 messages\n");
762       GNUNET_SCHEDULER_cancel (disconnect_task);
763       disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
764                                                       &gather_stats_and_exit,
765                                                       (void *) line);
766     }
767   }
768 }
769
770
771 /**
772  * Check if payload is sane (size contains payload).
773  *
774  * @param cls should match #ch
775  * @param message The actual message.
776  * @return #GNUNET_OK to keep the channel open,
777  *         #GNUNET_SYSERR to close it (signal serious error).
778  */
779 static int
780 check_data (void *cls,
781             const struct GNUNET_MessageHeader *message)
782 {
783   return GNUNET_OK;             /* all is well-formed */
784 }
785
786
787 /**
788  * Function is called whenever a message is received.
789  *
790  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
791  * @param message the actual message
792  */
793 static void
794 handle_data (void *cls,
795              const struct GNUNET_MessageHeader *message)
796 {
797   struct CadetTestChannelWrapper *ch = cls;
798   struct GNUNET_CADET_Channel *channel = ch->ch;
799   uint32_t *data;
800   uint32_t payload;
801   int *counter;
802
803   ok++;
804   GNUNET_CADET_receive_done (channel);
805   counter = get_target_channel () == channel ? &data_received : &ack_received;
806
807   reschedule_timeout_task ((long) __LINE__);
808
809   if (channel == outgoing_ch)
810   {
811     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
812                 "Root client got a message.\n");
813   }
814   else if (channel == incoming_ch)
815   {
816     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
817                 "Leaf client got a message.\n");
818   }
819   else
820   {
821     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
822                 "Unknown channel %p.\n",
823                 channel);
824     GNUNET_assert (0);
825   }
826
827   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
828               " ok: (%d/%d)\n",
829               ok,
830               ok_goal);
831   data = (uint32_t *) &message[1];
832   payload = ntohl (*data);
833   if (payload == *counter)
834   {
835     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
836                 " payload as expected: %u\n",
837                 payload);
838   }
839   else
840   {
841     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
842                 " payload %u, expected: %u\n",
843                 payload, *counter);
844   }
845
846   if (GNUNET_NO == initialized)
847   {
848     initialized = GNUNET_YES;
849     start_time = GNUNET_TIME_absolute_get ();
850     if (SPEED == test)
851     {
852       GNUNET_assert (incoming_ch == channel);
853       send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg,
854                                                      NULL);
855       return;
856     }
857   }
858
859   (*counter)++;
860   if (get_target_channel () == channel)  /* Got "data" */
861   {
862     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received);
863     if ((SPEED != test) ||( (ok_goal - 2) == ok) )
864     {
865       /* Send ACK */
866       send_test_message (channel);
867       return;
868     }
869     else
870     {
871       if (data_received < total_packets)
872         return;
873     }
874   }
875   else /* Got "ack" */
876   {
877     if ((SPEED_ACK == test) ||(SPEED == test) )
878     {
879       GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
880       /* Send more data */
881       send_test_message (channel);
882       if ((ack_received < total_packets) &&(SPEED != test) )
883         return;
884       if ((ok == 2) &&(SPEED == test) )
885         return;
886       show_end_data ();
887     }
888     if (test == P2P_SIGNAL)
889     {
890       GNUNET_CADET_channel_destroy (incoming_ch);
891       incoming_ch = NULL;
892     }
893     else
894     {
895       GNUNET_CADET_channel_destroy (outgoing_ch);
896       outgoing_ch = NULL;
897     }
898   }
899 }
900
901
902 /**
903  * Method called whenever a peer connects to a port in MQ-based CADET.
904  *
905  * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
906  * @param channel New handle to the channel.
907  * @param source Peer that started this channel.
908  * @return Closure for the incoming @a channel. It's given to:
909  *         - The #GNUNET_CADET_DisconnectEventHandler (given to
910  *           #GNUNET_CADET_open_port) when the channel dies.
911  *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
912  *           received on the @a channel.
913  */
914 static void *
915 connect_handler (void *cls,
916                  struct GNUNET_CADET_Channel *channel,
917                  const struct GNUNET_PeerIdentity *source)
918 {
919   struct CadetTestChannelWrapper *ch;
920   long peer = (long) cls;
921
922   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
923               "Incoming channel from %s to %ld: %p\n",
924               GNUNET_i2s (source),
925               peer,
926               channel);
927   ok++;
928   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
929               " ok: %d\n",
930               ok);
931   if (peer == peers_requested - 1)
932   {
933     if (NULL != incoming_ch)
934     {
935       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
936                   "Duplicate incoming channel for client %lu\n",
937                   (long) cls);
938       GNUNET_assert (0);
939     }
940     incoming_ch = channel;
941   }
942   else
943   {
944     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
945                 "Incoming channel for unexpected peer #%lu\n",
946                 (long) cls);
947     GNUNET_assert (0);
948   }
949   if ((NULL != disconnect_task)&&(REOPEN != test))
950   {
951     GNUNET_SCHEDULER_cancel (disconnect_task);
952     disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
953                                                     &gather_stats_and_exit,
954                                                     (void *) __LINE__);
955   }
956   else if ((NULL != disconnect_task) && (REOPEN == test))
957   {
958     GNUNET_SCHEDULER_cancel (disconnect_task);
959     disconnect_task = GNUNET_SCHEDULER_add_delayed (
960       GNUNET_TIME_relative_multiply (short_time, 2),
961       &gather_stats_and_exit,
962       (void *) __LINE__);
963   }
964
965   if ((NULL != reconnect_task) && (REOPEN == test))
966   {
967     GNUNET_SCHEDULER_cancel (reconnect_task);
968     reconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
969                                                    &reconnect_op,
970                                                    (void *) __LINE__);
971   }
972
973   /* TODO: cannot return channel as-is, in order to unify the data handlers */
974   ch = GNUNET_new (struct CadetTestChannelWrapper);
975   ch->ch = channel;
976
977   return ch;
978 }
979
980
981 /**
982  * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
983  *
984  * Testcase continues when the root receives confirmation of connected peers,
985  * on callback function ch.
986  *
987  * @param cls Closure (unused).
988  */
989 static void
990 start_test (void *cls)
991 {
992   struct GNUNET_MQ_MessageHandler handlers[] = {
993     GNUNET_MQ_hd_var_size (data,
994                            GNUNET_MESSAGE_TYPE_DUMMY,
995                            struct GNUNET_MessageHeader,
996                            NULL),
997     GNUNET_MQ_handler_end ()
998   };
999   struct CadetTestChannelWrapper *ch;
1000
1001   test_task = NULL;
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test: %s\n", test_name);
1003   if (NULL != disconnect_task)
1004   {
1005     GNUNET_SCHEDULER_cancel (disconnect_task);
1006     disconnect_task = NULL;
1007   }
1008
1009   if (SPEED_REL == test)
1010   {
1011     test = SPEED;
1012   }
1013
1014   ch = GNUNET_new (struct CadetTestChannelWrapper);
1015   outgoing_ch = GNUNET_CADET_channel_create (h1,
1016                                              ch,
1017                                              p_id[1],
1018                                              &port,
1019                                              NULL,
1020                                              &disconnect_handler,
1021                                              handlers);
1022
1023   ch->ch = outgoing_ch;
1024
1025   disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
1026                                                   &gather_stats_and_exit,
1027                                                   (void *) __LINE__);
1028   if (KEEPALIVE == test)
1029     return;                     /* Don't send any data. */
1030
1031   data_received = 0;
1032   data_sent = 0;
1033   ack_received = 0;
1034   ack_sent = 0;
1035   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036               "Sending data initializer on channel %p...\n",
1037               outgoing_ch);
1038   send_test_message (outgoing_ch);
1039   if (REOPEN == test)
1040   {
1041     reconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
1042                                                    &reconnect_op,
1043                                                    (void *) __LINE__);
1044     GNUNET_SCHEDULER_cancel (disconnect_task);
1045     disconnect_task = GNUNET_SCHEDULER_add_delayed (
1046       GNUNET_TIME_relative_multiply (short_time, 2),
1047       &gather_stats_and_exit,
1048       (void *) __LINE__);
1049   }
1050 }
1051
1052
1053 /**
1054  * Callback to be called when the requested peer information is available
1055  *
1056  * @param cls the closure from GNUNET_TESTBED_peer_get_information()
1057  * @param op the operation this callback corresponds to
1058  * @param pinfo the result; will be NULL if the operation has failed
1059  * @param emsg error message if the operation has failed;
1060  *             NULL if the operation is successfull
1061  */
1062 static void
1063 pi_cb (void *cls,
1064        struct GNUNET_TESTBED_Operation *op,
1065        const struct GNUNET_TESTBED_PeerInformation *pinfo,
1066        const char *emsg)
1067 {
1068   long i = (long) cls;
1069
1070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071               "ID callback for %ld\n",
1072               i);
1073   if ((NULL == pinfo) ||
1074       (NULL != emsg))
1075   {
1076     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1077                 "pi_cb: %s\n",
1078                 emsg);
1079     abort_test (__LINE__);
1080     return;
1081   }
1082   p_id[i] = pinfo->result.id;
1083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084               "id: %s\n",
1085               GNUNET_i2s (p_id[i]));
1086   p_ids++;
1087   if (p_ids < 2)
1088     return;
1089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090               "Got all IDs, starting test\n");
1091   test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL);
1092 }
1093
1094
1095 /**
1096  * test main: start test when all peers are connected
1097  *
1098  * @param cls Closure.
1099  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
1100  * @param num_peers Number of peers that are running.
1101  * @param peers Array of peers.
1102  * @param cadets Handle to each of the CADETs of the peers.
1103  */
1104 static void
1105 tmain (void *cls,
1106        struct GNUNET_CADET_TEST_Context *ctx,
1107        unsigned int num_peers,
1108        struct GNUNET_TESTBED_Peer **peers,
1109        struct GNUNET_CADET_Handle **cadets)
1110 {
1111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n");
1112   ok = 0;
1113   test_ctx = ctx;
1114   peers_running = num_peers;
1115   GNUNET_assert (peers_running == peers_requested);
1116   testbed_peers = peers;
1117   h1 = cadets[0];
1118   h2 = cadets[num_peers - 1];
1119   disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
1120                                                   &disconnect_cadet_peers,
1121                                                   (void *) __LINE__);
1122   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1123                                  NULL);
1124   t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
1125                                                  GNUNET_TESTBED_PIT_IDENTITY,
1126                                                  &pi_cb,
1127                                                  (void *) 0L);
1128   t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
1129                                                  GNUNET_TESTBED_PIT_IDENTITY,
1130                                                  &pi_cb,
1131                                                  (void *) 1L);
1132   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
1133 }
1134
1135
1136 /**
1137  * Main: start test
1138  */
1139 int
1140 main (int argc, char *argv[])
1141 {
1142   static const struct GNUNET_HashCode *ports[2];
1143   struct GNUNET_MQ_MessageHandler handlers[] = {
1144     GNUNET_MQ_hd_var_size (data,
1145                            GNUNET_MESSAGE_TYPE_DUMMY,
1146                            struct GNUNET_MessageHeader,
1147                            NULL),
1148     GNUNET_MQ_handler_end ()
1149   };
1150   const char *config_file;
1151   char port_id[] = "test port";
1152   struct GNUNET_GETOPT_CommandLineOption options[] = {
1153     GNUNET_GETOPT_option_relative_time ('t',
1154                                         "time",
1155                                         "short_time",
1156                                         gettext_noop ("set short timeout"),
1157                                         &short_time),
1158     GNUNET_GETOPT_option_uint ('m',
1159                                "messages",
1160                                "NUM_MESSAGES",
1161                                gettext_noop ("set number of messages to send"),
1162                                &total_packets),
1163
1164     GNUNET_GETOPT_OPTION_END
1165   };
1166
1167
1168   initialized = GNUNET_NO;
1169   GNUNET_log_setup ("test", "DEBUG", NULL);
1170
1171   total_packets = TOTAL_PACKETS;
1172   short_time = SHORT_TIME;
1173   if (-1 == GNUNET_GETOPT_run (argv[0], options, argc, argv))
1174   {
1175     fprintf (stderr, "test failed: problem with CLI parameters\n");
1176     exit (1);
1177   }
1178
1179   config_file = "test_cadet.conf";
1180   GNUNET_CRYPTO_hash (port_id, sizeof(port_id), &port);
1181
1182   /* Find out requested size */
1183   if (strstr (argv[0], "_2_") != NULL)
1184   {
1185     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DIRECT CONNECTIONs\n");
1186     peers_requested = 2;
1187   }
1188   else if (strstr (argv[0], "_5_") != NULL)
1189   {
1190     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "5 PEER LINE\n");
1191     peers_requested = 5;
1192   }
1193   else if (strstr (argv[0], "_6_") != NULL)
1194   {
1195     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "6 PEER LINE\n");
1196     peers_requested = 6;
1197   }
1198   else
1199   {
1200     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "SIZE UNKNOWN, USING 2\n");
1201     peers_requested = 2;
1202   }
1203
1204   /* Find out requested test */
1205   if (strstr (argv[0], "_forward") != NULL)
1206   {
1207     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "FORWARD\n");
1208     test = FORWARD;
1209     test_name = "unicast";
1210     ok_goal = 4;
1211   }
1212   else if (strstr (argv[0], "_signal") != NULL)
1213   {
1214     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SIGNAL\n");
1215     test = P2P_SIGNAL;
1216     test_name = "signal";
1217     ok_goal = 4;
1218   }
1219   else if (strstr (argv[0], "_speed_ack") != NULL)
1220   {
1221     /* Test is supposed to generate the following callbacks:
1222      * 1 incoming channel (@dest)
1223      * total_packets received data packet (@dest)
1224      * total_packets received data packet (@orig)
1225      * 1 received channel destroy (@dest) FIXME #5818
1226      */
1227     ok_goal = total_packets * 2 + 2;
1228     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED_ACK\n");
1229     test = SPEED_ACK;
1230     test_name = "speed ack";
1231   }
1232   else if (strstr (argv[0], "_speed") != NULL)
1233   {
1234     /* Test is supposed to generate the following callbacks:
1235      * 1 incoming channel (@dest)
1236      * 1 initial packet (@dest)
1237      * total_packets received data packet (@dest)
1238      * 1 received data packet (@orig)
1239      * 1 received channel destroy (@dest)  FIXME #5818
1240      */
1241     ok_goal = total_packets + 4;
1242     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED\n");
1243     if (strstr (argv[0], "_reliable") != NULL)
1244     {
1245       test = SPEED_REL;
1246       test_name = "speed reliable";
1247       config_file = "test_cadet_drop.conf";
1248     }
1249     else
1250     {
1251       test = SPEED;
1252       test_name = "speed";
1253     }
1254   }
1255   else if (strstr (argv[0], "_keepalive") != NULL)
1256   {
1257     test = KEEPALIVE;
1258     test_name = "keepalive";
1259     /* Test is supposed to generate the following callbacks:
1260      * 1 incoming channel (@dest)
1261      * [wait]
1262      * 1 received channel destroy (@dest)  FIXME #5818
1263      */
1264     ok_goal = 1;
1265   }
1266   else if (strstr (argv[0], "_reopen") != NULL)
1267   {
1268     test = REOPEN;
1269     test_name = "reopen";
1270     ///* Test is supposed to generate the following callbacks:
1271     // * 1 incoming channel (@dest)
1272     // * [wait]
1273     // * 1 received channel destroy (@dest)  FIXME #5818
1274     // */
1275     ok_goal = 6;
1276   }
1277   else
1278   {
1279     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "UNKNOWN\n");
1280     test = SETUP;
1281     ok_goal = 0;
1282   }
1283
1284   if (strstr (argv[0], "backwards") != NULL)
1285   {
1286     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "BACKWARDS (LEAF TO ROOT)\n");
1287     test_backwards = GNUNET_YES;
1288     GNUNET_asprintf (&test_name, "backwards %s", test_name);
1289   }
1290
1291   p_ids = 0;
1292   ports[0] = &port;
1293   ports[1] = NULL;
1294   GNUNET_CADET_TEST_ruN ("test_cadet_small",
1295                          config_file,
1296                          peers_requested,
1297                          &tmain,
1298                          NULL,        /* tmain cls */
1299                          &connect_handler,
1300                          NULL,
1301                          &disconnect_handler,
1302                          handlers,
1303                          ports);
1304   if (NULL != strstr (argv[0], "_reliable"))
1305     msg_dropped = 0;            /* dropped should be retransmitted */
1306
1307   if (ok_goal > ok - msg_dropped)
1308   {
1309     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal);
1310     return 1;
1311   }
1312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n");
1313   return 0;
1314 }
1315
1316 /* end of test_cadet.c */