c3fcc649264194b4c75ac9c469d9ff29e6519e05
[oweals/gnunet.git] / src / stream / test_stream_local.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_local.c
23  * @brief Stream API testing between local peers
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testing_lib.h"
34
35 #define VERBOSE 1
36
37 /**
38  * Number of peers
39  */
40 #define NUM_PEERS 2
41
42 /**
43  * Structure for holding peer's sockets and IO Handles
44  */
45 struct PeerData
46 {
47   /**
48    * Peer's stream socket
49    */
50   struct GNUNET_STREAM_Socket *socket;
51
52   /**
53    * Peer's io write handle
54    */
55   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
56
57   /**
58    * Peer's io read handle
59    */
60   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
61
62   /**
63    * Our Peer id
64    */
65   struct GNUNET_PeerIdentity our_id;
66
67   /**
68    * Bytes the peer has written
69    */
70   unsigned int bytes_wrote;
71
72   /**
73    * Byte the peer has read
74    */
75   unsigned int bytes_read;
76 };
77
78 /**
79  * The current peer group
80  */
81 static struct GNUNET_TESTING_PeerGroup *pg;
82
83 /**
84  * Peer 1 daemon
85  */
86 static struct GNUNET_TESTING_Daemon *d1;
87
88 /**
89  * Peer 2 daemon
90  */
91 static struct GNUNET_TESTING_Daemon *d2;
92
93 static struct PeerData peer1;
94 static struct PeerData peer2;
95 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
96 static struct GNUNET_CONFIGURATION_Handle *config;
97
98 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
99 static GNUNET_SCHEDULER_TaskIdentifier read_task;
100
101 static char *data = "ABCD";
102 static int result;
103
104 static int writing_success;
105 static int reading_success;
106
107
108 /**
109  * Check whether peers successfully shut down.
110  */
111 static void
112 shutdown_callback (void *cls, const char *emsg)
113 {
114   if (emsg != NULL)
115   {
116     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
117                 "Shutdown of peers failed!\n");
118   }
119   else
120   {
121     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
122                 "All peers successfully shut down!\n");
123   }
124   GNUNET_CONFIGURATION_destroy (config);
125 }
126
127
128 /**
129  * Shutdown nicely
130  */
131 static void
132 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
133 {
134   if (NULL != peer1.socket)
135     GNUNET_STREAM_close (peer1.socket);
136   if (NULL != peer2.socket)
137     GNUNET_STREAM_close (peer2.socket);
138   if (NULL != peer2_listen_socket)
139     GNUNET_STREAM_listen_close (peer2_listen_socket);
140
141   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
142   if (0 != abort_task)
143   {
144     GNUNET_SCHEDULER_cancel (abort_task);
145   }
146
147   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
148
149   GNUNET_TESTING_daemons_stop (pg,
150                                GNUNET_TIME_relative_multiply
151                                (GNUNET_TIME_UNIT_SECONDS, 5),
152                                &shutdown_callback,
153                                NULL);
154 }
155
156
157 /**
158  * Something went wrong and timed out. Kill everything and set error flag
159  */
160 static void
161 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
162 {
163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
164   if (0 != read_task)
165     {
166       GNUNET_SCHEDULER_cancel (read_task);
167     }
168   result = GNUNET_SYSERR;
169   abort_task = 0;
170   do_shutdown (cls, tc);
171 }
172
173 /**
174  * Signature for input processor 
175  *
176  * @param cls the closure from GNUNET_STREAM_write/read
177  * @param status the status of the stream at the time this function is called
178  * @param data traffic from the other side
179  * @param size the number of bytes available in data read 
180  * @return number of bytes of processed from 'data' (any data remaining should be
181  *         given to the next time the read processor is called).
182  */
183 static size_t
184 input_processor (void *cls,
185                  enum GNUNET_STREAM_Status status,
186                  const void *input_data,
187                  size_t size);
188
189
190 /**
191  * The write completion function; called upon writing some data to stream or
192  * upon error
193  *
194  * @param cls the closure from GNUNET_STREAM_write/read
195  * @param status the status of the stream at the time this function is called
196  * @param size the number of bytes read or written
197  */
198 static void 
199 write_completion (void *cls,
200                   enum GNUNET_STREAM_Status status,
201                   size_t size)
202 {
203   struct PeerData *peer = cls;
204
205   GNUNET_assert (GNUNET_STREAM_OK == status);
206   GNUNET_assert (size <= strlen (data));
207   peer->bytes_wrote += size;
208
209   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
210     {
211       peer->io_write_handle =
212         GNUNET_STREAM_write (peer->socket,
213                              (void *) data,
214                              strlen(data) - peer->bytes_wrote,
215                              GNUNET_TIME_relative_multiply
216                              (GNUNET_TIME_UNIT_SECONDS, 5),
217                              &write_completion,
218                              cls);
219       GNUNET_assert (NULL != peer->io_write_handle);
220     }
221   else
222     {
223       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
224                   "Writing completed\n");
225
226       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
227         {
228           peer->io_read_handle =
229             GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
230                                 peer->socket,
231                                 GNUNET_TIME_relative_multiply
232                                 (GNUNET_TIME_UNIT_SECONDS, 5),
233                                 &input_processor,
234                                 cls);
235           GNUNET_assert (NULL!=peer->io_read_handle);
236         }
237       else
238         {
239           writing_success = GNUNET_YES;
240           if (GNUNET_YES == reading_success) 
241             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
242         }
243     }
244 }
245
246
247 /**
248  * Function executed after stream has been established
249  *
250  * @param cls the closure from GNUNET_STREAM_open
251  * @param socket socket to use to communicate with the other side (read/write)
252  */
253 static void 
254 stream_open_cb (void *cls,
255                 struct GNUNET_STREAM_Socket *socket)
256 {
257   struct PeerData *peer;
258
259   GNUNET_assert (socket == peer1.socket);
260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
261               "%s: Stream established from peer1\n",
262               GNUNET_i2s (&peer1.our_id));
263   peer = (struct PeerData *) cls;
264   peer->bytes_wrote = 0;
265   GNUNET_assert (socket == peer1.socket);
266   GNUNET_assert (socket == peer->socket);
267   peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
268                                                (void *) data, /* data */
269                                                strlen(data),
270                                                GNUNET_TIME_relative_multiply
271                                                (GNUNET_TIME_UNIT_SECONDS, 5),
272                                                &write_completion,
273                                          cls);
274   GNUNET_assert (NULL != peer->io_write_handle);
275 }
276
277
278 /**
279  * Input processor
280  *
281  * @param cls the closure from GNUNET_STREAM_write/read
282  * @param status the status of the stream at the time this function is called
283  * @param data traffic from the other side
284  * @param size the number of bytes available in data read 
285  * @return number of bytes of processed from 'data' (any data remaining should be
286  *         given to the next time the read processor is called).
287  */
288 static size_t
289 input_processor (void *cls,
290                  enum GNUNET_STREAM_Status status,
291                  const void *input_data,
292                  size_t size)
293 {
294   struct PeerData *peer;
295
296   peer = (struct PeerData *) cls;
297
298   if (GNUNET_STREAM_TIMEOUT == status)
299     {
300       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301                   "Read operation timedout - reading again!\n");
302       GNUNET_assert (0 == size);
303       peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
304                                                  peer->socket,
305                                                  GNUNET_TIME_relative_multiply
306                                                  (GNUNET_TIME_UNIT_SECONDS, 5),
307                                                  &input_processor,
308                                                  cls);
309       GNUNET_assert (NULL != peer->io_read_handle);
310       return 0;
311     }
312
313   GNUNET_assert (GNUNET_STREAM_OK == status);
314   GNUNET_assert (size <= strlen (data));
315   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
316                                (const char *) input_data,
317                                size));
318   peer->bytes_read += size;
319   
320   if (peer->bytes_read < strlen (data))
321     {
322       peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
323                                                  peer->socket,
324                                                  GNUNET_TIME_relative_multiply
325                                                  (GNUNET_TIME_UNIT_SECONDS, 5),
326                                                  &input_processor,
327                                                  cls);
328       GNUNET_assert (NULL != peer->io_read_handle);
329     }
330   else 
331     {
332       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
333         {
334           peer->bytes_wrote = 0;
335           peer->io_write_handle = 
336             GNUNET_STREAM_write (peer->socket,
337                                  data,
338                                  strlen(data),
339                                  GNUNET_TIME_relative_multiply
340                                  (GNUNET_TIME_UNIT_SECONDS, 5),
341                                  &write_completion,
342                                  cls);
343         }
344       else                      /* Peer1 has completed reading. End of tests */
345         {
346           reading_success = GNUNET_YES;
347           if (GNUNET_YES == writing_success)
348             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
349         }
350     }
351   return size;
352 }
353
354   
355 /**
356  * Scheduler call back; to be executed when a new stream is connected
357  * Called from listen connect for peer2
358  */
359 static void
360 stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
361 {
362   read_task = GNUNET_SCHEDULER_NO_TASK;
363   GNUNET_assert (NULL != cls);
364   peer2.bytes_read = 0;
365   peer2.io_read_handle =
366     GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
367                         GNUNET_TIME_relative_multiply
368                         (GNUNET_TIME_UNIT_SECONDS, 5),
369                         &input_processor,
370                         (void *) &peer2);
371   GNUNET_assert (NULL != peer2.io_read_handle);
372 }
373
374
375 /**
376  * Functions of this type are called upon new stream connection from other peers
377  *
378  * @param cls the closure from GNUNET_STREAM_listen
379  * @param socket the socket representing the stream
380  * @param initiator the identity of the peer who wants to establish a stream
381  *            with us
382  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
383  *             stream (the socket will be invalid after the call)
384  */
385 static int
386 stream_listen_cb (void *cls,
387                   struct GNUNET_STREAM_Socket *socket,
388                   const struct GNUNET_PeerIdentity *initiator)
389 {
390   GNUNET_assert (NULL != socket);
391   GNUNET_assert (NULL != initiator);
392   GNUNET_assert (socket != peer1.socket);
393
394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395               "%s: Peer connected: %s\n",
396               GNUNET_i2s (&peer2.our_id),
397               GNUNET_i2s(initiator));
398
399   peer2.socket = socket;
400   /* FIXME: reading should be done right now instead of a scheduled call */
401   read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
402   return GNUNET_OK;
403 }
404
405
406 /**
407  * Callback to be called when testing peer group is ready
408  *
409  * @param cls NULL
410  * @param emsg NULL on success
411  */
412 void
413 peergroup_ready (void *cls, const char *emsg)
414 {
415   if (NULL != emsg)
416     {
417       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
418                   "Starting peer group failed: %s\n", emsg);
419       return;
420     }
421   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
422               "Peer group is now ready\n");
423   
424   GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
425   
426   d1 = GNUNET_TESTING_daemon_get (pg, 0);
427   GNUNET_assert (NULL != d1);
428   
429   d2 = GNUNET_TESTING_daemon_get (pg, 1);
430   GNUNET_assert (NULL != d2);
431
432   GNUNET_TESTING_get_peer_identity (d1->cfg,
433                                     &peer1.our_id);
434   GNUNET_TESTING_get_peer_identity (d2->cfg,
435                                     &peer2.our_id);
436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437               "%s : %s\n",
438               GNUNET_i2s (&peer1.our_id),
439               GNUNET_i2s (&d1->id));
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441               "%s : %s\n",
442               GNUNET_i2s (&peer2.our_id),
443               GNUNET_i2s (&d2->id));
444
445   peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
446                                               10, /* App port */
447                                               &stream_listen_cb,
448                                               NULL);
449   GNUNET_assert (NULL != peer2_listen_socket);
450
451   /* Connect to stream library */
452   peer1.socket = GNUNET_STREAM_open (d1->cfg,
453                                      &d2->id,         /* Null for local peer? */
454                                      10,           /* App port */
455                                      &stream_open_cb,
456                                      &peer1);
457   GNUNET_assert (NULL != peer1.socket);
458 }
459
460
461 /**
462  * Initialize framework and start test
463  */
464 static void
465 run (void *cls, char *const *args, const char *cfgfile,
466      const struct GNUNET_CONFIGURATION_Handle *cfg)
467 {
468   struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
469
470   /* GNUNET_log_setup ("test_stream_local", */
471   /*                   "DEBUG", */
472   /*                   NULL); */
473
474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475               "Starting test\n");
476   /* Duplicate the configuration */
477   config = GNUNET_CONFIGURATION_dup (cfg);
478
479   hosts = GNUNET_TESTING_hosts_load (config);
480   
481   pg = GNUNET_TESTING_peergroup_start (config,
482                                        2,
483                                        GNUNET_TIME_relative_multiply
484                                        (GNUNET_TIME_UNIT_SECONDS, 3),
485                                        NULL,
486                                        &peergroup_ready,
487                                        NULL,
488                                        hosts);
489   GNUNET_assert (NULL != pg);
490                                        
491   abort_task =
492     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
493                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
494                                   NULL);
495 }
496
497 /**
498  * Main function
499  */
500 int main (int argc, char **argv)
501 {
502   int ret;
503
504   char *argv2[] = { "test-stream-local",
505                     "-L", "DEBUG",
506                     "-c", "test_stream_local.conf",
507                     NULL};
508   
509   struct GNUNET_GETOPT_CommandLineOption options[] = {
510     GNUNET_GETOPT_OPTION_END
511   };
512
513   ret =
514       GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
515                           "test-stream-local", "nohelp", options, &run, NULL);
516
517   if (GNUNET_OK != ret)
518   {
519     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
520                 ret);
521     return 1;
522   }
523   if (GNUNET_SYSERR == result)
524   {
525     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
526     return 1;
527   }
528   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
529   return 0;
530 }