use statistics
[oweals/gnunet.git] / src / stream / test_stream_2peers.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_2peers.c
23  * @brief Stream API testing between 2 peers using testing API
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testbed_service.h"
34
35 /**
36  * Number of peers; Do NOT change this
37  */
38 #define NUM_PEERS 2
39
40 /**
41  * Shorthand for Relative time in seconds
42  */
43 #define TIME_REL_SECS(sec) \
44   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
45
46 /**
47  * Structure for holding peer's sockets and IO Handles
48  */
49 struct PeerData
50 {
51   /**
52    * Handle to testbed peer
53    */
54   struct GNUNET_TESTBED_Peer *peer;
55
56   /**
57    * Peer's stream socket
58    */
59   struct GNUNET_STREAM_Socket *socket;
60
61   /**
62    * Peer's io write handle
63    */
64   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
65
66   /**
67    * Peer's io read handle
68    */
69   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
70
71   /**
72    * Peer's shutdown handle
73    */
74   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
75
76   /**
77    * The service connect operation to stream
78    */
79   struct GNUNET_TESTBED_Operation *op;
80
81   /**
82    * Our Peer id
83    */
84   struct GNUNET_PeerIdentity our_id;
85
86   /**
87    * Bytes the peer has written
88    */
89   unsigned int bytes_wrote;
90
91   /**
92    * Byte the peer has read
93    */
94   unsigned int bytes_read;
95 };
96
97
98 /**
99  * Different states in test setup
100  */
101 enum SetupState
102 {
103   /**
104    * The initial state
105    */
106   INIT,
107
108   /**
109    * Get the identity of peer 1
110    */
111   PEER1_GET_IDENTITY,
112
113   /**
114    * Get the identity of peer 2
115    */
116   PEER2_GET_IDENTITY,
117   
118   /**
119    * Connect to stream service of peer 1
120    */
121   PEER1_STREAM_CONNECT,
122
123   /**
124    * Connect to stream service of peer 2
125    */
126   PEER2_STREAM_CONNECT
127
128 };
129
130 /**
131  * Various states during test setup
132  */
133 static enum SetupState setup_state;
134
135 /**
136  * Data context for peer 1
137  */
138 static struct PeerData peer1;
139
140 /**
141  * Data context for peer 2
142  */
143 static struct PeerData peer2;
144
145 /**
146  * Testbed operation handle
147  */
148 static struct GNUNET_TESTBED_Operation *op;
149
150 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
151
152 static char *data = "ABCD";
153 static int result;
154
155 static int writing_success;
156 static int reading_success;
157
158
159 /**
160  * Input processor
161  *
162  * @param cls the closure from GNUNET_STREAM_write/read
163  * @param status the status of the stream at the time this function is called
164  * @param data traffic from the other side
165  * @param size the number of bytes available in data read 
166  * @return number of bytes of processed from 'data' (any data remaining should be
167  *         given to the next time the read processor is called).
168  */
169 static size_t
170 input_processor (void *cls,
171                  enum GNUNET_STREAM_Status status,
172                  const void *input_data,
173                  size_t size);
174
175 /**
176  * Task for calling STREAM_read
177  *
178  * @param cls the peer data entity
179  * @param tc the task context
180  */
181 static void
182 stream_read_task (void *cls,
183                   const struct GNUNET_SCHEDULER_TaskContext *tc)
184 {
185   struct PeerData *peer = cls;
186   
187   peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
188                                              GNUNET_TIME_relative_multiply
189                                              (GNUNET_TIME_UNIT_SECONDS, 5),
190                                              &input_processor,
191                                              peer);
192   GNUNET_assert (NULL != peer->io_read_handle);
193 }
194
195 /**
196  * The write completion function; called upon writing some data to stream or
197  * upon error
198  *
199  * @param cls the closure from GNUNET_STREAM_write/read
200  * @param status the status of the stream at the time this function is called
201  * @param size the number of bytes read or written
202  */
203 static void 
204 write_completion (void *cls,
205                   enum GNUNET_STREAM_Status status,
206                   size_t size);
207
208
209 /**
210  * Task for calling STREAM_write
211  *
212  * @param cls the peer data entity
213  * @param tc the task context
214  */
215 static void
216 stream_write_task (void *cls,
217                    const struct GNUNET_SCHEDULER_TaskContext *tc)
218 {
219   struct PeerData *peer = cls;
220   
221   peer->io_write_handle = 
222     GNUNET_STREAM_write (peer->socket,
223                          (void *) data,
224                          strlen(data) - peer->bytes_wrote,
225                          GNUNET_TIME_relative_multiply
226                          (GNUNET_TIME_UNIT_SECONDS, 5),
227                          &write_completion,
228                          peer);
229  
230   GNUNET_assert (NULL != peer->io_write_handle);
231  }
232
233
234 /**
235  * Close sockets and stop testing deamons nicely
236  */
237 static void
238 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
239 {
240   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
241     GNUNET_SCHEDULER_cancel (abort_task);
242   if (NULL != peer1.socket)
243     GNUNET_STREAM_close (peer1.socket);
244   if (NULL != peer1.op)
245     GNUNET_TESTBED_operation_done (peer1.op);
246   else
247     GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
248 }
249
250
251 /**
252  * Completion callback for shutdown
253  *
254  * @param cls the closure from GNUNET_STREAM_shutdown call
255  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
256  *          SHUT_RDWR) 
257  */
258 static void 
259 shutdown_completion (void *cls,
260                      int operation)
261 {
262   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
263   GNUNET_SCHEDULER_add_now (&do_close, cls);
264 }
265
266
267
268 /**
269  * Shutdown sockets gracefully
270  */
271 static void
272 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
273 {
274   result = GNUNET_OK;
275   peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
276                                                   &shutdown_completion, cls);
277 }
278
279
280 /**
281  * Something went wrong and timed out. Kill everything and set error flag
282  */
283 static void
284 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
285 {
286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
287   result = GNUNET_SYSERR;
288   abort_task = 0;
289   do_close (cls, tc);  
290 }
291
292
293 /**
294  * The write completion function; called upon writing some data to stream or
295  * upon error
296  *
297  * @param cls the closure from GNUNET_STREAM_write/read
298  * @param status the status of the stream at the time this function is called
299  * @param size the number of bytes read or written
300  */
301 static void 
302 write_completion (void *cls,
303                   enum GNUNET_STREAM_Status status,
304                   size_t size)
305 {
306   struct PeerData *peer=cls;
307
308   GNUNET_assert (GNUNET_STREAM_OK == status);
309   GNUNET_assert (size <= strlen (data));
310   peer->bytes_wrote += size;
311
312   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
313     {
314       GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
315     }
316   else
317     {
318       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
319                   "Writing completed\n");
320
321       if (&peer2 == peer)   /* Peer1 has finished writing; should read now */
322         {
323           peer->bytes_read = 0;
324           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
325         }
326       else
327         {
328           writing_success = GNUNET_YES;
329           if (GNUNET_YES == reading_success)
330             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
331         }
332     }
333 }
334
335
336 /**
337  * Function executed after stream has been established
338  *
339  * @param cls the closure from GNUNET_STREAM_open
340  * @param socket socket to use to communicate with the other side (read/write)
341  */
342 static void 
343 stream_open_cb (void *cls,
344                 struct GNUNET_STREAM_Socket *socket)
345 {
346   struct PeerData *peer=cls;
347   
348   GNUNET_assert (&peer2 == peer);
349   GNUNET_assert (socket == peer2.socket);
350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Stream established from peer2\n",
351               GNUNET_i2s (&peer1.our_id));
352   peer->bytes_wrote = 0;
353   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
354 }
355
356
357 /**
358  * Input processor
359  *
360  * @param cls the closure from GNUNET_STREAM_write/read
361  * @param status the status of the stream at the time this function is called
362  * @param data traffic from the other side
363  * @param size the number of bytes available in data read 
364  * @return number of bytes of processed from 'data' (any data remaining should be
365  *         given to the next time the read processor is called).
366  */
367 static size_t
368 input_processor (void *cls,
369                  enum GNUNET_STREAM_Status status,
370                  const void *input_data,
371                  size_t size)
372 {
373   struct PeerData *peer;
374
375   peer = (struct PeerData *) cls;
376
377   if (GNUNET_STREAM_TIMEOUT == status)
378     {
379       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
380                   "Read operation timedout - reading again!\n");
381       GNUNET_assert (0 == size);
382       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
383       return 0;
384     }
385
386   GNUNET_assert (GNUNET_STREAM_OK == status);
387   GNUNET_assert (size <= strlen (data));
388   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
389                                (const char *) input_data,
390                                size));
391   peer->bytes_read += size;
392   
393   if (peer->bytes_read < strlen (data))
394     {
395       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
396     }
397   else 
398     {
399       if (&peer1 == peer)    /* Peer2 has completed reading; should write */
400         {
401           peer->bytes_wrote = 0;
402           GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
403         }
404       else                      /* Peer1 has completed reading. End of tests */
405         {
406           reading_success = GNUNET_YES;
407           if (GNUNET_YES == writing_success)
408             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
409         }
410     }
411   return size;
412 }
413
414   
415 /**
416  * Functions of this type are called upon new stream connection from other peers
417  *
418  * @param cls the closure from GNUNET_STREAM_listen
419  * @param socket the socket representing the stream
420  * @param initiator the identity of the peer who wants to establish a stream
421  *            with us
422  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
423  *             stream (the socket will be invalid after the call)
424  */
425 static int
426 stream_listen_cb (void *cls,
427                   struct GNUNET_STREAM_Socket *socket,
428                   const struct GNUNET_PeerIdentity *initiator)
429 {
430   if ((NULL == socket) || (NULL == initiator))
431   {
432     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
433     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
434       GNUNET_SCHEDULER_cancel (abort_task);
435     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
436     return GNUNET_OK;
437   }
438   GNUNET_assert (NULL != initiator);
439   GNUNET_assert (socket != peer2.socket);
440   GNUNET_assert (0 == memcmp (initiator, &peer2.our_id, 
441                               sizeof (struct GNUNET_PeerIdentity)));
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Peer connected: %s\n",
443               GNUNET_i2s (&peer1.our_id), GNUNET_i2s (initiator));  
444   peer1.socket = socket;
445   peer1.bytes_read = 0;
446   GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
447   return GNUNET_OK;
448 }
449
450
451 /**
452  * Listen success callback; connects a peer to stream as client
453  */
454 static void stream_connect (void);
455
456
457 /**
458  * Adapter function called to destroy a connection to
459  * a service.
460  * 
461  * @param cls closure
462  * @param op_result service handle returned from the connect adapter
463  */
464 static void
465 stream_da (void *cls, void *op_result)
466 {
467   struct GNUNET_STREAM_ListenSocket *lsocket;
468   struct GNUNET_STREAM_Socket *socket;
469
470   if (&peer1 == cls)
471   {
472     lsocket = op_result;
473     GNUNET_STREAM_listen_close (lsocket);
474     if (NULL != peer2.op)
475       GNUNET_TESTBED_operation_done (peer2.op);
476     else
477       GNUNET_SCHEDULER_shutdown ();
478     return;
479   }
480   if (&peer2 == cls)
481   {
482     socket = op_result;
483     GNUNET_STREAM_close (socket);
484     GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
485     return;
486   }
487   GNUNET_assert (0);
488 }
489
490
491 /**
492  * Adapter function called to establish a connection to
493  * a service.
494  * 
495  * @param cls closure
496  * @param cfg configuration of the peer to connect to; will be available until
497  *          GNUNET_TESTBED_operation_done() is called on the operation returned
498  *          from GNUNET_TESTBED_service_connect()
499  * @return service handle to return in 'op_result', NULL on error
500  */
501 static void * 
502 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
503 {  
504   struct GNUNET_STREAM_ListenSocket *lsocket;
505   
506   switch (setup_state)
507   {
508   case PEER1_STREAM_CONNECT:
509     lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
510                                     GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
511                                     &stream_connect, GNUNET_STREAM_OPTION_END);
512     return lsocket;
513   case PEER2_STREAM_CONNECT:
514     peer2.socket = GNUNET_STREAM_open (cfg, &peer1.our_id, 10, &stream_open_cb,
515                                        &peer2, GNUNET_STREAM_OPTION_END);
516     return peer2.socket;
517   default:
518     GNUNET_assert (0);
519   }
520 }
521
522
523 /**
524  * Listen success callback; connects a peer to stream as client
525  */
526 static void
527 stream_connect (void)
528
529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
530   peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
531                                              NULL, NULL,
532                                              stream_ca, stream_da, &peer2);
533   setup_state = PEER2_STREAM_CONNECT;
534 }
535
536
537 /**
538  * Callback to be called when the requested peer information is available
539  *
540  * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
541  * @param op the operation this callback corresponds to
542  * @param pinfo the result; will be NULL if the operation has failed
543  * @param emsg error message if the operation has failed; will be NULL if the
544  *          operation is successfull
545  */
546 static void 
547 peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
548              const struct GNUNET_TESTBED_PeerInformation *pinfo,
549              const char *emsg)
550 {
551   GNUNET_assert (NULL == emsg);
552   GNUNET_assert (op == op_);
553   switch (setup_state)
554     {
555     case PEER1_GET_IDENTITY:
556       memcpy (&peer1.our_id, pinfo->result.id, 
557               sizeof (struct GNUNET_PeerIdentity));
558       GNUNET_TESTBED_operation_done (op);
559       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
560                   (&peer1.our_id));
561       op = GNUNET_TESTBED_peer_get_information (peer2.peer,
562                                                 GNUNET_TESTBED_PIT_IDENTITY,
563                                                 &peerinfo_cb, NULL);
564       setup_state = PEER2_GET_IDENTITY;
565       break;
566     case PEER2_GET_IDENTITY:
567       memcpy (&peer2.our_id, pinfo->result.id,
568               sizeof (struct GNUNET_PeerIdentity));
569       GNUNET_TESTBED_operation_done (op);
570       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
571                   (&peer2.our_id));
572       peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
573                                                  NULL, NULL, stream_ca,
574                                                  stream_da, &peer1);
575       setup_state = PEER1_STREAM_CONNECT;
576       break;
577     default:
578       GNUNET_assert (0);
579     }
580 }
581
582
583 /**
584  * Controller event callback
585  *
586  * @param cls NULL
587  * @param event the controller event
588  */
589 static void
590 controller_event_cb (void *cls,
591                      const struct GNUNET_TESTBED_EventInformation *event)
592 {
593   switch (event->type)
594   {
595   case GNUNET_TESTBED_ET_CONNECT:
596     GNUNET_assert (INIT == setup_state);
597     GNUNET_TESTBED_operation_done (op);
598     /* Get the peer identity and configuration of peers */
599     op = GNUNET_TESTBED_peer_get_information (peer1.peer,
600                                               GNUNET_TESTBED_PIT_IDENTITY,
601                                               &peerinfo_cb, NULL);
602     setup_state = PEER1_GET_IDENTITY;
603     break;
604   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
605     switch (setup_state)
606     {    
607     case PEER1_STREAM_CONNECT:
608     case PEER2_STREAM_CONNECT:
609       GNUNET_assert (NULL == event->details.operation_finished.emsg);
610       break;
611     default:
612       GNUNET_assert (0);
613     }
614     break;
615   default:
616     GNUNET_assert (0);
617   }
618 }
619
620
621 /**
622  * Signature of a main function for a testcase.
623  *
624  * @param cls closure
625  * @param num_peers number of peers in 'peers'
626  * @param peers handle to peers run in the testbed
627  */
628 static void
629 test_master (void *cls, unsigned int num_peers,
630              struct GNUNET_TESTBED_Peer **peers)
631 {
632   GNUNET_assert (NULL != peers);
633   GNUNET_assert (NULL != peers[0]);
634   GNUNET_assert (NULL != peers[1]);
635   peer1.peer = peers[0];
636   peer2.peer = peers[1];
637   op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL, peer2.peer, peer1.peer);
638   setup_state = INIT;
639   abort_task =
640     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
641                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
642                                   NULL);
643 }
644
645
646 /**
647  * Main function
648  */
649 int main (int argc, char **argv)
650 {
651   uint64_t event_mask;  
652
653   result = GNUNET_NO;
654   event_mask = 0;
655   event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT);
656   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
657   GNUNET_TESTBED_test_run ("test_stream_2peers", "test_stream_local.conf",
658                            NUM_PEERS, event_mask, &controller_event_cb, NULL,
659                            &test_master, NULL);
660   if (GNUNET_SYSERR == result)
661     return 1;
662   return 0;
663 }