adapting next test to new MQ API
[oweals/gnunet.git] / src / core / test_core_api_reliability.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2015, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file core/test_core_api_reliability.c
22  * @brief testcase for core_api.c focusing on reliable transmission (with TCP)
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_arm_service.h"
27 #include "gnunet_core_service.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_ats_service.h"
30 #include "gnunet_transport_service.h"
31 #include "gnunet_transport_hello_service.h"
32 #include <gauger.h>
33
34 /**
35  * Note that this value must not significantly exceed
36  * 'MAX_PENDING' in 'gnunet-service-transport.c', otherwise
37  * messages may be dropped even for a reliable transport.
38  */
39 #define TOTAL_MSGS (600 * 10)
40
41 /**
42  * How long until we give up on transmitting the message?
43  */
44 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600)
45
46 /**
47  * What delay do we request from the core service for transmission?
48  */
49 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
50
51 #define MTYPE 12345
52
53
54 static unsigned long long total_bytes;
55
56 static struct GNUNET_TIME_Absolute start_time;
57
58 static struct GNUNET_SCHEDULER_Task *err_task;
59
60
61 struct PeerContext
62 {
63   struct GNUNET_CONFIGURATION_Handle *cfg;
64   struct GNUNET_CORE_Handle *ch;
65   struct GNUNET_MQ_Handle *mq;
66   struct GNUNET_PeerIdentity id;
67   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
68   struct GNUNET_MessageHeader *hello;
69   struct GNUNET_TRANSPORT_HelloGetHandle *ghh;
70   struct GNUNET_ATS_ConnectivityHandle *ats;
71   struct GNUNET_ATS_ConnectivitySuggestHandle *ats_sh;
72   int connect_status;
73   struct GNUNET_OS_Process *arm_proc;
74 };
75
76 static struct PeerContext p1;
77
78 static struct PeerContext p2;
79
80 static int ok;
81
82 static int32_t tr_n;
83
84
85 #define OKPP do { ok++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Now at stage %u at %s:%u\n", ok, __FILE__, __LINE__); } while (0)
86
87 struct TestMessage
88 {
89   struct GNUNET_MessageHeader header;
90   uint32_t num GNUNET_PACKED;
91 };
92
93
94 static unsigned int
95 get_size (unsigned int iter)
96 {
97   unsigned int ret;
98
99   if (iter < 60000)
100     return iter + sizeof (struct TestMessage);
101   ret = (iter * iter * iter);
102   return sizeof (struct TestMessage) + (ret % 60000);
103 }
104
105
106 static void
107 terminate_peer (struct PeerContext *p)
108 {
109   if (NULL != p->ch)
110   {
111     GNUNET_CORE_disconnecT (p->ch);
112     p->ch = NULL;
113   }
114   if (NULL != p->ghh)
115   {
116     GNUNET_TRANSPORT_hello_get_cancel (p->ghh);
117     p->ghh = NULL;
118   }
119   if (NULL != p->oh)
120   {
121     GNUNET_TRANSPORT_offer_hello_cancel (p->oh);
122     p->oh = NULL;
123   }
124   if (NULL != p->ats_sh)
125   {
126     GNUNET_ATS_connectivity_suggest_cancel (p->ats_sh);
127     p->ats_sh = NULL;
128   }
129   if (NULL != p->ats)
130   {
131     GNUNET_ATS_connectivity_done (p->ats);
132     p->ats = NULL;
133   }
134 }
135
136
137 static void
138 terminate_task_error (void *cls)
139 {
140   err_task = NULL;
141   GNUNET_break (0);
142   GNUNET_SCHEDULER_shutdown ();
143   ok = 42;
144 }
145
146
147 static void
148 do_shutdown (void *cls)
149 {
150   unsigned long long delta;
151
152   delta = GNUNET_TIME_absolute_get_duration (start_time).rel_value_us;
153   FPRINTF (stderr,
154            "\nThroughput was %llu kb/s\n",
155            total_bytes * 1000000LL / 1024 / delta);
156   GAUGER ("CORE",
157           "Core throughput/s",
158           total_bytes * 1000000LL / 1024 / delta,
159           "kb/s");
160   if (NULL != err_task)
161   {
162     GNUNET_SCHEDULER_cancel (err_task);
163     err_task = NULL;
164   }
165   terminate_peer (&p1);
166   terminate_peer (&p2);
167   
168 }
169
170
171 static void
172 send_message (struct GNUNET_MQ_Handle *mq,
173               int32_t num)
174 {
175   struct GNUNET_MQ_Envelope *env;
176   struct TestMessage *hdr;
177   unsigned int s;
178
179   GNUNET_assert (NULL != mq);
180   GNUNET_assert (tr_n < TOTAL_MSGS);
181   s = get_size (tr_n);
182   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
183               "Sending message %u of size %u\n",
184               tr_n,
185               s);
186   env = GNUNET_MQ_msg_extra (hdr,
187                              s - sizeof (struct TestMessage),
188                              MTYPE);
189   hdr->num = htonl (tr_n);
190   memset (&hdr[1],
191           tr_n,
192           s - sizeof (struct TestMessage));
193   tr_n++;
194   GNUNET_SCHEDULER_cancel (err_task);
195   err_task =
196       GNUNET_SCHEDULER_add_delayed (TIMEOUT,
197                                     &terminate_task_error,
198                                     NULL);
199   total_bytes += s;
200   GNUNET_MQ_send (mq,
201                   env);
202 }
203
204
205 static void *
206 connect_notify (void *cls,
207                 const struct GNUNET_PeerIdentity *peer,
208                 struct GNUNET_MQ_Handle *mq)
209 {
210   struct PeerContext *pc = cls;
211
212   if (0 == memcmp (&pc->id,
213                    peer,
214                    sizeof (struct GNUNET_PeerIdentity)))
215     return (void *) peer;
216   pc->mq = mq;
217   GNUNET_assert (0 == pc->connect_status);
218   pc->connect_status = 1;
219   if (pc == &p1)
220   {
221     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
222                 "Encrypted connection established to peer `%s'\n",
223                 GNUNET_i2s (peer));
224     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
225                 "Asking core (1) for transmission to peer `%s'\n",
226                 GNUNET_i2s (&p2.id));
227     GNUNET_SCHEDULER_cancel (err_task);
228     err_task =
229         GNUNET_SCHEDULER_add_delayed (TIMEOUT,
230                                       &terminate_task_error,
231                                       NULL);
232     start_time = GNUNET_TIME_absolute_get ();
233     send_message (mq,
234                   0);
235   }
236   return (void *) peer;
237 }
238
239
240 static void
241 disconnect_notify (void *cls,
242                    const struct GNUNET_PeerIdentity *peer,
243                    void *internal_cls)
244 {
245   struct PeerContext *pc = cls;
246
247   if (0 == memcmp (&pc->id,
248                    peer,
249                    sizeof (struct GNUNET_PeerIdentity)))
250     return;
251   pc->mq = NULL;
252   pc->connect_status = 0;
253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254               "Encrypted connection to `%s' cut\n",
255               GNUNET_i2s (peer));
256 }
257
258
259 static int
260 check_test (void *cls,
261             const struct TestMessage *hdr)
262 {
263   return GNUNET_OK; /* accept all */
264 }
265
266
267 static void
268 handle_test (void *cls,
269              const struct TestMessage *hdr)
270 {
271   static int n;
272   unsigned int s;
273
274   s = get_size (n);
275   if (ntohs (hdr->header.size) != s)
276   {
277     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
278                 "Expected message %u of size %u, got %u bytes of message %u\n",
279                 n,
280                 s,
281                 ntohs (hdr->header.size),
282                 ntohl (hdr->num));
283     GNUNET_SCHEDULER_cancel (err_task);
284     err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
285                                          NULL);
286     return;
287   }
288   if (ntohl (hdr->num) != n)
289   {
290     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
291                 "Expected message %u of size %u, got %u bytes of message %u\n",
292                 n,
293                 s,
294                 (unsigned int) ntohs (hdr->header.size),
295                 (unsigned int) ntohl (hdr->num));
296     GNUNET_SCHEDULER_cancel (err_task);
297     err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
298                                          NULL);
299     return;
300   }
301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302               "Got message %u of size %u\n",
303               (unsigned int) ntohl (hdr->num),
304               (unsigned int) ntohs (hdr->header.size));
305   n++;
306   if (0 == (n % (TOTAL_MSGS / 100)))
307     FPRINTF (stderr,
308              "%s",
309              ".");
310   if (n == TOTAL_MSGS)
311   {
312     ok = 0;
313     GNUNET_SCHEDULER_shutdown ();
314   }
315   else
316   {
317     if (n == tr_n)
318     {
319       send_message (p1.mq,
320                     tr_n);
321     }  
322   }
323 }
324
325
326 static void
327 init_notify (void *cls,
328              const struct GNUNET_PeerIdentity *my_identity)
329 {
330   GNUNET_MQ_hd_var_size (test,
331                          MTYPE,
332                          struct TestMessage);
333   struct PeerContext *p = cls;
334   struct GNUNET_MQ_MessageHandler handlers[] = {
335     make_test_handler (NULL),
336     GNUNET_MQ_handler_end ()
337   };
338
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Connection to CORE service of `%s' established\n",
341               GNUNET_i2s (my_identity));
342   p->id = *my_identity;
343   if (cls == &p1)
344   {
345     GNUNET_assert (ok == 2);
346     OKPP;
347     /* connect p2 */
348     GNUNET_assert (NULL !=
349                    (p2.ch = GNUNET_CORE_connecT (p2.cfg,
350                                                  &p2,
351                                                  &init_notify,
352                                                  &connect_notify,
353                                                  &disconnect_notify,
354                                                  handlers)));
355   }
356   else
357   {
358     GNUNET_assert (ok == 3);
359     OKPP;
360     GNUNET_assert (cls == &p2);
361     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362                 "Asking transport (1) to connect to peer `%s'\n",
363                 GNUNET_i2s (&p2.id));
364     p1.ats_sh = GNUNET_ATS_connectivity_suggest (p1.ats,
365                                                  &p2.id,
366                                                  1);
367   }
368 }
369
370
371 static void
372 offer_hello_done (void *cls)
373 {
374   struct PeerContext *p = cls;
375
376   p->oh = NULL;
377 }
378
379
380 static void
381 process_hello (void *cls,
382                const struct GNUNET_MessageHeader *message)
383 {
384   struct PeerContext *p = cls;
385
386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387               "Received (my) `%s' from transport service\n", "HELLO");
388   GNUNET_assert (message != NULL);
389   p->hello = GNUNET_copy_message (message);
390   if ((p == &p1) && (NULL == p2.oh))
391     p2.oh = GNUNET_TRANSPORT_offer_hello (p2.cfg,
392                                           message,
393                                           &offer_hello_done,
394                                           &p2);
395   if ((p == &p2) && (NULL == p1.oh))
396     p1.oh = GNUNET_TRANSPORT_offer_hello (p1.cfg,
397                                           message,
398                                           &offer_hello_done,
399                                           &p1);
400
401   if ((p == &p1) && (p2.hello != NULL) && (NULL == p1.oh) )
402     p1.oh = GNUNET_TRANSPORT_offer_hello (p1.cfg,
403                                           p2.hello,
404                                           &offer_hello_done,
405                                           &p1);
406   if ((p == &p2) && (p1.hello != NULL) && (NULL == p2.oh) )
407     p2.oh = GNUNET_TRANSPORT_offer_hello (p2.cfg,
408                                           p1.hello,
409                                           &offer_hello_done,
410                                           &p2);
411 }
412
413
414 static void
415 setup_peer (struct PeerContext *p,
416             const char *cfgname)
417 {
418   char *binary;
419
420   binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-arm");
421   p->cfg = GNUNET_CONFIGURATION_create ();
422   p->arm_proc
423     = GNUNET_OS_start_process (GNUNET_YES,
424                                GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
425                                NULL, NULL, NULL,
426                                binary,
427                                "gnunet-service-arm",
428                                "-c",
429                                cfgname,
430                                NULL);
431   GNUNET_assert (GNUNET_OK ==
432                  GNUNET_CONFIGURATION_load (p->cfg,
433                                             cfgname));
434   p->ats = GNUNET_ATS_connectivity_init (p->cfg);
435   GNUNET_assert (NULL != p->ats);
436   p->ghh = GNUNET_TRANSPORT_hello_get (p->cfg,
437                                        GNUNET_TRANSPORT_AC_ANY,
438                                        &process_hello,
439                                        p);
440   GNUNET_free (binary);
441 }
442
443
444 static void
445 run (void *cls,
446      char *const *args,
447      const char *cfgfile,
448      const struct GNUNET_CONFIGURATION_Handle *cfg)
449 {
450   GNUNET_MQ_hd_fixed_size (test,
451                            MTYPE,
452                            struct TestMessage);
453   struct GNUNET_MQ_MessageHandler handlers[] = {
454     make_test_handler (NULL),
455     GNUNET_MQ_handler_end ()
456   };
457
458   GNUNET_assert (ok == 1);
459   OKPP;
460   setup_peer (&p1,
461               "test_core_api_peer1.conf");
462   setup_peer (&p2,
463               "test_core_api_peer2.conf");
464   err_task =
465       GNUNET_SCHEDULER_add_delayed (TIMEOUT,
466                                     &terminate_task_error,
467                                     NULL);
468   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
469                                  NULL);
470
471   GNUNET_assert (NULL !=
472                  (p1.ch = GNUNET_CORE_connecT (p1.cfg,
473                                                &p1,
474                                                &init_notify,
475                                                &connect_notify,
476                                                &disconnect_notify,
477                                                handlers)));
478 }
479
480
481 static void
482 stop_arm (struct PeerContext *p)
483 {
484   if (0 != GNUNET_OS_process_kill (p->arm_proc,
485                                    GNUNET_TERM_SIG))
486     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
487                          "kill");
488   if (GNUNET_OK != GNUNET_OS_process_wait (p->arm_proc))
489     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
490                          "waitpid");
491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
492               "ARM process %u stopped\n",
493               GNUNET_OS_process_get_pid (p->arm_proc));
494   GNUNET_OS_process_destroy (p->arm_proc);
495   p->arm_proc = NULL;
496   GNUNET_CONFIGURATION_destroy (p->cfg);
497 }
498
499
500 int
501 main (int argc,
502       char *argv1[])
503 {
504   char *const argv[] = {
505     "test-core-api-reliability",
506     "-c",
507     "test_core_api_data.conf",
508     NULL
509   };
510   struct GNUNET_GETOPT_CommandLineOption options[] = {
511     GNUNET_GETOPT_OPTION_END
512   };
513   ok = 1;
514   GNUNET_log_setup ("test-core-api-reliability",
515                     "WARNING",
516                     NULL);
517   GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
518                       argv,
519                       "test-core-api-reliability",
520                       "nohelp",
521                       options,
522                       &run,
523                       &ok);
524   stop_arm (&p1);
525   stop_arm (&p2);
526   GNUNET_DISK_directory_remove ("/tmp/test-gnunet-core-peer-1");
527   GNUNET_DISK_directory_remove ("/tmp/test-gnunet-core-peer-2");
528
529   return ok;
530 }
531
532 /* end of test_core_api_reliability.c */