uncrustify as demanded.
[oweals/gnunet.git] / src / cadet / test_cadet.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file cadet/test_cadet.c
22  * @author Bart Polot
23  * @author Christian Grothoff
24  * @brief Test for the cadet service using mq API.
25  */
26 #include <stdio.h>
27 #include "platform.h"
28 #include "cadet_test_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_statistics_service.h"
31 #include <gauger.h>
32
33
34 /**
35  * Ugly workaround to unify data handlers on incoming and outgoing channels.
36  */
37 struct CadetTestChannelWrapper {
38   /**
39    * Channel pointer.
40    */
41   struct GNUNET_CADET_Channel *ch;
42 };
43
44 /**
45  * How many messages to send by default.
46  */
47 #define TOTAL_PACKETS 500       /* Cannot exceed 64k! */
48
49 /**
50  * How long until we give up on connecting the peers?
51  */
52 #define TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 120)
53
54 /**
55  * Time to wait by default  for stuff that should be rather fast.
56  */
57 #define SHORT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20)
58
59 /**
60  * How fast do we send messages?
61  */
62 #define SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 10)
63
64 /**
65  * DIFFERENT TESTS TO RUN
66  */
67 #define SETUP 0
68 #define FORWARD 1
69 #define KEEPALIVE 2
70 #define SPEED 3
71 #define SPEED_ACK 4
72 #define SPEED_REL 8
73 #define P2P_SIGNAL 10
74 #define REOPEN 11
75
76 /**
77  * Which test are we running?
78  */
79 static int test;
80
81 /**
82  * String with test name
83  */
84 static char *test_name;
85
86 /**
87  * Flag to send traffic leaf->root in speed tests to test BCK_ACK logic.
88  */
89 static int test_backwards = GNUNET_NO;
90
91 /**
92  * How many packets to send.
93  */
94 static unsigned int total_packets;
95
96 /**
97  * Time to wait for fast operations.
98  */
99 static struct GNUNET_TIME_Relative short_time;
100
101 /**
102  * How many events have happened
103  */
104 static int ok;
105
106 /**
107  * Number of events expected to conclude the test successfully.
108  */
109 static int ok_goal;
110
111 /**
112  * Size of each test packet's payload
113  */
114 static size_t size_payload = sizeof(uint32_t);
115
116 /**
117  * Operation to get peer ids.
118  */
119 static struct GNUNET_TESTBED_Operation *t_op[2];
120
121 /**
122  * Peer ids.
123  */
124 static struct GNUNET_PeerIdentity *p_id[2];
125
126 /**
127  * Port ID
128  */
129 static struct GNUNET_HashCode port;
130
131 /**
132  * Peer ids counter.
133  */
134 static unsigned int p_ids;
135
136 /**
137  * Is the setup initialized?
138  */
139 static int initialized;
140
141 /**
142  * Number of payload packes sent.
143  */
144 static int data_sent;
145
146 /**
147  * Number of payload packets received.
148  */
149 static int data_received;
150
151 /**
152  * Number of payload packed acknowledgements sent.
153  */
154 static int ack_sent;
155
156 /**
157  * Number of payload packed explicitly (app level) acknowledged.
158  */
159 static int ack_received;
160
161 /**
162  * Total number of peers asked to run.
163  */
164 static unsigned long long peers_requested;
165
166 /**
167  * Number of currently running peers (should be same as @c peers_requested).
168  */
169 static unsigned long long peers_running;
170
171 /**
172  * Test context (to shut down).
173  */
174 struct GNUNET_CADET_TEST_Context *test_ctx;
175
176 /**
177  * Task called to disconnect peers.
178  */
179 static struct GNUNET_SCHEDULER_Task *disconnect_task;
180
181 /**
182  * Task called to reconnect peers.
183  */
184 static struct GNUNET_SCHEDULER_Task *reconnect_task;
185
186 /**
187  * Task To perform tests
188  */
189 static struct GNUNET_SCHEDULER_Task *test_task;
190
191 /**
192  * Task runnining #send_next_msg().
193  */
194 static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
195
196 /**
197  * Cadet handle for the root peer
198  */
199 static struct GNUNET_CADET_Handle *h1;
200
201 /**
202  * Cadet handle for the first leaf peer
203  */
204 static struct GNUNET_CADET_Handle *h2;
205
206 /**
207  * Channel handle for the root peer
208  */
209 static struct GNUNET_CADET_Channel *outgoing_ch;
210
211 /**
212  * Channel handle for the dest peer
213  */
214 static struct GNUNET_CADET_Channel *incoming_ch;
215
216 /**
217  * Time we started the data transmission (after channel has been established
218  * and initilized).
219  */
220 static struct GNUNET_TIME_Absolute start_time;
221
222 /**
223  * Peers handle.
224  */
225 static struct GNUNET_TESTBED_Peer **testbed_peers;
226
227 /**
228  * Statistics operation handle.
229  */
230 static struct GNUNET_TESTBED_Operation *stats_op;
231
232 /**
233  * Keepalives sent.
234  */
235 static unsigned int ka_sent;
236
237 /**
238  * Keepalives received.
239  */
240 static unsigned int ka_received;
241
242 /**
243  * How many messages were dropped by CADET because of full buffers?
244  */
245 static unsigned int msg_dropped;
246
247
248 /******************************************************************************/
249
250
251 /******************************************************************************/
252
253
254 /**
255  * Get the channel considered as the "target" or "receiver", depending on
256  * the test type and size.
257  *
258  * @return Channel handle of the target client, either 0 (for backward tests)
259  *         or the last peer in the line (for other tests).
260  */
261 static struct GNUNET_CADET_Channel *
262 get_target_channel()
263 {
264   if (SPEED == test && GNUNET_YES == test_backwards)
265     return outgoing_ch;
266   else
267     return incoming_ch;
268 }
269
270
271 /**
272  * Show the results of the test (banwidth acheived) and log them to GAUGER
273  */
274 static void
275 show_end_data(void)
276 {
277   static struct GNUNET_TIME_Absolute end_time;
278   static struct GNUNET_TIME_Relative total_time;
279
280   end_time = GNUNET_TIME_absolute_get();
281   total_time = GNUNET_TIME_absolute_get_difference(start_time, end_time);
282   fprintf(stderr,
283           "\nResults of test \"%s\"\n",
284           test_name);
285   fprintf(stderr,
286           "Test time %s\n",
287           GNUNET_STRINGS_relative_time_to_string(total_time, GNUNET_YES));
288   fprintf(stderr,
289           "Test bandwidth: %f kb/s\n",
290           4 * total_packets * 1.0 / (total_time.rel_value_us / 1000));     // 4bytes * ms
291   fprintf(stderr,
292           "Test throughput: %f packets/s\n\n",
293           total_packets * 1000.0 / (total_time.rel_value_us / 1000));      // packets * ms
294   GAUGER("CADET",
295          test_name,
296          total_packets * 1000.0 / (total_time.rel_value_us / 1000),
297          "packets/s");
298 }
299
300
301 /**
302  * Disconnect from cadet services af all peers, call shutdown.
303  *
304  * @param cls Closure (line number from which termination was requested).
305  * @param tc Task Context.
306  */
307 static void
308 disconnect_cadet_peers(void *cls)
309 {
310   long line = (long)cls;
311
312   disconnect_task = NULL;
313   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
314              "disconnecting cadet service of peers, called from line %ld\n",
315              line);
316   for (unsigned int i = 0; i < 2; i++)
317     {
318       GNUNET_TESTBED_operation_done(t_op[i]);
319     }
320   if (NULL != outgoing_ch)
321     {
322       GNUNET_CADET_channel_destroy(outgoing_ch);
323       outgoing_ch = NULL;
324     }
325   if (NULL != incoming_ch)
326     {
327       GNUNET_CADET_channel_destroy(incoming_ch);
328       incoming_ch = NULL;
329     }
330   GNUNET_CADET_TEST_cleanup(test_ctx);
331   GNUNET_SCHEDULER_shutdown();
332 }
333
334
335 /**
336  * Shut down peergroup, clean up.
337  *
338  * @param cls Closure (unused).
339  * @param tc Task Context.
340  */
341 static void
342 shutdown_task(void *cls)
343 {
344   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
345              "Ending test.\n");
346   if (NULL != send_next_msg_task)
347     {
348       GNUNET_SCHEDULER_cancel(send_next_msg_task);
349       send_next_msg_task = NULL;
350     }
351   if (NULL != test_task)
352     {
353       GNUNET_SCHEDULER_cancel(test_task);
354       test_task = NULL;
355     }
356   if (NULL != disconnect_task)
357     {
358       GNUNET_SCHEDULER_cancel(disconnect_task);
359       disconnect_task =
360         GNUNET_SCHEDULER_add_now(&disconnect_cadet_peers,
361                                  (void *)__LINE__);
362     }
363 }
364
365
366 /**
367  * Stats callback. Finish the stats testbed operation and when all stats have
368  * been iterated, shutdown the test.
369  *
370  * @param cls Closure (line number from which termination was requested).
371  * @param op the operation that has been finished
372  * @param emsg error message in case the operation has failed; will be NULL if
373  *          operation has executed successfully.
374  */
375 static void
376 stats_cont(void *cls,
377            struct GNUNET_TESTBED_Operation *op,
378            const char *emsg)
379 {
380   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
381              "KA sent: %u, KA received: %u\n",
382              ka_sent,
383              ka_received);
384   if ((KEEPALIVE == test || REOPEN == test) &&
385       ((ka_sent < 2) || (ka_sent > ka_received + 1)))
386     {
387       GNUNET_break(0);
388       ok--;
389     }
390   GNUNET_TESTBED_operation_done(stats_op);
391
392   if (NULL != disconnect_task)
393     GNUNET_SCHEDULER_cancel(disconnect_task);
394   disconnect_task = GNUNET_SCHEDULER_add_now(&disconnect_cadet_peers,
395                                              cls);
396 }
397
398
399 /**
400  * Process statistic values.
401  *
402  * @param cls closure (line number, unused)
403  * @param peer the peer the statistic belong to
404  * @param subsystem name of subsystem that created the statistic
405  * @param name the name of the datum
406  * @param value the current value
407  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
408  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
409  */
410 static int
411 stats_iterator(void *cls,
412                const struct GNUNET_TESTBED_Peer *peer,
413                const char *subsystem,
414                const char *name,
415                uint64_t value,
416                int is_persistent)
417 {
418   static const char *s_sent = "# keepalives sent";
419   static const char *s_recv = "# keepalives received";
420   static const char *rdrops = "# messages dropped due to full buffer";
421   static const char *cdrops = "# messages dropped due to slow client";
422   uint32_t i;
423
424   i = GNUNET_TESTBED_get_index(peer);
425   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
426              subsystem, name, (unsigned long long)value);
427   if (0 == strncmp(s_sent, name, strlen(s_sent)) && 0 == i)
428     ka_sent = value;
429   if (0 == strncmp(s_recv, name, strlen(s_recv)) && peers_requested - 1 == i)
430     ka_received = value;
431   if (0 == strncmp(rdrops, name, strlen(rdrops)))
432     msg_dropped += value;
433   if (0 == strncmp(cdrops, name, strlen(cdrops)))
434     msg_dropped += value;
435
436   return GNUNET_OK;
437 }
438
439
440 /**
441  * Task to gather all statistics.
442  *
443  * @param cls Closure (line from which the task was scheduled).
444  */
445 static void
446 gather_stats_and_exit(void *cls)
447 {
448   long l = (long)cls;
449
450   disconnect_task = NULL;
451   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
452              "gathering statistics from line %ld\n",
453              l);
454   if (NULL != outgoing_ch)
455     {
456       GNUNET_CADET_channel_destroy(outgoing_ch);
457       outgoing_ch = NULL;
458     }
459   stats_op = GNUNET_TESTBED_get_statistics(peers_running,
460                                            testbed_peers,
461                                            "cadet",
462                                            NULL,
463                                            &stats_iterator,
464                                            stats_cont,
465                                            cls);
466 }
467
468
469 /**
470  * Send a message on the channel with the appropriate size and payload.
471  *
472  * Update the appropriate *_sent counter.
473  *
474  * @param channel Channel to send the message on.
475  */
476 static void
477 send_test_message(struct GNUNET_CADET_Channel *channel);
478
479 /**
480  * Check if payload is sane (size contains payload).
481  *
482  * @param cls should match #ch
483  * @param message The actual message.
484  * @return #GNUNET_OK to keep the channel open,
485  *         #GNUNET_SYSERR to close it (signal serious error).
486  */
487 static int
488 check_data(void *cls,
489            const struct GNUNET_MessageHeader *message);
490
491 /**
492  * Function is called whenever a message is received.
493  *
494  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
495  * @param message the actual message
496  */
497 static void
498 handle_data(void *cls,
499             const struct GNUNET_MessageHeader *message);
500
501 /**
502  * Function called whenever an MQ-channel is destroyed, unless the destruction
503  * was requested by #GNUNET_CADET_channel_destroy.
504  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
505  *
506  * It should clean up any associated state, including cancelling any pending
507  * transmission on this channel.
508  *
509  * @param cls Channel closure (channel wrapper).
510  * @param channel Connection to the other end (henceforth invalid).
511  */
512 static void
513 disconnect_handler(void *cls,
514                    const struct GNUNET_CADET_Channel *channel);
515
516
517 /**
518  * Task to reconnect to other peer.
519  *
520  * @param cls Closure (line from which the task was scheduled).
521  */
522 static void
523 reconnect_op(void *cls)
524 {
525   struct GNUNET_MQ_MessageHandler handlers[] = {
526     GNUNET_MQ_hd_var_size(data,
527                           GNUNET_MESSAGE_TYPE_DUMMY,
528                           struct GNUNET_MessageHeader,
529                           NULL),
530     GNUNET_MQ_handler_end()
531   };
532   long l = (long)cls;
533   struct CadetTestChannelWrapper *ch;
534
535   reconnect_task = NULL;
536   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
537              "reconnecting from line %ld\n",
538              l);
539   if (NULL != outgoing_ch)
540     {
541       GNUNET_CADET_channel_destroy(outgoing_ch);
542       outgoing_ch = NULL;
543     }
544   ch = GNUNET_new(struct CadetTestChannelWrapper);
545   outgoing_ch = GNUNET_CADET_channel_create(h1,
546                                             ch,
547                                             p_id[1],
548                                             &port,
549                                             NULL,
550                                             &disconnect_handler,
551                                             handlers);
552   ch->ch = outgoing_ch;
553   send_test_message(outgoing_ch);
554 }
555
556 /**
557  * Function called whenever an MQ-channel is destroyed, unless the destruction
558  * was requested by #GNUNET_CADET_channel_destroy.
559  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
560  *
561  * It should clean up any associated state, including cancelling any pending
562  * transmission on this channel.
563  *
564  * @param cls Channel closure (channel wrapper).
565  * @param channel Connection to the other end (henceforth invalid).
566  */
567 static void
568 disconnect_handler(void *cls,
569                    const struct GNUNET_CADET_Channel *channel)
570 {
571   struct CadetTestChannelWrapper *ch_w = cls;
572
573   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
574              "Channel disconnected at %d\n",
575              ok);
576   GNUNET_assert(ch_w->ch == channel);
577   if (channel == incoming_ch)
578     {
579       ok++;
580       incoming_ch = NULL;
581     }
582   else if (outgoing_ch == channel)
583     {
584       if (P2P_SIGNAL == test)
585         {
586           ok++;
587         }
588       outgoing_ch = NULL;
589     }
590   else
591     GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
592                "Unknown channel! %p\n",
593                channel);
594   if (NULL != disconnect_task && REOPEN != test)
595     {
596       GNUNET_SCHEDULER_cancel(disconnect_task);
597       disconnect_task =
598         GNUNET_SCHEDULER_add_now(&gather_stats_and_exit,
599                                  (void *)__LINE__);
600     }
601   else if (NULL != reconnect_task && REOPEN == test)
602     {
603       GNUNET_SCHEDULER_cancel(reconnect_task);
604       reconnect_task =
605         GNUNET_SCHEDULER_add_now(&reconnect_op,
606                                  (void *)__LINE__);
607     }
608   GNUNET_free(ch_w);
609 }
610
611
612 /**
613  * Abort test: schedule disconnect and shutdown immediately
614  *
615  * @param line Line in the code the abort is requested from (__LINE__).
616  */
617 static void
618 abort_test(long line)
619 {
620   if (NULL != disconnect_task)
621     {
622       GNUNET_SCHEDULER_cancel(disconnect_task);
623       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
624                  "Aborting test from %ld\n",
625                  line);
626       disconnect_task =
627         GNUNET_SCHEDULER_add_now(&disconnect_cadet_peers,
628                                  (void *)line);
629     }
630 }
631
632
633 /**
634  * Send a message on the channel with the appropriate size and payload.
635  *
636  * Update the appropriate *_sent counter.
637  *
638  * @param channel Channel to send the message on.
639  */
640 static void
641 send_test_message(struct GNUNET_CADET_Channel *channel)
642 {
643   struct GNUNET_MQ_Envelope *env;
644   struct GNUNET_MessageHeader *msg;
645   uint32_t *data;
646   int payload;
647   int size;
648
649   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
650              "Sending test message on channel %p\n",
651              channel);
652   size = size_payload;
653   if (GNUNET_NO == initialized)
654     {
655       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
656       size += 1000;
657       payload = data_sent;
658       if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
659         data_sent++;
660     }
661   else if (SPEED == test || SPEED_ACK == test)
662     {
663       if (get_target_channel() == channel)
664         {
665           payload = ack_sent;
666           size += ack_sent;
667           ack_sent++;
668           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
669                      "Sending ACK %u [%d bytes]\n",
670                      payload, size);
671         }
672       else
673         {
674           payload = data_sent;
675           size += data_sent;
676           data_sent++;
677           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
678                      "Sending DATA %u [%d bytes]\n",
679                      data_sent, size);
680         }
681     }
682   else if (FORWARD == test)
683     {
684       payload = ack_sent;
685     }
686   else if (P2P_SIGNAL == test)
687     {
688       payload = data_sent;
689     }
690   else if (REOPEN == test)
691     {
692       payload = data_sent;
693       data_sent++;
694       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
695                  "Sending DATA %u [%d bytes]\n",
696                  data_sent, size);
697     }
698   else
699     {
700       GNUNET_assert(0);
701     }
702   env = GNUNET_MQ_msg_extra(msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
703
704   data = (uint32_t *)&msg[1];
705   *data = htonl(payload);
706   GNUNET_MQ_send(GNUNET_CADET_get_mq(channel), env);
707 }
708
709
710 /**
711  * Task to request a new data transmission in a SPEED test, without waiting
712  * for previous messages to be sent/arrrive.
713  *
714  * @param cls Closure (unused).
715  */
716 static void
717 send_next_msg(void *cls)
718 {
719   struct GNUNET_CADET_Channel *channel;
720
721   send_next_msg_task = NULL;
722   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
723              "Sending next message: %d\n",
724              data_sent);
725
726   channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
727   GNUNET_assert(NULL != channel);
728   GNUNET_assert(SPEED == test);
729   send_test_message(channel);
730   if (data_sent < total_packets)
731     {
732       /* SPEED test: Send all messages as soon as possible */
733       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
734                  "Scheduling message %d\n",
735                  data_sent + 1);
736       send_next_msg_task =
737         GNUNET_SCHEDULER_add_delayed(SEND_INTERVAL,
738                                      &send_next_msg,
739                                      NULL);
740     }
741 }
742
743
744 /**
745  * Every few messages cancel the timeout task and re-schedule it again, to
746  * avoid timing out when traffic keeps coming.
747  *
748  * @param line Code line number to log if a timeout occurs.
749  */
750 static void
751 reschedule_timeout_task(long line)
752 {
753   if ((ok % 10) == 0)
754     {
755       if (NULL != disconnect_task)
756         {
757           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
758                      "reschedule timeout every 10 messages\n");
759           GNUNET_SCHEDULER_cancel(disconnect_task);
760           disconnect_task = GNUNET_SCHEDULER_add_delayed(short_time,
761                                                          &gather_stats_and_exit,
762                                                          (void *)line);
763         }
764     }
765 }
766
767
768 /**
769  * Check if payload is sane (size contains payload).
770  *
771  * @param cls should match #ch
772  * @param message The actual message.
773  * @return #GNUNET_OK to keep the channel open,
774  *         #GNUNET_SYSERR to close it (signal serious error).
775  */
776 static int
777 check_data(void *cls,
778            const struct GNUNET_MessageHeader *message)
779 {
780   return GNUNET_OK;             /* all is well-formed */
781 }
782
783
784 /**
785  * Function is called whenever a message is received.
786  *
787  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
788  * @param message the actual message
789  */
790 static void
791 handle_data(void *cls,
792             const struct GNUNET_MessageHeader *message)
793 {
794   struct CadetTestChannelWrapper *ch = cls;
795   struct GNUNET_CADET_Channel *channel = ch->ch;
796   uint32_t *data;
797   uint32_t payload;
798   int *counter;
799
800   ok++;
801   GNUNET_CADET_receive_done(channel);
802   counter = get_target_channel() == channel ? &data_received : &ack_received;
803
804   reschedule_timeout_task((long)__LINE__);
805
806   if (channel == outgoing_ch)
807     {
808       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
809                  "Root client got a message.\n");
810     }
811   else if (channel == incoming_ch)
812     {
813       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
814                  "Leaf client got a message.\n");
815     }
816   else
817     {
818       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
819                  "Unknown channel %p.\n",
820                  channel);
821       GNUNET_assert(0);
822     }
823
824   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
825              " ok: (%d/%d)\n",
826              ok,
827              ok_goal);
828   data = (uint32_t *)&message[1];
829   payload = ntohl(*data);
830   if (payload == *counter)
831     {
832       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
833                  " payload as expected: %u\n",
834                  payload);
835     }
836   else
837     {
838       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
839                  " payload %u, expected: %u\n",
840                  payload, *counter);
841     }
842
843   if (GNUNET_NO == initialized)
844     {
845       initialized = GNUNET_YES;
846       start_time = GNUNET_TIME_absolute_get();
847       if (SPEED == test)
848         {
849           GNUNET_assert(incoming_ch == channel);
850           send_next_msg_task = GNUNET_SCHEDULER_add_now(&send_next_msg,
851                                                         NULL);
852           return;
853         }
854     }
855
856   (*counter)++;
857   if (get_target_channel() == channel)  /* Got "data" */
858     {
859       GNUNET_log(GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received);
860       if (SPEED != test || (ok_goal - 2) == ok)
861         {
862           /* Send ACK */
863           send_test_message(channel);
864           return;
865         }
866       else
867         {
868           if (data_received < total_packets)
869             return;
870         }
871     }
872   else /* Got "ack" */
873     {
874       if (SPEED_ACK == test || SPEED == test)
875         {
876           GNUNET_log(GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
877           /* Send more data */
878           send_test_message(channel);
879           if (ack_received < total_packets && SPEED != test)
880             return;
881           if (ok == 2 && SPEED == test)
882             return;
883           show_end_data();
884         }
885       if (test == P2P_SIGNAL)
886         {
887           GNUNET_CADET_channel_destroy(incoming_ch);
888           incoming_ch = NULL;
889         }
890       else
891         {
892           GNUNET_CADET_channel_destroy(outgoing_ch);
893           outgoing_ch = NULL;
894         }
895     }
896 }
897
898
899 /**
900  * Method called whenever a peer connects to a port in MQ-based CADET.
901  *
902  * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
903  * @param channel New handle to the channel.
904  * @param source Peer that started this channel.
905  * @return Closure for the incoming @a channel. It's given to:
906  *         - The #GNUNET_CADET_DisconnectEventHandler (given to
907  *           #GNUNET_CADET_open_port) when the channel dies.
908  *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
909  *           received on the @a channel.
910  */
911 static void *
912 connect_handler(void *cls,
913                 struct GNUNET_CADET_Channel *channel,
914                 const struct GNUNET_PeerIdentity *source)
915 {
916   struct CadetTestChannelWrapper *ch;
917   long peer = (long)cls;
918
919   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
920              "Incoming channel from %s to %ld: %p\n",
921              GNUNET_i2s(source),
922              peer,
923              channel);
924   ok++;
925   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
926              " ok: %d\n",
927              ok);
928   if (peer == peers_requested - 1)
929     {
930       if (NULL != incoming_ch)
931         {
932           GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
933                      "Duplicate incoming channel for client %lu\n",
934                      (long)cls);
935           GNUNET_assert(0);
936         }
937       incoming_ch = channel;
938     }
939   else
940     {
941       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
942                  "Incoming channel for unexpected peer #%lu\n",
943                  (long)cls);
944       GNUNET_assert(0);
945     }
946   if (NULL != disconnect_task && REOPEN != test)
947     {
948       GNUNET_SCHEDULER_cancel(disconnect_task);
949       disconnect_task = GNUNET_SCHEDULER_add_delayed(short_time,
950                                                      &gather_stats_and_exit,
951                                                      (void *)__LINE__);
952     }
953   else if ((NULL != disconnect_task) && (REOPEN == test))
954     {
955       GNUNET_SCHEDULER_cancel(disconnect_task);
956       disconnect_task = GNUNET_SCHEDULER_add_delayed(
957         GNUNET_TIME_relative_multiply(short_time, 2),
958         &gather_stats_and_exit,
959         (void *)__LINE__);
960     }
961
962   if ((NULL != reconnect_task) && (REOPEN == test))
963     {
964       GNUNET_SCHEDULER_cancel(reconnect_task);
965       reconnect_task = GNUNET_SCHEDULER_add_delayed(short_time,
966                                                     &reconnect_op,
967                                                     (void *)__LINE__);
968     }
969
970   /* TODO: cannot return channel as-is, in order to unify the data handlers */
971   ch = GNUNET_new(struct CadetTestChannelWrapper);
972   ch->ch = channel;
973
974   return ch;
975 }
976
977
978 /**
979  * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
980  *
981  * Testcase continues when the root receives confirmation of connected peers,
982  * on callback function ch.
983  *
984  * @param cls Closure (unused).
985  */
986 static void
987 start_test(void *cls)
988 {
989   struct GNUNET_MQ_MessageHandler handlers[] = {
990     GNUNET_MQ_hd_var_size(data,
991                           GNUNET_MESSAGE_TYPE_DUMMY,
992                           struct GNUNET_MessageHeader,
993                           NULL),
994     GNUNET_MQ_handler_end()
995   };
996   struct CadetTestChannelWrapper *ch;
997
998   test_task = NULL;
999   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "start_test: %s\n", test_name);
1000   if (NULL != disconnect_task)
1001     {
1002       GNUNET_SCHEDULER_cancel(disconnect_task);
1003       disconnect_task = NULL;
1004     }
1005
1006   if (SPEED_REL == test)
1007     {
1008       test = SPEED;
1009     }
1010
1011   ch = GNUNET_new(struct CadetTestChannelWrapper);
1012   outgoing_ch = GNUNET_CADET_channel_create(h1,
1013                                             ch,
1014                                             p_id[1],
1015                                             &port,
1016                                             NULL,
1017                                             &disconnect_handler,
1018                                             handlers);
1019
1020   ch->ch = outgoing_ch;
1021
1022   disconnect_task = GNUNET_SCHEDULER_add_delayed(short_time,
1023                                                  &gather_stats_and_exit,
1024                                                  (void *)__LINE__);
1025   if (KEEPALIVE == test)
1026     return;                     /* Don't send any data. */
1027
1028   data_received = 0;
1029   data_sent = 0;
1030   ack_received = 0;
1031   ack_sent = 0;
1032   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1033              "Sending data initializer on channel %p...\n",
1034              outgoing_ch);
1035   send_test_message(outgoing_ch);
1036   if (REOPEN == test)
1037     {
1038       reconnect_task = GNUNET_SCHEDULER_add_delayed(short_time,
1039                                                     &reconnect_op,
1040                                                     (void *)__LINE__);
1041       GNUNET_SCHEDULER_cancel(disconnect_task);
1042       disconnect_task = GNUNET_SCHEDULER_add_delayed(
1043         GNUNET_TIME_relative_multiply(short_time, 2),
1044         &gather_stats_and_exit,
1045         (void *)__LINE__);
1046     }
1047 }
1048
1049
1050 /**
1051  * Callback to be called when the requested peer information is available
1052  *
1053  * @param cls the closure from GNUNET_TESTBED_peer_get_information()
1054  * @param op the operation this callback corresponds to
1055  * @param pinfo the result; will be NULL if the operation has failed
1056  * @param emsg error message if the operation has failed;
1057  *             NULL if the operation is successfull
1058  */
1059 static void
1060 pi_cb(void *cls,
1061       struct GNUNET_TESTBED_Operation *op,
1062       const struct GNUNET_TESTBED_PeerInformation *pinfo,
1063       const char *emsg)
1064 {
1065   long i = (long)cls;
1066
1067   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1068              "ID callback for %ld\n",
1069              i);
1070   if ((NULL == pinfo) ||
1071       (NULL != emsg))
1072     {
1073       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1074                  "pi_cb: %s\n",
1075                  emsg);
1076       abort_test(__LINE__);
1077       return;
1078     }
1079   p_id[i] = pinfo->result.id;
1080   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1081              "id: %s\n",
1082              GNUNET_i2s(p_id[i]));
1083   p_ids++;
1084   if (p_ids < 2)
1085     return;
1086   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1087              "Got all IDs, starting test\n");
1088   test_task = GNUNET_SCHEDULER_add_now(&start_test, NULL);
1089 }
1090
1091
1092 /**
1093  * test main: start test when all peers are connected
1094  *
1095  * @param cls Closure.
1096  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
1097  * @param num_peers Number of peers that are running.
1098  * @param peers Array of peers.
1099  * @param cadets Handle to each of the CADETs of the peers.
1100  */
1101 static void
1102 tmain(void *cls,
1103       struct GNUNET_CADET_TEST_Context *ctx,
1104       unsigned int num_peers,
1105       struct GNUNET_TESTBED_Peer **peers,
1106       struct GNUNET_CADET_Handle **cadets)
1107 {
1108   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "test main\n");
1109   ok = 0;
1110   test_ctx = ctx;
1111   peers_running = num_peers;
1112   GNUNET_assert(peers_running == peers_requested);
1113   testbed_peers = peers;
1114   h1 = cadets[0];
1115   h2 = cadets[num_peers - 1];
1116   disconnect_task = GNUNET_SCHEDULER_add_delayed(short_time,
1117                                                  &disconnect_cadet_peers,
1118                                                  (void *)__LINE__);
1119   GNUNET_SCHEDULER_add_shutdown(&shutdown_task,
1120                                 NULL);
1121   t_op[0] = GNUNET_TESTBED_peer_get_information(peers[0],
1122                                                 GNUNET_TESTBED_PIT_IDENTITY,
1123                                                 &pi_cb,
1124                                                 (void *)0L);
1125   t_op[1] = GNUNET_TESTBED_peer_get_information(peers[num_peers - 1],
1126                                                 GNUNET_TESTBED_PIT_IDENTITY,
1127                                                 &pi_cb,
1128                                                 (void *)1L);
1129   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
1130 }
1131
1132
1133 /**
1134  * Main: start test
1135  */
1136 int
1137 main(int argc, char *argv[])
1138 {
1139   static const struct GNUNET_HashCode *ports[2];
1140   struct GNUNET_MQ_MessageHandler handlers[] = {
1141     GNUNET_MQ_hd_var_size(data,
1142                           GNUNET_MESSAGE_TYPE_DUMMY,
1143                           struct GNUNET_MessageHeader,
1144                           NULL),
1145     GNUNET_MQ_handler_end()
1146   };
1147   const char *config_file;
1148   char port_id[] = "test port";
1149   struct GNUNET_GETOPT_CommandLineOption options[] = {
1150     GNUNET_GETOPT_option_relative_time('t',
1151                                        "time",
1152                                        "short_time",
1153                                        gettext_noop("set short timeout"),
1154                                        &short_time),
1155     GNUNET_GETOPT_option_uint('m',
1156                               "messages",
1157                               "NUM_MESSAGES",
1158                               gettext_noop("set number of messages to send"),
1159                               &total_packets),
1160
1161     GNUNET_GETOPT_OPTION_END
1162   };
1163
1164
1165   initialized = GNUNET_NO;
1166   GNUNET_log_setup("test", "DEBUG", NULL);
1167
1168   total_packets = TOTAL_PACKETS;
1169   short_time = SHORT_TIME;
1170   if (-1 == GNUNET_GETOPT_run(argv[0], options, argc, argv))
1171     {
1172       fprintf(stderr, "test failed: problem with CLI parameters\n");
1173       exit(1);
1174     }
1175
1176   config_file = "test_cadet.conf";
1177   GNUNET_CRYPTO_hash(port_id, sizeof(port_id), &port);
1178
1179   /* Find out requested size */
1180   if (strstr(argv[0], "_2_") != NULL)
1181     {
1182       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DIRECT CONNECTIONs\n");
1183       peers_requested = 2;
1184     }
1185   else if (strstr(argv[0], "_5_") != NULL)
1186     {
1187       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "5 PEER LINE\n");
1188       peers_requested = 5;
1189     }
1190   else if (strstr(argv[0], "_6_") != NULL)
1191     {
1192       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "6 PEER LINE\n");
1193       peers_requested = 6;
1194     }
1195   else
1196     {
1197       GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "SIZE UNKNOWN, USING 2\n");
1198       peers_requested = 2;
1199     }
1200
1201   /* Find out requested test */
1202   if (strstr(argv[0], "_forward") != NULL)
1203     {
1204       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "FORWARD\n");
1205       test = FORWARD;
1206       test_name = "unicast";
1207       ok_goal = 4;
1208     }
1209   else if (strstr(argv[0], "_signal") != NULL)
1210     {
1211       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "SIGNAL\n");
1212       test = P2P_SIGNAL;
1213       test_name = "signal";
1214       ok_goal = 4;
1215     }
1216   else if (strstr(argv[0], "_speed_ack") != NULL)
1217     {
1218       /* Test is supposed to generate the following callbacks:
1219        * 1 incoming channel (@dest)
1220        * total_packets received data packet (@dest)
1221        * total_packets received data packet (@orig)
1222        * 1 received channel destroy (@dest) FIXME #5818
1223        */
1224       ok_goal = total_packets * 2 + 2;
1225       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "SPEED_ACK\n");
1226       test = SPEED_ACK;
1227       test_name = "speed ack";
1228     }
1229   else if (strstr(argv[0], "_speed") != NULL)
1230     {
1231       /* Test is supposed to generate the following callbacks:
1232        * 1 incoming channel (@dest)
1233        * 1 initial packet (@dest)
1234        * total_packets received data packet (@dest)
1235        * 1 received data packet (@orig)
1236        * 1 received channel destroy (@dest)  FIXME #5818
1237        */
1238       ok_goal = total_packets + 4;
1239       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "SPEED\n");
1240       if (strstr(argv[0], "_reliable") != NULL)
1241         {
1242           test = SPEED_REL;
1243           test_name = "speed reliable";
1244           config_file = "test_cadet_drop.conf";
1245         }
1246       else
1247         {
1248           test = SPEED;
1249           test_name = "speed";
1250         }
1251     }
1252   else if (strstr(argv[0], "_keepalive") != NULL)
1253     {
1254       test = KEEPALIVE;
1255       test_name = "keepalive";
1256       /* Test is supposed to generate the following callbacks:
1257        * 1 incoming channel (@dest)
1258        * [wait]
1259        * 1 received channel destroy (@dest)  FIXME #5818
1260        */
1261       ok_goal = 1;
1262     }
1263   else if (strstr(argv[0], "_reopen") != NULL)
1264     {
1265       test = REOPEN;
1266       test_name = "reopen";
1267       ///* Test is supposed to generate the following callbacks:
1268       // * 1 incoming channel (@dest)
1269       // * [wait]
1270       // * 1 received channel destroy (@dest)  FIXME #5818
1271       // */
1272       ok_goal = 6;
1273     }
1274   else
1275     {
1276       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "UNKNOWN\n");
1277       test = SETUP;
1278       ok_goal = 0;
1279     }
1280
1281   if (strstr(argv[0], "backwards") != NULL)
1282     {
1283       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "BACKWARDS (LEAF TO ROOT)\n");
1284       test_backwards = GNUNET_YES;
1285       GNUNET_asprintf(&test_name, "backwards %s", test_name);
1286     }
1287
1288   p_ids = 0;
1289   ports[0] = &port;
1290   ports[1] = NULL;
1291   GNUNET_CADET_TEST_ruN("test_cadet_small",
1292                         config_file,
1293                         peers_requested,
1294                         &tmain,
1295                         NULL,         /* tmain cls */
1296                         &connect_handler,
1297                         NULL,
1298                         &disconnect_handler,
1299                         handlers,
1300                         ports);
1301   if (NULL != strstr(argv[0], "_reliable"))
1302     msg_dropped = 0;            /* dropped should be retransmitted */
1303
1304   if (ok_goal > ok - msg_dropped)
1305     {
1306       GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal);
1307       return 1;
1308     }
1309   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "success\n");
1310   return 0;
1311 }
1312
1313 /* end of test_cadet.c */