80a6e8227c5d8d10025b0396c5341af7f1418829
[oweals/gnunet.git] / src / cadet / test_cadet.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file cadet/test_cadet.c
22  * @author Bart Polot
23  * @author Christian Grothoff
24  * @brief Test for the cadet service using mq API.
25  */
26 #include <stdio.h>
27 #include "platform.h"
28 #include "cadet_test_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_statistics_service.h"
31 #include <gauger.h>
32
33
34 /**
35  * Ugly workaround to unify data handlers on incoming and outgoing channels.
36  */
37 struct CadetTestChannelWrapper
38 {
39   /**
40    * Channel pointer.
41    */
42   struct GNUNET_CADET_Channel *ch;
43 };
44
45 /**
46  * How many messages to send by default.
47  */
48 #define TOTAL_PACKETS 500       /* Cannot exceed 64k! */
49
50 /**
51  * How long until we give up on connecting the peers?
52  */
53 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
54
55 /**
56  * Time to wait by default  for stuff that should be rather fast.
57  */
58 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
59
60 /**
61  * How fast do we send messages?
62  */
63 #define SEND_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 10)
64
65 /**
66  * DIFFERENT TESTS TO RUN
67  */
68 #define SETUP 0
69 #define FORWARD 1
70 #define KEEPALIVE 2
71 #define SPEED 3
72 #define SPEED_ACK 4
73 #define SPEED_REL 8
74 #define P2P_SIGNAL 10
75 #define REOPEN 11
76
77 /**
78  * Which test are we running?
79  */
80 static int test;
81
82 /**
83  * String with test name
84  */
85 static char *test_name;
86
87 /**
88  * Flag to send traffic leaf->root in speed tests to test BCK_ACK logic.
89  */
90 static int test_backwards = GNUNET_NO;
91
92 /**
93  * How many packets to send.
94  */
95 static unsigned int total_packets;
96
97 /**
98  * Time to wait for fast operations.
99  */
100 static struct GNUNET_TIME_Relative short_time;
101
102 /**
103  * How many events have happened
104  */
105 static int ok;
106
107 /**
108  * Number of events expected to conclude the test successfully.
109  */
110 static int ok_goal;
111
112 /**
113  * Size of each test packet's payload
114  */
115 static size_t size_payload = sizeof (uint32_t);
116
117 /**
118  * Operation to get peer ids.
119  */
120 static struct GNUNET_TESTBED_Operation *t_op[2];
121
122 /**
123  * Peer ids.
124  */
125 static struct GNUNET_PeerIdentity *p_id[2];
126
127 /**
128  * Port ID
129  */
130 static struct GNUNET_HashCode port;
131
132 /**
133  * Peer ids counter.
134  */
135 static unsigned int p_ids;
136
137 /**
138  * Is the setup initialized?
139  */
140 static int initialized;
141
142 /**
143  * Number of payload packes sent.
144  */
145 static int data_sent;
146
147 /**
148  * Number of payload packets received.
149  */
150 static int data_received;
151
152 /**
153  * Number of payload packed acknowledgements sent.
154  */
155 static int ack_sent;
156
157 /**
158  * Number of payload packed explicitly (app level) acknowledged.
159  */
160 static int ack_received;
161
162 /**
163  * Total number of peers asked to run.
164  */
165 static unsigned long long peers_requested;
166
167 /**
168  * Number of currently running peers (should be same as @c peers_requested).
169  */
170 static unsigned long long peers_running;
171
172 /**
173  * Test context (to shut down).
174  */
175 struct GNUNET_CADET_TEST_Context *test_ctx;
176
177 /**
178  * Task called to disconnect peers.
179  */
180 static struct GNUNET_SCHEDULER_Task *disconnect_task;
181
182 /**
183  * Task called to reconnect peers.
184  */
185 static struct GNUNET_SCHEDULER_Task *reconnect_task;
186
187 /**
188  * Task To perform tests
189  */
190 static struct GNUNET_SCHEDULER_Task *test_task;
191
192 /**
193  * Task runnining #send_next_msg().
194  */
195 static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
196
197 /**
198  * Cadet handle for the root peer
199  */
200 static struct GNUNET_CADET_Handle *h1;
201
202 /**
203  * Cadet handle for the first leaf peer
204  */
205 static struct GNUNET_CADET_Handle *h2;
206
207 /**
208  * Channel handle for the root peer
209  */
210 static struct GNUNET_CADET_Channel *outgoing_ch;
211
212 /**
213  * Channel handle for the dest peer
214  */
215 static struct GNUNET_CADET_Channel *incoming_ch;
216
217 /**
218  * Time we started the data transmission (after channel has been established
219  * and initilized).
220  */
221 static struct GNUNET_TIME_Absolute start_time;
222
223 /**
224  * Peers handle.
225  */
226 static struct GNUNET_TESTBED_Peer **testbed_peers;
227
228 /**
229  * Statistics operation handle.
230  */
231 static struct GNUNET_TESTBED_Operation *stats_op;
232
233 /**
234  * Keepalives sent.
235  */
236 static unsigned int ka_sent;
237
238 /**
239  * Keepalives received.
240  */
241 static unsigned int ka_received;
242
243 /**
244  * How many messages were dropped by CADET because of full buffers?
245  */
246 static unsigned int msg_dropped;
247
248
249 /******************************************************************************/
250
251
252 /******************************************************************************/
253
254
255 /**
256  * Get the channel considered as the "target" or "receiver", depending on
257  * the test type and size.
258  *
259  * @return Channel handle of the target client, either 0 (for backward tests)
260  *         or the last peer in the line (for other tests).
261  */
262 static struct GNUNET_CADET_Channel *
263 get_target_channel ()
264 {
265   if (SPEED == test && GNUNET_YES == test_backwards)
266     return outgoing_ch;
267   else
268     return incoming_ch;
269 }
270
271
272 /**
273  * Show the results of the test (banwidth acheived) and log them to GAUGER
274  */
275 static void
276 show_end_data (void)
277 {
278   static struct GNUNET_TIME_Absolute end_time;
279   static struct GNUNET_TIME_Relative total_time;
280
281   end_time = GNUNET_TIME_absolute_get ();
282   total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
283   FPRINTF (stderr,
284            "\nResults of test \"%s\"\n",
285            test_name);
286   FPRINTF (stderr,
287            "Test time %s\n",
288            GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
289   FPRINTF (stderr,
290            "Test bandwidth: %f kb/s\n",
291            4 * total_packets * 1.0 / (total_time.rel_value_us / 1000));    // 4bytes * ms
292   FPRINTF (stderr,
293            "Test throughput: %f packets/s\n\n",
294            total_packets * 1000.0 / (total_time.rel_value_us / 1000));     // packets * ms
295   GAUGER ("CADET",
296           test_name,
297           total_packets * 1000.0 / (total_time.rel_value_us / 1000),
298           "packets/s");
299 }
300
301
302 /**
303  * Disconnect from cadet services af all peers, call shutdown.
304  *
305  * @param cls Closure (line number from which termination was requested).
306  * @param tc Task Context.
307  */
308 static void
309 disconnect_cadet_peers (void *cls)
310 {
311   long line = (long) cls;
312
313   disconnect_task = NULL;
314   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
315               "disconnecting cadet service of peers, called from line %ld\n",
316               line);
317   for (unsigned int i = 0; i < 2; i++)
318   {
319     GNUNET_TESTBED_operation_done (t_op[i]);
320   }
321   if (NULL != outgoing_ch)
322   {
323     GNUNET_CADET_channel_destroy (outgoing_ch);
324     outgoing_ch = NULL;
325   }
326   if (NULL != incoming_ch)
327   {
328     GNUNET_CADET_channel_destroy (incoming_ch);
329     incoming_ch = NULL;
330   }
331   GNUNET_CADET_TEST_cleanup (test_ctx);
332   GNUNET_SCHEDULER_shutdown ();
333 }
334
335
336 /**
337  * Shut down peergroup, clean up.
338  *
339  * @param cls Closure (unused).
340  * @param tc Task Context.
341  */
342 static void
343 shutdown_task (void *cls)
344 {
345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346               "Ending test.\n");
347   if (NULL != send_next_msg_task)
348   {
349     GNUNET_SCHEDULER_cancel (send_next_msg_task);
350     send_next_msg_task = NULL;
351   }
352   if (NULL != test_task)
353   {
354     GNUNET_SCHEDULER_cancel (test_task);
355     test_task = NULL;
356   }
357   if (NULL != disconnect_task)
358   {
359     GNUNET_SCHEDULER_cancel (disconnect_task);
360     disconnect_task =
361         GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
362                                   (void *) __LINE__);
363   }
364 }
365
366
367 /**
368  * Stats callback. Finish the stats testbed operation and when all stats have
369  * been iterated, shutdown the test.
370  *
371  * @param cls Closure (line number from which termination was requested).
372  * @param op the operation that has been finished
373  * @param emsg error message in case the operation has failed; will be NULL if
374  *          operation has executed successfully.
375  */
376 static void
377 stats_cont (void *cls,
378             struct GNUNET_TESTBED_Operation *op,
379             const char *emsg)
380 {
381   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
382               "KA sent: %u, KA received: %u\n",
383               ka_sent,
384               ka_received);
385   if ((KEEPALIVE == test || REOPEN == test) &&
386       ((ka_sent < 2) || (ka_sent > ka_received + 1)))
387   {
388     GNUNET_break (0);
389     ok--;
390   }
391   GNUNET_TESTBED_operation_done (stats_op);
392
393   if (NULL != disconnect_task)
394     GNUNET_SCHEDULER_cancel (disconnect_task);
395   disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
396                                               cls);
397 }
398
399
400 /**
401  * Process statistic values.
402  *
403  * @param cls closure (line number, unused)
404  * @param peer the peer the statistic belong to
405  * @param subsystem name of subsystem that created the statistic
406  * @param name the name of the datum
407  * @param value the current value
408  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
409  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
410  */
411 static int
412 stats_iterator (void *cls,
413                 const struct GNUNET_TESTBED_Peer *peer,
414                 const char *subsystem,
415                 const char *name,
416                 uint64_t value,
417                 int is_persistent)
418 {
419   static const char *s_sent = "# keepalives sent";
420   static const char *s_recv = "# keepalives received";
421   static const char *rdrops = "# messages dropped due to full buffer";
422   static const char *cdrops = "# messages dropped due to slow client";
423   uint32_t i;
424
425   i = GNUNET_TESTBED_get_index (peer);
426   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
427               subsystem, name, (unsigned long long) value);
428   if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
429     ka_sent = value;
430   if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
431     ka_received = value;
432   if (0 == strncmp (rdrops, name, strlen (rdrops)))
433     msg_dropped += value;
434   if (0 == strncmp (cdrops, name, strlen (cdrops)))
435     msg_dropped += value;
436
437   return GNUNET_OK;
438 }
439
440
441 /**
442  * Task to gather all statistics.
443  *
444  * @param cls Closure (line from which the task was scheduled).
445  */
446 static void
447 gather_stats_and_exit (void *cls)
448 {
449   long l = (long) cls;
450
451   disconnect_task = NULL;
452   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
453               "gathering statistics from line %ld\n",
454               l);
455   if (NULL != outgoing_ch)
456   {
457     GNUNET_CADET_channel_destroy (outgoing_ch);
458     outgoing_ch = NULL;
459   }
460   stats_op = GNUNET_TESTBED_get_statistics (peers_running,
461                                             testbed_peers,
462                                             "cadet",
463                                             NULL,
464                                             &stats_iterator,
465                                             stats_cont,
466                                             cls);
467 }
468
469
470 /**
471  * Send a message on the channel with the appropriate size and payload.
472  *
473  * Update the appropriate *_sent counter.
474  *
475  * @param channel Channel to send the message on.
476  */
477 static void
478 send_test_message (struct GNUNET_CADET_Channel *channel);
479
480 /**
481  * Check if payload is sane (size contains payload).
482  *
483  * @param cls should match #ch
484  * @param message The actual message.
485  * @return #GNUNET_OK to keep the channel open,
486  *         #GNUNET_SYSERR to close it (signal serious error).
487  */
488 static int
489 check_data (void *cls,
490             const struct GNUNET_MessageHeader *message);
491
492 /**
493  * Function is called whenever a message is received.
494  *
495  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
496  * @param message the actual message
497  */
498 static void
499 handle_data (void *cls,
500              const struct GNUNET_MessageHeader *message);
501
502 /**
503  * Function called whenever an MQ-channel is destroyed, unless the destruction
504  * was requested by #GNUNET_CADET_channel_destroy.
505  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
506  *
507  * It should clean up any associated state, including cancelling any pending
508  * transmission on this channel.
509  *
510  * @param cls Channel closure (channel wrapper).
511  * @param channel Connection to the other end (henceforth invalid).
512  */
513 static void
514 disconnect_handler (void *cls,
515                      const struct GNUNET_CADET_Channel *channel);
516
517
518 /**
519  * Task to reconnect to other peer.
520  *
521  * @param cls Closure (line from which the task was scheduled).
522  */
523 static void
524 reconnect_op (void *cls)
525 {
526   struct GNUNET_MQ_MessageHandler handlers[] = {
527     GNUNET_MQ_hd_var_size (data,
528                            GNUNET_MESSAGE_TYPE_DUMMY,
529                            struct GNUNET_MessageHeader,
530                            NULL),
531     GNUNET_MQ_handler_end ()
532   };
533   long l = (long) cls;
534   struct CadetTestChannelWrapper *ch;
535
536   reconnect_task = NULL;
537   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
538               "reconnecting from line %ld\n",
539               l);
540   if (NULL != outgoing_ch)
541   {
542     GNUNET_CADET_channel_destroy (outgoing_ch);
543     outgoing_ch = NULL;
544   }
545   ch = GNUNET_new (struct CadetTestChannelWrapper);
546   outgoing_ch = GNUNET_CADET_channel_create (h1,
547                                              ch,
548                                              p_id[1],
549                                              &port,
550                                              NULL,
551                                              &disconnect_handler,
552                                              handlers);
553   ch->ch = outgoing_ch;
554   send_test_message (outgoing_ch);
555 }
556
557 /**
558  * Function called whenever an MQ-channel is destroyed, unless the destruction
559  * was requested by #GNUNET_CADET_channel_destroy.
560  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
561  *
562  * It should clean up any associated state, including cancelling any pending
563  * transmission on this channel.
564  *
565  * @param cls Channel closure (channel wrapper).
566  * @param channel Connection to the other end (henceforth invalid).
567  */
568 static void
569 disconnect_handler (void *cls,
570                      const struct GNUNET_CADET_Channel *channel)
571 {
572   struct CadetTestChannelWrapper *ch_w = cls;
573
574   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
575               "Channel disconnected at %d\n",
576               ok);
577   GNUNET_assert (ch_w->ch == channel);
578   if (channel == incoming_ch)
579   {
580     ok++;
581     incoming_ch = NULL;
582   }
583   else if (outgoing_ch == channel)
584   {
585     if (P2P_SIGNAL == test)
586     {
587       ok++;
588     }
589     outgoing_ch = NULL;
590   }
591   else
592     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
593                 "Unknown channel! %p\n",
594                 channel);
595   if (NULL != disconnect_task && REOPEN != test)
596   {
597     GNUNET_SCHEDULER_cancel (disconnect_task);
598     disconnect_task =
599         GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
600                                   (void *) __LINE__);
601   }
602   else if (NULL != reconnect_task && REOPEN == test)
603   {
604     GNUNET_SCHEDULER_cancel (reconnect_task);
605     reconnect_task =
606         GNUNET_SCHEDULER_add_now (&reconnect_op,
607                                   (void *) __LINE__);
608   }
609   GNUNET_free (ch_w);
610 }
611
612
613 /**
614  * Abort test: schedule disconnect and shutdown immediately
615  *
616  * @param line Line in the code the abort is requested from (__LINE__).
617  */
618 static void
619 abort_test (long line)
620 {
621   if (NULL != disconnect_task)
622   {
623     GNUNET_SCHEDULER_cancel (disconnect_task);
624     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
625                 "Aborting test from %ld\n",
626                 line);
627     disconnect_task =
628         GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
629                                   (void *) line);
630   }
631 }
632
633
634 /**
635  * Send a message on the channel with the appropriate size and payload.
636  *
637  * Update the appropriate *_sent counter.
638  *
639  * @param channel Channel to send the message on.
640  */
641 static void
642 send_test_message (struct GNUNET_CADET_Channel *channel)
643 {
644   struct GNUNET_MQ_Envelope *env;
645   struct GNUNET_MessageHeader *msg;
646   uint32_t *data;
647   int payload;
648   int size;
649
650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651               "Sending test message on channel %p\n",
652               channel);
653   size = size_payload;
654   if (GNUNET_NO == initialized)
655   {
656     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
657     size += 1000;
658     payload = data_sent;
659     if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
660         data_sent++;
661   }
662   else if (SPEED == test || SPEED_ACK == test)
663   {
664     if (get_target_channel() == channel)
665     {
666       payload = ack_sent;
667       size += ack_sent;
668       ack_sent++;
669       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670                   "Sending ACK %u [%d bytes]\n",
671                   payload, size);
672     }
673     else
674     {
675       payload = data_sent;
676       size += data_sent;
677       data_sent++;
678       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679                   "Sending DATA %u [%d bytes]\n",
680                   data_sent, size);
681     }
682   }
683   else if (FORWARD == test)
684   {
685     payload = ack_sent;
686   }
687   else if (P2P_SIGNAL == test)
688   {
689     payload = data_sent;
690   }
691   else if (REOPEN == test)
692   {
693     payload = data_sent;
694     data_sent++;
695     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
696                 "Sending DATA %u [%d bytes]\n",
697                 data_sent, size);
698   }
699   else
700   {
701     GNUNET_assert (0);
702   }
703   env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
704
705   data = (uint32_t *) &msg[1];
706   *data = htonl (payload);
707   GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
708 }
709
710
711 /**
712  * Task to request a new data transmission in a SPEED test, without waiting
713  * for previous messages to be sent/arrrive.
714  *
715  * @param cls Closure (unused).
716  */
717 static void
718 send_next_msg (void *cls)
719 {
720   struct GNUNET_CADET_Channel *channel;
721
722   send_next_msg_task = NULL;
723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724               "Sending next message: %d\n",
725               data_sent);
726
727   channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
728   GNUNET_assert (NULL != channel);
729   GNUNET_assert (SPEED == test);
730   send_test_message (channel);
731   if (data_sent < total_packets)
732   {
733     /* SPEED test: Send all messages as soon as possible */
734     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735                 "Scheduling message %d\n",
736                 data_sent + 1);
737     send_next_msg_task =
738       GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
739                                       &send_next_msg,
740                                       NULL);
741   }
742 }
743
744
745 /**
746  * Every few messages cancel the timeout task and re-schedule it again, to
747  * avoid timing out when traffic keeps coming.
748  *
749  * @param line Code line number to log if a timeout occurs.
750  */
751 static void
752 reschedule_timeout_task (long line)
753 {
754   if ((ok % 10) == 0)
755   {
756     if (NULL != disconnect_task)
757     {
758       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759                   "reschedule timeout every 10 messages\n");
760       GNUNET_SCHEDULER_cancel (disconnect_task);
761       disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
762                                                       &gather_stats_and_exit,
763                                                       (void *) line);
764     }
765   }
766 }
767
768
769 /**
770  * Check if payload is sane (size contains payload).
771  *
772  * @param cls should match #ch
773  * @param message The actual message.
774  * @return #GNUNET_OK to keep the channel open,
775  *         #GNUNET_SYSERR to close it (signal serious error).
776  */
777 static int
778 check_data (void *cls,
779             const struct GNUNET_MessageHeader *message)
780 {
781   return GNUNET_OK;             /* all is well-formed */
782 }
783
784
785 /**
786  * Function is called whenever a message is received.
787  *
788  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
789  * @param message the actual message
790  */
791 static void
792 handle_data (void *cls,
793              const struct GNUNET_MessageHeader *message)
794 {
795   struct CadetTestChannelWrapper *ch = cls;
796   struct GNUNET_CADET_Channel *channel = ch->ch;
797   uint32_t *data;
798   uint32_t payload;
799   int *counter;
800
801   ok++;
802   GNUNET_CADET_receive_done (channel);
803   counter = get_target_channel () == channel ? &data_received : &ack_received;
804
805   reschedule_timeout_task ((long) __LINE__);
806
807   if (channel == outgoing_ch)
808   {
809     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
810                 "Root client got a message.\n");
811   }
812   else if (channel == incoming_ch)
813   {
814     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
815                 "Leaf client got a message.\n");
816   }
817   else
818   {
819     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
820                 "Unknown channel %p.\n",
821                 channel);
822     GNUNET_assert (0);
823   }
824
825   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
826               " ok: (%d/%d)\n",
827               ok,
828               ok_goal);
829   data = (uint32_t *) &message[1];
830   payload = ntohl (*data);
831   if (payload == *counter)
832   {
833     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
834                 " payload as expected: %u\n",
835                 payload);
836   }
837   else
838   {
839     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
840                 " payload %u, expected: %u\n",
841                 payload, *counter);
842   }
843
844   if (GNUNET_NO == initialized)
845   {
846     initialized = GNUNET_YES;
847     start_time = GNUNET_TIME_absolute_get ();
848     if (SPEED == test)
849     {
850       GNUNET_assert (incoming_ch == channel);
851       send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg,
852                                                      NULL);
853       return;
854     }
855   }
856
857   (*counter)++;
858   if (get_target_channel () == channel) /* Got "data" */
859   {
860     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received);
861     if (SPEED != test || (ok_goal - 2) == ok)
862     {
863       /* Send ACK */
864       send_test_message (channel);
865       return;
866     }
867     else
868     {
869       if (data_received < total_packets)
870         return;
871     }
872   }
873   else /* Got "ack" */
874   {
875     if (SPEED_ACK == test || SPEED == test)
876     {
877       GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
878       /* Send more data */
879       send_test_message (channel);
880       if (ack_received < total_packets && SPEED != test)
881         return;
882       if (ok == 2 && SPEED == test)
883         return;
884       show_end_data ();
885     }
886     if (test == P2P_SIGNAL)
887     {
888       GNUNET_CADET_channel_destroy (incoming_ch);
889       incoming_ch = NULL;
890     }
891     else
892     {
893       GNUNET_CADET_channel_destroy (outgoing_ch);
894       outgoing_ch = NULL;
895     }
896   }
897 }
898
899
900 /**
901  * Method called whenever a peer connects to a port in MQ-based CADET.
902  *
903  * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
904  * @param channel New handle to the channel.
905  * @param source Peer that started this channel.
906  * @return Closure for the incoming @a channel. It's given to:
907  *         - The #GNUNET_CADET_DisconnectEventHandler (given to
908  *           #GNUNET_CADET_open_port) when the channel dies.
909  *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
910  *           received on the @a channel.
911  */
912 static void *
913 connect_handler (void *cls,
914                  struct GNUNET_CADET_Channel *channel,
915                  const struct GNUNET_PeerIdentity *source)
916 {
917   struct CadetTestChannelWrapper *ch;
918   long peer = (long) cls;
919
920   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
921               "Incoming channel from %s to %ld: %p\n",
922               GNUNET_i2s (source),
923               peer,
924               channel);
925   ok++;
926   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
927               " ok: %d\n",
928               ok);
929   if (peer == peers_requested - 1)
930   {
931     if (NULL != incoming_ch)
932     {
933       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
934                   "Duplicate incoming channel for client %lu\n",
935                   (long) cls);
936       GNUNET_assert (0);
937     }
938     incoming_ch = channel;
939   }
940   else
941   {
942     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
943                 "Incoming channel for unexpected peer #%lu\n",
944                 (long) cls);
945     GNUNET_assert (0);
946   }
947   if (NULL != disconnect_task && REOPEN != test)
948   {
949     GNUNET_SCHEDULER_cancel (disconnect_task);
950     disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
951                                                     &gather_stats_and_exit,
952                                                     (void *) __LINE__);
953   }
954   else if ((NULL != disconnect_task) && (REOPEN == test))
955   {
956     GNUNET_SCHEDULER_cancel (disconnect_task);
957     disconnect_task = GNUNET_SCHEDULER_add_delayed (
958         GNUNET_TIME_relative_multiply (short_time, 2),
959         &gather_stats_and_exit,
960         (void *) __LINE__);
961   }
962
963   if ((NULL != reconnect_task) && (REOPEN == test))
964   {
965     GNUNET_SCHEDULER_cancel (reconnect_task);
966     reconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
967                                                    &reconnect_op,
968                                                    (void *) __LINE__);
969   }
970
971   /* TODO: cannot return channel as-is, in order to unify the data handlers */
972   ch = GNUNET_new (struct CadetTestChannelWrapper);
973   ch->ch = channel;
974
975   return ch;
976 }
977
978
979 /**
980  * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
981  *
982  * Testcase continues when the root receives confirmation of connected peers,
983  * on callback function ch.
984  *
985  * @param cls Closure (unused).
986  */
987 static void
988 start_test (void *cls)
989 {
990   struct GNUNET_MQ_MessageHandler handlers[] = {
991     GNUNET_MQ_hd_var_size (data,
992                            GNUNET_MESSAGE_TYPE_DUMMY,
993                            struct GNUNET_MessageHeader,
994                            NULL),
995     GNUNET_MQ_handler_end ()
996   };
997   struct CadetTestChannelWrapper *ch;
998
999   test_task = NULL;
1000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test: %s\n", test_name);
1001   if (NULL != disconnect_task)
1002   {
1003     GNUNET_SCHEDULER_cancel (disconnect_task);
1004     disconnect_task = NULL;
1005   }
1006
1007   if (SPEED_REL == test)
1008   {
1009     test = SPEED;
1010   }
1011
1012   ch = GNUNET_new (struct CadetTestChannelWrapper);
1013   outgoing_ch = GNUNET_CADET_channel_create (h1,
1014                                              ch,
1015                                              p_id[1],
1016                                              &port,
1017                                              NULL,
1018                                              &disconnect_handler,
1019                                              handlers);
1020
1021   ch->ch = outgoing_ch;
1022
1023   disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
1024                                                   &gather_stats_and_exit,
1025                                                   (void *) __LINE__);
1026   if (KEEPALIVE == test)
1027     return;                     /* Don't send any data. */
1028
1029   data_received = 0;
1030   data_sent = 0;
1031   ack_received = 0;
1032   ack_sent = 0;
1033   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034               "Sending data initializer on channel %p...\n",
1035               outgoing_ch);
1036   send_test_message (outgoing_ch);
1037   if (REOPEN == test)
1038   {
1039     reconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
1040                                                    &reconnect_op,
1041                                                    (void *) __LINE__);
1042     GNUNET_SCHEDULER_cancel (disconnect_task);
1043     disconnect_task = GNUNET_SCHEDULER_add_delayed (
1044         GNUNET_TIME_relative_multiply (short_time, 2),
1045         &gather_stats_and_exit,
1046         (void *) __LINE__);
1047   }
1048
1049 }
1050
1051
1052 /**
1053  * Callback to be called when the requested peer information is available
1054  *
1055  * @param cls the closure from GNUNET_TESTBED_peer_get_information()
1056  * @param op the operation this callback corresponds to
1057  * @param pinfo the result; will be NULL if the operation has failed
1058  * @param emsg error message if the operation has failed;
1059  *             NULL if the operation is successfull
1060  */
1061 static void
1062 pi_cb (void *cls,
1063        struct GNUNET_TESTBED_Operation *op,
1064        const struct GNUNET_TESTBED_PeerInformation *pinfo,
1065        const char *emsg)
1066 {
1067   long i = (long) cls;
1068
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070               "ID callback for %ld\n",
1071               i);
1072   if ( (NULL == pinfo) ||
1073        (NULL != emsg) )
1074   {
1075     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1076                 "pi_cb: %s\n",
1077                 emsg);
1078     abort_test (__LINE__);
1079     return;
1080   }
1081   p_id[i] = pinfo->result.id;
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083               "id: %s\n",
1084               GNUNET_i2s (p_id[i]));
1085   p_ids++;
1086   if (p_ids < 2)
1087     return;
1088   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1089               "Got all IDs, starting test\n");
1090   test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL);
1091 }
1092
1093
1094 /**
1095  * test main: start test when all peers are connected
1096  *
1097  * @param cls Closure.
1098  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
1099  * @param num_peers Number of peers that are running.
1100  * @param peers Array of peers.
1101  * @param cadets Handle to each of the CADETs of the peers.
1102  */
1103 static void
1104 tmain (void *cls,
1105        struct GNUNET_CADET_TEST_Context *ctx,
1106        unsigned int num_peers,
1107        struct GNUNET_TESTBED_Peer **peers,
1108        struct GNUNET_CADET_Handle **cadets)
1109 {
1110   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n");
1111   ok = 0;
1112   test_ctx = ctx;
1113   peers_running = num_peers;
1114   GNUNET_assert (peers_running == peers_requested);
1115   testbed_peers = peers;
1116   h1 = cadets[0];
1117   h2 = cadets[num_peers - 1];
1118   disconnect_task = GNUNET_SCHEDULER_add_delayed (short_time,
1119                                                   &disconnect_cadet_peers,
1120                                                   (void *) __LINE__);
1121   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1122                                  NULL);
1123   t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
1124                                                  GNUNET_TESTBED_PIT_IDENTITY,
1125                                                  &pi_cb,
1126                                                  (void *) 0L);
1127   t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
1128                                                  GNUNET_TESTBED_PIT_IDENTITY,
1129                                                  &pi_cb,
1130                                                  (void *) 1L);
1131   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
1132 }
1133
1134
1135 /**
1136  * Main: start test
1137  */
1138 int
1139 main (int argc, char *argv[])
1140 {
1141   static const struct GNUNET_HashCode *ports[2];
1142   struct GNUNET_MQ_MessageHandler handlers[] = {
1143     GNUNET_MQ_hd_var_size (data,
1144                            GNUNET_MESSAGE_TYPE_DUMMY,
1145                            struct GNUNET_MessageHeader,
1146                            NULL),
1147     GNUNET_MQ_handler_end ()
1148   };
1149   const char *config_file;
1150   char port_id[] = "test port";
1151   struct GNUNET_GETOPT_CommandLineOption options[] = {
1152     GNUNET_GETOPT_option_relative_time ('t',
1153                                         "time",
1154                                         "short_time",
1155                                         gettext_noop ("set short timeout"),
1156                                         &short_time),
1157     GNUNET_GETOPT_option_uint ('m',
1158                                "messages",
1159                                "NUM_MESSAGES",
1160                                gettext_noop ("set number of messages to send"),
1161                                &total_packets),
1162
1163     GNUNET_GETOPT_OPTION_END
1164   };
1165
1166
1167   initialized = GNUNET_NO;
1168   GNUNET_log_setup ("test", "DEBUG", NULL);
1169
1170   total_packets = TOTAL_PACKETS;
1171   short_time = SHORT_TIME;
1172   if (-1 == GNUNET_GETOPT_run (argv[0], options, argc, argv))
1173   {
1174     FPRINTF (stderr, "test failed: problem with CLI parameters\n");
1175     exit (1);
1176   }
1177
1178   config_file = "test_cadet.conf";
1179   GNUNET_CRYPTO_hash (port_id, sizeof (port_id), &port);
1180
1181   /* Find out requested size */
1182   if (strstr (argv[0], "_2_") != NULL)
1183   {
1184     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DIRECT CONNECTIONs\n");
1185     peers_requested = 2;
1186   }
1187   else if (strstr (argv[0], "_5_") != NULL)
1188   {
1189     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "5 PEER LINE\n");
1190     peers_requested = 5;
1191   }
1192   else if (strstr (argv[0], "_6_") != NULL)
1193   {
1194     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "6 PEER LINE\n");
1195     peers_requested = 6;
1196   }
1197   else
1198   {
1199     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "SIZE UNKNOWN, USING 2\n");
1200     peers_requested = 2;
1201   }
1202
1203   /* Find out requested test */
1204   if (strstr (argv[0], "_forward") != NULL)
1205   {
1206     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "FORWARD\n");
1207     test = FORWARD;
1208     test_name = "unicast";
1209     ok_goal = 4;
1210   }
1211   else if (strstr (argv[0], "_signal") != NULL)
1212   {
1213     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SIGNAL\n");
1214     test = P2P_SIGNAL;
1215     test_name = "signal";
1216     ok_goal = 4;
1217   }
1218   else if (strstr (argv[0], "_speed_ack") != NULL)
1219   {
1220     /* Test is supposed to generate the following callbacks:
1221      * 1 incoming channel (@dest)
1222      * total_packets received data packet (@dest)
1223      * total_packets received data packet (@orig)
1224      * 1 received channel destroy (@dest) FIXME #5818
1225      */
1226     ok_goal = total_packets * 2 + 2;
1227     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED_ACK\n");
1228     test = SPEED_ACK;
1229     test_name = "speed ack";
1230   }
1231   else if (strstr (argv[0], "_speed") != NULL)
1232   {
1233     /* Test is supposed to generate the following callbacks:
1234      * 1 incoming channel (@dest)
1235      * 1 initial packet (@dest)
1236      * total_packets received data packet (@dest)
1237      * 1 received data packet (@orig)
1238      * 1 received channel destroy (@dest)  FIXME #5818
1239      */
1240     ok_goal = total_packets + 4;
1241     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SPEED\n");
1242     if (strstr (argv[0], "_reliable") != NULL)
1243     {
1244       test = SPEED_REL;
1245       test_name = "speed reliable";
1246       config_file = "test_cadet_drop.conf";
1247     }
1248     else
1249     {
1250       test = SPEED;
1251       test_name = "speed";
1252     }
1253   }
1254   else if (strstr (argv[0], "_keepalive") != NULL)
1255   {
1256     test = KEEPALIVE;
1257     test_name = "keepalive";
1258     /* Test is supposed to generate the following callbacks:
1259      * 1 incoming channel (@dest)
1260      * [wait]
1261      * 1 received channel destroy (@dest)  FIXME #5818
1262      */
1263     ok_goal = 1;
1264   }
1265   else if (strstr (argv[0], "_reopen") != NULL)
1266   {
1267     test = REOPEN;
1268     test_name = "reopen";
1269     ///* Test is supposed to generate the following callbacks:
1270     // * 1 incoming channel (@dest)
1271     // * [wait]
1272     // * 1 received channel destroy (@dest)  FIXME #5818
1273     // */
1274     ok_goal = 6;
1275   }
1276   else
1277   {
1278     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "UNKNOWN\n");
1279     test = SETUP;
1280     ok_goal = 0;
1281   }
1282
1283   if (strstr (argv[0], "backwards") != NULL)
1284   {
1285     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "BACKWARDS (LEAF TO ROOT)\n");
1286     test_backwards = GNUNET_YES;
1287     GNUNET_asprintf (&test_name, "backwards %s", test_name);
1288   }
1289
1290   p_ids = 0;
1291   ports[0] = &port;
1292   ports[1] = NULL;
1293   GNUNET_CADET_TEST_ruN ("test_cadet_small",
1294                          config_file,
1295                          peers_requested,
1296                          &tmain,
1297                          NULL,        /* tmain cls */
1298                          &connect_handler,
1299                          NULL,
1300                          &disconnect_handler,
1301                          handlers,
1302                          ports);
1303   if (NULL != strstr (argv[0], "_reliable"))
1304     msg_dropped = 0;            /* dropped should be retransmitted */
1305
1306   if (ok_goal > ok - msg_dropped)
1307   {
1308     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal);
1309     return 1;
1310   }
1311   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n");
1312   return 0;
1313 }
1314
1315 /* end of test_cadet.c */