changes to traffic generation
[oweals/gnunet.git] / src / ats-tests / ats-testing.c
1 /*
2  This file is part of GNUnet.
3  (C) 2010-2013 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file ats-tests/ats-testing.c
22  * @brief ats testing library: setup topology and provide logging to test ats
23  * solvers
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "ats-testing.h"
28
29 /**
30  * Overall state of the performance benchmark
31  */
32 struct BenchmarkState
33 {
34   /**
35    * Are we connected to ATS service of all peers: GNUNET_YES/NO
36    */
37   int connected_ATS_service;
38
39   /**
40    * Are we connected to CORE service of all peers: GNUNET_YES/NO
41    */
42   int connected_COMM_service;
43
44   /**
45    * Are we connected to all peers: GNUNET_YES/NO
46    */
47   int connected_PEERS;
48
49   /**
50    * Are we connected to all slave peers on CORE level: GNUNET_YES/NO
51    */
52   int connected_CORE;
53
54   /**
55    * Are we connected to CORE service of all peers: GNUNET_YES/NO
56    */
57   int benchmarking;
58 };
59
60
61
62
63 /**
64  * Connect peers with testbed
65  */
66 struct TestbedConnectOperation
67 {
68   /**
69    * The benchmarking master initiating this connection
70    */
71   struct BenchmarkPeer *master;
72
73   /**
74    * The benchmarking slave to connect to
75    */
76   struct BenchmarkPeer *slave;
77
78   /**
79    * Testbed operation to connect peers
80    */
81   struct GNUNET_TESTBED_Operation *connect_op;
82 };
83
84 struct GNUNET_ATS_TEST_Topology
85 {
86   /**
87    * Shutdown task
88    */
89   GNUNET_SCHEDULER_TaskIdentifier shutdown_task;
90
91   /**
92    * Progress task
93    */
94   GNUNET_SCHEDULER_TaskIdentifier progress_task;
95
96   /**
97    * Test result
98    */
99   int result;
100
101   /**
102    * Test result logging
103    */
104   int logging;
105
106   /**Test core (GNUNET_YES) or transport (GNUNET_NO)
107    */
108   int test_core;
109
110   /**
111    * Solver string
112    */
113   char *solver;
114
115   /**
116    * Preference string
117    */
118   char *testname;
119
120   /**
121    * Preference string
122    */
123   char *pref_str;
124
125   /**
126    * ATS preference value
127    */
128   int pref_val;
129
130   /**
131    * Number master peers
132    */
133   unsigned int num_masters;
134
135   /**
136    * Array of master peers
137    */
138   struct BenchmarkPeer *mps;
139
140   /**
141    * Number slave peers
142    */
143   unsigned int num_slaves;
144
145   /**
146    * Array of slave peers
147    */
148   struct BenchmarkPeer *sps;
149
150   /**
151    * Benchmark duration
152    */
153   struct GNUNET_TIME_Relative perf_duration;
154
155   /**
156    * Logging frequency
157    */
158   struct GNUNET_TIME_Relative log_frequency;
159
160   /**
161    * Benchmark state
162    */
163   struct BenchmarkState state;
164
165   struct GNUNET_CORE_MessageHandler *handlers;
166
167   GNUNET_TRANSPORT_ReceiveCallback transport_recv_cb;
168
169   GNUNET_ATS_TESTING_TopologySetupDoneCallback done_cb;
170   GNUNET_ATS_AddressInformationCallback ats_perf_cb;
171   void *done_cb_cls;
172 };
173
174
175
176 static struct GNUNET_ATS_TEST_Topology *top;
177
178 struct TrafficGenerator *tg_head;
179 struct TrafficGenerator *tg_tail;
180
181 struct TrafficGenerator
182 {
183   struct TrafficGenerator *prev;
184   struct TrafficGenerator *next;
185
186   struct BenchmarkPeer *src;
187   struct BenchmarkPartner *dest;
188   unsigned int rate;
189   GNUNET_SCHEDULER_Task next_send;
190   struct GNUNET_TIME_Absolute last_sent;
191   struct GNUNET_TIME_Relative delta;
192 };
193
194 /**
195  * Shutdown nicely
196  *
197  * @param cls NULL
198  * @param tc the task context
199  */
200 static void
201 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
202 {
203   int c_m;
204   int c_s;
205   int c_op;
206   struct BenchmarkPeer *p;
207   struct TrafficGenerator *cur;
208   struct TrafficGenerator *next;
209
210   if (GNUNET_YES == top->logging)
211     GNUNET_ATS_TEST_logging_stop ();
212
213   top->shutdown_task = GNUNET_SCHEDULER_NO_TASK;
214
215   top->state.benchmarking = GNUNET_NO;
216
217   next = tg_head;
218   for (cur = next; NULL != cur; cur = next)
219   {
220       next = cur->next;
221       GNUNET_ATS_TEST_generate_traffic_stop(cur);
222   }
223
224   GNUNET_log(GNUNET_ERROR_TYPE_INFO, _("Benchmarking done\n"));
225
226   for (c_m = 0; c_m < top->num_masters; c_m++)
227   {
228     p = &top->mps[c_m];
229     if (NULL != top->mps[c_m].peer_id_op)
230     {
231       GNUNET_TESTBED_operation_done (p->peer_id_op);
232       p->peer_id_op = NULL;
233     }
234
235     if (GNUNET_SCHEDULER_NO_TASK != p->ats_task)
236       GNUNET_SCHEDULER_cancel (p->ats_task);
237     p->ats_task = GNUNET_SCHEDULER_NO_TASK;
238
239     for (c_op = 0; c_op < p->num_partners; c_op++)
240     {
241       if (NULL != p->partners[c_op].cth)
242       {
243         GNUNET_CORE_notify_transmit_ready_cancel (p->partners[c_op].cth);
244         p->partners[c_op].cth = NULL;
245       }
246       if (NULL != p->partners[c_op].tth)
247       {
248         GNUNET_TRANSPORT_notify_transmit_ready_cancel (p->partners[c_op].tth);
249         p->partners[c_op].tth = NULL;
250       }
251       if ( (NULL != p->core_connect_ops) &&
252            (NULL != p->core_connect_ops[c_op].connect_op) )
253       {
254         GNUNET_log(GNUNET_ERROR_TYPE_INFO,
255             _("Failed to connect peer 0 and %u\n"), c_op);
256         GNUNET_TESTBED_operation_done (
257             p->core_connect_ops[c_op].connect_op);
258         p->core_connect_ops[c_op].connect_op = NULL;
259       }
260     }
261
262     if (NULL != p->ats_perf_op)
263     {
264       GNUNET_TESTBED_operation_done (p->ats_perf_op);
265       p->ats_perf_op = NULL;
266     }
267
268     if (NULL != p->comm_op)
269     {
270       GNUNET_TESTBED_operation_done (p->comm_op);
271       p->comm_op = NULL;
272     }
273     GNUNET_free_non_null (p->core_connect_ops);
274     GNUNET_free(p->partners);
275     p->partners = NULL;
276   }
277
278   for (c_s = 0; c_s < top->num_slaves; c_s++)
279   {
280     p = &top->sps[c_s];
281     if (NULL != p->peer_id_op)
282     {
283       GNUNET_TESTBED_operation_done (p->peer_id_op);
284       p->peer_id_op = NULL;
285     }
286
287     for (c_op = 0; c_op < p->num_partners; c_op++)
288     {
289       if (NULL != p->partners[c_op].cth)
290       {
291         GNUNET_CORE_notify_transmit_ready_cancel (p->partners[c_op].cth);
292         p->partners[c_op].cth = NULL;
293       }
294       if (NULL != p->partners[c_op].tth)
295       {
296         GNUNET_TRANSPORT_notify_transmit_ready_cancel (p->partners[c_op].tth);
297         p->partners[c_op].tth = NULL;
298       }
299     }
300     if (NULL != p->ats_perf_op)
301     {
302       GNUNET_TESTBED_operation_done (p->ats_perf_op);
303       p->ats_perf_op = NULL;
304     }
305     if (NULL != p->comm_op)
306     {
307       GNUNET_TESTBED_operation_done (p->comm_op);
308       p->comm_op = NULL;
309     }
310     GNUNET_free(p->partners);
311     p->partners = NULL;
312   }
313   GNUNET_SCHEDULER_shutdown ();
314   GNUNET_free (top);
315   top = NULL;
316 }
317
318 static struct BenchmarkPartner *
319 find_partner (struct BenchmarkPeer *me, const struct GNUNET_PeerIdentity * peer)
320 {
321   int c_m;
322   GNUNET_assert (NULL != me);
323   GNUNET_assert (NULL != peer);
324
325   for (c_m = 0; c_m < me->num_partners; c_m++)
326   {
327     /* Find a partner with other as destination */
328     if (0 == memcmp (peer, &me->partners[c_m].dest->id,
329             sizeof(struct GNUNET_PeerIdentity)))
330     {
331       return &me->partners[c_m];
332     }
333   }
334
335   return NULL;
336 }
337
338
339 static struct BenchmarkPeer *
340 find_peer (const struct GNUNET_PeerIdentity * peer)
341 {
342   int c_p;
343
344   for (c_p = 0; c_p < top->num_masters; c_p++)
345   {
346     if (0 == memcmp (&top->mps[c_p].id, peer, sizeof(struct GNUNET_PeerIdentity)))
347       return &top->mps[c_p];
348   }
349
350   for (c_p = 0; c_p < top->num_slaves; c_p++)
351   {
352     if (0 == memcmp (&top->sps[c_p].id, peer, sizeof(struct GNUNET_PeerIdentity)))
353       return &top->sps[c_p];
354   }
355   return NULL ;
356 }
357
358
359 /**
360  * Method called whenever a given peer connects.
361  *
362  * @param cls closure
363  * @param peer peer identity this notification is about
364  */
365 static void
366 comm_connect_cb (void *cls, const struct GNUNET_PeerIdentity * peer)
367 {
368   struct BenchmarkPeer *me = cls;
369   struct BenchmarkPeer *remote;
370   char *id;
371   int c;
372   int completed;
373
374   remote = find_peer (peer);
375   if (NULL == remote)
376   {
377     GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Unknown peer connected: `%s'\n", GNUNET_i2s (peer));
378     GNUNET_break(0);
379     return;
380   }
381
382   id = GNUNET_strdup (GNUNET_i2s (&me->id));
383   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s [%u] `%s' connected to %s [%u] %s\n",
384       (me->master == GNUNET_YES) ? "Master": "Slave", me->no, id,
385       (remote->master == GNUNET_YES) ? "Master": "Slave", remote->no,
386       GNUNET_i2s (peer));
387
388   me->core_connections++;
389   if ((GNUNET_YES == me->master) && (GNUNET_NO == remote->master)
390       && (GNUNET_NO == top->state.connected_CORE))
391   {
392     me->core_slave_connections++;
393
394     if (me->core_slave_connections == top->num_slaves)
395     {
396       GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Master [%u] connected all slaves\n",
397           me->no);
398     }
399     completed = GNUNET_YES;
400     for (c = 0; c < top->num_masters; c++)
401     {
402       if (top->mps[c].core_slave_connections != top->num_slaves)
403         completed = GNUNET_NO;
404     }
405     if (GNUNET_YES == completed)
406     {
407       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
408           "All master peers connected all slave peers\n", id,
409           GNUNET_i2s (peer));
410       top->state.connected_CORE = GNUNET_YES;
411       /* Notify about setup done */
412       if (NULL != top->done_cb)
413         top->done_cb (top->done_cb_cls, top->mps, top->sps);
414     }
415   }
416   GNUNET_free(id);
417 }
418
419 static void
420 comm_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity * peer)
421 {
422   struct BenchmarkPeer *me = cls;
423   struct BenchmarkPartner *p;
424   char *id;
425
426   if (NULL == (p = find_partner (me, peer)))
427     return;
428
429   id = GNUNET_strdup (GNUNET_i2s (&me->id));
430   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s disconnected from %s \n", id,
431       GNUNET_i2s (peer));
432   GNUNET_assert(me->core_connections > 0);
433   me->core_connections--;
434
435   if ((GNUNET_YES == top->state.benchmarking)
436       && ((GNUNET_YES == me->master) || (GNUNET_YES == p->dest->master)))
437   {
438     GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
439         "%s disconnected from %s while benchmarking \n", id, GNUNET_i2s (peer));
440     if (NULL != p->tth)
441     {
442       GNUNET_TRANSPORT_notify_transmit_ready_cancel (p->tth);
443       p->tth = NULL;
444     }
445     if (NULL != p->cth)
446     {
447       GNUNET_CORE_notify_transmit_ready_cancel (p->cth);
448       p->cth = NULL;
449     }
450   }
451   GNUNET_free(id);
452 }
453
454
455 static void *
456 core_connect_adapter (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
457 {
458   struct BenchmarkPeer *me = cls;
459
460   me->ch = GNUNET_CORE_connect (cfg, me, NULL, comm_connect_cb,
461       comm_disconnect_cb, NULL, GNUNET_NO, NULL, GNUNET_NO, top->handlers);
462   if (NULL == me->ch)
463     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to create core connection \n");
464   return me->ch;
465 }
466
467 static void
468 core_disconnect_adapter (void *cls, void *op_result)
469 {
470   struct BenchmarkPeer *me = cls;
471
472   GNUNET_CORE_disconnect (me->ch);
473   me->ch = NULL;
474 }
475
476
477
478 static size_t
479 send_ping_ready_cb (void *cls, size_t size, void *buf)
480 {
481   struct BenchmarkPartner *p = cls;
482   static char msgbuf[TEST_MESSAGE_SIZE];
483   struct GNUNET_MessageHeader *msg;
484
485   if (NULL == buf)
486   {
487     GNUNET_break (0);
488     return 0;
489   }
490   if (size < TEST_MESSAGE_SIZE)
491   {
492     GNUNET_break (0);
493     return 0;
494   }
495
496   GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Master [%u]: Sending PING to [%u]\n",
497       p->me->no, p->dest->no);
498
499   if (top->test_core)
500   {
501       if (NULL == p->cth)
502       {
503         GNUNET_break (0);
504       }
505       p->cth = NULL;
506   }
507   else
508   {
509       if (NULL == p->tth)
510       {
511         GNUNET_break (0);
512       }
513       p->tth = NULL;
514   }
515
516   msg = (struct GNUNET_MessageHeader *) &msgbuf;
517   memset (&msgbuf, 'a', TEST_MESSAGE_SIZE);
518   msg->type = htons (TEST_MESSAGE_TYPE_PING);
519   msg->size = htons (TEST_MESSAGE_SIZE);
520   memcpy (buf, msg, TEST_MESSAGE_SIZE);
521
522   p->messages_sent++;
523   p->bytes_sent += TEST_MESSAGE_SIZE;
524   p->me->total_messages_sent++;
525   p->me->total_bytes_sent += TEST_MESSAGE_SIZE;
526
527   return TEST_MESSAGE_SIZE;
528 }
529
530
531 static void
532 comm_schedule_send (struct BenchmarkPartner *p)
533 {
534   p->last_message_sent = GNUNET_TIME_absolute_get();
535   if (GNUNET_YES == top->test_core)
536   {
537     p->cth = GNUNET_CORE_notify_transmit_ready (
538       p->me->ch, GNUNET_NO, 0, GNUNET_TIME_UNIT_MINUTES, &p->dest->id,
539       TEST_MESSAGE_SIZE, &send_ping_ready_cb, p);
540   }
541   else
542   {
543     p->tth = GNUNET_TRANSPORT_notify_transmit_ready (
544       p->me->th, &p->dest->id, TEST_MESSAGE_SIZE, 0,GNUNET_TIME_UNIT_MINUTES,
545       &send_ping_ready_cb, p);
546   }
547 }
548
549 static struct TrafficGenerator *
550 find_tg (struct BenchmarkPartner *p)
551 {
552   struct TrafficGenerator *cur;
553   for (cur = tg_head; NULL != cur; cur = cur->next)
554     if ( (0 == memcmp (&p->me->id, &cur->dest->me, sizeof (cur->dest->me))) &&
555          (0 == memcmp (&p->dest->id, &cur->dest->dest->id, sizeof (cur->dest->dest->id))) )
556          return cur;
557
558   return NULL;
559 }
560
561 static int
562 core_handle_pong (void *cls, const struct GNUNET_PeerIdentity *other,
563     const struct GNUNET_MessageHeader *message)
564 {
565   struct BenchmarkPeer *me = cls;
566   struct BenchmarkPartner *p = NULL;
567   struct TrafficGenerator *tg;
568
569   if (NULL == (p = find_partner (me, other)))
570   {
571     GNUNET_break(0);
572     return GNUNET_SYSERR;
573   }
574
575   GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
576       "Master [%u]: Received PONG from [%u], next message\n", me->no,
577       p->dest->no);
578
579   p->messages_received++;
580   p->bytes_received += TEST_MESSAGE_SIZE;
581   p->me->total_messages_received++;
582   p->me->total_bytes_received += TEST_MESSAGE_SIZE;
583   p->total_app_rtt += GNUNET_TIME_absolute_get_difference(p->last_message_sent,
584       GNUNET_TIME_absolute_get()).rel_value_us;
585
586
587   tg = find_tg(p);
588 /*
589   if (GNUNET_TIME_absolute_max(GNUNET_TIME_absolute_get(), tg->next_send))*/
590     comm_schedule_send (p);
591   return GNUNET_OK;
592 }
593
594 static size_t
595 comm_send_pong_ready (void *cls, size_t size, void *buf)
596 {
597   static char msgbuf[TEST_MESSAGE_SIZE];
598   struct BenchmarkPartner *p = cls;
599   struct GNUNET_MessageHeader *msg;
600
601   if (GNUNET_YES == top->test_core)
602     p->cth = NULL;
603   else
604     p->tth = NULL;
605
606   p->messages_sent++;
607   p->bytes_sent += TEST_MESSAGE_SIZE;
608   p->me->total_messages_sent++;
609   p->me->total_bytes_sent += TEST_MESSAGE_SIZE;
610
611   msg = (struct GNUNET_MessageHeader *) &msgbuf;
612   memset (&msgbuf, 'a', TEST_MESSAGE_SIZE);
613   msg->type = htons (TEST_MESSAGE_TYPE_PONG);
614   msg->size = htons (TEST_MESSAGE_SIZE);
615   memcpy (buf, msg, TEST_MESSAGE_SIZE);
616
617   return TEST_MESSAGE_SIZE;
618 }
619
620
621 static int
622 core_handle_ping (void *cls, const struct GNUNET_PeerIdentity *other,
623     const struct GNUNET_MessageHeader *message)
624 {
625   struct BenchmarkPeer *me = cls;
626   struct BenchmarkPartner *p = NULL;
627
628   if (NULL == (p = find_partner(me, other)))
629   {
630     GNUNET_break(0);
631     return GNUNET_SYSERR;
632   }
633
634   GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
635       "Slave [%u]: Received PING from [%u], sending PONG\n", me->no,
636       p->dest->no);
637
638   p->messages_received++;
639   p->bytes_received += TEST_MESSAGE_SIZE;
640   p->me->total_messages_received++;
641   p->me->total_bytes_received += TEST_MESSAGE_SIZE;
642
643   if (GNUNET_YES == top->test_core)
644   {
645     GNUNET_assert (NULL == p->cth);
646
647     p->cth = GNUNET_CORE_notify_transmit_ready (me->ch, GNUNET_NO, 0,
648         GNUNET_TIME_UNIT_MINUTES, &p->dest->id, TEST_MESSAGE_SIZE,
649         &comm_send_pong_ready, p);
650   }
651   else
652   {
653     GNUNET_assert (NULL == p->tth);
654     p->tth = GNUNET_TRANSPORT_notify_transmit_ready (me->th, &p->dest->id,
655         TEST_MESSAGE_SIZE, 0, GNUNET_TIME_UNIT_MINUTES, &comm_send_pong_ready,
656         p);
657   }
658   return GNUNET_OK;
659 }
660
661 static void
662 test_recv_cb (void *cls,
663                      const struct GNUNET_PeerIdentity * peer,
664                      const struct GNUNET_MessageHeader * message)
665 {
666   if (TEST_MESSAGE_SIZE != ntohs (message->size) ||
667       (TEST_MESSAGE_TYPE_PING != ntohs (message->type) &&
668      TEST_MESSAGE_TYPE_PONG != ntohs (message->type)))
669   {
670     return;
671   }
672   if (TEST_MESSAGE_TYPE_PING == ntohs (message->type))
673     core_handle_ping (cls, peer, message);
674   if (TEST_MESSAGE_TYPE_PONG == ntohs (message->type))
675     core_handle_pong (cls, peer, message);
676 }
677
678
679 static void *
680 transport_connect_adapter (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
681 {
682   struct BenchmarkPeer *me = cls;
683
684   me->th = GNUNET_TRANSPORT_connect (cfg, &me->id, me, &test_recv_cb,
685       &comm_connect_cb, &comm_disconnect_cb);
686   if (NULL == me->th)
687     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to create transport connection \n");
688   return me->th;
689 }
690
691 static void
692 transport_disconnect_adapter (void *cls, void *op_result)
693 {
694   struct BenchmarkPeer *me = cls;
695
696   GNUNET_TRANSPORT_disconnect (me->th);
697   me->th = NULL;
698 }
699
700
701 static void
702 connect_completion_callback (void *cls, struct GNUNET_TESTBED_Operation *op,
703     const char *emsg)
704 {
705   struct TestbedConnectOperation *cop = cls;
706   static int ops = 0;
707   int c;
708   if (NULL == emsg)
709   {
710     GNUNET_log(GNUNET_ERROR_TYPE_INFO,
711         _("Connected master [%u] with slave [%u]\n"), cop->master->no,
712         cop->slave->no);
713   }
714   else
715   {
716     GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
717         _("Failed to connect master peer [%u] with slave [%u]\n"),
718         cop->master->no, cop->slave->no);
719     GNUNET_break(0);
720     if (GNUNET_SCHEDULER_NO_TASK != top->shutdown_task)
721       GNUNET_SCHEDULER_cancel (top->shutdown_task);
722     top->shutdown_task = GNUNET_SCHEDULER_add_now (do_shutdown, NULL );
723   }
724   GNUNET_TESTBED_operation_done (op);
725   ops++;
726   for (c = 0; c < top->num_slaves; c++)
727   {
728     if (cop == &cop->master->core_connect_ops[c])
729       cop->master->core_connect_ops[c].connect_op = NULL;
730   }
731   if (ops == top->num_masters * top->num_slaves)
732   {
733     top->state.connected_PEERS = GNUNET_YES;
734   }
735 }
736
737 static void
738 do_connect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
739 {
740   int c_m;
741   int c_s;
742   struct BenchmarkPeer *p;
743
744   if ((top->state.connected_ATS_service == GNUNET_NO) ||
745       (top->state.connected_COMM_service == GNUNET_NO))
746     return;
747
748   GNUNET_log(GNUNET_ERROR_TYPE_INFO, _("Connecting peers on CORE level\n"));
749
750   for (c_m = 0; c_m < top->num_masters; c_m++)
751   {
752     p = &top->mps[c_m];
753     p->core_connect_ops = GNUNET_malloc (top->num_slaves *
754         sizeof (struct TestbedConnectOperation));
755
756     for (c_s = 0; c_s < top->num_slaves; c_s++)
757     {
758       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
759           _("Connecting master [%u] with slave [%u]\n"), p->no, top->sps[c_s].no);
760       p->core_connect_ops[c_s].master = p;
761       p->core_connect_ops[c_s].slave = &top->sps[c_s];
762       p->core_connect_ops[c_s].connect_op = GNUNET_TESTBED_overlay_connect (
763           NULL, &connect_completion_callback, &p->core_connect_ops[c_s],
764           top->sps[c_s].peer, p->peer);
765       if (NULL == p->core_connect_ops[c_s].connect_op)
766       {
767         GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
768             _("Could not connect master [%u] and slave [%u]\n"), p->no,
769             top->sps[c_s].no);
770         GNUNET_break(0);
771         if (GNUNET_SCHEDULER_NO_TASK != top->shutdown_task)
772           GNUNET_SCHEDULER_cancel (top->shutdown_task);
773         top->shutdown_task = GNUNET_SCHEDULER_add_now (do_shutdown, NULL );
774         return;
775       }
776     }
777   }
778 }
779
780
781 static void
782 comm_connect_completion_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
783     void *ca_result, const char *emsg)
784 {
785   static int comm_done = 0;
786   if ((NULL != emsg) || (NULL == ca_result))
787   {
788     GNUNET_log(GNUNET_ERROR_TYPE_INFO, _("Initialization failed, shutdown\n"));
789     GNUNET_break(0);
790     if (GNUNET_SCHEDULER_NO_TASK != top->shutdown_task)
791       GNUNET_SCHEDULER_cancel (top->shutdown_task);
792     top->shutdown_task = GNUNET_SCHEDULER_add_now (do_shutdown, NULL );
793     return;
794   }
795   comm_done++;
796
797   if (comm_done == top->num_slaves + top->num_masters)
798   {
799     GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Connected to all %s services\n",
800         (GNUNET_YES == top->test_core) ? "CORE" : "TRANSPORT");
801     top->state.connected_COMM_service = GNUNET_YES;
802     GNUNET_SCHEDULER_add_now (&do_connect_peers, NULL );
803   }
804 }
805
806 static void
807 do_comm_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
808 {
809   int c_s;
810   int c_m;
811   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Connecting to all %s services\n",
812       (GNUNET_YES == top->test_core) ? "CORE" : "TRANSPORT");
813   for (c_m = 0; c_m < top->num_masters; c_m++)
814   {
815     if (GNUNET_YES == top->test_core)
816       top->mps[c_m].comm_op = GNUNET_TESTBED_service_connect (NULL, top->mps[c_m].peer,
817         "core", &comm_connect_completion_cb, NULL, &core_connect_adapter,
818         &core_disconnect_adapter, &top->mps[c_m]);
819     else
820     {
821       top->mps[c_m].comm_op = GNUNET_TESTBED_service_connect (NULL, top->mps[c_m].peer,
822         "transport", &comm_connect_completion_cb, NULL, &transport_connect_adapter,
823         &transport_disconnect_adapter, &top->mps[c_m]);
824     }
825   }
826
827   for (c_s = 0; c_s < top->num_slaves; c_s++)
828   {
829     if (GNUNET_YES == top->test_core)
830       top->sps[c_s].comm_op = GNUNET_TESTBED_service_connect (NULL, top->sps[c_s].peer,
831         "core", &comm_connect_completion_cb, NULL, &core_connect_adapter,
832         &core_disconnect_adapter, &top->sps[c_s]);
833     else
834     {
835       top->sps[c_s].comm_op = GNUNET_TESTBED_service_connect (NULL, top->sps[c_s].peer,
836         "transport", &comm_connect_completion_cb, NULL, &transport_connect_adapter,
837         &transport_disconnect_adapter, &top->sps[c_s]);
838     }
839   }
840 }
841
842 static void
843 ats_performance_info_cb (void *cls, const struct GNUNET_HELLO_Address *address,
844     int address_active, struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
845     struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
846     const struct GNUNET_ATS_Information *ats, uint32_t ats_count)
847 {
848   struct BenchmarkPeer *me = cls;
849   struct BenchmarkPartner *p;
850   int c_a;
851   int log;
852   char *peer_id;
853
854   p = find_partner (me, &address->peer);
855   if (NULL == p)
856   {
857     /* This is not one of my partners
858      * Will happen since the peers will connect to each other due to gossiping
859      */
860     return;
861   }
862   peer_id = GNUNET_strdup (GNUNET_i2s (&me->id));
863
864   log = GNUNET_NO;
865   if ((p->bandwidth_in != ntohl (bandwidth_in.value__)) ||
866       (p->bandwidth_out != ntohl (bandwidth_out.value__)))
867       log = GNUNET_YES;
868   p->bandwidth_in = ntohl (bandwidth_in.value__);
869   p->bandwidth_out = ntohl (bandwidth_out.value__);
870
871   for (c_a = 0; c_a < ats_count; c_a++)
872   {
873     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s [%u] received ATS information: %s %s %u\n",
874         (GNUNET_YES == p->me->master) ? "Master" : "Slave",
875         p->me->no,
876         GNUNET_i2s (&p->dest->id),
877         GNUNET_ATS_print_property_type(ntohl(ats[c_a].type)),
878         ntohl(ats[c_a].value));
879     switch (ntohl (ats[c_a].type ))
880     {
881       case GNUNET_ATS_ARRAY_TERMINATOR:
882         break;
883       case GNUNET_ATS_UTILIZATION_OUT:
884         if (p->ats_utilization_up != ntohl (ats[c_a].value))
885              log = GNUNET_YES;
886          p->ats_utilization_up = ntohl (ats[c_a].value);
887
888          break;
889        case GNUNET_ATS_UTILIZATION_IN:
890          if (p->ats_utilization_down != ntohl (ats[c_a].value))
891              log = GNUNET_YES;
892          p->ats_utilization_down = ntohl (ats[c_a].value);
893          break;
894        case GNUNET_ATS_NETWORK_TYPE:
895          if (p->ats_network_type != ntohl (ats[c_a].value))
896              log = GNUNET_YES;
897          p->ats_network_type = ntohl (ats[c_a].value);
898          break;
899        case GNUNET_ATS_QUALITY_NET_DELAY:
900          if (p->ats_delay != ntohl (ats[c_a].value))
901              log = GNUNET_YES;
902          p->ats_delay = ntohl (ats[c_a].value);
903          break;
904        case GNUNET_ATS_QUALITY_NET_DISTANCE:
905          if (p->ats_distance != ntohl (ats[c_a].value))
906              log = GNUNET_YES;
907          p->ats_distance = ntohl (ats[c_a].value);
908          GNUNET_break (0);
909          break;
910        case GNUNET_ATS_COST_WAN:
911          if (p->ats_cost_wan != ntohl (ats[c_a].value))
912              log = GNUNET_YES;
913          p->ats_cost_wan = ntohl (ats[c_a].value);
914          break;
915        case GNUNET_ATS_COST_LAN:
916          if (p->ats_cost_lan != ntohl (ats[c_a].value))
917              log = GNUNET_YES;
918          p->ats_cost_lan = ntohl (ats[c_a].value);
919          break;
920        case GNUNET_ATS_COST_WLAN:
921          if (p->ats_cost_wlan != ntohl (ats[c_a].value))
922              log = GNUNET_YES;
923          p->ats_cost_wlan = ntohl (ats[c_a].value);
924          break;
925        default:
926          break;
927      }
928    }
929    if ((GNUNET_YES == top->logging) && (GNUNET_YES == log))
930      GNUNET_ATS_TEST_logging_now();
931
932   top->ats_perf_cb (cls, address, address_active, bandwidth_out, bandwidth_in,
933       ats, ats_count);
934   GNUNET_free(peer_id);
935 }
936
937
938 static void *
939 ats_perf_connect_adapter (void *cls,
940     const struct GNUNET_CONFIGURATION_Handle *cfg)
941 {
942   struct BenchmarkPeer *me = cls;
943
944   me->ats_perf_handle = GNUNET_ATS_performance_init (cfg, ats_performance_info_cb, me);
945   if (NULL == me->ats_perf_handle)
946     GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
947         "Failed to create ATS performance handle \n");
948   return me->ats_perf_handle;
949 }
950
951 static void
952 ats_perf_disconnect_adapter (void *cls, void *op_result)
953 {
954   struct BenchmarkPeer *me = cls;
955
956   GNUNET_ATS_performance_done (me->ats_perf_handle);
957   me->ats_perf_handle = NULL;
958 }
959
960 static void
961 ats_connect_completion_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
962     void *ca_result, const char *emsg)
963 {
964   static int op_done = 0;
965
966   if ((NULL != emsg) || (NULL == ca_result))
967   {
968     GNUNET_log(GNUNET_ERROR_TYPE_INFO, _("Initialization failed, shutdown\n"));
969     GNUNET_break(0);
970     if (GNUNET_SCHEDULER_NO_TASK != top->shutdown_task)
971       GNUNET_SCHEDULER_cancel (top->shutdown_task);
972     top->shutdown_task = GNUNET_SCHEDULER_add_now (do_shutdown, NULL );
973     return;
974   }
975   op_done++;
976   if (op_done == (top->num_masters + top->num_slaves))
977   {
978     GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Connected to all ATS services\n");
979     top->state.connected_ATS_service = GNUNET_YES;
980     GNUNET_SCHEDULER_add_now (&do_comm_connect, NULL );
981   }
982 }
983
984
985 static void
986 do_connect_ats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
987 {
988   int c_m;
989   int c_s;
990
991   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Connecting to all ATS services\n");
992   for (c_m = 0; c_m < top->num_masters; c_m++)
993   {
994     top->mps[c_m].ats_perf_op = GNUNET_TESTBED_service_connect (NULL,
995         top->mps[c_m].peer,
996         "ats", ats_connect_completion_cb, NULL,
997         &ats_perf_connect_adapter,
998         &ats_perf_disconnect_adapter, &top->mps[c_m]);
999   }
1000
1001   for (c_s = 0; c_s < top->num_slaves; c_s++)
1002   {
1003     top->sps[c_s].ats_perf_op = GNUNET_TESTBED_service_connect (NULL, top->sps[c_s].peer,
1004         "ats", ats_connect_completion_cb, NULL, &ats_perf_connect_adapter,
1005         &ats_perf_disconnect_adapter, &top->sps[c_s]);
1006   }
1007 }
1008
1009
1010
1011 static void
1012 peerinformation_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op,
1013     const struct GNUNET_TESTBED_PeerInformation*pinfo, const char *emsg)
1014 {
1015   struct BenchmarkPeer *p = cb_cls;
1016   static int done = 0;
1017
1018   GNUNET_assert(pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY);
1019
1020   p->id = *pinfo->result.id;
1021   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%s [%u] has peer id `%s'\n",
1022       (p->master == GNUNET_YES) ? "Master" : "Slave", p->no,
1023       GNUNET_i2s (&p->id));
1024
1025   GNUNET_TESTBED_operation_done (op);
1026   p->peer_id_op = NULL;
1027   done++;
1028
1029   if (done == top->num_slaves + top->num_masters)
1030   {
1031     GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1032         "Retrieved all peer ID, connect to ATS\n");
1033     GNUNET_SCHEDULER_add_now (&do_connect_ats, NULL );
1034   }
1035 }
1036
1037 /**
1038  * Signature of a main function for a testcase.
1039  *
1040  * @param cls closure
1041  * @param num_peers number of peers in 'peers'
1042  * @param peers_ handle to peers run in the testbed
1043  * @param links_succeeded the number of overlay link connection attempts that
1044  *          succeeded
1045  * @param links_failed the number of overlay link connection attempts that
1046  *          failed
1047  */
1048 static void
1049 main_run (void *cls, struct GNUNET_TESTBED_RunHandle *h,
1050           unsigned int num_peers,
1051           struct GNUNET_TESTBED_Peer **peers_,
1052           unsigned int links_succeeded,
1053           unsigned int links_failed)
1054 {
1055   int c_m;
1056   int c_s;
1057   GNUNET_assert(NULL == cls);
1058   GNUNET_assert(top->num_masters + top->num_slaves == num_peers);
1059   GNUNET_assert(NULL != peers_);
1060
1061   top->shutdown_task = GNUNET_SCHEDULER_add_delayed (
1062       GNUNET_TIME_UNIT_FOREVER_REL, &do_shutdown, top);
1063
1064   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting up %u masters and %u slaves\n",
1065       top->num_masters, top->num_slaves);
1066
1067   /* Setup master peers */
1068   for (c_m = 0; c_m < top->num_masters; c_m++)
1069   {
1070     GNUNET_assert(NULL != peers_[c_m]);
1071     top->mps[c_m].peer = peers_[c_m];
1072     top->mps[c_m].no = c_m;
1073     top->mps[c_m].master = GNUNET_YES;
1074     top->mps[c_m].pref_partner = &top->sps[c_m];
1075     top->mps[c_m].pref_value = TEST_ATS_PREFERENCE_DEFAULT;
1076     top->mps[c_m].partners =
1077         GNUNET_malloc (top->num_slaves * sizeof (struct BenchmarkPartner));
1078     top->mps[c_m].num_partners = top->num_slaves;
1079     /* Initialize partners */
1080     for (c_s = 0; c_s < top->num_slaves; c_s++)
1081     {
1082       top->mps[c_m].partners[c_s].me = &top->mps[c_m];
1083       top->mps[c_m].partners[c_s].dest = &top->sps[c_s];
1084     }
1085     /* Get configuration */
1086     top->mps[c_m].peer_id_op = GNUNET_TESTBED_peer_get_information (top->mps[c_m].peer,
1087         GNUNET_TESTBED_PIT_IDENTITY, &peerinformation_cb, &top->mps[c_m]);
1088   }
1089
1090   /* Setup slave peers */
1091   for (c_s = 0; c_s < top->num_slaves; c_s++)
1092   {
1093     GNUNET_assert(NULL != peers_[c_s + top->num_masters]);
1094     top->sps[c_s].peer = peers_[c_s + top->num_masters];
1095     top->sps[c_s].no = c_s + top->num_masters;
1096     top->sps[c_s].master = GNUNET_NO;
1097     top->sps[c_s].partners =
1098         GNUNET_malloc (top->num_masters * sizeof (struct BenchmarkPartner));
1099     top->sps[c_s].num_partners = top->num_masters;
1100     /* Initialize partners */
1101     for (c_m = 0; c_m < top->num_masters; c_m++)
1102     {
1103       top->sps[c_s].partners[c_m].me = &top->sps[c_s];
1104       top->sps[c_s].partners[c_m].dest = &top->mps[c_m];
1105     }
1106     /* Get configuration */
1107     top->sps[c_s].peer_id_op = GNUNET_TESTBED_peer_get_information (top->sps[c_s].peer,
1108         GNUNET_TESTBED_PIT_IDENTITY, &peerinformation_cb, &top->sps[c_s]);
1109   }
1110 }
1111
1112 /**
1113  * Controller event callback
1114  *
1115  * @param cls NULL
1116  * @param event the controller event
1117  */
1118 static void
1119 controller_event_cb (void *cls,
1120     const struct GNUNET_TESTBED_EventInformation *event)
1121 {
1122   struct GNUNET_ATS_TEST_Topology *top = cls;
1123   switch (event->type)
1124   {
1125   case GNUNET_TESTBED_ET_CONNECT:
1126     break;
1127   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
1128     break;
1129   default:
1130     GNUNET_break(0);
1131     GNUNET_SCHEDULER_cancel (top->shutdown_task);
1132     top->shutdown_task = GNUNET_SCHEDULER_add_now (&do_shutdown, NULL );
1133   }
1134 }
1135
1136
1137 void
1138 GNUNET_ATS_TEST_create_topology (char *name, char *cfg_file,
1139     unsigned int num_slaves,
1140     unsigned int num_masters,
1141     int test_core,
1142     GNUNET_ATS_TESTING_TopologySetupDoneCallback done_cb,
1143     void *done_cb_cls,
1144     GNUNET_TRANSPORT_ReceiveCallback transport_recv_cb,
1145     GNUNET_ATS_AddressInformationCallback ats_perf_cb)
1146 {
1147
1148   static struct GNUNET_CORE_MessageHandler handlers[] = {
1149       {&core_handle_ping, TEST_MESSAGE_TYPE_PING, 0 },
1150       {&core_handle_pong, TEST_MESSAGE_TYPE_PONG, 0 },
1151       { NULL, 0, 0 } };
1152
1153   top = GNUNET_new (struct GNUNET_ATS_TEST_Topology);
1154   top->num_masters = num_masters;
1155   top->num_slaves = num_slaves;
1156   top->handlers = handlers;
1157   top->done_cb = done_cb;
1158   top->done_cb_cls = done_cb_cls;
1159   top->test_core = test_core;
1160   top->transport_recv_cb = transport_recv_cb;
1161   top->ats_perf_cb = ats_perf_cb;
1162
1163   top->mps = GNUNET_malloc (num_masters * sizeof (struct BenchmarkPeer));
1164   top->sps = GNUNET_malloc (num_slaves * sizeof (struct BenchmarkPeer));
1165
1166   /* Start topology */
1167   uint64_t event_mask;
1168   event_mask = 0;
1169   event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT);
1170   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
1171   (void) GNUNET_TESTBED_test_run (name, cfg_file,
1172       num_slaves + num_masters, event_mask, &controller_event_cb, NULL,
1173       &main_run, NULL);
1174 }
1175
1176 void
1177 GNUNET_ATS_TEST_shutdown_topology (void)
1178 {
1179   if (NULL == top)
1180     return;
1181
1182   GNUNET_SCHEDULER_shutdown();
1183 }
1184
1185
1186
1187 struct TrafficGenerator *
1188 GNUNET_ATS_TEST_generate_traffic_start (struct BenchmarkPeer *src,
1189     struct BenchmarkPartner *dest, unsigned int rate,
1190     struct GNUNET_TIME_Relative duration)
1191 {
1192   struct TrafficGenerator * tg;
1193   tg = NULL;
1194
1195   tg = GNUNET_new (struct TrafficGenerator);
1196   GNUNET_CONTAINER_DLL_insert (tg_head, tg_tail, tg);
1197   tg->src = src;
1198   tg->dest = dest;
1199   tg->rate = rate;
1200
1201   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1202       "Setting up traffic generator between master[%u] `%s' and slave [%u] `%s' sending with %u Bytes/sec\n",
1203       dest->me->no, GNUNET_i2s (&dest->me->id),
1204       dest->dest->no, GNUNET_i2s (&dest->dest->id),
1205       rate);
1206
1207   if (top->test_core)
1208   {
1209       if (NULL != dest->cth)
1210       {
1211         GNUNET_break (0);
1212         return tg;
1213       }
1214       dest->cth = GNUNET_CORE_notify_transmit_ready (src->ch, GNUNET_NO,
1215           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, &dest->dest->id,
1216           TEST_MESSAGE_SIZE, &send_ping_ready_cb, dest);
1217   }
1218   else
1219   {
1220       if (NULL != dest->tth)
1221       {
1222         GNUNET_break (0);
1223         return tg;
1224       }
1225       dest->tth = GNUNET_TRANSPORT_notify_transmit_ready (src->th, &dest->dest->id,
1226           TEST_MESSAGE_SIZE, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1227           &send_ping_ready_cb, dest);
1228       tg->last_sent = GNUNET_TIME_absolute_get();
1229       tg->delta.rel_value_us = (GNUNET_TIME_UNIT_SECONDS.rel_value_us / (rate / TEST_MESSAGE_SIZE));
1230   }
1231   return tg;
1232 }
1233
1234 void
1235 GNUNET_ATS_TEST_generate_traffic_stop (struct TrafficGenerator *tg)
1236 {
1237   GNUNET_CONTAINER_DLL_remove (tg_head, tg_tail, tg);
1238
1239   if (top->test_core)
1240   {
1241       if (NULL != tg->dest->cth)
1242       {
1243           GNUNET_CORE_notify_transmit_ready_cancel (tg->dest->cth);
1244           tg->dest->cth = NULL;
1245       }
1246   }
1247   else
1248   {
1249       if (NULL != tg->dest->tth)
1250       {
1251           GNUNET_TRANSPORT_notify_transmit_ready_cancel (tg->dest->tth);
1252           tg->dest->tth = NULL;
1253       }
1254   }
1255   GNUNET_free (tg);
1256 }
1257
1258 /* end of file ats-testing.c */