-fix double-sending in stream if finish_cb behaves in a certain way
[oweals/gnunet.git] / src / stream / perf_stream_api.c
1  /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/perf_stream_api.c
23  * @brief performance benchmarks for stream api
24  * @author Sree Harsha Totakura
25  */
26
27 #define LOG(kind, ...)                         \
28   GNUNET_log (kind, __VA_ARGS__);
29
30 /****************************************************************************************/
31 /* Test is setup into the following major steps:                                        */
32 /*    1. Measurements over loopback (1 hop). i.e. we use only one peer and open         */
33 /*       stream connections over loopback. Messages will go through                     */
34 /*       STREAM_API->MESH_API->MESH_SERVICE->MESH_API->STREAM_API.                      */
35 /*    2. Measurements over 2 peers (2 hops). We use testbed to create 2 peers,          */
36 /*       connect them and then create stream connections. Messages will go through      */
37 /*       STREAM_API->MESH_API->MESH_SERVICE->CORE1.....CORE2->MESH_API->STREAM_API      */
38 /*    3. Measurements over 3 peers (3 hops). We use testbed to create 3 peers,          */
39 /*       connect them in a line topology: peer1->peer2->peer3. Messages will go         */
40 /*       through                                                                        */
41 /*       STREAM_API->MESH_API->MESH_SERVICE->CORE1..CORE2..CORE3->MESH_API->STREAM_API. */
42 /****************************************************************************************/
43
44 #include "platform.h"
45 #include "gnunet_common.h"
46 #include "gnunet_util_lib.h"
47 #include "gnunet_testing_lib.h"
48 #include "gnunet_testbed_service.h"
49 #include "gnunet_stream_lib.h"
50   
51 /**
52  * Simple struct to keep track of progress, and print a
53  * nice little percentage meter for long running tasks.
54  */
55 struct ProgressMeter
56 {
57   unsigned int total;
58
59   unsigned int modnum;
60
61   unsigned int dotnum;
62
63   unsigned int completed;
64
65   int print;
66
67   char *startup_string;
68 };
69
70
71 /**
72  * Steps in testing
73  */
74 enum TestStep
75 {
76   /**
77    * Single hop loopback testing
78    */
79   TEST_STEP_1_HOP,
80
81   /**
82    * Testing with 2 peers
83    */
84   TEST_STEP_2_HOP,
85
86   /**
87    * Testing with 3 peers
88    */
89   TEST_STEP_3_HOP
90 };
91
92
93 /**
94  * Structure for holding peer's sockets and IO Handles
95  */
96 struct PeerData
97 {
98   /**
99    * Peer's stream socket
100    */
101   struct GNUNET_STREAM_Socket *socket;
102
103   /**
104    * Peer's io write handle
105    */
106   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
107
108   /**
109    * Peer's io read handle
110    */
111   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
112
113   /**
114    * The peer handle when we use the testbed servie
115    */
116   struct GNUNET_TESTBED_Peer *peer;
117
118   /**
119    * Handle to peer specific opearations while using testbed service
120    */
121   struct GNUNET_TESTBED_Operation *op;
122
123   /**
124    * The identity of this peer
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Peer's shutdown handle
130    */
131   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
132
133   /**
134    * Bytes the peer has written
135    */
136   size_t bytes_wrote;
137
138   /**
139    * Byte the peer has read
140    */
141   size_t bytes_read;
142
143   /**
144    * number of packets sent
145    */
146   unsigned int packets_wrote;
147
148   /**
149    * number of packets read
150    */
151   unsigned int packets_read;
152 };
153
154
155 /**
156  * Enumeration of stages in this testing
157  */
158 enum TestStage
159 {
160   /**
161    * The initial stage
162    */
163   INIT,
164   
165   /**
166    * Uplink testing stage
167    */
168   UPLINK_OK,
169
170   /**
171    * Downlink testing stage
172    */
173   DOWNLINK_OK
174 };
175
176
177 /**
178  * Maximum size of the data which we will transfer during tests
179  */
180 #define DATA_SIZE 5000000      /* 5mB */
181
182 /**
183  * Fixed number of packets we send in each direction during each subtest
184  */
185 #define MAX_PACKETS 1000
186
187 /**
188  * Listen socket of peer2
189  */
190 struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
191
192 /**
193  * Handle to configuration during TEST_STEP_1_HOP
194  */
195 const struct GNUNET_CONFIGURATION_Handle *config;
196
197 /**
198  * Handle for the progress meter
199  */
200 static struct ProgressMeter *meter;
201
202 /**
203  * Placeholder for peer data
204  */
205 static struct PeerData peer_data[3];
206
207 /**
208  * Handle to common operations while using testbed
209  */
210 static struct GNUNET_TESTBED_Operation *common_op;
211
212 /**
213  * Task ID for abort task
214  */
215 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
216
217 /**
218  * Task ID for write task
219  */
220 static GNUNET_SCHEDULER_TaskIdentifier write_task;
221
222 /**
223  * Task ID for read task
224  */
225 static GNUNET_SCHEDULER_TaskIdentifier read_task;
226
227 /**
228  * Absolute time when profiling starts
229  */
230 static struct GNUNET_TIME_Absolute prof_start_time;
231
232 /**
233  * Test time taken for sending the data
234  */
235 static struct GNUNET_TIME_Relative prof_time;
236
237 /**
238  * Random data block. Should generate data first
239  */
240 static uint32_t data[DATA_SIZE / 4];
241
242 /**
243  * Payload sizes to test each major test with
244  */
245 static uint16_t payload_size[] = 
246 { 20, 500, 2000, 7000, 13000, 25000, 50000};//, 60000, 63000, 64000 };
247
248 /**
249  * Current step of testing
250  */
251 static enum TestStep test_step;
252
253 /**
254  * Index for choosing payload size
255  */
256 static unsigned int payload_size_index;
257
258 /**
259  * Number of peers we want to create while using the testbed service
260  */
261 static int num_peers;
262
263 /**
264  * Flag to indicate that the other peer should reset its data read source index
265  */
266 static int reset_read;
267
268 /**
269  * Testing result of a major test
270  */
271 static enum TestStage result;
272
273 /**
274  * Create a meter to keep track of the progress of some task.
275  *
276  * @param total the total number of items to complete
277  * @param start_string a string to prefix the meter with (if printing)
278  * @param print GNUNET_YES to print the meter, GNUNET_NO to count
279  *              internally only
280  *
281  * @return the progress meter
282  */
283 static struct ProgressMeter *
284 create_meter (unsigned int total, char *start_string, int print)
285 {
286   struct ProgressMeter *ret;
287
288   ret = GNUNET_malloc (sizeof (struct ProgressMeter));
289   ret->print = print;
290   ret->total = total;
291   ret->modnum = total / 4;
292   if (ret->modnum == 0)         /* Divide by zero check */
293     ret->modnum = 1;
294   ret->dotnum = (total / 50) + 1;
295   if (start_string != NULL)
296     ret->startup_string = GNUNET_strdup (start_string);
297   else
298     ret->startup_string = GNUNET_strdup ("");
299
300   return ret;
301 }
302
303
304 /**
305  * Update progress meter (increment by one).
306  *
307  * @param meter the meter to update and print info for
308  *
309  * @return GNUNET_YES if called the total requested,
310  *         GNUNET_NO if more items expected
311  */
312 static int
313 update_meter (struct ProgressMeter *meter)
314 {
315   if (meter->print == GNUNET_YES)
316   {
317     if (meter->completed % meter->modnum == 0)
318     {
319       if (meter->completed == 0)
320       {
321         FPRINTF (stdout, "%sProgress: [0%%", meter->startup_string);
322       }
323       else
324         FPRINTF (stdout, "%d%%",
325                  (int) (((float) meter->completed / meter->total) * 100));
326     }
327     else if (meter->completed % meter->dotnum == 0)
328       FPRINTF (stdout, "%s",  ".");
329
330     if (meter->completed + 1 == meter->total)
331       FPRINTF (stdout, "%d%%]\n", 100);
332     fflush (stdout);
333   }
334   meter->completed++;
335
336   if (meter->completed == meter->total)
337     return GNUNET_YES;
338   if (meter->completed > meter->total)
339     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Progress meter overflow!!\n");
340   return GNUNET_NO;
341 }
342
343
344 /**
345  * Reset progress meter.
346  *
347  * @param meter the meter to reset
348  *
349  * @return GNUNET_YES if meter reset,
350  *         GNUNET_SYSERR on error
351  */
352 static int
353 reset_meter (struct ProgressMeter *meter)
354 {
355   if (meter == NULL)
356     return GNUNET_SYSERR;
357
358   meter->completed = 0;
359   return GNUNET_YES;
360 }
361
362
363 /**
364  * Release resources for meter
365  *
366  * @param meter the meter to free
367  */
368 static void
369 free_meter (struct ProgressMeter *meter)
370 {
371   GNUNET_free_non_null (meter->startup_string);
372   GNUNET_free (meter);
373 }
374
375
376 /**
377  * Shutdown nicely
378  */
379 static void
380 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
381 {
382   switch (test_step)
383   {
384   case TEST_STEP_1_HOP:
385     if (NULL != peer_data[0].socket)
386       GNUNET_STREAM_close (peer_data[0].socket);
387     if (NULL != peer_data[1].socket)
388       GNUNET_STREAM_close (peer_data[1].socket);
389     if (NULL != peer2_listen_socket)
390       GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
391     break;
392   case TEST_STEP_2_HOP:
393     if (NULL != peer_data[1].socket)
394       GNUNET_STREAM_close (peer_data[1].socket);
395     if (NULL != peer_data[0].op)
396       GNUNET_TESTBED_operation_done (peer_data[0].op);
397     if (NULL != peer_data[1].op)
398       GNUNET_TESTBED_operation_done (peer_data[1].op);
399     break;
400   case TEST_STEP_3_HOP:
401     GNUNET_break (0);    
402   }  
403   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
404     GNUNET_SCHEDULER_cancel (abort_task);
405   if (GNUNET_SCHEDULER_NO_TASK != write_task)
406     GNUNET_SCHEDULER_cancel (write_task);
407   GNUNET_SCHEDULER_shutdown (); /* Shutdown this testcase */
408   if (NULL != meter)
409   {
410     free_meter (meter);
411     meter = NULL;
412   }
413 }
414
415
416 /**
417  * Something went wrong and timed out. Kill everything and set error flag
418  */
419 static void
420 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
421 {
422   abort_task = GNUNET_SCHEDULER_NO_TASK;
423   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test: ABORT\n");
424   if (GNUNET_SCHEDULER_NO_TASK != read_task)
425     GNUNET_SCHEDULER_cancel (read_task);
426   result = GNUNET_SYSERR;
427   do_close (cls, tc);
428 }
429
430
431 /**
432  * Completion callback for shutdown
433  *
434  * @param cls the closure from GNUNET_STREAM_shutdown call
435  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
436  *          SHUT_RDWR) 
437  */
438 static void 
439 shutdown_completion (void *cls,
440                      int operation)
441 {
442   static int shutdowns;
443
444   if (++shutdowns == 1)
445   {
446     peer_data[0].shutdown_handle = NULL;
447     peer_data[1].shutdown_handle = GNUNET_STREAM_shutdown (peer_data[1].socket, SHUT_RDWR,
448                                                            &shutdown_completion, cls);
449     return;
450   }  
451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
452   GNUNET_SCHEDULER_add_now (&do_close, cls);
453 }
454
455
456 /**
457  * Shutdown sockets gracefully
458  */
459 static void
460 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
461 {
462   peer_data[0].shutdown_handle = GNUNET_STREAM_shutdown (peer_data[0].socket, SHUT_RDWR,
463                                                          &shutdown_completion, cls);
464 }
465
466   
467 /**
468  * Scheduler call back; to be executed when a new stream is connected
469  * Called from listen connect for peer2
470  */
471 static void
472 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
473
474
475 /**
476  * Task for calling STREAM_write with a chunk of random data
477  *
478  * @param cls the peer data entity
479  * @param tc the task context
480  */
481 static void
482 stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
483
484
485 /**
486  * The write completion function; called upon writing some data to stream or
487  * upon error
488  *
489  * @param cls the closure from GNUNET_STREAM_write/read
490  * @param status the status of the stream at the time this function is called
491  * @param size the number of bytes written
492  */
493 static void 
494 write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size)
495 {
496   struct PeerData *pdata = cls;
497   double throughput;
498   double prof_time_sec;
499   unsigned int packets_wrote;
500
501   if (GNUNET_STREAM_OK != status)
502   {
503     GNUNET_SCHEDULER_cancel (abort_task);
504     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
505     return;
506   }
507   GNUNET_assert (size <= DATA_SIZE);
508   packets_wrote = (size + payload_size[payload_size_index] - 1)
509       / payload_size[payload_size_index];
510   pdata->bytes_wrote += size;
511   for (;packets_wrote > 0; packets_wrote--)
512   {    
513     update_meter (meter);
514     pdata->packets_wrote++;
515   }
516   if (pdata->packets_wrote < MAX_PACKETS) /* Have more data to send */
517   {
518     size_t write_amount;
519     
520     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
521     {
522       GNUNET_SCHEDULER_cancel (abort_task);
523       abort_task = 
524           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
525                                         (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
526                                     NULL);
527     }
528     write_amount = (MAX_PACKETS - pdata->packets_wrote) *
529         payload_size[payload_size_index];
530     if (write_amount > DATA_SIZE)
531       write_amount = DATA_SIZE;
532     reset_read = GNUNET_YES;
533     pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
534                                                   write_amount,
535                                                   GNUNET_TIME_UNIT_FOREVER_REL,
536                                                   &write_completion, pdata);
537     GNUNET_assert (NULL != pdata->io_write_handle);
538   }
539   else
540   {
541     free_meter (meter);
542     meter = NULL;
543     prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time);
544     prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000));
545     throughput = ((float) pdata->bytes_wrote) / prof_time_sec;
546     PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00);
547     switch (result)
548     {
549     case INIT:
550       result = UPLINK_OK;
551       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
552       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == write_task);
553       pdata->bytes_read = 0;
554       pdata->packets_read = 0;
555       meter = create_meter (MAX_PACKETS, "Testing Downlink\n", GNUNET_YES);
556       read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer_data[0]);
557       write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, &peer_data[1]);
558       break;
559     case UPLINK_OK:
560       result = DOWNLINK_OK;
561       GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
562       break;
563     case DOWNLINK_OK:
564       GNUNET_assert (0);
565     }
566   }
567 }
568
569
570 /**
571  * Task for calling STREAM_write with a chunk of random data
572  *
573  * @param cls the peer data entity
574  * @param tc the task context
575  */
576 static void
577 stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
578 {
579   struct PeerData *pdata = cls;
580   size_t write_amount;
581
582   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
583   {
584     GNUNET_SCHEDULER_cancel (abort_task);
585     abort_task = 
586       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
587                                     (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
588                                     NULL);
589   }
590   write_task = GNUNET_SCHEDULER_NO_TASK;
591   prof_start_time = GNUNET_TIME_absolute_get ();
592   pdata->bytes_wrote = 0;
593   pdata->packets_wrote = 0;
594   write_amount = MAX_PACKETS * payload_size[payload_size_index];
595   if (write_amount > DATA_SIZE)
596     write_amount = DATA_SIZE;
597   reset_read = GNUNET_YES;
598   pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
599                                                 write_amount,
600                                                 GNUNET_TIME_UNIT_FOREVER_REL,
601                                                 &write_completion, pdata);
602   GNUNET_assert (NULL != pdata->io_write_handle);
603 }
604
605
606 /**
607  * Scheduler call back; to be executed when a new stream is connected
608  * Called from listen connect for peer2
609  */
610 static void
611 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
612
613
614 /**
615  * Input processor
616  *
617  * @param cls peer2
618  * @param status the status of the stream at the time this function is called
619  * @param data traffic from the other side
620  * @param size the number of bytes available in data read 
621  * @return number of bytes of processed from 'data' (any data remaining should be
622  *         given to the next time the read processor is called).
623  */
624 static size_t
625 input_processor (void *cls, enum GNUNET_STREAM_Status status,
626                  const void *input_data, size_t size)
627 {
628   struct PeerData *pdata = cls;
629
630   if (GNUNET_STREAM_OK != status)
631   {
632     GNUNET_SCHEDULER_cancel (abort_task);
633     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
634     return 0;
635   }
636   GNUNET_assert (size <= DATA_SIZE);
637   if (GNUNET_YES == reset_read)
638   {
639     pdata->bytes_read = 0;
640     reset_read = GNUNET_NO;
641   }
642   GNUNET_assert ((pdata->bytes_read + size) <= DATA_SIZE);
643   GNUNET_assert (0 == memcmp (((void *)data ) + pdata->bytes_read, 
644                               input_data, size));
645   pdata->bytes_read += size;
646   pdata->packets_read += (size + payload_size[payload_size_index] - 1)
647       / payload_size[payload_size_index];
648   if (pdata->packets_read < MAX_PACKETS)
649   {
650     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
651     read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
652   }
653   else 
654   {
655     LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
656   }
657   return size;
658 }
659
660   
661 /**
662  * Scheduler call back; to be executed when a new stream is connected
663  * Called from listen connect for peer2
664  */
665 static void
666 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
667 {
668   struct PeerData *pdata = cls;
669
670   read_task = GNUNET_SCHEDULER_NO_TASK;
671   pdata->io_read_handle =
672       GNUNET_STREAM_read (pdata->socket, GNUNET_TIME_UNIT_FOREVER_REL,
673                           &input_processor, pdata);
674   GNUNET_assert (NULL != pdata->io_read_handle);
675 }
676
677
678 /**
679  * Functions of this type are called upon new stream connection from other peers
680  *
681  * @param cls the closure from GNUNET_STREAM_listen
682  * @param socket the socket representing the stream
683  * @param initiator the identity of the peer who wants to establish a stream
684  *            with us
685  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
686  *             stream (the socket will be invalid after the call)
687  */
688 static int
689 stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket,
690                   const struct GNUNET_PeerIdentity *initiator)
691 {
692   struct PeerData *pdata = cls;
693
694   
695   if ((NULL == socket) || (NULL == initiator))
696   {
697     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
698     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
699       GNUNET_SCHEDULER_cancel (abort_task);
700     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
701     return GNUNET_OK;
702   }
703   GNUNET_assert (NULL != socket);
704   GNUNET_assert (pdata == &peer_data[1]);
705   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n",
706               GNUNET_i2s(initiator));
707   pdata->socket = socket;
708   pdata->bytes_read = 0;
709   read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
710   return GNUNET_OK;
711 }
712
713
714 /**
715  * Function executed after stream has been established
716  *
717  * @param cls the closure from GNUNET_STREAM_open
718  * @param socket socket to use to communicate with the other side (read/write)
719  */
720 static void 
721 stream_open_cb (void *cls,
722                 struct GNUNET_STREAM_Socket *socket)
723 {
724   struct PeerData *pdata = cls;
725
726   GNUNET_assert (socket == pdata->socket);
727   meter = create_meter (MAX_PACKETS, "Testing Uplink\n", GNUNET_YES);
728   write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, pdata);
729 }
730
731
732 /**
733  * Listen success callback; connects a peer to stream as client
734  */
735 static void
736 stream_connect (void)
737 {
738   peer_data[0].socket = 
739       GNUNET_STREAM_open (config, &peer_data[1].id, 10, &stream_open_cb,
740                           &peer_data[0],
741                           GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
742                           payload_size[payload_size_index],
743                           GNUNET_STREAM_OPTION_END);
744   GNUNET_assert (NULL != peer_data[0].socket);
745 }
746
747
748 /**
749  * Initialize framework and start test
750  *
751  * @param cls closure
752  * @param cfg configuration of the peer that was started
753  * @param peer identity of the peer that was created
754  */
755 static void
756 run (void *cls, 
757      const struct GNUNET_CONFIGURATION_Handle *cfg,
758      struct GNUNET_TESTING_Peer *peer)
759 {
760   struct GNUNET_PeerIdentity id;
761
762   GNUNET_TESTING_peer_get_identity (peer, &id);
763   config = cfg;
764   peer2_listen_socket = 
765       GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[1],
766                             GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
767                             &stream_connect,
768                             GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
769                             payload_size[payload_size_index],
770                             GNUNET_STREAM_OPTION_END);
771   GNUNET_assert (NULL != peer2_listen_socket);
772   peer_data[0].id = id;
773   peer_data[1].id = id;
774   abort_task =
775       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
776                                     (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
777                                     NULL);
778 }
779
780
781 /**
782  * Adapter function called to establish a connection to
783  * a service.
784  * 
785  * @param cls closure
786  * @param cfg configuration of the peer to connect to; will be available until
787  *          GNUNET_TESTBED_operation_done() is called on the operation returned
788  *          from GNUNET_TESTBED_service_connect()
789  * @return service handle to return in 'op_result', NULL on error
790  */
791 static void * 
792 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg);
793
794
795 /**
796  * Adapter function called to destroy a connection to
797  * a service.
798  * 
799  * @param cls closure
800  * @param op_result service handle returned from the connect adapter
801  */
802 static void
803 stream_da (void *cls, void *op_result)
804 {
805   if (&peer_data[1] == cls)
806   {
807     GNUNET_STREAM_listen_close (op_result);
808     return;
809   }
810   else if (&peer_data[0] == cls)
811   {
812     GNUNET_STREAM_close (op_result);
813     return;
814   }
815   GNUNET_assert (0);
816 }
817
818
819 /**
820  * Listen success callback; connects a peer to stream as client. Called from
821  * testbed stream_ca
822  */
823 static void
824 stream_connect2 (void)
825 {
826   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
827   peer_data[0].op =
828       GNUNET_TESTBED_service_connect (&peer_data[0], peer_data[0].peer,
829                                       "stream", NULL, NULL, stream_ca,
830                                       stream_da, &peer_data[0]);
831 }
832
833
834 /**
835  * Adapter function called to establish a connection to
836  * a service.
837  * 
838  * @param cls closure
839  * @param cfg configuration of the peer to connect to; will be available until
840  *          GNUNET_TESTBED_operation_done() is called on the operation returned
841  *          from GNUNET_TESTBED_service_connect()
842  * @return service handle to return in 'op_result', NULL on error
843  */
844 static void * 
845 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
846 {
847   struct PeerData *pdata = cls;
848
849   if (&peer_data[1] == pdata)
850   {
851     peer2_listen_socket = NULL;
852     peer2_listen_socket =
853         GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, &peer_data[1],
854                               GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
855                               &stream_connect2,
856                               GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
857                               payload_size[payload_size_index],
858                               GNUNET_STREAM_OPTION_END);
859     GNUNET_assert (NULL != peer2_listen_socket);
860     return peer2_listen_socket;
861   }
862   if (&peer_data[0] == pdata)
863   {
864     pdata->socket =
865         GNUNET_STREAM_open (cfg, &peer_data[1].id, 10, &stream_open_cb,
866                             &peer_data[0],
867                             GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
868                             payload_size[payload_size_index],
869                             GNUNET_STREAM_OPTION_END);
870     GNUNET_assert (NULL != pdata->socket);
871     return pdata->socket;
872   }
873   GNUNET_assert (0);
874   return NULL;
875 }
876
877
878 /**
879  * Callback to be called when the requested peer information is available
880  *
881  * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
882  * @param op the operation this callback corresponds to
883  * @param pinfo the result; will be NULL if the operation has failed
884  * @param emsg error message if the operation has failed; will be NULL if the
885  *          operation is successfull
886  */
887 static void 
888 peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op,
889              const struct GNUNET_TESTBED_PeerInformation *pinfo,
890              const char *emsg)
891 {
892   struct PeerData *pdata = cb_cls;
893
894   GNUNET_assert (NULL == emsg);
895   GNUNET_assert (common_op == op);
896   GNUNET_assert (NULL != pdata);
897   memcpy (&pdata->id, pinfo->result.id, sizeof (struct GNUNET_PeerIdentity));
898   GNUNET_TESTBED_operation_done (op);
899   if (pdata == &peer_data[0])
900   {
901     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n",
902                 GNUNET_i2s (&pdata->id));
903     common_op = GNUNET_TESTBED_peer_get_information (peer_data[1].peer,
904                                                      GNUNET_TESTBED_PIT_IDENTITY,
905                                                      &peerinfo_cb, &peer_data[1]);
906   }
907   else if (pdata == &peer_data[1])
908   {
909     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n",
910                 GNUNET_i2s (&pdata->id));
911     if (TEST_STEP_2_HOP == test_step)
912       peer_data[1].op = 
913           GNUNET_TESTBED_service_connect (&peer_data[1], peer_data[1].peer,
914                                           "stream", NULL, NULL, stream_ca,
915                                           stream_da, &peer_data[1]);
916     else
917       GNUNET_break (0);         /* FIXME: 3 hop test case here... */
918   }
919 }
920
921
922 /**
923  * Controller event callback
924  *
925  * @param cls NULL
926  * @param event the controller event
927  */
928 static void
929 controller_event_cb (void *cls,
930                      const struct GNUNET_TESTBED_EventInformation *event)
931 {
932   switch (event->type)
933   {
934   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
935     if (NULL != event->details.operation_finished.emsg)
936     {
937       FPRINTF (stderr, "Error while expecting an operation to succeed:%s \n",
938                event->details.operation_finished.emsg);
939       GNUNET_assert (0);
940     }
941     break;
942   case GNUNET_TESTBED_ET_CONNECT:
943     GNUNET_TESTBED_operation_done (common_op);
944     /* Get the peer identity and configuration of peers */
945     common_op =
946         GNUNET_TESTBED_peer_get_information (peer_data[0].peer,
947                                              GNUNET_TESTBED_PIT_IDENTITY,
948                                              &peerinfo_cb, &peer_data[0]);
949     break;
950   default:
951     GNUNET_assert (0);
952   }
953 }
954
955
956 /**
957  * Signature of a main function for a testcase.
958  *
959  * @param cls closure
960  * @param num_peers number of peers in 'peers'
961  * @param peers handle to peers run in the testbed
962  */
963 static void
964 test_master (void *cls, unsigned int num_peers_,
965              struct GNUNET_TESTBED_Peer **peers)
966 {
967   GNUNET_assert (NULL != peers);
968   GNUNET_assert (NULL != peers[0]);
969   GNUNET_assert (NULL != peers[1]);
970   GNUNET_assert (num_peers_ == num_peers);
971   peer_data[0].peer = peers[0];
972   peer_data[1].peer = peers[1];
973   if (2 == num_peers)
974     common_op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL,
975                                                 peer_data[0].peer,
976                                                 peer_data[1].peer);
977   else
978     GNUNET_break (0);
979   abort_task =
980       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
981                                     (GNUNET_TIME_UNIT_SECONDS, 120), &do_abort,
982                                     NULL);
983 }
984
985
986 /**
987  * Main function
988  */
989 int main (int argc, char **argv)
990 {
991   char *test_name = "perf_stream_api";
992   char *cfg_file = "test_stream_local.conf";
993   uint64_t event_mask;
994   unsigned int count;
995   int ret;
996
997   meter = create_meter ((sizeof (data) / 4), "Generating random data\n", GNUNET_YES);
998   for (count=0; count < (sizeof (data) / 4); count++)
999   {
1000     data[count] = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1001                                             UINT32_MAX);
1002     update_meter (meter);
1003   }
1004   reset_meter (meter);
1005   free_meter (meter);
1006   meter = NULL;
1007   test_step = TEST_STEP_1_HOP;
1008   for (payload_size_index = 0;
1009        payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1010        payload_size_index++)
1011   {
1012     PRINTF ("\nTesting over loopback with payload size %hu\n",
1013             payload_size[payload_size_index]);
1014     (void) memset (peer_data, 0, sizeof (peer_data));
1015     result = INIT;
1016     reset_read = GNUNET_NO;
1017     ret = GNUNET_TESTING_peer_run (test_name, cfg_file, &run, NULL);
1018     if ((0 != ret) || (DOWNLINK_OK != result))
1019       goto return_fail;
1020   }
1021   test_step = TEST_STEP_2_HOP;
1022   num_peers = 2;
1023   event_mask = 0;
1024   event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT);
1025   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
1026   for (payload_size_index = 0;
1027        payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1028        payload_size_index++)
1029   {
1030     PRINTF ("\nTesting over 1 hop with payload size %hu\n",
1031             payload_size[payload_size_index]);
1032     (void) memset (peer_data, 0, sizeof (peer_data));
1033     result = INIT;
1034     reset_read = GNUNET_NO;
1035     GNUNET_TESTBED_test_run (test_name, cfg_file, num_peers, event_mask,
1036                              &controller_event_cb, NULL, &test_master, NULL);
1037     if (DOWNLINK_OK != result)
1038       goto return_fail;
1039   }
1040   test_step = TEST_STEP_3_HOP;
1041   for (payload_size_index = 0; 
1042        payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1043        payload_size_index++)
1044   {
1045     /* Initialize testbed here */
1046   }
1047   return 0;
1048
1049  return_fail:
1050   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test failed\n");
1051   return 1;
1052 }
1053
1054 /* end of perf_stream_api.c */