-converting CORE service to new transport MQ API
[oweals/gnunet.git] / src / core / test_core_api_reliability.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file core/test_core_api_reliability.c
22  * @brief testcase for core_api.c focusing on reliable transmission (with TCP)
23  */
24 #include "platform.h"
25 #include "gnunet_arm_service.h"
26 #include "gnunet_core_service.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_ats_service.h"
29 #include "gnunet_transport_service.h"
30 #include <gauger.h>
31
32 /**
33  * Note that this value must not significantly exceed
34  * 'MAX_PENDING' in 'gnunet-service-transport.c', otherwise
35  * messages may be dropped even for a reliable transport.
36  */
37 #define TOTAL_MSGS (600 * 10)
38
39 /**
40  * How long until we give up on transmitting the message?
41  */
42 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600)
43
44 /**
45  * What delay do we request from the core service for transmission?
46  */
47 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
48
49 #define MTYPE 12345
50
51
52 static unsigned long long total_bytes;
53
54 static struct GNUNET_TIME_Absolute start_time;
55
56 static struct GNUNET_SCHEDULER_Task *err_task;
57
58
59 struct PeerContext
60 {
61   struct GNUNET_CONFIGURATION_Handle *cfg;
62   struct GNUNET_CORE_Handle *ch;
63   struct GNUNET_PeerIdentity id;
64   struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
65   struct GNUNET_MessageHeader *hello;
66   struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
67   struct GNUNET_ATS_ConnectivityHandle *ats;
68   struct GNUNET_ATS_ConnectivitySuggestHandle *ats_sh;
69   int connect_status;
70   struct GNUNET_OS_Process *arm_proc;
71 };
72
73 static struct PeerContext p1;
74
75 static struct PeerContext p2;
76
77 static struct GNUNET_CORE_TransmitHandle *nth;
78
79 static int ok;
80
81 static int32_t tr_n;
82
83
84 #define OKPP do { ok++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Now at stage %u at %s:%u\n", ok, __FILE__, __LINE__); } while (0)
85
86 struct TestMessage
87 {
88   struct GNUNET_MessageHeader header;
89   uint32_t num GNUNET_PACKED;
90 };
91
92
93 static unsigned int
94 get_size (unsigned int iter)
95 {
96   unsigned int ret;
97
98   if (iter < 60000)
99     return iter + sizeof (struct TestMessage);
100   ret = (iter * iter * iter);
101   return sizeof (struct TestMessage) + (ret % 60000);
102 }
103
104
105 static void
106 terminate_peer (struct PeerContext *p)
107 {
108   if (NULL != p->ch)
109   {
110     GNUNET_CORE_disconnect (p->ch);
111     p->ch = NULL;
112   }
113   if (NULL != p->ghh)
114   {
115     GNUNET_TRANSPORT_get_hello_cancel (p->ghh);
116     p->ghh = NULL;
117   }
118   if (NULL != p->oh)
119   {
120     GNUNET_TRANSPORT_offer_hello_cancel (p->oh);
121     p->oh = NULL;
122   }
123   if (NULL != p->ats_sh)
124   {
125     GNUNET_ATS_connectivity_suggest_cancel (p->ats_sh);
126     p->ats_sh = NULL;
127   }
128   if (NULL != p->ats)
129   {
130     GNUNET_ATS_connectivity_done (p->ats);
131     p->ats = NULL;
132   }
133 }
134
135
136 static void
137 terminate_task_error (void *cls)
138 {
139   err_task = NULL;
140   GNUNET_break (0);
141   GNUNET_SCHEDULER_shutdown ();
142   ok = 42;
143 }
144
145
146 static void
147 do_shutdown (void *cls)
148 {
149   unsigned long long delta;
150
151   delta = GNUNET_TIME_absolute_get_duration (start_time).rel_value_us;
152   FPRINTF (stderr,
153            "\nThroughput was %llu kb/s\n",
154            total_bytes * 1000000LL / 1024 / delta);
155   GAUGER ("CORE",
156           "Core throughput/s",
157           total_bytes * 1000000LL / 1024 / delta,
158           "kb/s");
159   if (NULL != err_task)
160   {
161     GNUNET_SCHEDULER_cancel (err_task);
162     err_task = NULL;
163   }
164   if (NULL != nth)
165   {
166     GNUNET_CORE_notify_transmit_ready_cancel (nth);
167     nth = NULL;
168   }
169   terminate_peer (&p1);
170   terminate_peer (&p2);
171   
172 }
173
174
175 static size_t
176 transmit_ready (void *cls,
177                 size_t size,
178                 void *buf)
179 {
180   char *cbuf = buf;
181   struct TestMessage hdr;
182   unsigned int s;
183   unsigned int ret;
184
185   nth = NULL;
186   GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
187   if (NULL == buf)
188   {
189     if (NULL != p1.ch)
190       GNUNET_break (NULL !=
191                     (nth = GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
192                                                               GNUNET_CORE_PRIO_BEST_EFFORT,
193                                                               FAST_TIMEOUT,
194                                                               &p2.id,
195                                                               get_size (tr_n),
196                                                               &transmit_ready,
197                                                               &p1)));
198     return 0;
199   }
200   GNUNET_assert (tr_n < TOTAL_MSGS);
201   ret = 0;
202   s = get_size (tr_n);
203   GNUNET_assert (size >= s);
204   GNUNET_assert (buf != NULL);
205   cbuf = buf;
206   do
207   {
208     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
209                 "Sending message %u of size %u at offset %u\n",
210                 tr_n,
211                 s,
212                 ret);
213     hdr.header.size = htons (s);
214     hdr.header.type = htons (MTYPE);
215     hdr.num = htonl (tr_n);
216     GNUNET_memcpy (&cbuf[ret], &hdr, sizeof (struct TestMessage));
217     ret += sizeof (struct TestMessage);
218     memset (&cbuf[ret], tr_n, s - sizeof (struct TestMessage));
219     ret += s - sizeof (struct TestMessage);
220     tr_n++;
221     s = get_size (tr_n);
222     if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16))
223       break;                    /* sometimes pack buffer full, sometimes not */
224   }
225   while (size - ret >= s);
226   GNUNET_SCHEDULER_cancel (err_task);
227   err_task =
228       GNUNET_SCHEDULER_add_delayed (TIMEOUT,
229                                     &terminate_task_error,
230                                     NULL);
231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
232               "Returning total message block of size %u\n",
233               ret);
234   total_bytes += ret;
235   return ret;
236 }
237
238
239 static void
240 connect_notify (void *cls,
241                 const struct GNUNET_PeerIdentity *peer)
242 {
243   struct PeerContext *pc = cls;
244
245   if (0 == memcmp (&pc->id,
246                    peer,
247                    sizeof (struct GNUNET_PeerIdentity)))
248     return;
249   GNUNET_assert (0 == pc->connect_status);
250   pc->connect_status = 1;
251   if (pc == &p1)
252   {
253     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254                 "Encrypted connection established to peer `%s'\n",
255                 GNUNET_i2s (peer));
256     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257                 "Asking core (1) for transmission to peer `%s'\n",
258                 GNUNET_i2s (&p2.id));
259     GNUNET_SCHEDULER_cancel (err_task);
260     err_task =
261         GNUNET_SCHEDULER_add_delayed (TIMEOUT,
262                                       &terminate_task_error,
263                                       NULL);
264     start_time = GNUNET_TIME_absolute_get ();
265     GNUNET_break (NULL !=
266                   (nth = GNUNET_CORE_notify_transmit_ready (p1.ch,
267                                                             GNUNET_NO,
268                                                             GNUNET_CORE_PRIO_BEST_EFFORT,
269                                                             TIMEOUT,
270                                                             &p2.id,
271                                                             get_size (0),
272                                                             &transmit_ready,
273                                                             &p1)));
274   }
275 }
276
277
278 static void
279 disconnect_notify (void *cls,
280                    const struct GNUNET_PeerIdentity *peer)
281 {
282   struct PeerContext *pc = cls;
283
284   if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity)))
285     return;
286   pc->connect_status = 0;
287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288               "Encrypted connection to `%s' cut\n",
289               GNUNET_i2s (peer));
290 }
291
292
293 static int
294 inbound_notify (void *cls,
295                 const struct GNUNET_PeerIdentity *other,
296                 const struct GNUNET_MessageHeader *message)
297 {
298   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
299               "Core provides inbound data from `%s'.\n",
300               GNUNET_i2s (other));
301   return GNUNET_OK;
302 }
303
304
305 static int
306 outbound_notify (void *cls,
307                  const struct GNUNET_PeerIdentity *other,
308                  const struct GNUNET_MessageHeader *message)
309 {
310   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
311               "Core notifies about outbound data for `%s'.\n",
312               GNUNET_i2s (other));
313   return GNUNET_OK;
314 }
315
316
317 static size_t
318 transmit_ready (void *cls,
319                 size_t size,
320                 void *buf);
321
322
323 static int
324 process_mtype (void *cls,
325                const struct GNUNET_PeerIdentity *peer,
326                const struct GNUNET_MessageHeader *message)
327 {
328   static int n;
329   unsigned int s;
330   const struct TestMessage *hdr;
331
332   hdr = (const struct TestMessage *) message;
333   s = get_size (n);
334   if (MTYPE != ntohs (message->type))
335     return GNUNET_SYSERR;
336   if (ntohs (message->size) != s)
337   {
338     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
339                 "Expected message %u of size %u, got %u bytes of message %u\n",
340                 n, s,
341                 ntohs (message->size),
342                 ntohl (hdr->num));
343     GNUNET_SCHEDULER_cancel (err_task);
344     err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
345                                          NULL);
346     return GNUNET_SYSERR;
347   }
348   if (ntohl (hdr->num) != n)
349   {
350     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
351                 "Expected message %u of size %u, got %u bytes of message %u\n",
352                 n, s,
353                 ntohs (message->size),
354                 ntohl (hdr->num));
355     GNUNET_SCHEDULER_cancel (err_task);
356     err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
357                                          NULL);
358     return GNUNET_SYSERR;
359   }
360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361               "Got message %u of size %u\n",
362               ntohl (hdr->num),
363               ntohs (message->size));
364   n++;
365   if (0 == (n % (TOTAL_MSGS / 100)))
366     FPRINTF (stderr, "%s",  ".");
367   if (n == TOTAL_MSGS)
368   {
369     ok = 0;
370     GNUNET_SCHEDULER_shutdown ();
371   }
372   else
373   {
374     if (n == tr_n)
375       GNUNET_break (NULL !=
376                     GNUNET_CORE_notify_transmit_ready (p1.ch,
377                                                        GNUNET_NO /* no cork */,
378                                                        GNUNET_CORE_PRIO_BEST_EFFORT,
379                                                        FAST_TIMEOUT /* ignored! */,
380                                                        &p2.id,
381                                                        get_size (tr_n),
382                                                        &transmit_ready, &p1));
383   }
384   return GNUNET_OK;
385 }
386
387
388 static struct GNUNET_CORE_MessageHandler handlers[] = {
389   {&process_mtype, MTYPE, 0},
390   {NULL, 0, 0}
391 };
392
393
394 static void
395 init_notify (void *cls,
396              const struct GNUNET_PeerIdentity *my_identity)
397 {
398   struct PeerContext *p = cls;
399
400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401               "Connection to CORE service of `%s' established\n",
402               GNUNET_i2s (my_identity));
403   p->id = *my_identity;
404   if (cls == &p1)
405   {
406     GNUNET_assert (ok == 2);
407     OKPP;
408     /* connect p2 */
409     GNUNET_assert (NULL != (p2.ch = GNUNET_CORE_connect (p2.cfg, &p2,
410                                                          &init_notify,
411                                                          &connect_notify,
412                                                          &disconnect_notify,
413                                                          &inbound_notify, GNUNET_YES,
414                                                          &outbound_notify, GNUNET_YES,
415                                                          handlers)));
416   }
417   else
418   {
419     GNUNET_assert (ok == 3);
420     OKPP;
421     GNUNET_assert (cls == &p2);
422     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423                 "Asking transport (1) to connect to peer `%s'\n",
424                 GNUNET_i2s (&p2.id));
425     p1.ats_sh = GNUNET_ATS_connectivity_suggest (p1.ats,
426                                                  &p2.id,
427                                                  1);
428   }
429 }
430
431
432 static void
433 offer_hello_done (void *cls)
434 {
435   struct PeerContext *p = cls;
436
437   p->oh = NULL;
438 }
439
440
441 static void
442 process_hello (void *cls,
443                const struct GNUNET_MessageHeader *message)
444 {
445   struct PeerContext *p = cls;
446
447   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448               "Received (my) `%s' from transport service\n", "HELLO");
449   GNUNET_assert (message != NULL);
450   p->hello = GNUNET_copy_message (message);
451   if ((p == &p1) && (NULL == p2.oh))
452     p2.oh = GNUNET_TRANSPORT_offer_hello (p2.cfg,
453                                           message,
454                                           &offer_hello_done,
455                                           &p2);
456   if ((p == &p2) && (NULL == p1.oh))
457     p1.oh = GNUNET_TRANSPORT_offer_hello (p1.cfg,
458                                           message,
459                                           &offer_hello_done,
460                                           &p1);
461
462   if ((p == &p1) && (p2.hello != NULL) && (NULL == p1.oh) )
463     p1.oh = GNUNET_TRANSPORT_offer_hello (p1.cfg,
464                                           p2.hello,
465                                           &offer_hello_done,
466                                           &p1);
467   if ((p == &p2) && (p1.hello != NULL) && (NULL == p2.oh) )
468     p2.oh = GNUNET_TRANSPORT_offer_hello (p2.cfg,
469                                           p1.hello,
470                                           &offer_hello_done,
471                                           &p2);
472 }
473
474
475 static void
476 setup_peer (struct PeerContext *p,
477             const char *cfgname)
478 {
479   char *binary;
480
481   binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-arm");
482   p->cfg = GNUNET_CONFIGURATION_create ();
483   p->arm_proc =
484     GNUNET_OS_start_process (GNUNET_YES, GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
485                              NULL, NULL, NULL,
486                              binary,
487                              "gnunet-service-arm",
488                              "-c", cfgname, NULL);
489   GNUNET_assert (GNUNET_OK ==
490                  GNUNET_CONFIGURATION_load (p->cfg,
491                                             cfgname));
492   p->ats = GNUNET_ATS_connectivity_init (p->cfg);
493   GNUNET_assert (NULL != p->ats);
494   p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
495                                        &process_hello,
496                                        p);
497   GNUNET_free (binary);
498 }
499
500
501 static void
502 run (void *cls,
503      char *const *args,
504      const char *cfgfile,
505      const struct GNUNET_CONFIGURATION_Handle *cfg)
506 {
507   GNUNET_assert (ok == 1);
508   OKPP;
509   setup_peer (&p1, "test_core_api_peer1.conf");
510   setup_peer (&p2, "test_core_api_peer2.conf");
511   err_task =
512       GNUNET_SCHEDULER_add_delayed (TIMEOUT,
513                                     &terminate_task_error,
514                                     NULL);
515   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
516                                  NULL);
517
518   GNUNET_assert (NULL !=
519                  (p1.ch = GNUNET_CORE_connect (p1.cfg, &p1,
520                                                &init_notify,
521                                                &connect_notify,
522                                                &disconnect_notify,
523                                                &inbound_notify,
524                                                GNUNET_YES,
525                                                &outbound_notify,
526                                                GNUNET_YES,
527                                                handlers)));
528 }
529
530
531 static void
532 stop_arm (struct PeerContext *p)
533 {
534   if (0 != GNUNET_OS_process_kill (p->arm_proc,
535                                    GNUNET_TERM_SIG))
536     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
537                          "kill");
538   if (GNUNET_OK != GNUNET_OS_process_wait (p->arm_proc))
539     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
540                          "waitpid");
541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542               "ARM process %u stopped\n",
543               GNUNET_OS_process_get_pid (p->arm_proc));
544   GNUNET_OS_process_destroy (p->arm_proc);
545   p->arm_proc = NULL;
546   GNUNET_CONFIGURATION_destroy (p->cfg);
547 }
548
549
550 int
551 main (int argc, char *argv1[])
552 {
553   char *const argv[] = {
554     "test-core-api-reliability",
555     "-c",
556     "test_core_api_data.conf",
557     NULL
558   };
559   struct GNUNET_GETOPT_CommandLineOption options[] = {
560     GNUNET_GETOPT_OPTION_END
561   };
562   ok = 1;
563   GNUNET_log_setup ("test-core-api-reliability",
564                     "WARNING",
565                     NULL);
566   GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
567                       argv,
568                       "test-core-api-reliability",
569                       "nohelp",
570                       options,
571                       &run,
572                       &ok);
573   stop_arm (&p1);
574   stop_arm (&p2);
575   GNUNET_DISK_directory_remove ("/tmp/test-gnunet-core-peer-1");
576   GNUNET_DISK_directory_remove ("/tmp/test-gnunet-core-peer-2");
577
578   return ok;
579 }
580
581 /* end of test_core_api_reliability.c */