- use global hashmap for IDs
[oweals/gnunet.git] / src / mesh / mesh_profiler.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file mesh/mesh_profiler.c
22  *
23  * @brief Profiler for mesh experiments.
24  */
25 #include <stdio.h>
26 #include "platform.h"
27 #include "mesh_test_lib.h"
28 #include "gnunet_mesh_service.h"
29 #include "gnunet_statistics_service.h"
30
31
32 /**
33  * How namy messages to send
34  */
35 #define TOTAL_PACKETS 1000
36
37 /**
38  * How namy peers to run
39  */
40 #define TOTAL_PEERS 1000
41
42 /**
43  * How long until we give up on connecting the peers?
44  */
45 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
46
47 /**
48  * Time to wait for stuff that should be rather fast
49  */
50 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
51
52
53 struct MeshPeer
54 {
55   /**
56    * Testbed Operation (to get peer id, etc).
57    */
58   struct GNUNET_TESTBED_Operation *op;
59
60   /**
61    * Peer ID.
62    */
63   struct GNUNET_PeerIdentity id;
64
65   /**
66    * Mesh handle for the root peer
67    */
68   struct GNUNET_MESH_Handle *mesh;
69
70   /**
71    * Channel handle for the root peer
72    */
73   struct GNUNET_MESH_Channel *ch;
74
75   /**
76    * Channel handle for the dest peer
77    */
78   struct GNUNET_MESH_Channel *incoming_ch;
79
80   /**
81    * Number of payload packes sent
82    */
83   int data_sent;
84
85   /**
86    * Number of payload packets received
87    */
88   int data_received;
89
90   struct MeshPeer *dest;
91   struct MeshPeer *incoming;
92   GNUNET_SCHEDULER_TaskIdentifier ping_task;
93   struct GNUNET_TIME_Absolute timestamp;
94 };
95
96 /**
97  * GNUNET_PeerIdentity -> MeshPeer
98  */
99 static struct GNUNET_CONTAINER_MultiPeerMap *ids;
100
101 /**
102  * Testbed peer handles.
103  */
104 static struct GNUNET_TESTBED_Peer **testbed_handles;
105
106 /**
107  * Testbed Operation (to get stats).
108  */
109 static struct GNUNET_TESTBED_Operation *stats_op;
110
111 /**
112  * How many events have happened
113  */
114 static int ok;
115
116 /**
117  * Number of events expected to conclude the test successfully.
118  */
119 static int ok_goal;
120
121 /**
122  * Size of each test packet
123  */
124 size_t size_payload = sizeof (struct GNUNET_MessageHeader) + sizeof (uint32_t);
125
126 /**
127  * Operation to get peer ids.
128  */
129 struct MeshPeer peers[TOTAL_PEERS];
130
131 /**
132  * Peer ids counter.
133  */
134 static unsigned int p_ids;
135
136 /**
137  * Total number of currently running peers.
138  */
139 static unsigned long long peers_running;
140
141 /**
142  * Test context (to shut down).
143  */
144 static struct GNUNET_MESH_TEST_Context *test_ctx;
145
146 /**
147  * Task called to shutdown test.
148  */
149 static GNUNET_SCHEDULER_TaskIdentifier shutdown_handle;
150
151 /**
152  * Task called to disconnect peers, before shutdown.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier disconnect_task;
155
156 /**
157  * Task to perform tests
158  */
159 static GNUNET_SCHEDULER_TaskIdentifier test_task;
160
161
162 /**
163  * Flag to notify callbacks not to generate any new traffic anymore.
164  */
165 static int test_finished;
166
167 /**
168  * Calculate a random delay.
169  *
170  * @param max Exclusive maximum, in ms.
171  *
172  * @return A time between 0 a max-1 ms.
173  */
174 static struct GNUNET_TIME_Relative
175 delay_ms_rnd (unsigned int max)
176 {
177   unsigned int rnd;
178
179   rnd = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max);
180   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, rnd);
181 }
182
183
184 /**
185  * Get the index of a peer in the peers array.
186  *
187  * @param peer Peer whose index to get.
188  *
189  * @return Index of peer in peers.
190  */
191 static unsigned int
192 get_index (struct MeshPeer *peer)
193 {
194   return peer - peers;
195 }
196
197
198 /**
199  * Show the results of the test (banwidth acheived) and log them to GAUGER
200  */
201 static void
202 show_end_data (void)
203 {
204 }
205
206
207 /**
208  * Shut down peergroup, clean up.
209  *
210  * @param cls Closure (unused).
211  * @param tc Task Context.
212  */
213 static void
214 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
215 {
216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n");
217   shutdown_handle = GNUNET_SCHEDULER_NO_TASK;
218 }
219
220
221 /**
222  * Disconnect from mesh services af all peers, call shutdown.
223  *
224  * @param cls Closure (unused).
225  * @param tc Task Context.
226  */
227 static void
228 disconnect_mesh_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
229 {
230   long line = (long) cls;
231   unsigned int i;
232
233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
234               "disconnecting mesh service of peers, called from line %ld\n",
235               line);
236   disconnect_task = GNUNET_SCHEDULER_NO_TASK;
237   for (i = 0; i < TOTAL_PEERS; i++)
238   {
239     GNUNET_TESTBED_operation_done (peers[i].op);
240     GNUNET_MESH_channel_destroy (peers[i].ch);
241     GNUNET_MESH_channel_destroy (peers[i].incoming_ch);
242   }
243   GNUNET_MESH_TEST_cleanup (test_ctx);
244   if (GNUNET_SCHEDULER_NO_TASK != shutdown_handle)
245   {
246     GNUNET_SCHEDULER_cancel (shutdown_handle);
247   }
248   shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
249 }
250
251
252 /**
253  * Finish test normally: schedule disconnect and shutdown
254  *
255  * @param line Line in the code the abort is requested from (__LINE__).
256  */
257 static void
258 abort_test (long line)
259 {
260   if (disconnect_task != GNUNET_SCHEDULER_NO_TASK)
261   {
262     GNUNET_SCHEDULER_cancel (disconnect_task);
263     disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_mesh_peers,
264                                                 (void *) line);
265   }
266 }
267
268 /**
269  * Stats callback. Finish the stats testbed operation and when all stats have
270  * been iterated, shutdown the test.
271  *
272  * @param cls closure
273  * @param op the operation that has been finished
274  * @param emsg error message in case the operation has failed; will be NULL if
275  *          operation has executed successfully.
276  */
277 static void
278 stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg)
279 {
280   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "... collecting statistics done.\n");
281   GNUNET_TESTBED_operation_done (stats_op);
282
283   if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
284     GNUNET_SCHEDULER_cancel (disconnect_task);
285   disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_mesh_peers,
286                                               (void *) __LINE__);
287
288 }
289
290
291 /**
292  * Process statistic values.
293  *
294  * @param cls closure
295  * @param peer the peer the statistic belong to
296  * @param subsystem name of subsystem that created the statistic
297  * @param name the name of the datum
298  * @param value the current value
299  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
300  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
301  */
302 static int
303 stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer,
304                 const char *subsystem, const char *name,
305                 uint64_t value, int is_persistent)
306 {
307   uint32_t i;
308
309   i = GNUNET_TESTBED_get_index (peer);
310   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  %u - %s [%s]: %llu\n",
311               i, subsystem, name, value);
312
313   return GNUNET_OK;
314 }
315
316
317 /**
318  * Task check that keepalives were sent and received.
319  *
320  * @param cls Closure (NULL).
321  * @param tc Task Context.
322  */
323 static void
324 collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
325 {
326   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
327     return;
328
329   disconnect_task = GNUNET_SCHEDULER_NO_TASK;
330   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start collecting statistics...\n");
331   stats_op = GNUNET_TESTBED_get_statistics (TOTAL_PEERS, testbed_handles,
332                                             NULL, NULL,
333                                             stats_iterator, stats_cont, NULL);
334 }
335
336
337 /**
338  * @brief Finish profiler normally.
339  *
340  * @param cls Closure (unused).
341  * @param tc Task context.
342  */
343 static void
344 finish_profiler (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
345 {
346   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
347     return;
348
349   test_finished = GNUNET_YES;
350   show_end_data();
351   GNUNET_SCHEDULER_add_now (&collect_stats, NULL);
352 }
353
354
355
356 /**
357  * Transmit ready callback.
358  *
359  * @param cls Closure (peer).
360  * @param size Size of the tranmist buffer.
361  * @param buf Pointer to the beginning of the buffer.
362  *
363  * @return Number of bytes written to buf.
364  */
365 static size_t
366 tmt_rdy (void *cls, size_t size, void *buf);
367
368
369 /**
370  * @brief Send data to destination
371  *
372  * @param cls Closure (peer).
373  * @param tc Task context.
374  */
375 static void
376 ping (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
377 {
378   struct MeshPeer *peer = (struct MeshPeer *) cls;
379
380   peer->ping_task = GNUNET_SCHEDULER_NO_TASK;
381   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
382     return;
383
384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u -> %u\n",
385               get_index (peer), get_index (peer->dest));
386
387   GNUNET_MESH_notify_transmit_ready (peer->ch, GNUNET_NO,
388                                      GNUNET_TIME_UNIT_FOREVER_REL,
389                                      size_payload, &tmt_rdy, peer);
390 }
391
392
393 /**
394  * Transmit ready callback
395  *
396  * @param cls Closure (peer).
397  * @param size Size of the buffer we have.
398  * @param buf Buffer to copy data to.
399  */
400 size_t
401 tmt_rdy (void *cls, size_t size, void *buf)
402 {
403   struct MeshPeer *peer = (struct MeshPeer *) cls;
404   struct GNUNET_MessageHeader *msg = buf;
405   uint32_t *data;
406   unsigned int s;
407
408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmt_rdy called, filling buffer\n");
409   if (size < size_payload || NULL == buf)
410   {
411     GNUNET_break (ok >= ok_goal - 2);
412     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413                 "size %u, buf %p, data_sent %u, data_received %u\n",
414                 size, buf, peer->data_sent, peer->data_received);
415     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ok %u, ok goal %u\n", ok, ok_goal);
416
417     return 0;
418   }
419   msg->size = htons (size);
420   msg->type = htons ((long) cls);
421   data = (uint32_t *) &msg[1];
422   *data = htonl (peer->data_sent);
423   if (0 == peer->data_sent)
424   {
425     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent: initializer\n");
426     s = 5;
427   }
428   else
429   {
430     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent: msg %d\n", peer->data_sent);
431     s = 60;
432   }
433   peer->data_sent++;
434   peer->timestamp = GNUNET_TIME_absolute_get ();
435   peer->ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (s * 1000),
436                                                   &ping, peer);
437
438   return size_payload;
439 }
440
441
442 /**
443  * Function is called whenever a message is received.
444  *
445  * @param cls closure (peer #, set from GNUNET_MESH_connect)
446  * @param channel connection to the other end
447  * @param channel_ctx place to store local state associated with the channel
448  * @param message the actual message
449  * @return GNUNET_OK to keep the connection open,
450  *         GNUNET_SYSERR to close it (signal serious error)
451  */
452 int
453 data_callback (void *cls, struct GNUNET_MESH_Channel *channel,
454                void **channel_ctx,
455                const struct GNUNET_MessageHeader *message)
456 {
457   long n = (long) cls;
458   struct MeshPeer *peer;
459   struct GNUNET_TIME_Relative latency;
460
461   GNUNET_MESH_receive_done (channel);
462   peer = &peers[n];
463
464   GNUNET_assert (0 != peer->timestamp.abs_value_us);
465   latency = GNUNET_TIME_absolute_get_duration (peer->incoming->timestamp);
466   FPRINTF (stderr, "%u -> %ld latency: %s\n",
467            get_index (peer->incoming), n,
468            GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_NO));
469   peer->timestamp.abs_value_us = 0;
470
471   return GNUNET_OK;
472 }
473
474
475 /**
476  * Handlers, for diverse services
477  */
478 static struct GNUNET_MESH_MessageHandler handlers[] = {
479   {&data_callback, 1, sizeof (struct GNUNET_MessageHeader)},
480   {NULL, 0, 0}
481 };
482
483
484 /**
485  * Method called whenever another peer has added us to a channel
486  * the other peer initiated.
487  *
488  * @param cls Closure.
489  * @param channel New handle to the channel.
490  * @param initiator Peer that started the channel.
491  * @param port Port this channel is connected to.
492  * @param options channel option flags
493  * @return Initial channel context for the channel
494  *         (can be NULL -- that's not an error).
495  */
496 static void *
497 incoming_channel (void *cls, struct GNUNET_MESH_Channel *channel,
498                  const struct GNUNET_PeerIdentity *initiator,
499                  uint32_t port, enum GNUNET_MESH_ChannelOption options)
500 {
501   long n = (long) cls;
502   struct MeshPeer *peer;
503
504   peer = GNUNET_CONTAINER_multipeermap_get (ids, initiator);
505   GNUNET_assert (NULL != peer);
506   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <= %u\n", n, get_index (peer));
507   peers[n].incoming_ch = channel;
508
509   if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
510   {
511     GNUNET_SCHEDULER_cancel (disconnect_task);
512     disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
513                                                     &disconnect_mesh_peers,
514                                                     (void *) __LINE__);
515   }
516
517   return NULL;
518 }
519
520 /**
521  * Function called whenever an inbound channel is destroyed.  Should clean up
522  * any associated state.
523  *
524  * @param cls closure (set from GNUNET_MESH_connect)
525  * @param channel connection to the other end (henceforth invalid)
526  * @param channel_ctx place where local state associated
527  *                   with the channel is stored
528  */
529 static void
530 channel_cleaner (void *cls, const struct GNUNET_MESH_Channel *channel,
531                  void *channel_ctx)
532 {
533   long n = (long) cls;
534
535   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
536               "Incoming channel disconnected at peer %ld\n", n);
537   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
538
539   if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
540   {
541     GNUNET_SCHEDULER_cancel (disconnect_task);
542     disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_mesh_peers,
543                                                 (void *) __LINE__);
544   }
545 }
546
547
548 /**
549  * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE MESH SERVICES.
550  *
551  * Testcase continues when the root receives confirmation of connected peers,
552  * on callback funtion ch.
553  *
554  * @param cls Closure (unsued).
555  * @param tc Task Context.
556  */
557 static void
558 do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
559 {
560   enum GNUNET_MESH_ChannelOption flags;
561   unsigned long i;
562
563   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
564     return;
565
566   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start profiler\n");
567
568   if (GNUNET_SCHEDULER_NO_TASK != disconnect_task)
569     GNUNET_SCHEDULER_cancel (disconnect_task);
570   disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
571                                                   &disconnect_mesh_peers,
572                                                   (void *) __LINE__);
573
574   flags = GNUNET_MESH_OPTION_DEFAULT;
575   for (i = 0; i < TOTAL_PEERS; i++)
576   {
577     unsigned int r;
578     r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, TOTAL_PEERS);
579     peers[i].dest = &peers[r];
580     peers[i].ch = GNUNET_MESH_channel_create (peers[i].mesh, NULL,
581                                               &peers[i].dest->id,
582                                               1, flags);
583     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u => %u\n", i, r);
584     peers[i].ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (2000),
585                                                        &ping, &peers[i]);
586   }
587 }
588
589
590 /**
591  * Callback to be called when the requested peer information is available
592  *
593  * @param cls the closure from GNUNET_TESTBED_peer_get_information()
594  * @param op the operation this callback corresponds to
595  * @param pinfo the result; will be NULL if the operation has failed
596  * @param emsg error message if the operation has failed;
597  *             NULL if the operation is successfull
598  */
599 static void
600 peer_id_cb (void *cls,
601        struct GNUNET_TESTBED_Operation *op,
602        const struct GNUNET_TESTBED_PeerInformation *pinfo,
603        const char *emsg)
604 {
605   long n = (long) cls;
606
607   if (NULL == pinfo || NULL != emsg)
608   {
609     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg);
610     abort_test (__LINE__);
611     return;
612   }
613   peers[n].id = *(pinfo->result.id);
614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %u  id: %s\n",
615               n, GNUNET_i2s (&peers[n].id));
616   GNUNET_break (GNUNET_OK ==
617                 GNUNET_CONTAINER_multipeermap_put (ids, &peers[n].id, &peers[n],
618                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
619   p_ids++;
620   if (p_ids < TOTAL_PEERS)
621     return;
622   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got all IDs, starting profiler\n");
623   test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
624                                             &do_test, NULL);
625 }
626
627 /**
628  * test main: start test when all peers are connected
629  *
630  * @param cls Closure.
631  * @param ctx Argument to give to GNUNET_MESH_TEST_cleanup on test end.
632  * @param num_peers Number of peers that are running.
633  * @param testbed_peers Array of peers.
634  * @param meshes Handle to each of the MESHs of the peers.
635  */
636 static void
637 tmain (void *cls,
638        struct GNUNET_MESH_TEST_Context *ctx,
639        unsigned int num_peers,
640        struct GNUNET_TESTBED_Peer **testbed_peers,
641        struct GNUNET_MESH_Handle **meshes)
642 {
643   unsigned long i;
644
645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n");
646   ok = 0;
647   test_ctx = ctx;
648   GNUNET_assert (TOTAL_PEERS == num_peers);
649   peers_running = num_peers;
650   testbed_handles = testbed_handles;
651   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
652                                 &finish_profiler, NULL);
653   disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
654                                                   &disconnect_mesh_peers,
655                                                   (void *) __LINE__);
656   shutdown_handle = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
657                                                   &shutdown_task, NULL);
658   for (i = 0; i < TOTAL_PEERS; i++)
659   {
660     peers[i].mesh = meshes[i];
661     peers[i].op =
662       GNUNET_TESTBED_peer_get_information (testbed_handles[i],
663                                            GNUNET_TESTBED_PIT_IDENTITY,
664                                            &peer_id_cb, (void *) i);
665   }
666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
667   /* Continues from pi_cb -> do_test */
668 }
669
670
671 /**
672  * Main: start profiler.
673  */
674 int
675 main (int argc, char *argv[])
676 {
677   static uint32_t ports[2];
678   const char *config_file;
679
680   config_file = "test_mesh.conf";
681
682   ids = GNUNET_CONTAINER_multipeermap_create (2 * TOTAL_PEERS, GNUNET_YES);
683   GNUNET_assert (NULL != ids);
684   p_ids = 0;
685   test_finished = GNUNET_NO;
686   ports[0] = 1;
687   ports[1] = 0;
688   GNUNET_MESH_TEST_run ("mesh_profiler", config_file, TOTAL_PEERS,
689                         &tmain, NULL, /* tmain cls */
690                         &incoming_channel, &channel_cleaner,
691                         handlers, ports);
692
693   if (ok_goal > ok)
694   {
695     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
696                 "FAILED! (%d/%d)\n", ok, ok_goal);
697     return 1;
698   }
699   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n");
700   return 0;
701 }
702
703 /* end of test_mesh_small.c */
704