some fixes to the pt/vpn testcase.
[oweals/gnunet.git] / src / stream / test_stream_2peers.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_2peers.c
23  * @brief Stream API testing between 2 peers using testing API
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testbed_service.h"
34
35 /**
36  * Number of peers; Do NOT change this
37  */
38 #define NUM_PEERS 2
39
40 /**
41  * Shorthand for Relative time in seconds
42  */
43 #define TIME_REL_SECS(sec) \
44   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
45
46 /**
47  * Structure for holding peer's sockets and IO Handles
48  */
49 struct PeerData
50 {
51   /**
52    * Handle to testbed peer
53    */
54   struct GNUNET_TESTBED_Peer *peer;
55
56   /**
57    * Peer's stream socket
58    */
59   struct GNUNET_STREAM_Socket *socket;
60
61   /**
62    * Peer's io write handle
63    */
64   struct GNUNET_STREAM_WriteHandle *io_write_handle;
65
66   /**
67    * Peer's io read handle
68    */
69   struct GNUNET_STREAM_ReadHandle *io_read_handle;
70
71   /**
72    * Peer's shutdown handle
73    */
74   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
75
76   /**
77    * The service connect operation to stream
78    */
79   struct GNUNET_TESTBED_Operation *op;
80
81   /**
82    * Our Peer id
83    */
84   struct GNUNET_PeerIdentity our_id;
85
86   /**
87    * Bytes the peer has written
88    */
89   unsigned int bytes_wrote;
90
91   /**
92    * Byte the peer has read
93    */
94   unsigned int bytes_read;
95 };
96
97
98 /**
99  * Different states in test setup
100  */
101 enum SetupState
102 {
103   /**
104    * Get the identity of peer 1
105    */
106   PEER1_GET_IDENTITY,
107
108   /**
109    * Get the identity of peer 2
110    */
111   PEER2_GET_IDENTITY,
112   
113   /**
114    * Connect to stream service of peer 1
115    */
116   PEER1_STREAM_CONNECT,
117
118   /**
119    * Connect to stream service of peer 2
120    */
121   PEER2_STREAM_CONNECT
122
123 };
124
125 /**
126  * Various states during test setup
127  */
128 static enum SetupState setup_state;
129
130 /**
131  * Data context for peer 1
132  */
133 static struct PeerData peer1;
134
135 /**
136  * Data context for peer 2
137  */
138 static struct PeerData peer2;
139
140 /**
141  * Testbed operation handle
142  */
143 static struct GNUNET_TESTBED_Operation *op;
144
145 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
146
147 static char *data = "ABCD";
148 static int result;
149
150 static int writing_success;
151 static int reading_success;
152
153
154 /**
155  * Input processor
156  *
157  * @param cls the closure from GNUNET_STREAM_write/read
158  * @param status the status of the stream at the time this function is called
159  * @param data traffic from the other side
160  * @param size the number of bytes available in data read 
161  * @return number of bytes of processed from 'data' (any data remaining should be
162  *         given to the next time the read processor is called).
163  */
164 static size_t
165 input_processor (void *cls,
166                  enum GNUNET_STREAM_Status status,
167                  const void *input_data,
168                  size_t size);
169
170 /**
171  * Task for calling STREAM_read
172  *
173  * @param cls the peer data entity
174  * @param tc the task context
175  */
176 static void
177 stream_read_task (void *cls,
178                   const struct GNUNET_SCHEDULER_TaskContext *tc)
179 {
180   struct PeerData *peer = cls;
181   
182   peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
183                                              GNUNET_TIME_relative_multiply
184                                              (GNUNET_TIME_UNIT_SECONDS, 5),
185                                              &input_processor,
186                                              peer);
187   GNUNET_assert (NULL != peer->io_read_handle);
188 }
189
190 /**
191  * The write completion function; called upon writing some data to stream or
192  * upon error
193  *
194  * @param cls the closure from GNUNET_STREAM_write/read
195  * @param status the status of the stream at the time this function is called
196  * @param size the number of bytes read or written
197  */
198 static void 
199 write_completion (void *cls,
200                   enum GNUNET_STREAM_Status status,
201                   size_t size);
202
203
204 /**
205  * Task for calling STREAM_write
206  *
207  * @param cls the peer data entity
208  * @param tc the task context
209  */
210 static void
211 stream_write_task (void *cls,
212                    const struct GNUNET_SCHEDULER_TaskContext *tc)
213 {
214   struct PeerData *peer = cls;
215   
216   peer->io_write_handle = 
217     GNUNET_STREAM_write (peer->socket,
218                          (void *) data,
219                          strlen(data) - peer->bytes_wrote,
220                          GNUNET_TIME_relative_multiply
221                          (GNUNET_TIME_UNIT_SECONDS, 5),
222                          &write_completion,
223                          peer);
224  
225   GNUNET_assert (NULL != peer->io_write_handle);
226  }
227
228
229 /**
230  * Close sockets and stop testing deamons nicely
231  */
232 static void
233 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
234 {
235   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
236     GNUNET_SCHEDULER_cancel (abort_task);
237   if (NULL != peer1.socket)
238     GNUNET_STREAM_close (peer1.socket);
239   if (NULL != peer1.op)
240     GNUNET_TESTBED_operation_done (peer1.op);
241   else
242     GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
243 }
244
245
246 /**
247  * Completion callback for shutdown
248  *
249  * @param cls the closure from GNUNET_STREAM_shutdown call
250  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
251  *          SHUT_RDWR) 
252  */
253 static void 
254 shutdown_completion (void *cls,
255                      int operation)
256 {
257   static int shutdowns;
258
259   if (++shutdowns == 1)
260   {
261     peer1.shutdown_handle = NULL;
262     peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
263                                                     &shutdown_completion, cls);
264     return;
265   }  
266   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
267   GNUNET_SCHEDULER_add_now (&do_close, cls);
268 }
269
270
271 /**
272  * Shutdown sockets gracefully
273  */
274 static void
275 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
276 {
277   result = GNUNET_OK;
278   peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
279                                                   &shutdown_completion, cls);
280 }
281
282
283 /**
284  * Something went wrong and timed out. Kill everything and set error flag
285  */
286 static void
287 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
288 {
289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
290   result = GNUNET_SYSERR;
291   abort_task = 0;
292   do_close (cls, tc);  
293 }
294
295
296 /**
297  * The write completion function; called upon writing some data to stream or
298  * upon error
299  *
300  * @param cls the closure from GNUNET_STREAM_write/read
301  * @param status the status of the stream at the time this function is called
302  * @param size the number of bytes read or written
303  */
304 static void 
305 write_completion (void *cls,
306                   enum GNUNET_STREAM_Status status,
307                   size_t size)
308 {
309   struct PeerData *peer=cls;
310
311   GNUNET_assert (GNUNET_STREAM_OK == status);
312   GNUNET_assert (size <= strlen (data));
313   peer->bytes_wrote += size;
314
315   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
316     {
317       GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
318     }
319   else
320     {
321       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
322                   "Writing completed\n");
323
324       if (&peer2 == peer)   /* Peer1 has finished writing; should read now */
325         {
326           peer->bytes_read = 0;
327           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
328         }
329       else
330         {
331           writing_success = GNUNET_YES;
332           if (GNUNET_YES == reading_success)
333             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
334         }
335     }
336 }
337
338
339 /**
340  * Function executed after stream has been established
341  *
342  * @param cls the closure from GNUNET_STREAM_open
343  * @param socket socket to use to communicate with the other side (read/write)
344  */
345 static void 
346 stream_open_cb (void *cls,
347                 struct GNUNET_STREAM_Socket *socket)
348 {
349   struct PeerData *peer=cls;
350   
351   GNUNET_assert (&peer2 == peer);
352   GNUNET_assert (socket == peer2.socket);
353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Stream established from peer2\n",
354               GNUNET_i2s (&peer1.our_id));
355   peer->bytes_wrote = 0;
356   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
357 }
358
359
360 /**
361  * Input processor
362  *
363  * @param cls the closure from GNUNET_STREAM_write/read
364  * @param status the status of the stream at the time this function is called
365  * @param data traffic from the other side
366  * @param size the number of bytes available in data read 
367  * @return number of bytes of processed from 'data' (any data remaining should be
368  *         given to the next time the read processor is called).
369  */
370 static size_t
371 input_processor (void *cls,
372                  enum GNUNET_STREAM_Status status,
373                  const void *input_data,
374                  size_t size)
375 {
376   struct PeerData *peer;
377
378   peer = (struct PeerData *) cls;
379
380   if (GNUNET_STREAM_TIMEOUT == status)
381     {
382       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
383                   "Read operation timedout - reading again!\n");
384       GNUNET_assert (0 == size);
385       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
386       return 0;
387     }
388
389   GNUNET_assert (GNUNET_STREAM_OK == status);
390   GNUNET_assert (size <= strlen (data));
391   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
392                                (const char *) input_data,
393                                size));
394   peer->bytes_read += size;
395   
396   if (peer->bytes_read < strlen (data))
397     {
398       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
399     }
400   else 
401     {
402       if (&peer1 == peer)    /* Peer2 has completed reading; should write */
403         {
404           peer->bytes_wrote = 0;
405           GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
406         }
407       else                      /* Peer1 has completed reading. End of tests */
408         {
409           reading_success = GNUNET_YES;
410           if (GNUNET_YES == writing_success)
411             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
412         }
413     }
414   return size;
415 }
416
417   
418 /**
419  * Functions of this type are called upon new stream connection from other peers
420  *
421  * @param cls the closure from GNUNET_STREAM_listen
422  * @param socket the socket representing the stream
423  * @param initiator the identity of the peer who wants to establish a stream
424  *            with us
425  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
426  *             stream (the socket will be invalid after the call)
427  */
428 static int
429 stream_listen_cb (void *cls,
430                   struct GNUNET_STREAM_Socket *socket,
431                   const struct GNUNET_PeerIdentity *initiator)
432 {
433   if ((NULL == socket) || (NULL == initiator))
434   {
435     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
436     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
437       GNUNET_SCHEDULER_cancel (abort_task);
438     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
439     return GNUNET_OK;
440   }
441   GNUNET_assert (NULL != initiator);
442   GNUNET_assert (socket != peer2.socket);
443   GNUNET_assert (0 == memcmp (initiator, &peer2.our_id, 
444                               sizeof (struct GNUNET_PeerIdentity)));
445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Peer connected: %s\n",
446               GNUNET_i2s (&peer1.our_id), GNUNET_i2s (initiator));  
447   peer1.socket = socket;
448   peer1.bytes_read = 0;
449   GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
450   return GNUNET_OK;
451 }
452
453
454 /**
455  * Listen success callback; connects a peer to stream as client
456  */
457 static void stream_connect (void);
458
459
460 /**
461  * Adapter function called to destroy a connection to
462  * a service.
463  * 
464  * @param cls closure
465  * @param op_result service handle returned from the connect adapter
466  */
467 static void
468 stream_da (void *cls, void *op_result)
469 {
470   struct GNUNET_STREAM_ListenSocket *lsocket;
471   struct GNUNET_STREAM_Socket *socket;
472
473   if (&peer1 == cls)
474   {
475     lsocket = op_result;
476     GNUNET_STREAM_listen_close (lsocket);
477     if (NULL != peer2.op)
478       GNUNET_TESTBED_operation_done (peer2.op);
479     else
480       GNUNET_SCHEDULER_shutdown ();
481     return;
482   }
483   if (&peer2 == cls)
484   {
485     socket = op_result;
486     GNUNET_STREAM_close (socket);
487     GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
488     return;
489   }
490   GNUNET_assert (0);
491 }
492
493
494 /**
495  * Adapter function called to establish a connection to
496  * a service.
497  * 
498  * @param cls closure
499  * @param cfg configuration of the peer to connect to; will be available until
500  *          GNUNET_TESTBED_operation_done() is called on the operation returned
501  *          from GNUNET_TESTBED_service_connect()
502  * @return service handle to return in 'op_result', NULL on error
503  */
504 static void * 
505 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
506 {  
507   struct GNUNET_STREAM_ListenSocket *lsocket;
508   
509   switch (setup_state)
510   {
511   case PEER1_STREAM_CONNECT:
512     lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
513                                     GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
514                                     &stream_connect, GNUNET_STREAM_OPTION_END);
515     return lsocket;
516   case PEER2_STREAM_CONNECT:
517     peer2.socket = GNUNET_STREAM_open (cfg, &peer1.our_id, 10, &stream_open_cb,
518                                        &peer2, GNUNET_STREAM_OPTION_END);
519     return peer2.socket;
520   default:
521     GNUNET_assert (0);
522   }
523 }
524
525
526 /**
527  * Listen success callback; connects a peer to stream as client
528  */
529 static void
530 stream_connect (void)
531
532   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
533   peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
534                                              NULL, NULL,
535                                              stream_ca, stream_da, &peer2);
536   setup_state = PEER2_STREAM_CONNECT;
537 }
538
539
540 /**
541  * Callback to be called when the requested peer information is available
542  *
543  * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
544  * @param op the operation this callback corresponds to
545  * @param pinfo the result; will be NULL if the operation has failed
546  * @param emsg error message if the operation has failed; will be NULL if the
547  *          operation is successfull
548  */
549 static void 
550 peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
551              const struct GNUNET_TESTBED_PeerInformation *pinfo,
552              const char *emsg)
553 {
554   GNUNET_assert (NULL == emsg);
555   GNUNET_assert (op == op_);
556   switch (setup_state)
557     {
558     case PEER1_GET_IDENTITY:
559       memcpy (&peer1.our_id, pinfo->result.id, 
560               sizeof (struct GNUNET_PeerIdentity));
561       GNUNET_TESTBED_operation_done (op);
562       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
563                   (&peer1.our_id));
564       op = GNUNET_TESTBED_peer_get_information (peer2.peer,
565                                                 GNUNET_TESTBED_PIT_IDENTITY,
566                                                 &peerinfo_cb, NULL);
567       setup_state = PEER2_GET_IDENTITY;
568       break;
569     case PEER2_GET_IDENTITY:
570       memcpy (&peer2.our_id, pinfo->result.id,
571               sizeof (struct GNUNET_PeerIdentity));
572       GNUNET_TESTBED_operation_done (op);
573       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
574                   (&peer2.our_id));
575       peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
576                                                  NULL, NULL, stream_ca,
577                                                  stream_da, &peer1);
578       setup_state = PEER1_STREAM_CONNECT;
579       break;
580     default:
581       GNUNET_assert (0);
582     }
583 }
584
585
586 /**
587  * Controller event callback
588  *
589  * @param cls NULL
590  * @param event the controller event
591  */
592 static void
593 controller_event_cb (void *cls,
594                      const struct GNUNET_TESTBED_EventInformation *event)
595 {
596   switch (event->type)
597   {
598   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
599     switch (setup_state)
600     {    
601     case PEER1_STREAM_CONNECT:
602     case PEER2_STREAM_CONNECT:
603       GNUNET_assert (NULL == event->details.operation_finished.emsg);
604       break;
605     default:
606       GNUNET_assert (0);
607     }
608     break;
609   default:
610     GNUNET_assert (0);
611   }
612 }
613
614
615 /**
616  * Signature of a main function for a testcase.
617  *
618  * @param cls closure
619  * @param num_peers number of peers in 'peers'
620  * @param peers handle to peers run in the testbed
621  */
622 static void
623 test_master (void *cls, unsigned int num_peers,
624              struct GNUNET_TESTBED_Peer **peers)
625 {
626   GNUNET_assert (NULL != peers);
627   GNUNET_assert (NULL != peers[0]);
628   GNUNET_assert (NULL != peers[1]);
629   peer1.peer = peers[0];
630   peer2.peer = peers[1];
631   /* Get the peer identity and configuration of peers */
632   op = GNUNET_TESTBED_peer_get_information (peer1.peer,
633                                             GNUNET_TESTBED_PIT_IDENTITY,
634                                             &peerinfo_cb, NULL);
635   setup_state = PEER1_GET_IDENTITY;
636   abort_task =
637     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
638                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
639                                   NULL);
640 }
641
642
643 /**
644  * Main function
645  */
646 int main (int argc, char **argv)
647 {
648   uint64_t event_mask;  
649
650   result = GNUNET_NO;
651   event_mask = 0;
652   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
653   (void) GNUNET_TESTBED_test_run ("test_stream_2peers",
654                                   "test_stream_local.conf",
655                                   NUM_PEERS, event_mask, &controller_event_cb,
656                                   NULL,
657                                   &test_master, NULL);
658   if (GNUNET_SYSERR == result)
659     return 1;
660   return 0;
661 }