- dist fix
[oweals/gnunet.git] / src / stream / perf_stream_api.c
1  /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/perf_stream_api.c
23  * @brief performance benchmarks for stream api
24  * @author Sree Harsha Totakura
25  */
26
27 #define LOG(kind, ...)                         \
28   GNUNET_log (kind, __VA_ARGS__);
29
30 /****************************************************************************************/
31 /* Test is setup into the following major steps:                                        */
32 /*    1. Measurements over loopback (1 hop). i.e. we use only one peer and open         */
33 /*       stream connections over loopback. Messages will go through                     */
34 /*       STREAM_API->MESH_API->MESH_SERVICE->MESH_API->STREAM_API.                      */
35 /*    2. Measurements over 2 peers (2 hops). We use testbed to create 2 peers,          */
36 /*       connect them and then create stream connections. Messages will go through      */
37 /*       STREAM_API->MESH_API->MESH_SERVICE->CORE1.....CORE2->MESH_API->STREAM_API      */
38 /*    3. Measurements over 3 peers (3 hops). We use testbed to create 3 peers,          */
39 /*       connect them in a line topology: peer1->peer2->peer3. Messages will go         */
40 /*       through                                                                        */
41 /*       STREAM_API->MESH_API->MESH_SERVICE->CORE1..CORE2..CORE3->MESH_API->STREAM_API. */
42 /****************************************************************************************/
43
44 #include "platform.h"
45 #include "gnunet_common.h"
46 #include "gnunet_util_lib.h"
47 #include "gnunet_testing_lib.h"
48 #include "gnunet_testbed_service.h"
49 #include "gnunet_stream_lib.h"
50   
51 /**
52  * Simple struct to keep track of progress, and print a
53  * nice little percentage meter for long running tasks.
54  */
55 struct ProgressMeter
56 {
57   unsigned int total;
58
59   unsigned int modnum;
60
61   unsigned int dotnum;
62
63   unsigned int completed;
64
65   int print;
66
67   char *startup_string;
68 };
69
70
71 /**
72  * Steps in testing
73  */
74 enum TestStep
75 {
76   /**
77    * Single hop loopback testing
78    */
79   TEST_STEP_1_HOP,
80
81   /**
82    * Testing with 2 peers
83    */
84   TEST_STEP_2_HOP,
85
86   /**
87    * Testing with 3 peers
88    */
89   TEST_STEP_3_HOP
90 };
91
92
93 /**
94  * Structure for holding peer's sockets and IO Handles
95  */
96 struct PeerData
97 {
98   /**
99    * Peer's stream socket
100    */
101   struct GNUNET_STREAM_Socket *socket;
102
103   /**
104    * Peer's io write handle
105    */
106   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
107
108   /**
109    * Peer's io read handle
110    */
111   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
112
113   /**
114    * The peer handle when we use the testbed servie
115    */
116   struct GNUNET_TESTBED_Peer *peer;
117
118   /**
119    * Handle to peer specific opearations while using testbed service
120    */
121   struct GNUNET_TESTBED_Operation *op;
122
123   /**
124    * The identity of this peer
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Bytes the peer has written
130    */
131   size_t bytes_wrote;
132
133   /**
134    * Byte the peer has read
135    */
136   size_t bytes_read;
137
138   /**
139    * number of packets sent
140    */
141   unsigned int packets_wrote;
142
143   /**
144    * number of packets read
145    */
146   unsigned int packets_read;
147 };
148
149
150 /**
151  * Enumeration of stages in this testing
152  */
153 enum TestStage
154 {
155   /**
156    * The initial stage
157    */
158   INIT,
159   
160   /**
161    * Uplink testing stage
162    */
163   UPLINK_OK,
164
165   /**
166    * Downlink testing stage
167    */
168   DOWNLINK_OK
169 };
170
171
172 /**
173  * Maximum size of the data which we will transfer during tests
174  */
175 #define DATA_SIZE 5000000      /* 5mB */
176
177 /**
178  * Fixed number of packets we send in each direction during each subtest
179  */
180 #define MAX_PACKETS 2000
181
182 /**
183  * Listen socket of peer2
184  */
185 struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
186
187 /**
188  * Handle to configuration during TEST_STEP_1_HOP
189  */
190 const struct GNUNET_CONFIGURATION_Handle *config;
191
192 /**
193  * Handle for the progress meter
194  */
195 static struct ProgressMeter *meter;
196
197 /**
198  * Placeholder for peer data
199  */
200 static struct PeerData peer_data[3];
201
202 /**
203  * Handle to common operations while using testbed
204  */
205 static struct GNUNET_TESTBED_Operation *common_op;
206
207 /**
208  * Task ID for abort task
209  */
210 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
211
212 /**
213  * Task ID for write task
214  */
215 static GNUNET_SCHEDULER_TaskIdentifier write_task;
216
217 /**
218  * Task ID for read task
219  */
220 static GNUNET_SCHEDULER_TaskIdentifier read_task;
221
222 /**
223  * Absolute time when profiling starts
224  */
225 static struct GNUNET_TIME_Absolute prof_start_time;
226
227 /**
228  * Test time taken for sending the data
229  */
230 static struct GNUNET_TIME_Relative prof_time;
231
232 /**
233  * Random data block. Should generate data first
234  */
235 static uint32_t data[DATA_SIZE / 4];
236
237 /**
238  * Payload sizes to test each major test with
239  */
240 static uint16_t payload_size[] = 
241 { 20, 500, 2000, 7000, 13000, 25000, 50000};//, 60000, 63000, 64000 };
242
243 /**
244  * Current step of testing
245  */
246 static enum TestStep test_step;
247
248 /**
249  * Index for choosing payload size
250  */
251 static unsigned int payload_size_index;
252
253 /**
254  * Number of peers we want to create while using the testbed service
255  */
256 static int num_peers;
257
258 /**
259  * Flag to indicate that the other peer should reset its data read source index
260  */
261 static int reset_read;
262
263 /**
264  * Testing result of a major test
265  */
266 static enum TestStage result;
267
268 /**
269  * Create a meter to keep track of the progress of some task.
270  *
271  * @param total the total number of items to complete
272  * @param start_string a string to prefix the meter with (if printing)
273  * @param print GNUNET_YES to print the meter, GNUNET_NO to count
274  *              internally only
275  *
276  * @return the progress meter
277  */
278 static struct ProgressMeter *
279 create_meter (unsigned int total, char *start_string, int print)
280 {
281   struct ProgressMeter *ret;
282
283   ret = GNUNET_malloc (sizeof (struct ProgressMeter));
284   ret->print = print;
285   ret->total = total;
286   ret->modnum = total / 4;
287   if (ret->modnum == 0)         /* Divide by zero check */
288     ret->modnum = 1;
289   ret->dotnum = (total / 50) + 1;
290   if (start_string != NULL)
291     ret->startup_string = GNUNET_strdup (start_string);
292   else
293     ret->startup_string = GNUNET_strdup ("");
294
295   return ret;
296 }
297
298
299 /**
300  * Update progress meter (increment by one).
301  *
302  * @param meter the meter to update and print info for
303  *
304  * @return GNUNET_YES if called the total requested,
305  *         GNUNET_NO if more items expected
306  */
307 static int
308 update_meter (struct ProgressMeter *meter)
309 {
310   if (meter->print == GNUNET_YES)
311   {
312     if (meter->completed % meter->modnum == 0)
313     {
314       if (meter->completed == 0)
315       {
316         FPRINTF (stdout, "%sProgress: [0%%", meter->startup_string);
317       }
318       else
319         FPRINTF (stdout, "%d%%",
320                  (int) (((float) meter->completed / meter->total) * 100));
321     }
322     else if (meter->completed % meter->dotnum == 0)
323       FPRINTF (stdout, "%s",  ".");
324
325     if (meter->completed + 1 == meter->total)
326       FPRINTF (stdout, "%d%%]\n", 100);
327     fflush (stdout);
328   }
329   meter->completed++;
330
331   if (meter->completed == meter->total)
332     return GNUNET_YES;
333   if (meter->completed > meter->total)
334     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Progress meter overflow!!\n");
335   return GNUNET_NO;
336 }
337
338
339 /**
340  * Reset progress meter.
341  *
342  * @param meter the meter to reset
343  *
344  * @return GNUNET_YES if meter reset,
345  *         GNUNET_SYSERR on error
346  */
347 static int
348 reset_meter (struct ProgressMeter *meter)
349 {
350   if (meter == NULL)
351     return GNUNET_SYSERR;
352
353   meter->completed = 0;
354   return GNUNET_YES;
355 }
356
357
358 /**
359  * Release resources for meter
360  *
361  * @param meter the meter to free
362  */
363 static void
364 free_meter (struct ProgressMeter *meter)
365 {
366   GNUNET_free_non_null (meter->startup_string);
367   GNUNET_free (meter);
368 }
369
370
371 /**
372  * Shutdown nicely
373  */
374 static void
375 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
376 {
377   switch (test_step)
378   {
379   case TEST_STEP_1_HOP:
380     if (NULL != peer_data[0].socket)
381       GNUNET_STREAM_close (peer_data[0].socket);
382     if (NULL != peer_data[1].socket)
383       GNUNET_STREAM_close (peer_data[1].socket);
384     if (NULL != peer2_listen_socket)
385       GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
386     break;
387   case TEST_STEP_2_HOP:
388     if (NULL != peer_data[1].socket)
389       GNUNET_STREAM_close (peer_data[1].socket);
390     if (NULL != peer_data[0].op)
391       GNUNET_TESTBED_operation_done (peer_data[0].op);
392     if (NULL != peer_data[1].op)
393       GNUNET_TESTBED_operation_done (peer_data[1].op);
394     break;
395   case TEST_STEP_3_HOP:
396     GNUNET_break (0);    
397   }  
398   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
399     GNUNET_SCHEDULER_cancel (abort_task);
400   if (GNUNET_SCHEDULER_NO_TASK != write_task)
401     GNUNET_SCHEDULER_cancel (write_task);
402   GNUNET_SCHEDULER_shutdown (); /* Shutdown this testcase */
403   if (NULL != meter)
404   {
405     free_meter (meter);
406     meter = NULL;
407   }
408 }
409
410
411 /**
412  * Something went wrong and timed out. Kill everything and set error flag
413  */
414 static void
415 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
416 {
417   abort_task = GNUNET_SCHEDULER_NO_TASK;
418   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test: ABORT\n");
419   if (GNUNET_SCHEDULER_NO_TASK != read_task)
420     GNUNET_SCHEDULER_cancel (read_task);
421   result = GNUNET_SYSERR;
422   do_shutdown (cls, tc);
423 }
424
425   
426 /**
427  * Scheduler call back; to be executed when a new stream is connected
428  * Called from listen connect for peer2
429  */
430 static void
431 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
432
433
434 /**
435  * Task for calling STREAM_write with a chunk of random data
436  *
437  * @param cls the peer data entity
438  * @param tc the task context
439  */
440 static void
441 stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
442
443
444 /**
445  * The write completion function; called upon writing some data to stream or
446  * upon error
447  *
448  * @param cls the closure from GNUNET_STREAM_write/read
449  * @param status the status of the stream at the time this function is called
450  * @param size the number of bytes written
451  */
452 static void 
453 write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size)
454 {
455   struct PeerData *pdata = cls;
456   double throughput;
457   double prof_time_sec;
458   unsigned int packets_wrote;
459
460   if (GNUNET_STREAM_OK != status)
461   {
462     GNUNET_SCHEDULER_cancel (abort_task);
463     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
464     return;
465   }
466   GNUNET_assert (size <= DATA_SIZE);
467   packets_wrote = (size + payload_size[payload_size_index] - 1)
468       / payload_size[payload_size_index];
469   pdata->bytes_wrote += size;
470   for (;packets_wrote > 0; packets_wrote--)
471   {    
472     update_meter (meter);
473     pdata->packets_wrote++;
474   }
475   if (pdata->packets_wrote < MAX_PACKETS) /* Have more data to send */
476   {
477     size_t write_amount;
478     
479     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
480     {
481       GNUNET_SCHEDULER_cancel (abort_task);
482       abort_task = 
483           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
484                                         (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
485                                     NULL);
486     }
487     write_amount = (MAX_PACKETS - pdata->packets_wrote) *
488         payload_size[payload_size_index];
489     if (write_amount > DATA_SIZE)
490       write_amount = DATA_SIZE;
491     reset_read = GNUNET_YES;
492     pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
493                                                   write_amount,
494                                                   GNUNET_TIME_UNIT_FOREVER_REL,
495                                                   &write_completion, pdata);
496     GNUNET_assert (NULL != pdata->io_write_handle);
497   }
498   else
499   {
500     free_meter (meter);
501     meter = NULL;
502     prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time);
503     prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000));
504     throughput = ((float) pdata->bytes_wrote) / prof_time_sec;
505     PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00);
506     switch (result)
507     {
508     case INIT:
509       result = UPLINK_OK;
510       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
511       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == write_task);
512       pdata->bytes_read = 0;
513       pdata->packets_read = 0;
514       meter = create_meter (MAX_PACKETS, "Testing Downlink\n", GNUNET_YES);
515       read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer_data[0]);
516       write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, &peer_data[1]);
517       break;
518     case UPLINK_OK:
519       result = DOWNLINK_OK;
520       GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
521       break;
522     case DOWNLINK_OK:
523       GNUNET_assert (0);
524     }
525   }
526 }
527
528
529 /**
530  * Task for calling STREAM_write with a chunk of random data
531  *
532  * @param cls the peer data entity
533  * @param tc the task context
534  */
535 static void
536 stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
537 {
538   struct PeerData *pdata = cls;
539   size_t write_amount;
540
541   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
542   {
543     GNUNET_SCHEDULER_cancel (abort_task);
544     abort_task = 
545       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
546                                     (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
547                                     NULL);
548   }
549   write_task = GNUNET_SCHEDULER_NO_TASK;
550   prof_start_time = GNUNET_TIME_absolute_get ();
551   pdata->bytes_wrote = 0;
552   pdata->packets_wrote = 0;
553   write_amount = MAX_PACKETS * payload_size[payload_size_index];
554   if (write_amount > DATA_SIZE)
555     write_amount = DATA_SIZE;
556   reset_read = GNUNET_YES;
557   pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
558                                                 write_amount,
559                                                 GNUNET_TIME_UNIT_FOREVER_REL,
560                                                 &write_completion, pdata);
561   GNUNET_assert (NULL != pdata->io_write_handle);
562 }
563
564
565 /**
566  * Scheduler call back; to be executed when a new stream is connected
567  * Called from listen connect for peer2
568  */
569 static void
570 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
571
572
573 /**
574  * Input processor
575  *
576  * @param cls peer2
577  * @param status the status of the stream at the time this function is called
578  * @param data traffic from the other side
579  * @param size the number of bytes available in data read 
580  * @return number of bytes of processed from 'data' (any data remaining should be
581  *         given to the next time the read processor is called).
582  */
583 static size_t
584 input_processor (void *cls, enum GNUNET_STREAM_Status status,
585                  const void *input_data, size_t size)
586 {
587   struct PeerData *pdata = cls;
588
589   if (GNUNET_STREAM_OK != status)
590   {
591     GNUNET_SCHEDULER_cancel (abort_task);
592     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
593     return 0;
594   }
595   GNUNET_assert (size <= DATA_SIZE);
596   if (GNUNET_YES == reset_read)
597   {
598     pdata->bytes_read = 0;
599     reset_read = GNUNET_NO;
600   }
601   GNUNET_assert ((pdata->bytes_read + size) <= DATA_SIZE);
602   GNUNET_assert (0 == memcmp (((void *)data ) + pdata->bytes_read, 
603                               input_data, size));
604   pdata->bytes_read += size;
605   pdata->packets_read += (size + payload_size[payload_size_index] - 1)
606       / payload_size[payload_size_index];
607   if (pdata->packets_read < MAX_PACKETS)
608   {
609     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
610     read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
611   }
612   else 
613   {
614     LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
615   }
616   return size;
617 }
618
619   
620 /**
621  * Scheduler call back; to be executed when a new stream is connected
622  * Called from listen connect for peer2
623  */
624 static void
625 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
626 {
627   struct PeerData *pdata = cls;
628
629   read_task = GNUNET_SCHEDULER_NO_TASK;
630   pdata->io_read_handle =
631       GNUNET_STREAM_read (pdata->socket, GNUNET_TIME_UNIT_FOREVER_REL,
632                           &input_processor, pdata);
633   GNUNET_assert (NULL != pdata->io_read_handle);
634 }
635
636
637 /**
638  * Functions of this type are called upon new stream connection from other peers
639  *
640  * @param cls the closure from GNUNET_STREAM_listen
641  * @param socket the socket representing the stream
642  * @param initiator the identity of the peer who wants to establish a stream
643  *            with us
644  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
645  *             stream (the socket will be invalid after the call)
646  */
647 static int
648 stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket,
649                   const struct GNUNET_PeerIdentity *initiator)
650 {
651   struct PeerData *pdata = cls;
652
653   
654   if ((NULL == socket) || (NULL == initiator))
655   {
656     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
657     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
658       GNUNET_SCHEDULER_cancel (abort_task);
659     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
660     return GNUNET_OK;
661   }
662   GNUNET_assert (NULL != socket);
663   GNUNET_assert (pdata == &peer_data[1]);
664   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n",
665               GNUNET_i2s(initiator));
666   pdata->socket = socket;
667   pdata->bytes_read = 0;
668   read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Function executed after stream has been established
675  *
676  * @param cls the closure from GNUNET_STREAM_open
677  * @param socket socket to use to communicate with the other side (read/write)
678  */
679 static void 
680 stream_open_cb (void *cls,
681                 struct GNUNET_STREAM_Socket *socket)
682 {
683   struct PeerData *pdata = cls;
684
685   GNUNET_assert (socket == pdata->socket);
686   meter = create_meter (MAX_PACKETS, "Testing Uplink\n", GNUNET_YES);
687   write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, pdata);
688 }
689
690
691 /**
692  * Listen success callback; connects a peer to stream as client
693  */
694 static void
695 stream_connect (void)
696 {
697   peer_data[0].socket = 
698       GNUNET_STREAM_open (config, &peer_data[1].id, 10, &stream_open_cb,
699                           &peer_data[0],
700                           GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
701                           payload_size[payload_size_index],
702                           GNUNET_STREAM_OPTION_END);
703   GNUNET_assert (NULL != peer_data[0].socket);
704 }
705
706
707 /**
708  * Initialize framework and start test
709  *
710  * @param cls closure
711  * @param cfg configuration of the peer that was started
712  * @param peer identity of the peer that was created
713  */
714 static void
715 run (void *cls, 
716      const struct GNUNET_CONFIGURATION_Handle *cfg,
717      struct GNUNET_TESTING_Peer *peer)
718 {
719   struct GNUNET_PeerIdentity id;
720
721   GNUNET_TESTING_peer_get_identity (peer, &id);
722   config = cfg;
723   peer2_listen_socket = 
724       GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[1],
725                             GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
726                             &stream_connect,
727                             GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
728                             payload_size[payload_size_index],
729                             GNUNET_STREAM_OPTION_END);
730   GNUNET_assert (NULL != peer2_listen_socket);
731   peer_data[0].id = id;
732   peer_data[1].id = id;
733   abort_task =
734       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
735                                     (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
736                                     NULL);
737 }
738
739
740 /**
741  * Adapter function called to establish a connection to
742  * a service.
743  * 
744  * @param cls closure
745  * @param cfg configuration of the peer to connect to; will be available until
746  *          GNUNET_TESTBED_operation_done() is called on the operation returned
747  *          from GNUNET_TESTBED_service_connect()
748  * @return service handle to return in 'op_result', NULL on error
749  */
750 static void * 
751 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg);
752
753
754 /**
755  * Adapter function called to destroy a connection to
756  * a service.
757  * 
758  * @param cls closure
759  * @param op_result service handle returned from the connect adapter
760  */
761 static void
762 stream_da (void *cls, void *op_result)
763 {
764   if (&peer_data[1] == cls)
765   {
766     GNUNET_STREAM_listen_close (op_result);
767     return;
768   }
769   else if (&peer_data[0] == cls)
770   {
771     GNUNET_STREAM_close (op_result);
772     return;
773   }
774   GNUNET_assert (0);
775 }
776
777
778 /**
779  * Listen success callback; connects a peer to stream as client. Called from
780  * testbed stream_ca
781  */
782 static void
783 stream_connect2 (void)
784 {
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
786   peer_data[0].op =
787       GNUNET_TESTBED_service_connect (&peer_data[0], peer_data[0].peer,
788                                       "stream", NULL, NULL, stream_ca,
789                                       stream_da, &peer_data[0]);
790 }
791
792
793 /**
794  * Adapter function called to establish a connection to
795  * a service.
796  * 
797  * @param cls closure
798  * @param cfg configuration of the peer to connect to; will be available until
799  *          GNUNET_TESTBED_operation_done() is called on the operation returned
800  *          from GNUNET_TESTBED_service_connect()
801  * @return service handle to return in 'op_result', NULL on error
802  */
803 static void * 
804 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
805 {
806   struct PeerData *pdata = cls;
807
808   if (&peer_data[1] == pdata)
809   {
810     peer2_listen_socket = NULL;
811     peer2_listen_socket =
812         GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, &peer_data[1],
813                               GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
814                               &stream_connect2,
815                               GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
816                               payload_size[payload_size_index],
817                               GNUNET_STREAM_OPTION_END);
818     GNUNET_assert (NULL != peer2_listen_socket);
819     return peer2_listen_socket;
820   }
821   if (&peer_data[0] == pdata)
822   {
823     pdata->socket =
824         GNUNET_STREAM_open (cfg, &peer_data[1].id, 10, &stream_open_cb,
825                             &peer_data[0],
826                             GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
827                             payload_size[payload_size_index],
828                             GNUNET_STREAM_OPTION_END);
829     GNUNET_assert (NULL != pdata->socket);
830     return pdata->socket;
831   }
832   GNUNET_assert (0);
833   return NULL;
834 }
835
836
837 /**
838  * Callback to be called when the requested peer information is available
839  *
840  * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
841  * @param op the operation this callback corresponds to
842  * @param pinfo the result; will be NULL if the operation has failed
843  * @param emsg error message if the operation has failed; will be NULL if the
844  *          operation is successfull
845  */
846 static void 
847 peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op,
848              const struct GNUNET_TESTBED_PeerInformation *pinfo,
849              const char *emsg)
850 {
851   struct PeerData *pdata = cb_cls;
852
853   GNUNET_assert (NULL == emsg);
854   GNUNET_assert (common_op == op);
855   GNUNET_assert (NULL != pdata);
856   memcpy (&pdata->id, pinfo->result.id, sizeof (struct GNUNET_PeerIdentity));
857   GNUNET_TESTBED_operation_done (op);
858   if (pdata == &peer_data[0])
859   {
860     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n",
861                 GNUNET_i2s (&pdata->id));
862     common_op = GNUNET_TESTBED_peer_get_information (peer_data[1].peer,
863                                                      GNUNET_TESTBED_PIT_IDENTITY,
864                                                      &peerinfo_cb, &peer_data[1]);
865   }
866   else if (pdata == &peer_data[1])
867   {
868     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n",
869                 GNUNET_i2s (&pdata->id));
870     if (TEST_STEP_2_HOP == test_step)
871       peer_data[1].op = 
872           GNUNET_TESTBED_service_connect (&peer_data[1], peer_data[1].peer,
873                                           "stream", NULL, NULL, stream_ca,
874                                           stream_da, &peer_data[1]);
875     else
876       GNUNET_break (0);         /* FIXME: 3 hop test case here... */
877   }
878 }
879
880
881 /**
882  * Controller event callback
883  *
884  * @param cls NULL
885  * @param event the controller event
886  */
887 static void
888 controller_event_cb (void *cls,
889                      const struct GNUNET_TESTBED_EventInformation *event)
890 {
891   switch (event->type)
892   {
893   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
894     if (NULL != event->details.operation_finished.emsg)
895     {
896       FPRINTF (stderr, "Error while expecting an operation to succeed:%s \n",
897                event->details.operation_finished.emsg);
898       GNUNET_assert (0);
899     }
900     break;
901   case GNUNET_TESTBED_ET_CONNECT:
902     GNUNET_TESTBED_operation_done (common_op);
903     /* Get the peer identity and configuration of peers */
904     common_op =
905         GNUNET_TESTBED_peer_get_information (peer_data[0].peer,
906                                              GNUNET_TESTBED_PIT_IDENTITY,
907                                              &peerinfo_cb, &peer_data[0]);
908     break;
909   default:
910     GNUNET_assert (0);
911   }
912 }
913
914
915 /**
916  * Signature of a main function for a testcase.
917  *
918  * @param cls closure
919  * @param num_peers number of peers in 'peers'
920  * @param peers handle to peers run in the testbed
921  */
922 static void
923 test_master (void *cls, unsigned int num_peers_,
924              struct GNUNET_TESTBED_Peer **peers)
925 {
926   GNUNET_assert (NULL != peers);
927   GNUNET_assert (NULL != peers[0]);
928   GNUNET_assert (NULL != peers[1]);
929   GNUNET_assert (num_peers_ == num_peers);
930   peer_data[0].peer = peers[0];
931   peer_data[1].peer = peers[1];
932   if (2 == num_peers)
933     common_op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL,
934                                                 peer_data[0].peer,
935                                                 peer_data[1].peer);
936   else
937     GNUNET_break (0);
938   abort_task =
939       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
940                                     (GNUNET_TIME_UNIT_SECONDS, 120), &do_abort,
941                                     NULL);
942 }
943
944
945 /**
946  * Main function
947  */
948 int main (int argc, char **argv)
949 {
950   char *test_name = "perf_stream_api";
951   char *cfg_file = "test_stream_local.conf";
952   uint64_t event_mask;
953   unsigned int count;
954   int ret;
955
956   meter = create_meter ((sizeof (data) / 4), "Generating random data\n", GNUNET_YES);
957   for (count=0; count < (sizeof (data) / 4); count++)
958   {
959     data[count] = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
960                                             UINT32_MAX);
961     update_meter (meter);
962   }
963   reset_meter (meter);
964   free_meter (meter);
965   meter = NULL;
966   test_step = TEST_STEP_1_HOP;
967   for (payload_size_index = 0;
968        payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
969        payload_size_index++)
970   {
971     PRINTF ("\nTesting over loopback with payload size %hu\n",
972             payload_size[payload_size_index]);
973     (void) memset (peer_data, 0, sizeof (peer_data));
974     result = INIT;
975     reset_read = GNUNET_NO;
976     ret = GNUNET_TESTING_peer_run (test_name, cfg_file, &run, NULL);
977     if ((0 != ret) || (DOWNLINK_OK != result))
978       goto return_fail;
979   }
980   test_step = TEST_STEP_2_HOP;
981   num_peers = 2;
982   event_mask = 0;
983   event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT);
984   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
985   for (payload_size_index = 0;
986        payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
987        payload_size_index++)
988   {
989     PRINTF ("\nTesting over 1 hop with payload size %hu\n",
990             payload_size[payload_size_index]);
991     (void) memset (peer_data, 0, sizeof (peer_data));
992     result = INIT;
993     reset_read = GNUNET_NO;
994     GNUNET_TESTBED_test_run (test_name, cfg_file, num_peers, event_mask,
995                              &controller_event_cb, NULL, &test_master, NULL);
996     if (DOWNLINK_OK != result)
997       goto return_fail;
998   }
999   test_step = TEST_STEP_3_HOP;
1000   for (payload_size_index = 0; 
1001        payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1002        payload_size_index++)
1003   {
1004     /* Initialize testbed here */
1005   }
1006   return 0;
1007
1008  return_fail:
1009   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test failed\n");
1010   return 1;
1011 }
1012
1013 /* end of perf_stream_api.c */