- rename
[oweals/gnunet.git] / src / stream / test_stream_2peers_halfclose.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_2peers_halfclose.c
23  * @brief Testcases for Stream API halfclosed connections between 2 peers
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_testbed_service.h"
29 #include "gnunet_mesh_service.h"
30 #include "gnunet_stream_lib.h"
31
32 /**
33  * Number of peers
34  */
35 #define NUM_PEERS 2
36
37 #define TIME_REL_SECS(sec) \
38   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
39
40 /**
41  * Structure for holding peer's sockets and IO Handles
42  */
43 struct PeerData
44 {
45   /**
46    * The testbed peer handle corresponding to this peer
47    */
48   struct GNUNET_TESTBED_Peer *peer;
49
50   /**
51    * Peer's stream socket
52    */
53   struct GNUNET_STREAM_Socket *socket;
54
55   /**
56    * Peer's io write handle
57    */
58   struct GNUNET_STREAM_WriteHandle *io_write_handle;
59
60   /**
61    * Peer's io read handle
62    */
63   struct GNUNET_STREAM_ReadHandle *io_read_handle;
64
65   /**
66    * Peer's shutdown handle
67    */
68   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
69
70   /**
71    * Testbed operation handle specific for this peer
72    */
73   struct GNUNET_TESTBED_Operation *op;
74
75   /**
76    * Our Peer id
77    */
78   struct GNUNET_PeerIdentity our_id;
79
80   /**
81    * Bytes the peer has written
82    */
83   unsigned int bytes_wrote;
84
85   /**
86    * Byte the peer has read
87    */
88   unsigned int bytes_read;
89
90   /**
91    * GNUNET_YES if the peer has successfully completed the current test
92    */
93   unsigned int test_ok;
94
95   /**
96    * The shutdown operation that has to be used by the stream_shutdown_task
97    */
98   int shutdown_operation;
99 };
100
101
102 /**
103  * Enumeration for various tests that are to be passed in the same order as
104  * below
105  */
106 enum Test
107 {
108   /**
109    * Peer1 writing; Peer2 reading
110    */
111   PEER1_WRITE,
112
113   /**
114    * Peer1 write shutdown; Peer2 should get an error when it tries to read;
115    */
116   PEER1_WRITE_SHUTDOWN,
117
118   /**
119    * Peer1 reads; Peer2 writes (connection is halfclosed)
120    */
121   PEER1_HALFCLOSE_READ,
122
123   /**
124    * Peer1 attempts to write; Should fail with stream already shutdown error
125    */
126   PEER1_HALFCLOSE_WRITE_FAIL,
127
128   /**
129    * Peer1 read shutdown; Peer2 should get stream shutdown error during write
130    */
131   PEER1_READ_SHUTDOWN,
132
133   /**
134    * All tests successfully finished
135    */
136   SUCCESS
137 };
138
139
140 /**
141  * Different states in test setup
142  */
143 enum SetupState
144 {
145   /**
146    * The initial state
147    */
148   INIT,
149
150   /**
151    * Get the identity of peer 1
152    */
153   PEER1_GET_IDENTITY,
154
155   /**
156    * Get the identity of peer 2
157    */
158   PEER2_GET_IDENTITY,
159
160   /**
161    * Connect to stream service of peer 2
162    */
163   PEER2_STREAM_CONNECT,
164   
165   /**
166    * Connect to stream service of peer 1
167    */
168   PEER1_STREAM_CONNECT
169
170 };
171
172
173 /**
174  * Peer1 writes first and then calls for SHUT_WR
175  * Peer2 reads first and then calls for SHUT_RD
176  * Attempt to write again by Peer1 should be rejected
177  * Attempt to read again by Peer2 should be rejected
178  * Peer1 then reads from Peer2 which writes
179  */
180 static struct PeerData peer1;
181 static struct PeerData peer2;
182
183 /**
184  * Task for aborting the test case if it takes too long
185  */
186 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
187
188 /**
189  * Task for reading from stream
190  */
191 static GNUNET_SCHEDULER_TaskIdentifier read_task;
192
193 static char *data = "ABCD";
194
195 /**
196  * Handle to testbed operation
197  */
198 struct GNUNET_TESTBED_Operation *op;
199
200 /**
201  * Final testing result
202  */
203 static int result;
204
205 /**
206  * Current running test
207  */
208 enum Test current_test;
209
210 /**
211  * State is test setup
212  */
213 enum SetupState setup_state;
214
215
216 /**
217  * Input processor
218  *
219  * @param cls the closure from GNUNET_STREAM_write/read
220  * @param status the status of the stream at the time this function is called
221  * @param data traffic from the other side
222  * @param size the number of bytes available in data read 
223  * @return number of bytes of processed from 'data' (any data remaining should be
224  *         given to the next time the read processor is called).
225  */
226 static size_t
227 input_processor (void *cls,
228                  enum GNUNET_STREAM_Status status,
229                  const void *input_data,
230                  size_t size);
231
232
233 /**
234  * The transition function; responsible for the transitions among tests
235  */
236 static void
237 transition();
238
239
240 /**
241  * Task for calling STREAM_read
242  *
243  * @param cls the peer data entity
244  * @param tc the task context
245  */
246 static void
247 stream_read_task (void *cls,
248                   const struct GNUNET_SCHEDULER_TaskContext *tc)
249 {
250   struct PeerData *peer = cls;
251   
252   peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
253                                              GNUNET_TIME_relative_multiply
254                                              (GNUNET_TIME_UNIT_SECONDS, 5),
255                                              &input_processor,
256                                              cls);
257   switch (current_test)
258     {
259     case PEER1_WRITE_SHUTDOWN:
260       GNUNET_assert (&peer2 == peer);
261       GNUNET_assert (NULL == peer->io_read_handle);
262       transition ();            /* to PEER1_HALFCLOSE_READ */
263       break;
264     default:
265       GNUNET_assert (NULL != peer->io_read_handle);
266     }
267 }
268
269
270 /**
271  * The write completion function; called upon writing some data to stream or
272  * upon error
273  *
274  * @param cls the closure from GNUNET_STREAM_write/read
275  * @param status the status of the stream at the time this function is called
276  * @param size the number of bytes read or written
277  */
278 static void 
279 write_completion (void *cls,
280                   enum GNUNET_STREAM_Status status,
281                   size_t size);
282
283
284 /**
285  * Task for calling STREAM_write
286  *
287  * @param cls the peer data entity
288  * @param tc the task context
289  */
290 static void
291 stream_write_task (void *cls,
292                    const struct GNUNET_SCHEDULER_TaskContext *tc)
293 {
294   struct PeerData *peer = cls;
295   
296   peer->io_write_handle = 
297     GNUNET_STREAM_write (peer->socket,
298                          (void *) data,
299                          strlen(data) - peer->bytes_wrote,
300                          GNUNET_TIME_relative_multiply
301                          (GNUNET_TIME_UNIT_SECONDS, 5),
302                          &write_completion,
303                          peer);
304   switch (current_test)
305     {
306     case PEER1_HALFCLOSE_WRITE_FAIL:
307       GNUNET_assert (&peer1 == peer);
308       GNUNET_assert (NULL == peer->io_write_handle);
309       transition();             /* To PEER1_READ_SHUTDOWN */
310       break;
311     case PEER1_READ_SHUTDOWN:
312       GNUNET_assert (&peer2 == peer);
313       GNUNET_assert (NULL == peer->io_write_handle);
314       transition ();            /* To SUCCESS */
315       break;
316     default:
317         GNUNET_assert (NULL != peer->io_write_handle);
318     }
319 }
320
321
322 /**
323  * Close sockets and stop testing deamons nicely
324  */
325 static void
326 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
327 {
328   if (NULL != peer2.socket)
329     GNUNET_STREAM_close (peer2.socket);
330   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
331     GNUNET_SCHEDULER_cancel (abort_task);
332   if (NULL != peer2.op)
333     GNUNET_TESTBED_operation_done (peer2.op);
334   else
335     GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
336 }
337
338
339 /**
340  * Completion callback for shutdown
341  *
342  * @param cls the closure from GNUNET_STREAM_shutdown call
343  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
344  *          SHUT_RDWR) 
345  */
346 void 
347 shutdown_completion (void *cls,
348                      int operation)
349 {
350   switch (current_test)
351     {
352     case PEER1_WRITE:
353       GNUNET_assert (0);
354     case PEER1_WRITE_SHUTDOWN:
355       GNUNET_assert (cls == &peer1);
356       GNUNET_assert (SHUT_WR == operation);
357       peer1.test_ok = GNUNET_YES;
358       /* Peer2 should read with error */
359       peer2.bytes_read = 0;
360       GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
361       break;
362     case PEER1_READ_SHUTDOWN:
363       peer1.test_ok = GNUNET_YES;
364       peer2.bytes_wrote = 0;
365       GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
366       break;
367     case PEER1_HALFCLOSE_READ:
368     case PEER1_HALFCLOSE_WRITE_FAIL:
369     case SUCCESS:
370       GNUNET_assert (0);        /* We shouldn't reach here */
371     }
372 }
373
374
375 /**
376  * Task for calling STREAM_shutdown
377  *
378  * @param cls the peer entity
379  * @param tc the TaskContext
380  */
381 static void
382 stream_shutdown_task (void *cls,
383                       const struct GNUNET_SCHEDULER_TaskContext *tc)
384 {
385   struct PeerData *peer = cls;
386
387   peer->shutdown_handle = GNUNET_STREAM_shutdown (peer->socket,
388                                                   peer->shutdown_operation,
389                                                   &shutdown_completion,
390                                                   peer);
391   GNUNET_assert (NULL != peer->shutdown_handle);
392 }
393
394
395 /**
396  * Something went wrong and timed out. Kill everything and set error flag
397  */
398 static void
399 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
400 {
401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
402   if (0 != read_task)
403     {
404       GNUNET_SCHEDULER_cancel (read_task);
405     }
406   result = GNUNET_SYSERR;
407   abort_task = 0;
408   do_close (cls, tc);  
409 }
410
411
412 /**
413  * The transition function; responsible for the transitions among tests
414  */
415 static void
416 transition()
417 {
418   if ((GNUNET_YES == peer1.test_ok) && (GNUNET_YES == peer2.test_ok))
419     {
420       peer1.test_ok = GNUNET_NO;
421       peer2.test_ok = GNUNET_NO;
422       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423                   "TEST %d SUCCESSFULL\n", current_test);
424       switch (current_test)
425         {
426         case PEER1_WRITE:
427           current_test = PEER1_WRITE_SHUTDOWN;
428           /* Peer1 should shutdown writing */
429           peer1.shutdown_operation = SHUT_WR;
430           GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
431           break;
432         case PEER1_WRITE_SHUTDOWN:
433           current_test = PEER1_HALFCLOSE_READ;
434           /* Peer2 should be able to write successfully */
435           peer2.bytes_wrote = 0;
436           GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
437           
438           /* Peer1 should be able to read successfully */
439           peer1.bytes_read = 0;
440           GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
441           break;
442         case PEER1_HALFCLOSE_READ:
443           current_test = PEER1_HALFCLOSE_WRITE_FAIL;
444           peer1.bytes_wrote = 0;
445           peer2.bytes_read = 0;
446           peer2.test_ok = GNUNET_YES;
447           GNUNET_SCHEDULER_add_now (&stream_write_task, &peer1);
448           break;
449         case PEER1_HALFCLOSE_WRITE_FAIL:
450           current_test = PEER1_READ_SHUTDOWN;
451           peer1.shutdown_operation = SHUT_RD;
452           GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
453           break;
454         case PEER1_READ_SHUTDOWN:
455           current_test = SUCCESS;
456           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457                       "All tests successful\n");
458           GNUNET_SCHEDULER_add_now (&do_close, NULL);
459           break;
460         case SUCCESS:
461           GNUNET_assert (0);    /* We shouldn't reach here */
462           
463         }
464     }
465 }
466
467 /**
468  * The write completion function; called upon writing some data to stream or
469  * upon error
470  *
471  * @param cls the closure from GNUNET_STREAM_write/read
472  * @param status the status of the stream at the time this function is called
473  * @param size the number of bytes read or written
474  */
475 static void 
476 write_completion (void *cls,
477                   enum GNUNET_STREAM_Status status,
478                   size_t size)
479 {
480   struct PeerData *peer = cls;
481
482   switch (current_test)
483     {
484     case PEER1_WRITE:
485     case PEER1_HALFCLOSE_READ:
486
487     GNUNET_assert (GNUNET_STREAM_OK == status);
488     GNUNET_assert (size <= strlen (data));
489     peer->bytes_wrote += size;
490
491     if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
492       {
493         GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
494       }
495     else
496       {
497         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                     "Writing completed\n");
499
500         if (&peer1 == peer)
501           {
502             peer1.test_ok = GNUNET_YES;
503             transition ();       /* to PEER1_WRITE_SHUTDOWN */
504           }
505         else            /* This will happen during PEER1_HALFCLOSE_READ */
506           {
507             peer2.test_ok = GNUNET_YES;
508             transition ();      /* to PEER1_HALFCLOSE_WRITE_FAIL */
509           }
510       }
511     break;
512     case PEER1_HALFCLOSE_WRITE_FAIL:
513       GNUNET_assert (peer == &peer1);
514       GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
515       GNUNET_assert (0 == size);
516       peer1.test_ok = GNUNET_YES;
517       break;
518     case PEER1_READ_SHUTDOWN:
519       GNUNET_assert (peer == &peer2);
520       GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
521       GNUNET_assert (0 == size);
522       peer2.test_ok = GNUNET_YES;
523       break;
524     case PEER1_WRITE_SHUTDOWN:
525     case SUCCESS:
526       GNUNET_assert (0);        /* We shouldn't reach here */
527     } 
528 }
529
530
531 /**
532  * Function executed after stream has been established
533  *
534  * @param cls the closure from GNUNET_STREAM_open
535  * @param socket socket to use to communicate with the other side (read/write)
536  */
537 static void 
538 stream_open_cb (void *cls,
539                 struct GNUNET_STREAM_Socket *socket)
540 {
541   struct PeerData *peer;
542
543   GNUNET_assert (socket == peer1.socket);
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545               "%s: Stream established from peer1\n",
546               GNUNET_i2s (&peer1.our_id));
547   peer = (struct PeerData *) cls;
548   peer->bytes_wrote = 0;
549   GNUNET_assert (socket == peer1.socket);
550   GNUNET_assert (socket == peer->socket);
551   peer1.test_ok = GNUNET_NO;
552   peer2.test_ok = GNUNET_NO;
553   current_test = PEER1_WRITE;
554   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
555 }
556
557
558 /**
559  * Input processor
560  *
561  * @param cls the closure from GNUNET_STREAM_write/read
562  * @param status the status of the stream at the time this function is called
563  * @param data traffic from the other side
564  * @param size the number of bytes available in data read 
565  * @return number of bytes of processed from 'data' (any data remaining should be
566  *         given to the next time the read processor is called).
567  */
568 static size_t
569 input_processor (void *cls,
570                  enum GNUNET_STREAM_Status status,
571                  const void *input_data,
572                  size_t size)
573 {
574   struct PeerData *peer;
575
576   peer = (struct PeerData *) cls;
577
578   switch (current_test)
579     {
580     case PEER1_WRITE:
581     case PEER1_HALFCLOSE_READ:
582       if (GNUNET_STREAM_TIMEOUT == status)
583         {
584           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
585                       "Read operation timedout - reading again!\n");
586           GNUNET_assert (0 == size);
587           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
588           return 0;
589         }
590
591       GNUNET_assert (GNUNET_STREAM_OK == status);
592       GNUNET_assert (size <= strlen (data));
593       GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
594                                    (const char *) input_data,
595                                    size));
596       peer->bytes_read += size;
597   
598       if (peer->bytes_read < strlen (data))
599         {
600           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
601         }
602       else  
603         {
604           if (&peer2 == peer) /* Peer2 has completed reading; should write */
605             {
606               peer2.test_ok = GNUNET_YES;
607               transition ();    /* Transition to PEER1_WRITE_SHUTDOWN */
608             }
609           else         /* Peer1 has completed reading. End of tests */
610             {
611               peer1.test_ok = GNUNET_YES;
612               transition ();    /* to PEER1_HALFCLOSE_WRITE_FAIL */
613             }
614         }
615       break;
616     case PEER1_WRITE_SHUTDOWN:
617       GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
618       peer2.test_ok = GNUNET_YES;
619       break;
620     case PEER1_HALFCLOSE_WRITE_FAIL:
621     case PEER1_READ_SHUTDOWN:
622     case SUCCESS:
623       GNUNET_assert (0);        /* We shouldn't reach here */
624     }
625   
626   return size;
627 }
628
629   
630 /**
631  * Scheduler call back; to be executed when a new stream is connected
632  * Called from listen connect for peer2
633  */
634 static void
635 stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
636 {
637   read_task = GNUNET_SCHEDULER_NO_TASK;
638   GNUNET_assert (NULL != cls);
639   peer2.bytes_read = 0;
640   GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
641 }
642
643
644 /**
645  * Functions of this type are called upon new stream connection from other peers
646  *
647  * @param cls the closure from GNUNET_STREAM_listen
648  * @param socket the socket representing the stream
649  * @param initiator the identity of the peer who wants to establish a stream
650  *            with us
651  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
652  *             stream (the socket will be invalid after the call)
653  */
654 static int
655 stream_listen_cb (void *cls,
656                   struct GNUNET_STREAM_Socket *socket,
657                   const struct GNUNET_PeerIdentity *initiator)
658 {
659   if ((NULL == socket) || (NULL == initiator))
660   {
661     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
662     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
663       GNUNET_SCHEDULER_cancel (abort_task);
664     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
665     return GNUNET_OK;
666   }
667   GNUNET_assert (socket != peer1.socket);
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "%s: Peer connected: %s\n",
670               GNUNET_i2s (&peer2.our_id),
671               GNUNET_i2s(initiator));
672   peer2.socket = socket;
673   /* FIXME: reading should be done right now instead of a scheduled call */
674   read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
675   return GNUNET_OK;
676 }
677
678
679 /**
680  * Listen success callback; connects a peer to stream as client
681  */
682 static void
683 stream_connect (void);
684
685
686 /**
687  * Adapter function called to destroy a connection to
688  * a service.
689  * 
690  * @param cls closure
691  * @param op_result service handle returned from the connect adapter
692  */
693 static void
694 stream_da (void *cls, void *op_result)
695 {
696   struct GNUNET_STREAM_ListenSocket *lsocket;
697
698   if (&peer2 == cls)
699   {
700     lsocket = op_result;
701     GNUNET_STREAM_listen_close (lsocket);
702     if (NULL != peer1.op)
703       GNUNET_TESTBED_operation_done (peer1.op);
704     else
705       GNUNET_SCHEDULER_shutdown ();
706     return;
707   }
708   if (&peer1 == cls)
709   {
710     GNUNET_assert (op_result == peer1.socket);
711     GNUNET_STREAM_close (peer1.socket);
712     GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
713     return;
714   }
715   GNUNET_assert (0);
716 }
717
718
719 /**
720  * Adapter function called to establish a connection to
721  * a service.
722  * 
723  * @param cls closure
724  * @param cfg configuration of the peer to connect to; will be available until
725  *          GNUNET_TESTBED_operation_done() is called on the operation returned
726  *          from GNUNET_TESTBED_service_connect()
727  * @return service handle to return in 'op_result', NULL on error
728  */
729 static void * 
730 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
731 {
732   struct GNUNET_STREAM_ListenSocket *lsocket;
733   
734   switch (setup_state)
735   {
736   case PEER2_STREAM_CONNECT:
737     lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
738                                     GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
739                                     &stream_connect, GNUNET_STREAM_OPTION_END);
740     GNUNET_assert (NULL != lsocket);
741     return lsocket;
742   case PEER1_STREAM_CONNECT:
743     peer1.socket = GNUNET_STREAM_open (cfg, &peer2.our_id, 10, &stream_open_cb,
744                                        &peer1, GNUNET_STREAM_OPTION_END);
745     GNUNET_assert (NULL != peer1.socket);
746     return peer1.socket;
747   default:
748     GNUNET_assert (0);
749   }
750 }
751
752
753 /**
754  * Listen success callback; connects a peer to stream as client
755  */
756 static void
757 stream_connect (void)
758
759   GNUNET_assert (PEER2_STREAM_CONNECT == setup_state);
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");  
761   peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
762                                              NULL, NULL,
763                                              stream_ca, stream_da, &peer1);
764   setup_state = PEER1_STREAM_CONNECT;
765 }
766
767
768 /**
769  * Callback to be called when the requested peer information is available
770  *
771  * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
772  * @param op the operation this callback corresponds to
773  * @param pinfo the result; will be NULL if the operation has failed
774  * @param emsg error message if the operation has failed; will be NULL if the
775  *          operation is successfull
776  */
777 static void 
778 peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
779              const struct GNUNET_TESTBED_PeerInformation *pinfo,
780              const char *emsg)
781 {
782   GNUNET_assert (NULL == emsg);
783   GNUNET_assert (op == op_);
784   switch (setup_state)
785   {
786   case PEER1_GET_IDENTITY:
787     memcpy (&peer1.our_id, pinfo->result.id, 
788             sizeof (struct GNUNET_PeerIdentity));
789     GNUNET_TESTBED_operation_done (op);
790     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
791                 (&peer1.our_id));
792     op = GNUNET_TESTBED_peer_get_information (peer2.peer,
793                                               GNUNET_TESTBED_PIT_IDENTITY,
794                                               &peerinfo_cb, NULL);
795     setup_state = PEER2_GET_IDENTITY;
796     break;
797   case PEER2_GET_IDENTITY:
798     memcpy (&peer2.our_id, pinfo->result.id,
799             sizeof (struct GNUNET_PeerIdentity));
800     GNUNET_TESTBED_operation_done (op);
801     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
802                 (&peer2.our_id));
803     peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
804                                                NULL, NULL,
805                                                stream_ca, stream_da, &peer2);
806     setup_state = PEER2_STREAM_CONNECT;
807     break;
808   default:
809     GNUNET_assert (0);
810   }
811 }
812
813
814 /**
815  * Controller event callback
816  *
817  * @param cls NULL
818  * @param event the controller event
819  */
820 static void 
821 controller_event_cb (void *cls,
822                      const struct GNUNET_TESTBED_EventInformation *event)
823 {
824   switch (event->type)
825   {
826   case GNUNET_TESTBED_ET_CONNECT:
827     GNUNET_assert (INIT == setup_state);
828     GNUNET_TESTBED_operation_done (op);
829     op = GNUNET_TESTBED_peer_get_information (peer1.peer,
830                                               GNUNET_TESTBED_PIT_IDENTITY,
831                                               &peerinfo_cb, NULL);
832     setup_state = PEER1_GET_IDENTITY;
833     break;
834   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
835     switch (setup_state)
836     {
837     case PEER1_STREAM_CONNECT:
838     case PEER2_STREAM_CONNECT:
839       GNUNET_assert (NULL == event->details.operation_finished.emsg);
840       break;
841     default:
842       GNUNET_assert (0);
843     }
844     break;
845   default:
846     GNUNET_assert (0);
847   }
848 }
849
850
851 /**
852  * Signature of a main function for a testcase.
853  *
854  * @param cls closure
855  * @param num_peers number of peers in 'peers'
856  * @param peers handle to peers run in the testbed
857  */
858 static void
859 test_master (void *cls, unsigned int num_peers,
860              struct GNUNET_TESTBED_Peer **peers)
861 {
862   GNUNET_assert (NULL != peers);
863   GNUNET_assert (NULL != peers[0]);
864   GNUNET_assert (NULL != peers[1]);
865   peer1.peer = peers[0];
866   peer2.peer = peers[1];
867   op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL, peer2.peer, peer1.peer);
868   setup_state = INIT;
869   abort_task =
870     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
871                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
872                                   NULL);
873 }
874
875
876 /**
877  * Main function
878  */
879 int main (int argc, char **argv)
880 {
881   uint64_t event_mask;  
882
883   result = GNUNET_NO;
884   event_mask = 0;
885   event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT);
886   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
887   GNUNET_TESTBED_test_run ("test_stream_2peers_halfclose",
888                            "test_stream_local.conf", NUM_PEERS, event_mask,
889                            &controller_event_cb, NULL, &test_master, NULL);
890   if (GNUNET_SYSERR == result)
891     return 1;
892   return 0;
893 }