add avg latency and various bugfixes
[oweals/gnunet.git] / src / transport / test_communicator_basic.c
1 /*
2     This file is part of GNUnet.
3     Copyright (C) 2019 GNUnet e.V.
4
5     GNUnet is free software: you can redistribute it and/or modify it
6     under the terms of the GNU Affero General Public License as published
7     by the Free Software Foundation, either version 3 of the License,
8     or (at your option) any later version.
9
10     GNUnet is distributed in the hope that it will be useful, but
11     WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13     Affero General Public License for more details.
14
15     You should have received a copy of the GNU Affero General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18     SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22 * @file transport/test_communicator_basic.c
23 * @brief test the communicators
24 * @author Julius Bünger
25 * @author Martin Schanzenbach
26 */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "transport-testing2.h"
30 #include "gnunet_ats_transport_service.h"
31 #include "gnunet_signatures.h"
32 #include "gnunet_testing_lib.h"
33 #include "transport.h"
34
35 #include <inttypes.h>
36
37
38 #define LOG(kind, ...) GNUNET_log_from (kind, \
39                                         "test_transport_communicator", \
40                                         __VA_ARGS__)
41
42 #define NUM_PEERS 2
43
44 static struct GNUNET_PeerIdentity peer_id[NUM_PEERS];
45
46 static char *communicator_binary;
47
48 static struct
49 GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_hs[NUM_PEERS];
50
51 static struct GNUNET_CONFIGURATION_Handle *cfg_peers[NUM_PEERS];
52
53 static char **cfg_peers_name;
54
55 static int ret;
56
57 static struct GNUNET_TIME_Absolute start_short;
58
59 static struct GNUNET_TIME_Absolute start_long;
60
61 static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc;
62
63 #define SHORT_MESSAGE_SIZE 128
64
65 #define LONG_MESSAGE_SIZE 32000
66
67 #define SHORT_BURST_SECONDS 2
68
69 #define LONG_BURST_SECONDS 2
70
71 #define SHORT_BURST_WINDOW \
72   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,SHORT_BURST_SECONDS)
73
74 #define LONG_BURST_WINDOW \
75   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,SHORT_BURST_SECONDS)
76
77 #define BURST_SHORT 0
78
79 #define BURST_LONG 1
80
81 #define SIZE_CHECK 2
82
83 #define MAX_BUF_LEN 1
84
85 static int buf_len = 0;
86
87 static uint32_t ack = 0;
88
89 static int phase;
90
91 static size_t num_received = 0;
92
93 static uint64_t avg_latency = 0;
94
95 static void
96 communicator_available_cb (void *cls,
97                            struct
98                            GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
99                            *tc_h,
100                            enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
101                            char *address_prefix)
102 {
103   LOG (GNUNET_ERROR_TYPE_INFO,
104        "Communicator available. (cc: %u, prefix: %s)\n",
105        cc,
106        address_prefix);
107 }
108
109
110 static void
111 add_address_cb (void *cls,
112                 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
113                 tc_h,
114                 const char *address,
115                 struct GNUNET_TIME_Relative expiration,
116                 uint32_t aid,
117                 enum GNUNET_NetworkType nt)
118 {
119   LOG (GNUNET_ERROR_TYPE_DEBUG,
120        "New address. (addr: %s, expir: %" PRIu32 ", ID: %" PRIu32 ", nt: %u\n",
121        address,
122        expiration.rel_value_us,
123        aid,
124        nt);
125   // addresses[1] = GNUNET_strdup (address);
126   if (0 == strcmp ((char*) cls, cfg_peers_name[NUM_PEERS - 1]))
127     GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (tc_hs[0],
128                                                                 &peer_id[
129                                                                   NUM_PEERS
130                                                                   - 1],
131                                                                 address);
132 }
133
134
135 /**
136  * @brief Callback that informs whether the requested queue will be
137  * established
138  *
139  * Implements #GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback.
140  *
141  * @param cls Closure - unused
142  * @param tc_h Communicator handle - unused
143  * @param will_try #GNUNET_YES if queue will be established
144  *                #GNUNET_NO if queue will not be established (bogous address)
145  */
146 static void
147 queue_create_reply_cb (void *cls,
148                        struct
149                        GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
150                        tc_h,
151                        int will_try)
152 {
153   if (GNUNET_YES == will_try)
154     LOG (GNUNET_ERROR_TYPE_DEBUG,
155          "Queue will be established!\n");
156   else
157     LOG (GNUNET_ERROR_TYPE_WARNING,
158          "Queue won't be established (bougus address?)!\n");
159 }
160
161
162 static char*
163 make_payload (size_t payload_size)
164 {
165   char *payload = GNUNET_malloc (payload_size);
166   struct GNUNET_TIME_Absolute ts;
167   struct GNUNET_TIME_AbsoluteNBO ts_n;
168   GNUNET_assert (payload_size >= 8); // So that out timestamp fits
169   ts = GNUNET_TIME_absolute_get ();
170   ts_n = GNUNET_TIME_absolute_hton (ts);
171   memset (payload, 0, payload_size);
172   memcpy (payload, &ts_n, sizeof (struct GNUNET_TIME_AbsoluteNBO));
173   return payload;
174 }
175
176
177 static void
178 size_test (void *cls)
179 {
180   char *payload;
181   phase = SIZE_CHECK;
182
183   if (ack < 64000) // Leave some room for our protocol.
184   {
185     payload = make_payload (ack);
186     GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
187                                                           payload,
188                                                           ack);
189     GNUNET_free (payload);
190     return;
191   }
192   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
193               "Size packet test done.\n");
194   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
195               "#packets: %lu -- latency: %lu\n",
196               num_received,
197               avg_latency);
198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
199               "Finished\n");
200   GNUNET_SCHEDULER_shutdown ();
201   // Finished!
202 }
203
204
205 static void
206 long_test (void *cls)
207 {
208   struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration (
209     start_long);
210   char *payload;
211   if (LONG_BURST_WINDOW.rel_value_us > duration.rel_value_us)
212   {
213     //FIXME: Not sure how aggressive we should be here, our transport does not
214     //implement congestion control or flow control... (also for the other three
215     if (buf_len < MAX_BUF_LEN)
216     {
217       payload = make_payload (LONG_MESSAGE_SIZE);
218       GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
219                                                             payload,
220                                                             LONG_MESSAGE_SIZE);
221       buf_len++;
222       GNUNET_free (payload);
223       GNUNET_SCHEDULER_add_now (&long_test, NULL);
224     }
225     return;
226   }
227   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
228               "Long size packet test done.\n");
229   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
230               "goodput: %lu b/s -- #packets: %lu -- latency: %lu\n",
231               (LONG_MESSAGE_SIZE * num_received) / LONG_BURST_SECONDS,
232               num_received,
233               avg_latency);
234   ack = 10;
235   num_received = 0;
236   buf_len = 0;
237   avg_latency = 0;
238   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &size_test, NULL);
239 }
240
241
242 static void
243 short_test (void *cls)
244 {
245   struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration (
246     start_short);
247   char *payload;
248   if (SHORT_BURST_WINDOW.rel_value_us > duration.rel_value_us)
249   {
250     if (buf_len < MAX_BUF_LEN)
251     {
252       payload = make_payload (SHORT_MESSAGE_SIZE);
253       GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
254                                                             payload,
255                                                             SHORT_MESSAGE_SIZE);
256       buf_len++;
257       GNUNET_free (payload);
258       GNUNET_SCHEDULER_add_now (&short_test, NULL);
259     }
260     return;
261   }
262   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
263               "Short size packet test done.\n");
264   GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
265               "goodput: %lu b/s -- #packets: %lu -- latency: %lu\n",
266               (SHORT_MESSAGE_SIZE * num_received) / SHORT_BURST_SECONDS,
267               num_received,
268               avg_latency);
269   start_long = GNUNET_TIME_absolute_get ();
270   phase = BURST_LONG;
271   buf_len = 0;
272   avg_latency = 0;
273   num_received = 0;
274   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &long_test, NULL);
275 }
276
277
278 /**
279  * @brief Handle opening of queue
280  *
281  * Issues sending of test data
282  *
283  * Implements #GNUNET_TRANSPORT_TESTING_AddQueueCallback
284  *
285  * @param cls Closure
286  * @param tc_h Communicator handle
287  * @param tc_queue Handle to newly opened queue
288  */
289 static void
290 add_queue_cb (void *cls,
291               struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
292               struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *
293               tc_queue)
294 {
295   if (0 != strcmp ((char*) cls, cfg_peers_name[0]))
296     return; // TODO?
297   LOG (GNUNET_ERROR_TYPE_DEBUG,
298        "Queue established, starting test...\n");
299   start_short = GNUNET_TIME_absolute_get ();
300   my_tc = tc_queue;
301   buf_len = 0;
302   phase = BURST_SHORT;
303   GNUNET_SCHEDULER_add_now (&short_test, tc_queue);
304 }
305
306
307 static void
308 update_avg_latency (const char*payload)
309 {
310   struct GNUNET_TIME_AbsoluteNBO *ts_n;
311   struct GNUNET_TIME_Absolute ts;
312   struct GNUNET_TIME_Relative latency;
313
314   ts_n = (struct GNUNET_TIME_AbsoluteNBO *) payload;
315   ts = GNUNET_TIME_absolute_ntoh (*ts_n);
316   latency = GNUNET_TIME_absolute_get_duration (ts);
317   if (1 == num_received)
318     avg_latency = latency.rel_value_us;
319   else
320     avg_latency = ((avg_latency * (num_received - 1)) + latency.rel_value_us)
321                   / num_received;
322
323 }
324
325
326 /**
327  * @brief Handle an incoming message
328  *
329  * Implements #GNUNET_TRANSPORT_TESTING_IncomingMessageCallback
330
331  * @param cls Closure
332  * @param tc_h Handle to the receiving communicator
333  * @param msg Received message
334  */
335 void
336 incoming_message_cb (void *cls,
337                      struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
338                      *tc_h,
339                      const char*payload,
340                      size_t payload_len)
341 {
342   if (0 != strcmp ((char*) cls, cfg_peers_name[NUM_PEERS - 1]))
343     return; // TODO?
344   if (phase == BURST_SHORT)
345   {
346     GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
347     num_received++;
348     update_avg_latency (payload);
349     if (buf_len == MAX_BUF_LEN)
350       GNUNET_SCHEDULER_add_now (&short_test, NULL);
351     buf_len--;
352   }
353   else if (phase == BURST_LONG)
354   {
355     if (LONG_MESSAGE_SIZE != payload_len)
356       return; // Ignore
357     num_received++;
358     update_avg_latency (payload);
359     if (buf_len == MAX_BUF_LEN)
360       GNUNET_SCHEDULER_add_now (&long_test, NULL);
361     buf_len--;
362   }
363   else         // if (phase == SIZE_CHECK) {
364   {
365     // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366     //          "Receiving payload with size %lu...\n", payload_len);
367     if (ack != payload_len)
368     {
369       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
370                   "Error receiving message, corrupted.\n");
371       ret = 1;
372       GNUNET_SCHEDULER_shutdown ();
373       return;
374     }
375     num_received++;
376     update_avg_latency (payload);
377     ack += 5; // Next expected message size
378     GNUNET_SCHEDULER_add_now (&size_test, NULL);
379   }
380 }
381
382
383 /**
384  * @brief Main function called by the scheduler
385  *
386  * @param cls Closure - Handle to configuration
387  */
388 static void
389 run (void *cls)
390 {
391   ret = 0;
392   num_received = 0;
393   for (int i = 0; i < NUM_PEERS; i++)
394   {
395     tc_hs[i] = GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
396       "transport",
397       communicator_binary,
398       cfg_peers_name[i],
399       &communicator_available_cb,
400       &add_address_cb,
401       &queue_create_reply_cb,
402       &add_queue_cb,
403       &incoming_message_cb,
404       cfg_peers_name[i]); /* cls */
405   }
406 }
407
408
409 int
410 main (int argc,
411       char *const *argv)
412 {
413   struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
414   char *communicator_name;
415   char *cfg_peer;
416   ret = 1;
417
418   communicator_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
419   GNUNET_asprintf (&communicator_binary, "gnunet-communicator-%s",
420                    communicator_name);
421   cfg_peers_name = GNUNET_malloc (sizeof(char*) * NUM_PEERS);
422   if (GNUNET_OK != GNUNET_log_setup ("test_communicator_basic",
423                                      "DEBUG",
424                                      "test_communicator_basic.log"))
425   {
426     fprintf (stderr, "Unable to setup log\n");
427     GNUNET_break (0);
428     return 2;
429   }
430   for (int i = 0; i < NUM_PEERS; i++)
431   {
432     GNUNET_asprintf ((&cfg_peer),
433                      "test_communicator_%s_peer%u.conf",
434                      communicator_name, i + 1);
435     cfg_peers_name[i] = cfg_peer;
436     cfg_peers[i] = GNUNET_CONFIGURATION_create ();
437     if (GNUNET_YES ==
438         GNUNET_DISK_file_test (cfg_peers_name[i]))
439     {
440       if (GNUNET_SYSERR ==
441           GNUNET_CONFIGURATION_load (cfg_peers[i],
442                                      cfg_peers_name[i]))
443       {
444         fprintf (stderr,
445                  "Malformed configuration file `%s', exiting ...\n",
446                  cfg_peers_name[i]);
447         return 1;
448       }
449     }
450     else
451     {
452       if (GNUNET_SYSERR ==
453           GNUNET_CONFIGURATION_load (cfg_peers[i],
454                                      NULL))
455       {
456         fprintf (stderr,
457                  "Configuration file %s does not exist, exiting ...\n",
458                  cfg_peers_name[i]);
459         return 1;
460       }
461     }
462     private_key =
463       GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg_peers[i]);
464     if (NULL == private_key)
465     {
466       LOG (GNUNET_ERROR_TYPE_ERROR,
467            "Unable to get peer ID\n");
468       return 1;
469     }
470     GNUNET_CRYPTO_eddsa_key_get_public (private_key,
471                                         &peer_id[i].public_key);
472     GNUNET_free (private_key);
473     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
474                 "Identity of peer %u is %s\n",
475                 i, GNUNET_i2s_full (&peer_id[i]));
476   }
477   fprintf (stderr, "Starting test...\n");
478   GNUNET_SCHEDULER_run (&run,
479                         NULL);
480   return ret;
481 }