-fix double-sending in stream if finish_cb behaves in a certain way
[oweals/gnunet.git] / src / stream / test_stream_2peers.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_2peers.c
23  * @brief Stream API testing between 2 peers using testing API
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testbed_service.h"
34
35 /**
36  * Number of peers; Do NOT change this
37  */
38 #define NUM_PEERS 2
39
40 /**
41  * Shorthand for Relative time in seconds
42  */
43 #define TIME_REL_SECS(sec) \
44   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
45
46 /**
47  * Structure for holding peer's sockets and IO Handles
48  */
49 struct PeerData
50 {
51   /**
52    * Handle to testbed peer
53    */
54   struct GNUNET_TESTBED_Peer *peer;
55
56   /**
57    * Peer's stream socket
58    */
59   struct GNUNET_STREAM_Socket *socket;
60
61   /**
62    * Peer's io write handle
63    */
64   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
65
66   /**
67    * Peer's io read handle
68    */
69   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
70
71   /**
72    * Peer's shutdown handle
73    */
74   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
75
76   /**
77    * The service connect operation to stream
78    */
79   struct GNUNET_TESTBED_Operation *op;
80
81   /**
82    * Our Peer id
83    */
84   struct GNUNET_PeerIdentity our_id;
85
86   /**
87    * Bytes the peer has written
88    */
89   unsigned int bytes_wrote;
90
91   /**
92    * Byte the peer has read
93    */
94   unsigned int bytes_read;
95 };
96
97
98 /**
99  * Different states in test setup
100  */
101 enum SetupState
102 {
103   /**
104    * The initial state
105    */
106   INIT,
107
108   /**
109    * Get the identity of peer 1
110    */
111   PEER1_GET_IDENTITY,
112
113   /**
114    * Get the identity of peer 2
115    */
116   PEER2_GET_IDENTITY,
117   
118   /**
119    * Connect to stream service of peer 1
120    */
121   PEER1_STREAM_CONNECT,
122
123   /**
124    * Connect to stream service of peer 2
125    */
126   PEER2_STREAM_CONNECT
127
128 };
129
130 /**
131  * Various states during test setup
132  */
133 static enum SetupState setup_state;
134
135 /**
136  * Data context for peer 1
137  */
138 static struct PeerData peer1;
139
140 /**
141  * Data context for peer 2
142  */
143 static struct PeerData peer2;
144
145 /**
146  * Testbed operation handle
147  */
148 static struct GNUNET_TESTBED_Operation *op;
149
150 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
151
152 static char *data = "ABCD";
153 static int result;
154
155 static int writing_success;
156 static int reading_success;
157
158
159 /**
160  * Input processor
161  *
162  * @param cls the closure from GNUNET_STREAM_write/read
163  * @param status the status of the stream at the time this function is called
164  * @param data traffic from the other side
165  * @param size the number of bytes available in data read 
166  * @return number of bytes of processed from 'data' (any data remaining should be
167  *         given to the next time the read processor is called).
168  */
169 static size_t
170 input_processor (void *cls,
171                  enum GNUNET_STREAM_Status status,
172                  const void *input_data,
173                  size_t size);
174
175 /**
176  * Task for calling STREAM_read
177  *
178  * @param cls the peer data entity
179  * @param tc the task context
180  */
181 static void
182 stream_read_task (void *cls,
183                   const struct GNUNET_SCHEDULER_TaskContext *tc)
184 {
185   struct PeerData *peer = cls;
186   
187   peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
188                                              GNUNET_TIME_relative_multiply
189                                              (GNUNET_TIME_UNIT_SECONDS, 5),
190                                              &input_processor,
191                                              peer);
192   GNUNET_assert (NULL != peer->io_read_handle);
193 }
194
195 /**
196  * The write completion function; called upon writing some data to stream or
197  * upon error
198  *
199  * @param cls the closure from GNUNET_STREAM_write/read
200  * @param status the status of the stream at the time this function is called
201  * @param size the number of bytes read or written
202  */
203 static void 
204 write_completion (void *cls,
205                   enum GNUNET_STREAM_Status status,
206                   size_t size);
207
208
209 /**
210  * Task for calling STREAM_write
211  *
212  * @param cls the peer data entity
213  * @param tc the task context
214  */
215 static void
216 stream_write_task (void *cls,
217                    const struct GNUNET_SCHEDULER_TaskContext *tc)
218 {
219   struct PeerData *peer = cls;
220   
221   peer->io_write_handle = 
222     GNUNET_STREAM_write (peer->socket,
223                          (void *) data,
224                          strlen(data) - peer->bytes_wrote,
225                          GNUNET_TIME_relative_multiply
226                          (GNUNET_TIME_UNIT_SECONDS, 5),
227                          &write_completion,
228                          peer);
229  
230   GNUNET_assert (NULL != peer->io_write_handle);
231  }
232
233
234 /**
235  * Close sockets and stop testing deamons nicely
236  */
237 static void
238 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
239 {
240   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
241     GNUNET_SCHEDULER_cancel (abort_task);
242   if (NULL != peer1.socket)
243     GNUNET_STREAM_close (peer1.socket);
244   if (NULL != peer1.op)
245     GNUNET_TESTBED_operation_done (peer1.op);
246   else
247     GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
248 }
249
250
251 /**
252  * Completion callback for shutdown
253  *
254  * @param cls the closure from GNUNET_STREAM_shutdown call
255  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
256  *          SHUT_RDWR) 
257  */
258 static void 
259 shutdown_completion (void *cls,
260                      int operation)
261 {
262   static int shutdowns;
263
264   if (++shutdowns == 1)
265   {
266     peer1.shutdown_handle = NULL;
267     peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
268                                                     &shutdown_completion, cls);
269     return;
270   }  
271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
272   GNUNET_SCHEDULER_add_now (&do_close, cls);
273 }
274
275
276 /**
277  * Shutdown sockets gracefully
278  */
279 static void
280 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
281 {
282   result = GNUNET_OK;
283   peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
284                                                   &shutdown_completion, cls);
285 }
286
287
288 /**
289  * Something went wrong and timed out. Kill everything and set error flag
290  */
291 static void
292 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
293 {
294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
295   result = GNUNET_SYSERR;
296   abort_task = 0;
297   do_close (cls, tc);  
298 }
299
300
301 /**
302  * The write completion function; called upon writing some data to stream or
303  * upon error
304  *
305  * @param cls the closure from GNUNET_STREAM_write/read
306  * @param status the status of the stream at the time this function is called
307  * @param size the number of bytes read or written
308  */
309 static void 
310 write_completion (void *cls,
311                   enum GNUNET_STREAM_Status status,
312                   size_t size)
313 {
314   struct PeerData *peer=cls;
315
316   GNUNET_assert (GNUNET_STREAM_OK == status);
317   GNUNET_assert (size <= strlen (data));
318   peer->bytes_wrote += size;
319
320   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
321     {
322       GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
323     }
324   else
325     {
326       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327                   "Writing completed\n");
328
329       if (&peer2 == peer)   /* Peer1 has finished writing; should read now */
330         {
331           peer->bytes_read = 0;
332           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
333         }
334       else
335         {
336           writing_success = GNUNET_YES;
337           if (GNUNET_YES == reading_success)
338             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
339         }
340     }
341 }
342
343
344 /**
345  * Function executed after stream has been established
346  *
347  * @param cls the closure from GNUNET_STREAM_open
348  * @param socket socket to use to communicate with the other side (read/write)
349  */
350 static void 
351 stream_open_cb (void *cls,
352                 struct GNUNET_STREAM_Socket *socket)
353 {
354   struct PeerData *peer=cls;
355   
356   GNUNET_assert (&peer2 == peer);
357   GNUNET_assert (socket == peer2.socket);
358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Stream established from peer2\n",
359               GNUNET_i2s (&peer1.our_id));
360   peer->bytes_wrote = 0;
361   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
362 }
363
364
365 /**
366  * Input processor
367  *
368  * @param cls the closure from GNUNET_STREAM_write/read
369  * @param status the status of the stream at the time this function is called
370  * @param data traffic from the other side
371  * @param size the number of bytes available in data read 
372  * @return number of bytes of processed from 'data' (any data remaining should be
373  *         given to the next time the read processor is called).
374  */
375 static size_t
376 input_processor (void *cls,
377                  enum GNUNET_STREAM_Status status,
378                  const void *input_data,
379                  size_t size)
380 {
381   struct PeerData *peer;
382
383   peer = (struct PeerData *) cls;
384
385   if (GNUNET_STREAM_TIMEOUT == status)
386     {
387       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
388                   "Read operation timedout - reading again!\n");
389       GNUNET_assert (0 == size);
390       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
391       return 0;
392     }
393
394   GNUNET_assert (GNUNET_STREAM_OK == status);
395   GNUNET_assert (size <= strlen (data));
396   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
397                                (const char *) input_data,
398                                size));
399   peer->bytes_read += size;
400   
401   if (peer->bytes_read < strlen (data))
402     {
403       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
404     }
405   else 
406     {
407       if (&peer1 == peer)    /* Peer2 has completed reading; should write */
408         {
409           peer->bytes_wrote = 0;
410           GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
411         }
412       else                      /* Peer1 has completed reading. End of tests */
413         {
414           reading_success = GNUNET_YES;
415           if (GNUNET_YES == writing_success)
416             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
417         }
418     }
419   return size;
420 }
421
422   
423 /**
424  * Functions of this type are called upon new stream connection from other peers
425  *
426  * @param cls the closure from GNUNET_STREAM_listen
427  * @param socket the socket representing the stream
428  * @param initiator the identity of the peer who wants to establish a stream
429  *            with us
430  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
431  *             stream (the socket will be invalid after the call)
432  */
433 static int
434 stream_listen_cb (void *cls,
435                   struct GNUNET_STREAM_Socket *socket,
436                   const struct GNUNET_PeerIdentity *initiator)
437 {
438   if ((NULL == socket) || (NULL == initiator))
439   {
440     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
441     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
442       GNUNET_SCHEDULER_cancel (abort_task);
443     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
444     return GNUNET_OK;
445   }
446   GNUNET_assert (NULL != initiator);
447   GNUNET_assert (socket != peer2.socket);
448   GNUNET_assert (0 == memcmp (initiator, &peer2.our_id, 
449                               sizeof (struct GNUNET_PeerIdentity)));
450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Peer connected: %s\n",
451               GNUNET_i2s (&peer1.our_id), GNUNET_i2s (initiator));  
452   peer1.socket = socket;
453   peer1.bytes_read = 0;
454   GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
455   return GNUNET_OK;
456 }
457
458
459 /**
460  * Listen success callback; connects a peer to stream as client
461  */
462 static void stream_connect (void);
463
464
465 /**
466  * Adapter function called to destroy a connection to
467  * a service.
468  * 
469  * @param cls closure
470  * @param op_result service handle returned from the connect adapter
471  */
472 static void
473 stream_da (void *cls, void *op_result)
474 {
475   struct GNUNET_STREAM_ListenSocket *lsocket;
476   struct GNUNET_STREAM_Socket *socket;
477
478   if (&peer1 == cls)
479   {
480     lsocket = op_result;
481     GNUNET_STREAM_listen_close (lsocket);
482     if (NULL != peer2.op)
483       GNUNET_TESTBED_operation_done (peer2.op);
484     else
485       GNUNET_SCHEDULER_shutdown ();
486     return;
487   }
488   if (&peer2 == cls)
489   {
490     socket = op_result;
491     GNUNET_STREAM_close (socket);
492     GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
493     return;
494   }
495   GNUNET_assert (0);
496 }
497
498
499 /**
500  * Adapter function called to establish a connection to
501  * a service.
502  * 
503  * @param cls closure
504  * @param cfg configuration of the peer to connect to; will be available until
505  *          GNUNET_TESTBED_operation_done() is called on the operation returned
506  *          from GNUNET_TESTBED_service_connect()
507  * @return service handle to return in 'op_result', NULL on error
508  */
509 static void * 
510 stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
511 {  
512   struct GNUNET_STREAM_ListenSocket *lsocket;
513   
514   switch (setup_state)
515   {
516   case PEER1_STREAM_CONNECT:
517     lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
518                                     GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
519                                     &stream_connect, GNUNET_STREAM_OPTION_END);
520     return lsocket;
521   case PEER2_STREAM_CONNECT:
522     peer2.socket = GNUNET_STREAM_open (cfg, &peer1.our_id, 10, &stream_open_cb,
523                                        &peer2, GNUNET_STREAM_OPTION_END);
524     return peer2.socket;
525   default:
526     GNUNET_assert (0);
527   }
528 }
529
530
531 /**
532  * Listen success callback; connects a peer to stream as client
533  */
534 static void
535 stream_connect (void)
536
537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
538   peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
539                                              NULL, NULL,
540                                              stream_ca, stream_da, &peer2);
541   setup_state = PEER2_STREAM_CONNECT;
542 }
543
544
545 /**
546  * Callback to be called when the requested peer information is available
547  *
548  * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
549  * @param op the operation this callback corresponds to
550  * @param pinfo the result; will be NULL if the operation has failed
551  * @param emsg error message if the operation has failed; will be NULL if the
552  *          operation is successfull
553  */
554 static void 
555 peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
556              const struct GNUNET_TESTBED_PeerInformation *pinfo,
557              const char *emsg)
558 {
559   GNUNET_assert (NULL == emsg);
560   GNUNET_assert (op == op_);
561   switch (setup_state)
562     {
563     case PEER1_GET_IDENTITY:
564       memcpy (&peer1.our_id, pinfo->result.id, 
565               sizeof (struct GNUNET_PeerIdentity));
566       GNUNET_TESTBED_operation_done (op);
567       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
568                   (&peer1.our_id));
569       op = GNUNET_TESTBED_peer_get_information (peer2.peer,
570                                                 GNUNET_TESTBED_PIT_IDENTITY,
571                                                 &peerinfo_cb, NULL);
572       setup_state = PEER2_GET_IDENTITY;
573       break;
574     case PEER2_GET_IDENTITY:
575       memcpy (&peer2.our_id, pinfo->result.id,
576               sizeof (struct GNUNET_PeerIdentity));
577       GNUNET_TESTBED_operation_done (op);
578       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
579                   (&peer2.our_id));
580       peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
581                                                  NULL, NULL, stream_ca,
582                                                  stream_da, &peer1);
583       setup_state = PEER1_STREAM_CONNECT;
584       break;
585     default:
586       GNUNET_assert (0);
587     }
588 }
589
590
591 /**
592  * Controller event callback
593  *
594  * @param cls NULL
595  * @param event the controller event
596  */
597 static void
598 controller_event_cb (void *cls,
599                      const struct GNUNET_TESTBED_EventInformation *event)
600 {
601   switch (event->type)
602   {
603   case GNUNET_TESTBED_ET_CONNECT:
604     GNUNET_assert (INIT == setup_state);
605     GNUNET_TESTBED_operation_done (op);
606     /* Get the peer identity and configuration of peers */
607     op = GNUNET_TESTBED_peer_get_information (peer1.peer,
608                                               GNUNET_TESTBED_PIT_IDENTITY,
609                                               &peerinfo_cb, NULL);
610     setup_state = PEER1_GET_IDENTITY;
611     break;
612   case GNUNET_TESTBED_ET_OPERATION_FINISHED:
613     switch (setup_state)
614     {    
615     case PEER1_STREAM_CONNECT:
616     case PEER2_STREAM_CONNECT:
617       GNUNET_assert (NULL == event->details.operation_finished.emsg);
618       break;
619     default:
620       GNUNET_assert (0);
621     }
622     break;
623   default:
624     GNUNET_assert (0);
625   }
626 }
627
628
629 /**
630  * Signature of a main function for a testcase.
631  *
632  * @param cls closure
633  * @param num_peers number of peers in 'peers'
634  * @param peers handle to peers run in the testbed
635  */
636 static void
637 test_master (void *cls, unsigned int num_peers,
638              struct GNUNET_TESTBED_Peer **peers)
639 {
640   GNUNET_assert (NULL != peers);
641   GNUNET_assert (NULL != peers[0]);
642   GNUNET_assert (NULL != peers[1]);
643   peer1.peer = peers[0];
644   peer2.peer = peers[1];
645   op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL, peer2.peer, peer1.peer);
646   setup_state = INIT;
647   abort_task =
648     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
649                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
650                                   NULL);
651 }
652
653
654 /**
655  * Main function
656  */
657 int main (int argc, char **argv)
658 {
659   uint64_t event_mask;  
660
661   result = GNUNET_NO;
662   event_mask = 0;
663   event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT);
664   event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
665   GNUNET_TESTBED_test_run ("test_stream_2peers", "test_stream_local.conf",
666                            NUM_PEERS, event_mask, &controller_event_cb, NULL,
667                            &test_master, NULL);
668   if (GNUNET_SYSERR == result)
669     return 1;
670   return 0;
671 }