glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / cadet / gnunet-cadet-profiler.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15 /**
16  * @file cadet/gnunet-cadet-profiler.c
17  *
18  * @brief Profiler for cadet experiments.
19  */
20 #include <stdio.h>
21 #include "platform.h"
22 #include "cadet_test_lib.h"
23 #include "gnunet_cadet_service.h"
24 #include "gnunet_statistics_service.h"
25
26
27 #define PING 1
28 #define PONG 2
29
30
31 /**
32  * Paximum ping period in milliseconds. Real period = rand (0, PING_PERIOD)
33  */
34 #define PING_PERIOD 500
35
36 /**
37  * How long until we give up on connecting the peers?
38  */
39 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
40
41 /**
42  * Time to wait for stuff that should be rather fast
43  */
44 #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300)
45
46 /**
47  * Total number of rounds.
48  */
49 #define number_rounds sizeof(rounds)/sizeof(rounds[0])
50
51 /**
52  * Ratio of peers active. First round always is 1.0.
53  */
54 static float rounds[] = {0.8, 0.6, 0.8, 0.5, 0.3, 0.8, 0.0};
55
56 /**
57  * Message type for pings.
58  */
59 struct CadetPingMessage
60 {
61   /**
62    * Header. Type PING/PONG.
63    */
64   struct GNUNET_MessageHeader header;
65
66   /**
67    * Message number.
68    */
69   uint32_t counter;
70
71   /**
72    * Time the message was sent.
73    */
74   struct GNUNET_TIME_AbsoluteNBO timestamp;
75
76   /**
77    * Round number.
78    */
79   uint32_t round_number;
80 };
81
82 /**
83  * Peer description.
84  */
85 struct CadetPeer
86 {
87   /**
88    * Testbed Operation (to get peer id, etc).
89    */
90   struct GNUNET_TESTBED_Operation *op;
91
92   /**
93    * Peer ID.
94    */
95   struct GNUNET_PeerIdentity id;
96
97   /**
98    * Cadet handle for the root peer
99    */
100   struct GNUNET_CADET_Handle *cadet;
101
102   /**
103    * Channel handle for the root peer
104    */
105   struct GNUNET_CADET_Channel *ch;
106
107   /**
108    * Channel handle for the dest peer
109    */
110   struct GNUNET_CADET_Channel *incoming_ch;
111
112   /**
113    * Channel handle for a warmup channel.
114    */
115   struct GNUNET_CADET_Channel *warmup_ch;
116
117   /**
118    * Number of payload packes sent
119    */
120   int data_sent;
121
122   /**
123    * Number of payload packets received
124    */
125   int data_received;
126
127   /**
128    * Is peer up?
129    */
130   int up;
131
132   /**
133    * Destinaton to ping.
134    */
135   struct CadetPeer *dest;
136
137   /**
138    * Incoming channel for pings.
139    */
140   struct CadetPeer *incoming;
141
142   /**
143    * Task to do the next ping.
144    */
145   struct GNUNET_SCHEDULER_Task *ping_task;
146
147   /**
148    * NTR operation for the next ping.
149    */
150   struct GNUNET_CADET_TransmitHandle *ping_ntr;
151
152   float mean[number_rounds];
153   float var[number_rounds];
154   unsigned int pongs[number_rounds];
155   unsigned int pings[number_rounds];
156
157 };
158
159 /**
160  * Duration of each round.
161  */
162 static struct GNUNET_TIME_Relative round_time;
163
164 /**
165  * GNUNET_PeerIdentity -> CadetPeer
166  */
167 static struct GNUNET_CONTAINER_MultiPeerMap *ids;
168
169 /**
170  * Testbed peer handles.
171  */
172 static struct GNUNET_TESTBED_Peer **testbed_handles;
173
174 /**
175  * Testbed Operation (to get stats).
176  */
177 static struct GNUNET_TESTBED_Operation *stats_op;
178
179 /**
180  * Operation to get peer ids.
181  */
182 static struct CadetPeer *peers;
183
184 /**
185  * Peer ids counter.
186  */
187 static unsigned int p_ids;
188
189 /**
190  * Total number of peers.
191  */
192 static unsigned long long peers_total;
193
194 /**
195  * Number of currently running peers.
196  */
197 static unsigned long long peers_running;
198
199 /**
200  * Number of peers doing pings.
201  */
202 static unsigned long long peers_pinging;
203
204 /**
205  * Test context (to shut down).
206  */
207 static struct GNUNET_CADET_TEST_Context *test_ctx;
208
209 /**
210  * Task called to disconnect peers, before shutdown.
211  */
212 static struct GNUNET_SCHEDULER_Task *disconnect_task;
213
214 /**
215  * Task to perform tests
216  */
217 static struct GNUNET_SCHEDULER_Task *test_task;
218
219 /**
220  * Round number.
221  */
222 static unsigned int current_round;
223
224 /**
225  * Do preconnect? (Each peer creates a tunnel to one other peer).
226  */
227 static int do_warmup;
228
229 /**
230  * Warmup progress.
231  */
232 static unsigned int peers_warmup;
233
234 /**
235  * Flag to notify callbacks not to generate any new traffic anymore.
236  */
237 static int test_finished;
238
239 /**
240  * Task running each round of the benchmark.
241  */
242 static struct GNUNET_SCHEDULER_Task *round_task;
243
244
245 /**
246  * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
247  *
248  * Testcase continues when the root receives confirmation of connected peers,
249  * on callback funtion ch.
250  *
251  * @param cls Closure (unsued).
252  */
253 static void
254 start_test (void *cls);
255
256
257 /**
258  * Calculate a random delay.
259  *
260  * @param max Exclusive maximum, in ms.
261  *
262  * @return A time between 0 a max-1 ms.
263  */
264 static struct GNUNET_TIME_Relative
265 delay_ms_rnd (unsigned int max)
266 {
267   unsigned int rnd;
268
269   rnd = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max);
270   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, rnd);
271 }
272
273
274 /**
275  * Get the index of a peer in the peers array.
276  *
277  * @param peer Peer whose index to get.
278  *
279  * @return Index of peer in peers.
280  */
281 static unsigned int
282 get_index (struct CadetPeer *peer)
283 {
284   return peer - peers;
285 }
286
287
288 /**
289  * Show the results of the test (banwidth acheived) and log them to GAUGER
290  */
291 static void
292 show_end_data (void)
293 {
294   struct CadetPeer *peer;
295   unsigned int i;
296   unsigned int j;
297
298   for (i = 0; i < number_rounds; i++)
299   {
300     for (j = 0; j < peers_pinging; j++)
301     {
302       peer = &peers[j];
303       FPRINTF (stdout,
304                "ROUND %3u PEER %3u: %10.2f / %10.2f, PINGS: %3u, PONGS: %3u\n",
305                i, j, peer->mean[i], sqrt (peer->var[i] / (peer->pongs[i] - 1)),
306                peer->pings[i], peer->pongs[i]);
307     }
308   }
309 }
310
311
312 /**
313  * Disconnect from cadet services af all peers, call shutdown.
314  *
315  * @param cls Closure (unused).
316  */
317 static void
318 disconnect_cadet_peers (void *cls)
319 {
320   long line = (long) cls;
321   unsigned int i;
322
323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324               "disconnecting cadet service, called from line %ld\n",
325               line);
326   disconnect_task = NULL;
327   for (i = 0; i < peers_total; i++)
328   {
329     if (NULL != peers[i].op)
330       GNUNET_TESTBED_operation_done (peers[i].op);
331
332     if (peers[i].up != GNUNET_YES)
333       continue;
334
335     if (NULL != peers[i].ch)
336     {
337       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
338                   "%u: channel %p\n", i, peers[i].ch);
339       GNUNET_CADET_channel_destroy (peers[i].ch);
340     }
341     if (NULL != peers[i].warmup_ch)
342     {
343       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
344                   "%u: warmup channel %p\n",
345                   i, peers[i].warmup_ch);
346       GNUNET_CADET_channel_destroy (peers[i].warmup_ch);
347     }
348     if (NULL != peers[i].incoming_ch)
349     {
350       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
351                   "%u: incoming channel %p\n",
352                   i, peers[i].incoming_ch);
353       GNUNET_CADET_channel_destroy (peers[i].incoming_ch);
354     }
355   }
356   GNUNET_CADET_TEST_cleanup (test_ctx);
357   GNUNET_SCHEDULER_shutdown ();
358 }
359
360
361 /**
362  * Shut down peergroup, clean up.
363  *
364  * @param cls Closure (unused).
365  */
366 static void
367 shutdown_task (void *cls)
368 {
369   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
370               "Ending test.\n");
371   if (NULL != disconnect_task)
372   {
373     GNUNET_SCHEDULER_cancel (disconnect_task);
374     disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
375                                                 (void *) __LINE__);
376   }
377   if (NULL != round_task)
378   {
379     GNUNET_SCHEDULER_cancel (round_task);
380     round_task = NULL;
381   }
382   if (NULL != test_task)
383   {
384     GNUNET_SCHEDULER_cancel (test_task);
385     test_task = NULL;
386   }
387 }
388
389
390 /**
391  * Finish test normally: schedule disconnect and shutdown
392  *
393  * @param line Line in the code the abort is requested from (__LINE__).
394  */
395 static void
396 abort_test (long line)
397 {
398   if (disconnect_task != NULL)
399   {
400     GNUNET_SCHEDULER_cancel (disconnect_task);
401     disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
402                                                 (void *) line);
403   }
404 }
405
406 /**
407  * Stats callback. Finish the stats testbed operation and when all stats have
408  * been iterated, shutdown the test.
409  *
410  * @param cls closure
411  * @param op the operation that has been finished
412  * @param emsg error message in case the operation has failed; will be NULL if
413  *          operation has executed successfully.
414  */
415 static void
416 stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg)
417 {
418   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "... collecting statistics done.\n");
419   GNUNET_TESTBED_operation_done (stats_op);
420
421   if (NULL != disconnect_task)
422     GNUNET_SCHEDULER_cancel (disconnect_task);
423   disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
424                                               (void *) __LINE__);
425
426 }
427
428
429 /**
430  * Process statistic values.
431  *
432  * @param cls closure
433  * @param peer the peer the statistic belong to
434  * @param subsystem name of subsystem that created the statistic
435  * @param name the name of the datum
436  * @param value the current value
437  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
438  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
439  */
440 static int
441 stats_iterator (void *cls,
442                 const struct GNUNET_TESTBED_Peer *peer,
443                 const char *subsystem,
444                 const char *name,
445                 uint64_t value,
446                 int is_persistent)
447 {
448   uint32_t i;
449
450   i = GNUNET_TESTBED_get_index (peer);
451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452               " STATS %u - %s [%s]: %llu\n",
453               i, subsystem, name,
454               (unsigned long long) value);
455
456   return GNUNET_OK;
457 }
458
459
460 /**
461  * Task check that keepalives were sent and received.
462  *
463  * @param cls Closure (NULL).
464  */
465 static void
466 collect_stats (void *cls)
467 {
468   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
469               "Start collecting statistics...\n");
470   stats_op = GNUNET_TESTBED_get_statistics (peers_total,
471                                             testbed_handles,
472                                             NULL, NULL,
473                                             &stats_iterator,
474                                             &stats_cont, NULL);
475 }
476
477
478 /**
479  * @brief Finish profiler normally. Signal finish and start collecting stats.
480  *
481  * @param cls Closure (unused).
482  */
483 static void
484 finish_profiler (void *cls)
485 {
486   test_finished = GNUNET_YES;
487   show_end_data ();
488   GNUNET_SCHEDULER_add_now (&collect_stats, NULL);
489 }
490
491
492 /**
493  * Set the total number of running peers.
494  *
495  * @param target Desired number of running peers.
496  */
497 static void
498 adjust_running_peers (unsigned int target)
499 {
500   struct GNUNET_TESTBED_Operation *op;
501   unsigned int delta;
502   unsigned int run;
503   unsigned int i;
504   unsigned int r;
505
506   GNUNET_assert (target <= peers_total);
507
508   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adjust peers to %u\n", target);
509   if (target > peers_running)
510   {
511     delta = target - peers_running;
512     run = GNUNET_YES;
513   }
514   else
515   {
516     delta = peers_running - target;
517     run = GNUNET_NO;
518   }
519
520   for (i = 0; i < delta; i++)
521   {
522     do {
523       r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
524                                     peers_total - peers_pinging);
525       r += peers_pinging;
526     } while (peers[r].up == run || NULL != peers[r].incoming);
527     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "St%s peer %u: %s\n",
528                 run ? "arting" : "opping", r, GNUNET_i2s (&peers[r].id));
529
530     if (NULL != peers[r].ping_task)
531     {
532       GNUNET_SCHEDULER_cancel (peers[r].ping_task);
533       peers[r].ping_task = NULL;
534     }
535     if (NULL != peers[r].ping_ntr)
536     {
537       GNUNET_CADET_notify_transmit_ready_cancel (peers[r].ping_ntr);
538       peers[r].ping_ntr = NULL;
539     }
540     peers[r].up = run;
541
542     if (NULL != peers[r].ch)
543       GNUNET_CADET_channel_destroy (peers[r].ch);
544     peers[r].ch = NULL;
545     if (NULL != peers[r].dest)
546     {
547       if (NULL != peers[r].dest->incoming_ch)
548         GNUNET_CADET_channel_destroy (peers[r].dest->incoming_ch);
549       peers[r].dest->incoming_ch = NULL;
550     }
551
552     op = GNUNET_TESTBED_peer_manage_service (&peers[r], testbed_handles[r],
553                                              "cadet", NULL, NULL, run);
554     GNUNET_break (NULL != op);
555     peers_running += run ? 1 : -1;
556     GNUNET_assert (peers_running > 0);
557   }
558 }
559
560
561 /**
562  * @brief Move to next round.
563  *
564  * @param cls Closure (round #).
565  */
566 static void
567 next_rnd (void *cls)
568 {
569   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
570               "ROUND %u\n",
571               current_round);
572   if (0.0 == rounds[current_round])
573   {
574     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Finishing\n");
575     GNUNET_SCHEDULER_add_now (&finish_profiler, NULL);
576     return;
577   }
578   adjust_running_peers (rounds[current_round] * peers_total);
579   current_round++;
580
581   round_task = GNUNET_SCHEDULER_add_delayed (round_time,
582                                              &next_rnd,
583                                              NULL);
584 }
585
586
587 /**
588  * Transmit ping callback.
589  *
590  * @param cls Closure (peer for PING, NULL for PONG).
591  * @param size Size of the tranmist buffer.
592  * @param buf Pointer to the beginning of the buffer.
593  *
594  * @return Number of bytes written to buf.
595  */
596 static size_t
597 tmt_rdy_ping (void *cls, size_t size, void *buf);
598
599
600 /**
601  * Transmit pong callback.
602  *
603  * @param cls Closure (copy of PING message, to be freed).
604  * @param size Size of the buffer we have.
605  * @param buf Buffer to copy data to.
606  */
607 static size_t
608 tmt_rdy_pong (void *cls, size_t size, void *buf)
609 {
610   struct CadetPingMessage *ping = cls;
611   struct CadetPingMessage *pong;
612
613   if (0 == size || NULL == buf)
614   {
615     GNUNET_free (ping);
616     return 0;
617   }
618   pong = (struct CadetPingMessage *) buf;
619   GNUNET_memcpy (pong, ping, sizeof (*ping));
620   pong->header.type = htons (PONG);
621
622   GNUNET_free (ping);
623   return sizeof (*ping);
624 }
625
626
627 /**
628  * @brief Send a ping to destination
629  *
630  * @param cls Closure (peer).
631  */
632 static void
633 ping (void *cls)
634 {
635   struct CadetPeer *peer = cls;
636
637   peer->ping_task = NULL;
638   if (GNUNET_YES == test_finished)
639     return;
640   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
641               "%u -> %u (%u)\n",
642               get_index (peer),
643               get_index (peer->dest),
644               peer->data_sent);
645   peer->ping_ntr = GNUNET_CADET_notify_transmit_ready (peer->ch, GNUNET_NO,
646                                                        GNUNET_TIME_UNIT_FOREVER_REL,
647                                                        sizeof (struct CadetPingMessage),
648                                                        &tmt_rdy_ping, peer);
649 }
650
651 /**
652  * @brief Reply with a pong to origin.
653  *
654  * @param cls Closure (peer).
655  * @param tc Task context.
656  */
657 static void
658 pong (struct GNUNET_CADET_Channel *channel,
659       const struct CadetPingMessage *ping)
660 {
661   struct CadetPingMessage *copy;
662
663   copy = GNUNET_new (struct CadetPingMessage);
664   *copy = *ping;
665   GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
666                                      GNUNET_TIME_UNIT_FOREVER_REL,
667                                      sizeof (struct CadetPingMessage),
668                                      &tmt_rdy_pong, copy);
669 }
670
671
672 /**
673  * Transmit ping callback
674  *
675  * @param cls Closure (peer).
676  * @param size Size of the buffer we have.
677  * @param buf Buffer to copy data to.
678  */
679 static size_t
680 tmt_rdy_ping (void *cls, size_t size, void *buf)
681 {
682   struct CadetPeer *peer = cls;
683   struct CadetPingMessage *msg = buf;
684
685   peer->ping_ntr = NULL;
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687               "tmt_rdy called, filling buffer\n");
688   if (size < sizeof (struct CadetPingMessage) || NULL == buf)
689   {
690     GNUNET_break (GNUNET_YES == test_finished);
691     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
692                 "size %u, buf %p, data_sent %u, data_received %u\n",
693                 (unsigned int) size,
694                 buf,
695                 peer->data_sent,
696                 peer->data_received);
697
698     return 0;
699   }
700   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
701               "Sending: msg %d\n",
702               peer->data_sent);
703   msg->header.size = htons (size);
704   msg->header.type = htons (PING);
705   msg->counter = htonl (peer->data_sent++);
706   msg->round_number = htonl (current_round);
707   msg->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
708   peer->pings[current_round]++;
709   peer->ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (PING_PERIOD),
710                                                   &ping, peer);
711
712   return sizeof (struct CadetPingMessage);
713 }
714
715
716 /**
717  * Function is called whenever a PING message is received.
718  *
719  * @param cls closure (peer #, set from GNUNET_CADET_connect)
720  * @param channel connection to the other end
721  * @param channel_ctx place to store local state associated with the channel
722  * @param message the actual message
723  * @return GNUNET_OK to keep the connection open,
724  *         GNUNET_SYSERR to close it (signal serious error)
725  */
726 int
727 ping_handler (void *cls, struct GNUNET_CADET_Channel *channel,
728               void **channel_ctx,
729               const struct GNUNET_MessageHeader *message)
730 {
731   long n = (long) cls;
732
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "%u got PING\n",
735               (unsigned int) n);
736   GNUNET_CADET_receive_done (channel);
737   if (GNUNET_NO == test_finished)
738     pong (channel, (struct CadetPingMessage *) message);
739
740   return GNUNET_OK;
741 }
742
743
744 /**
745  * Function is called whenever a PONG message is received.
746  *
747  * @param cls closure (peer #, set from GNUNET_CADET_connect)
748  * @param channel connection to the other end
749  * @param channel_ctx place to store local state associated with the channel
750  * @param message the actual message
751  * @return GNUNET_OK to keep the connection open,
752  *         GNUNET_SYSERR to close it (signal serious error)
753  */
754 int
755 pong_handler (void *cls, struct GNUNET_CADET_Channel *channel,
756               void **channel_ctx,
757               const struct GNUNET_MessageHeader *message)
758 {
759   long n = (long) cls;
760   struct CadetPeer *peer;
761   struct CadetPingMessage *msg;
762   struct GNUNET_TIME_Absolute send_time;
763   struct GNUNET_TIME_Relative latency;
764   unsigned int r /* Ping round */;
765   float delta;
766
767   GNUNET_CADET_receive_done (channel);
768   peer = &peers[n];
769
770   msg = (struct CadetPingMessage *) message;
771
772   send_time = GNUNET_TIME_absolute_ntoh (msg->timestamp);
773   latency = GNUNET_TIME_absolute_get_duration (send_time);
774   r = ntohl (msg->round_number);
775   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <- %u (%u) latency: %s\n",
776               get_index (peer),
777               get_index (peer->dest),
778               (uint32_t) ntohl (msg->counter),
779               GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_NO));
780
781   /* Online variance calculation */
782   peer->pongs[r]++;
783   delta = latency.rel_value_us - peer->mean[r];
784   peer->mean[r] = peer->mean[r] + delta/peer->pongs[r];
785   peer->var[r] += delta * (latency.rel_value_us - peer->mean[r]);
786
787   return GNUNET_OK;
788 }
789
790
791 /**
792  * Handlers, for diverse services
793  */
794 static struct GNUNET_CADET_MessageHandler handlers[] = {
795   {&ping_handler, PING, sizeof (struct CadetPingMessage)},
796   {&pong_handler, PONG, sizeof (struct CadetPingMessage)},
797   {NULL, 0, 0}
798 };
799
800
801 /**
802  * Method called whenever another peer has added us to a channel
803  * the other peer initiated.
804  *
805  * @param cls Closure.
806  * @param channel New handle to the channel.
807  * @param initiator Peer that started the channel.
808  * @param port Port this channel is connected to.
809  * @param options channel option flags
810  * @return Initial channel context for the channel
811  *         (can be NULL -- that's not an error).
812  */
813 static void *
814 incoming_channel (void *cls, struct GNUNET_CADET_Channel *channel,
815                  const struct GNUNET_PeerIdentity *initiator,
816                  const struct GNUNET_HashCode *port,
817                   enum GNUNET_CADET_ChannelOption options)
818 {
819   long n = (long) cls;
820   struct CadetPeer *peer;
821
822   peer = GNUNET_CONTAINER_multipeermap_get (ids, initiator);
823   GNUNET_assert (NULL != peer);
824   if (NULL == peers[n].incoming)
825   {
826     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
827                 "WARMUP %3u: %u <= %u\n",
828                 peers_warmup,
829                 (unsigned int) n,
830                 get_index (peer));
831     peers_warmup++;
832     if (peers_warmup < peers_total)
833       return NULL;
834     if (NULL != test_task)
835     {
836       GNUNET_SCHEDULER_cancel (test_task);
837       test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
838                                                 &start_test, NULL);
839     }
840     return NULL;
841   }
842   GNUNET_assert (peer == peers[n].incoming);
843   GNUNET_assert (peer->dest == &peers[n]);
844   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
845               "%u <= %u %p\n",
846               (unsigned int) n,
847               get_index (peer),
848               channel);
849   peers[n].incoming_ch = channel;
850
851   return NULL;
852 }
853
854 /**
855  * Function called whenever an inbound channel is destroyed.  Should clean up
856  * any associated state.
857  *
858  * @param cls closure (set from GNUNET_CADET_connect)
859  * @param channel connection to the other end (henceforth invalid)
860  * @param channel_ctx place where local state associated
861  *                   with the channel is stored
862  */
863 static void
864 channel_cleaner (void *cls,
865                  const struct GNUNET_CADET_Channel *channel,
866                  void *channel_ctx)
867 {
868   long n = (long) cls;
869   struct CadetPeer *peer = &peers[n];
870
871   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
872               "Channel %p disconnected at peer %ld\n", channel, n);
873   if (peer->ch == channel)
874     peer->ch = NULL;
875 }
876
877
878 /**
879  * Select a random peer that has no incoming channel
880  *
881  * @param peer ID of the peer connecting. NULL if irrelevant (warmup).
882  *
883  * @return Random peer not yet connected to.
884  */
885 static struct CadetPeer *
886 select_random_peer (struct CadetPeer *peer)
887 {
888   unsigned int r;
889
890   do
891   {
892     r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, peers_total);
893   } while (NULL != peers[r].incoming);
894   peers[r].incoming = peer;
895
896   return &peers[r];
897 }
898
899 /**
900  * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
901  *
902  * Testcase continues when the root receives confirmation of connected peers,
903  * on callback funtion ch.
904  *
905  * @param cls Closure (unsued).
906  */
907 static void
908 start_test (void *cls)
909 {
910   enum GNUNET_CADET_ChannelOption flags;
911   unsigned long i;
912
913   test_task = NULL;
914   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start profiler\n");
915
916   flags = GNUNET_CADET_OPTION_DEFAULT;
917   for (i = 0; i < peers_pinging; i++)
918   {
919     peers[i].dest = select_random_peer (&peers[i]);
920     peers[i].ch = GNUNET_CADET_channel_create (peers[i].cadet, NULL,
921                                                &peers[i].dest->id,
922                                                GC_u2h (1), flags);
923     if (NULL == peers[i].ch)
924     {
925       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Channel %lu failed\n", i);
926       GNUNET_CADET_TEST_cleanup (test_ctx);
927       return;
928     }
929     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
930                 "%lu => %u %p\n",
931                 i,
932                 get_index (peers[i].dest),
933                 peers[i].ch);
934     peers[i].ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (2000),
935                                                        &ping, &peers[i]);
936   }
937   peers_running = peers_total;
938   if (NULL != disconnect_task)
939     GNUNET_SCHEDULER_cancel (disconnect_task);
940   disconnect_task =
941     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(round_time,
942                                                                 number_rounds + 1),
943                                   &disconnect_cadet_peers,
944                                   (void *) __LINE__);
945   round_task = GNUNET_SCHEDULER_add_delayed (round_time,
946                                              &next_rnd,
947                                              NULL);
948 }
949
950
951 /**
952  * Do warmup: create some channels to spread information about the topology.
953  */
954 static void
955 warmup (void)
956 {
957   struct CadetPeer *peer;
958   unsigned int i;
959
960   for (i = 0; i < peers_total; i++)
961   {
962     peer = select_random_peer (NULL);
963     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "WARMUP %u => %u\n",
964                 i, get_index (peer));
965     peers[i].warmup_ch =
966       GNUNET_CADET_channel_create (peers[i].cadet, NULL, &peer->id,
967                                   GC_u2h (1), GNUNET_CADET_OPTION_DEFAULT);
968     if (NULL == peers[i].warmup_ch)
969     {
970       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Warmup %u failed\n", i);
971       GNUNET_CADET_TEST_cleanup (test_ctx);
972       return;
973     }
974   }
975 }
976
977
978 /**
979  * Callback to be called when the requested peer information is available
980  *
981  * @param cls the closure from GNUNET_TESTBED_peer_get_information()
982  * @param op the operation this callback corresponds to
983  * @param pinfo the result; will be NULL if the operation has failed
984  * @param emsg error message if the operation has failed;
985  *             NULL if the operation is successfull
986  */
987 static void
988 peer_id_cb (void *cls,
989             struct GNUNET_TESTBED_Operation *op,
990             const struct GNUNET_TESTBED_PeerInformation *pinfo,
991             const char *emsg)
992 {
993   long n = (long) cls;
994
995   if (NULL == pinfo || NULL != emsg)
996   {
997     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg);
998     abort_test (__LINE__);
999     return;
1000   }
1001   peers[n].id = *(pinfo->result.id);
1002   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003               "%ld  id: %s\n",
1004               n,
1005               GNUNET_i2s (&peers[n].id));
1006   GNUNET_break (GNUNET_OK ==
1007                 GNUNET_CONTAINER_multipeermap_put (ids, &peers[n].id, &peers[n],
1008                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1009
1010   GNUNET_TESTBED_operation_done (peers[n].op);
1011   peers[n].op = NULL;
1012
1013   p_ids++;
1014   if (p_ids < peers_total)
1015     return;
1016   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got all IDs, starting profiler\n");
1017   if (do_warmup)
1018   {
1019     struct GNUNET_TIME_Relative delay;
1020
1021     warmup();
1022     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1023                                            100 * peers_total);
1024     test_task = GNUNET_SCHEDULER_add_delayed (delay, &start_test, NULL);
1025     return; /* start_test from incoming_channel */
1026   }
1027   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting in a second...\n");
1028   test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1029                                             &start_test, NULL);
1030 }
1031
1032
1033 /**
1034  * test main: start test when all peers are connected
1035  *
1036  * @param cls Closure.
1037  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
1038  * @param num_peers Number of peers that are running.
1039  * @param testbed_peers Array of peers.
1040  * @param cadetes Handle to each of the CADETs of the peers.
1041  */
1042 static void
1043 tmain (void *cls,
1044        struct GNUNET_CADET_TEST_Context *ctx,
1045        unsigned int num_peers,
1046        struct GNUNET_TESTBED_Peer **testbed_peers,
1047        struct GNUNET_CADET_Handle **cadetes)
1048 {
1049   unsigned long i;
1050
1051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1052               "test main\n");
1053   test_ctx = ctx;
1054   GNUNET_assert (peers_total == num_peers);
1055   peers_running = num_peers;
1056   testbed_handles = testbed_peers;
1057   disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
1058                                                   &disconnect_cadet_peers,
1059                                                   (void *) __LINE__);
1060   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1061   for (i = 0; i < peers_total; i++)
1062   {
1063     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                 "requesting id %ld\n",
1065                 i);
1066     peers[i].up = GNUNET_YES;
1067     peers[i].cadet = cadetes[i];
1068     peers[i].op =
1069       GNUNET_TESTBED_peer_get_information (testbed_handles[i],
1070                                            GNUNET_TESTBED_PIT_IDENTITY,
1071                                            &peer_id_cb, (void *) i);
1072   }
1073   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "requested peer ids\n");
1074   /* Continues from pi_cb -> do_test */
1075 }
1076
1077
1078 /**
1079  * Main: start profiler.
1080  */
1081 int
1082 main (int argc, char *argv[])
1083 {
1084   static const struct GNUNET_HashCode *ports[2];
1085   const char *config_file;
1086
1087   config_file = ".profiler.conf";
1088
1089   if (4 > argc)
1090   {
1091     fprintf (stderr,
1092              "usage: %s ROUND_TIME PEERS PINGS [DO_WARMUP]\n",
1093              argv[0]);
1094     fprintf (stderr,
1095              "example: %s 30s 16 1 Y\n",
1096              argv[0]);
1097     return 1;
1098   }
1099
1100   if (GNUNET_OK !=
1101       GNUNET_STRINGS_fancy_time_to_relative (argv[1],
1102                                              &round_time))
1103   {
1104     fprintf (stderr,
1105              "%s is not a valid time\n",
1106              argv[1]);
1107     return 1;
1108   }
1109
1110   peers_total = atoll (argv[2]);
1111   if (2 > peers_total)
1112   {
1113     fprintf (stderr,
1114              "%s peers is not valid (> 2)\n",
1115              argv[1]);
1116     return 1;
1117   }
1118   peers = GNUNET_new_array (peers_total,
1119                             struct CadetPeer);
1120   peers_pinging = atoll (argv[3]);
1121
1122   if (peers_total < 2 * peers_pinging)
1123   {
1124     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1125                 "not enough peers, total should be > 2 * peers_pinging\n");
1126     return 1;
1127   }
1128
1129   do_warmup = (5 > argc || argv[4][0] != 'N');
1130
1131   ids = GNUNET_CONTAINER_multipeermap_create (2 * peers_total,
1132                                               GNUNET_YES);
1133   GNUNET_assert (NULL != ids);
1134   p_ids = 0;
1135   test_finished = GNUNET_NO;
1136   ports[0] = GC_u2h (1);
1137   ports[1] = 0;
1138   GNUNET_CADET_TEST_run ("cadet-profiler", config_file, peers_total,
1139                         &tmain, NULL, /* tmain cls */
1140                         &incoming_channel, &channel_cleaner,
1141                         handlers, ports);
1142   GNUNET_free (peers);
1143
1144   return 0;
1145 }
1146
1147 /* end of gnunet-cadet-profiler.c */