- change
[oweals/gnunet.git] / src / stream / test_stream_2peers.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_2peers.c
23  * @brief Stream API testing between 2 peers using testing API
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testing_lib.h"
34
35 #define VERBOSE 1
36
37 /**
38  * Number of peers
39  */
40 #define NUM_PEERS 2
41
42 #define TIME_REL_SECS(sec) \
43   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
44
45 /**
46  * Structure for holding peer's sockets and IO Handles
47  */
48 struct PeerData
49 {
50   /**
51    * Peer's stream socket
52    */
53   struct GNUNET_STREAM_Socket *socket;
54
55   /**
56    * Peer's io write handle
57    */
58   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
59
60   /**
61    * Peer's io read handle
62    */
63   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
64
65   /**
66    * Peer's shutdown handle
67    */
68   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
69
70   /**
71    * Our Peer id
72    */
73   struct GNUNET_PeerIdentity our_id;
74
75   /**
76    * Bytes the peer has written
77    */
78   unsigned int bytes_wrote;
79
80   /**
81    * Byte the peer has read
82    */
83   unsigned int bytes_read;
84 };
85
86 /**
87  * The current peer group
88  */
89 static struct GNUNET_TESTING_PeerGroup *pg;
90
91 /**
92  * Peer 1 daemon
93  */
94 static struct GNUNET_TESTING_Daemon *d1;
95
96 /**
97  * Peer 2 daemon
98  */
99 static struct GNUNET_TESTING_Daemon *d2;
100
101 static struct PeerData peer1;
102 static struct PeerData peer2;
103 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
104 static struct GNUNET_CONFIGURATION_Handle *config;
105
106 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
107
108 static char *data = "ABCD";
109 static int result;
110
111 static int writing_success;
112 static int reading_success;
113
114
115 /**
116  * Input processor
117  *
118  * @param cls the closure from GNUNET_STREAM_write/read
119  * @param status the status of the stream at the time this function is called
120  * @param data traffic from the other side
121  * @param size the number of bytes available in data read 
122  * @return number of bytes of processed from 'data' (any data remaining should be
123  *         given to the next time the read processor is called).
124  */
125 static size_t
126 input_processor (void *cls,
127                  enum GNUNET_STREAM_Status status,
128                  const void *input_data,
129                  size_t size);
130
131 /**
132  * Task for calling STREAM_read
133  *
134  * @param cls the peer data entity
135  * @param tc the task context
136  */
137 static void
138 stream_read_task (void *cls,
139                   const struct GNUNET_SCHEDULER_TaskContext *tc)
140 {
141   struct PeerData *peer = cls;
142   
143   peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
144                                              GNUNET_TIME_relative_multiply
145                                              (GNUNET_TIME_UNIT_SECONDS, 5),
146                                              &input_processor,
147                                              peer);
148   GNUNET_assert (NULL != peer->io_read_handle);
149 }
150
151 /**
152  * The write completion function; called upon writing some data to stream or
153  * upon error
154  *
155  * @param cls the closure from GNUNET_STREAM_write/read
156  * @param status the status of the stream at the time this function is called
157  * @param size the number of bytes read or written
158  */
159 static void 
160 write_completion (void *cls,
161                   enum GNUNET_STREAM_Status status,
162                   size_t size);
163
164
165 /**
166  * Task for calling STREAM_write
167  *
168  * @param cls the peer data entity
169  * @param tc the task context
170  */
171 static void
172 stream_write_task (void *cls,
173                    const struct GNUNET_SCHEDULER_TaskContext *tc)
174 {
175   struct PeerData *peer = cls;
176   
177   peer->io_write_handle = 
178     GNUNET_STREAM_write (peer->socket,
179                          (void *) data,
180                          strlen(data) - peer->bytes_wrote,
181                          GNUNET_TIME_relative_multiply
182                          (GNUNET_TIME_UNIT_SECONDS, 5),
183                          &write_completion,
184                          peer);
185  
186   GNUNET_assert (NULL != peer->io_write_handle);
187  }
188
189
190 /**
191  * Check whether peers successfully shut down.
192  */
193 static void
194 peergroup_shutdown_callback (void *cls, const char *emsg)
195 {
196   if (emsg != NULL)
197   {
198     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
199                 "Shutdown of peers failed!\n");
200   }
201   else
202   {
203     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
204                 "All peers successfully shut down!\n");
205   }
206   GNUNET_CONFIGURATION_destroy (config);
207 }
208
209
210 /**
211  * Close sockets and stop testing deamons nicely
212  */
213 static void
214 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
215 {
216   if (NULL != peer1.socket)
217     GNUNET_STREAM_close (peer1.socket);
218   if (NULL != peer2.socket)
219     GNUNET_STREAM_close (peer2.socket);
220   if (NULL != peer2_listen_socket)
221     GNUNET_STREAM_listen_close (peer2_listen_socket);
222
223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
224   if (0 != abort_task)
225   {
226     GNUNET_SCHEDULER_cancel (abort_task);
227   }
228
229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
230
231   GNUNET_TESTING_daemons_stop (pg,
232                                GNUNET_TIME_relative_multiply
233                                (GNUNET_TIME_UNIT_SECONDS, 5),
234                                &peergroup_shutdown_callback,
235                                NULL);
236 }
237
238
239 /**
240  * Completion callback for shutdown
241  *
242  * @param cls the closure from GNUNET_STREAM_shutdown call
243  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
244  *          SHUT_RDWR) 
245  */
246 static void 
247 shutdown_completion (void *cls,
248                      int operation)
249 {
250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
251               "STREAM shutdown successful\n");
252   GNUNET_SCHEDULER_add_now (&do_close,
253                             cls);
254 }
255
256
257
258 /**
259  * Shutdown sockets gracefully
260  */
261 static void
262 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
263 {
264   peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, 
265                                                   SHUT_RDWR,
266                                                   &shutdown_completion,
267                                                   cls);
268 }
269
270
271 /**
272  * Something went wrong and timed out. Kill everything and set error flag
273  */
274 static void
275 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
276 {
277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
278   result = GNUNET_SYSERR;
279   abort_task = 0;
280   do_close (cls, tc);  
281 }
282
283
284 /**
285  * The write completion function; called upon writing some data to stream or
286  * upon error
287  *
288  * @param cls the closure from GNUNET_STREAM_write/read
289  * @param status the status of the stream at the time this function is called
290  * @param size the number of bytes read or written
291  */
292 static void 
293 write_completion (void *cls,
294                   enum GNUNET_STREAM_Status status,
295                   size_t size)
296 {
297   struct PeerData *peer=cls;
298
299   GNUNET_assert (GNUNET_STREAM_OK == status);
300   GNUNET_assert (size <= strlen (data));
301   peer->bytes_wrote += size;
302
303   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
304     {
305       GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
306     }
307   else
308     {
309       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
310                   "Writing completed\n");
311
312       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
313         {
314           peer->bytes_read = 0;
315           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
316         }
317       else
318         {
319           writing_success = GNUNET_YES;
320           if (GNUNET_YES == reading_success)
321             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
322         }
323     }
324 }
325
326
327 /**
328  * Function executed after stream has been established
329  *
330  * @param cls the closure from GNUNET_STREAM_open
331  * @param socket socket to use to communicate with the other side (read/write)
332  */
333 static void 
334 stream_open_cb (void *cls,
335                 struct GNUNET_STREAM_Socket *socket)
336 {
337   struct PeerData *peer=cls;
338   
339   GNUNET_assert (&peer1 == peer);
340   GNUNET_assert (socket == peer1.socket);
341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342               "%s: Stream established from peer1\n",
343               GNUNET_i2s (&peer1.our_id));
344   peer->bytes_wrote = 0;
345   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
346 }
347
348
349 /**
350  * Input processor
351  *
352  * @param cls the closure from GNUNET_STREAM_write/read
353  * @param status the status of the stream at the time this function is called
354  * @param data traffic from the other side
355  * @param size the number of bytes available in data read 
356  * @return number of bytes of processed from 'data' (any data remaining should be
357  *         given to the next time the read processor is called).
358  */
359 static size_t
360 input_processor (void *cls,
361                  enum GNUNET_STREAM_Status status,
362                  const void *input_data,
363                  size_t size)
364 {
365   struct PeerData *peer;
366
367   peer = (struct PeerData *) cls;
368
369   if (GNUNET_STREAM_TIMEOUT == status)
370     {
371       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
372                   "Read operation timedout - reading again!\n");
373       GNUNET_assert (0 == size);
374       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
375       return 0;
376     }
377
378   GNUNET_assert (GNUNET_STREAM_OK == status);
379   GNUNET_assert (size <= strlen (data));
380   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
381                                (const char *) input_data,
382                                size));
383   peer->bytes_read += size;
384   
385   if (peer->bytes_read < strlen (data))
386     {
387       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
388     }
389   else 
390     {
391       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
392         {
393           peer->bytes_wrote = 0;
394           GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
395         }
396       else                      /* Peer1 has completed reading. End of tests */
397         {
398           reading_success = GNUNET_YES;
399           if (GNUNET_YES == writing_success)
400             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
401         }
402     }
403   return size;
404 }
405
406   
407 /**
408  * Functions of this type are called upon new stream connection from other peers
409  *
410  * @param cls the closure from GNUNET_STREAM_listen
411  * @param socket the socket representing the stream
412  * @param initiator the identity of the peer who wants to establish a stream
413  *            with us
414  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
415  *             stream (the socket will be invalid after the call)
416  */
417 static int
418 stream_listen_cb (void *cls,
419                   struct GNUNET_STREAM_Socket *socket,
420                   const struct GNUNET_PeerIdentity *initiator)
421 {
422   GNUNET_assert (NULL != socket);
423   GNUNET_assert (NULL != initiator);
424   GNUNET_assert (socket != peer1.socket);
425
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427               "%s: Peer connected: %s\n",
428               GNUNET_i2s (&peer2.our_id),
429               GNUNET_i2s(initiator));
430
431   peer2.socket = socket;
432   peer2.bytes_read = 0;
433   GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
434   return GNUNET_OK;
435 }
436
437
438 /**
439  * Listen success callback; connects a peer to stream as client
440  */
441 static void
442 stream_connect (void)
443 {
444   peer1.socket = GNUNET_STREAM_open (d1->cfg,
445                                      &d2->id,         /* Null for local peer? */
446                                      10,           /* App port */
447                                      &stream_open_cb,
448                                      &peer1,
449                                      GNUNET_STREAM_OPTION_END);
450   GNUNET_assert (NULL != peer1.socket);
451 }
452
453
454 /**
455  * Callback to be called when testing peer group is ready
456  *
457  * @param cls NULL
458  * @param emsg NULL on success
459  */
460 void
461 peergroup_ready (void *cls, const char *emsg)
462 {
463   if (NULL != emsg)
464     {
465       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
466                   "Starting peer group failed: %s\n", emsg);
467       return;
468     }
469   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470               "Peer group is now ready\n");
471   
472   GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
473   
474   d1 = GNUNET_TESTING_daemon_get (pg, 0);
475   GNUNET_assert (NULL != d1);
476   
477   d2 = GNUNET_TESTING_daemon_get (pg, 1);
478   GNUNET_assert (NULL != d2);
479
480   GNUNET_TESTING_get_peer_identity (d1->cfg,
481                                     &peer1.our_id);
482   GNUNET_TESTING_get_peer_identity (d2->cfg,
483                                     &peer2.our_id);
484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
485               "%s : %s\n",
486               GNUNET_i2s (&peer1.our_id),
487               GNUNET_i2s (&d1->id));
488   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489               "%s : %s\n",
490               GNUNET_i2s (&peer2.our_id),
491               GNUNET_i2s (&d2->id));
492
493   peer2_listen_socket = 
494     GNUNET_STREAM_listen (d2->cfg,
495                           10, /* App port */
496                           &stream_listen_cb,
497                           NULL,
498                           GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
499                           &stream_connect,
500                           GNUNET_STREAM_OPTION_END);
501   GNUNET_assert (NULL != peer2_listen_socket);
502 }
503
504
505 /**
506  * Initialize framework and start test
507  */
508 static void
509 run (void *cls, char *const *args, const char *cfgfile,
510      const struct GNUNET_CONFIGURATION_Handle *cfg)
511 {
512   struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
513
514   GNUNET_log_setup ("test_stream_2peers",
515                     "DEBUG",
516                     NULL);
517
518   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519               "Starting test\n");
520   /* Duplicate the configuration */
521   config = GNUNET_CONFIGURATION_dup (cfg);
522
523   hosts = GNUNET_TESTING_hosts_load (config);
524   
525   pg = GNUNET_TESTING_peergroup_start (config,
526                                        2,
527                                        GNUNET_TIME_relative_multiply
528                                        (GNUNET_TIME_UNIT_SECONDS, 3),
529                                        NULL,
530                                        &peergroup_ready,
531                                        NULL,
532                                        hosts);
533   GNUNET_assert (NULL != pg);
534                                        
535   abort_task =
536     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
537                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
538                                   NULL);
539 }
540
541 /**
542  * Main function
543  */
544 int main (int argc, char **argv)
545 {
546   int ret;
547
548   char *argv2[] = { "test-stream-2peers",
549                     "-L", "DEBUG",
550                     "-c", "test_stream_local.conf",
551                     NULL};
552   
553   struct GNUNET_GETOPT_CommandLineOption options[] = {
554     GNUNET_GETOPT_OPTION_END
555   };
556
557   ret =
558       GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
559                           "test-stream-2peers", "nohelp", options, &run, NULL);
560
561   if (GNUNET_OK != ret)
562   {
563     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
564                 ret);
565     return 1;
566   }
567   if (GNUNET_SYSERR == result)
568   {
569     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
570     return 1;
571   }
572   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
573   return 0;
574 }