-regex
[oweals/gnunet.git] / src / stream / test_stream_local.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_local.c
23  * @brief Stream API testing between local peers
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testing_lib.h"
34
35 #define VERBOSE 1
36
37 /**
38  * Number of peers
39  */
40 #define NUM_PEERS 2
41
42 /**
43  * Structure for holding peer's sockets and IO Handles
44  */
45 struct PeerData
46 {
47   /**
48    * Peer's stream socket
49    */
50   struct GNUNET_STREAM_Socket *socket;
51
52   /**
53    * Peer's io write handle
54    */
55   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
56
57   /**
58    * Peer's io read handle
59    */
60   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
61
62   /**
63    * Our Peer id
64    */
65   struct GNUNET_PeerIdentity our_id;
66
67   /**
68    * Bytes the peer has written
69    */
70   unsigned int bytes_wrote;
71
72   /**
73    * Byte the peer has read
74    */
75   unsigned int bytes_read;
76 };
77
78 /**
79  * The current peer group
80  */
81 static struct GNUNET_TESTING_PeerGroup *pg;
82
83 /**
84  * Peer 1 daemon
85  */
86 static struct GNUNET_TESTING_Daemon *d1;
87
88 /**
89  * Peer 2 daemon
90  */
91 static struct GNUNET_TESTING_Daemon *d2;
92
93 static struct PeerData peer1;
94 static struct PeerData peer2;
95 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
96 static struct GNUNET_CONFIGURATION_Handle *config;
97
98 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
99 static GNUNET_SCHEDULER_TaskIdentifier read_task;
100
101 static char *data = "ABCD";
102 static int result;
103
104
105 /**
106  * Check whether peers successfully shut down.
107  */
108 static void
109 shutdown_callback (void *cls, const char *emsg)
110 {
111   if (emsg != NULL)
112   {
113     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
114                 "Shutdown of peers failed!\n");
115   }
116   else
117   {
118     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
119                 "All peers successfully shut down!\n");
120   }
121   GNUNET_CONFIGURATION_destroy (config);
122 }
123
124
125 /**
126  * Shutdown nicely
127  */
128 static void
129 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
130 {
131   if (NULL != peer1.socket)
132     GNUNET_STREAM_close (peer1.socket);
133   if (NULL != peer2.socket)
134     GNUNET_STREAM_close (peer2.socket);
135   if (NULL != peer2_listen_socket)
136     GNUNET_STREAM_listen_close (peer2_listen_socket);
137
138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
139   if (0 != abort_task)
140   {
141     GNUNET_SCHEDULER_cancel (abort_task);
142   }
143
144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
145
146   GNUNET_TESTING_daemons_stop (pg,
147                                GNUNET_TIME_relative_multiply
148                                (GNUNET_TIME_UNIT_SECONDS, 5),
149                                &shutdown_callback,
150                                NULL);
151 }
152
153
154 /**
155  * Something went wrong and timed out. Kill everything and set error flag
156  */
157 static void
158 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
159 {
160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
161   if (0 != read_task)
162     {
163       GNUNET_SCHEDULER_cancel (read_task);
164     }
165   result = GNUNET_SYSERR;
166   abort_task = 0;
167   do_shutdown (cls, tc);
168 }
169
170 /**
171  * Signature for input processor 
172  *
173  * @param cls the closure from GNUNET_STREAM_write/read
174  * @param status the status of the stream at the time this function is called
175  * @param data traffic from the other side
176  * @param size the number of bytes available in data read 
177  * @return number of bytes of processed from 'data' (any data remaining should be
178  *         given to the next time the read processor is called).
179  */
180 static size_t
181 input_processor (void *cls,
182                  enum GNUNET_STREAM_Status status,
183                  const void *input_data,
184                  size_t size);
185
186
187 /**
188  * The write completion function; called upon writing some data to stream or
189  * upon error
190  *
191  * @param cls the closure from GNUNET_STREAM_write/read
192  * @param status the status of the stream at the time this function is called
193  * @param size the number of bytes read or written
194  */
195 static void 
196 write_completion (void *cls,
197                   enum GNUNET_STREAM_Status status,
198                   size_t size)
199 {
200   struct PeerData *peer;
201
202   peer = (struct PeerData *) cls;
203   GNUNET_assert (GNUNET_STREAM_OK == status);
204   GNUNET_assert (size <= strlen (data));
205   peer->bytes_wrote += size;
206
207   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
208     {
209       peer->io_write_handle =
210         GNUNET_STREAM_write (peer->socket,
211                              (void *) data,
212                              strlen(data) - peer->bytes_wrote,
213                              GNUNET_TIME_relative_multiply
214                              (GNUNET_TIME_UNIT_SECONDS, 5),
215                              &write_completion,
216                              cls);
217       GNUNET_assert (NULL != peer->io_write_handle);
218     }
219   else
220     {
221       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
222                   "Writing completed\n");
223
224       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
225         {
226           peer->io_read_handle =
227             GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
228                                 peer->socket,
229                                 GNUNET_TIME_relative_multiply
230                                 (GNUNET_TIME_UNIT_SECONDS, 5),
231                                 &input_processor,
232                                 cls);
233           GNUNET_assert (NULL!=peer->io_read_handle);
234         }
235     }
236 }
237
238
239 /**
240  * Function executed after stream has been established
241  *
242  * @param cls the closure from GNUNET_STREAM_open
243  * @param socket socket to use to communicate with the other side (read/write)
244  */
245 static void 
246 stream_open_cb (void *cls,
247                 struct GNUNET_STREAM_Socket *socket)
248 {
249   struct PeerData *peer;
250
251   GNUNET_assert (socket == peer1.socket);
252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253               "%s: Stream established from peer1\n",
254               GNUNET_i2s (&peer1.our_id));
255   peer = (struct PeerData *) cls;
256   peer->bytes_wrote = 0;
257   GNUNET_assert (socket == peer1.socket);
258   GNUNET_assert (socket == peer->socket);
259   peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
260                                                (void *) data, /* data */
261                                                strlen(data),
262                                                GNUNET_TIME_relative_multiply
263                                                (GNUNET_TIME_UNIT_SECONDS, 5),
264                                                &write_completion,
265                                          cls);
266   GNUNET_assert (NULL != peer->io_write_handle);
267 }
268
269
270 /**
271  * Input processor
272  *
273  * @param cls the closure from GNUNET_STREAM_write/read
274  * @param status the status of the stream at the time this function is called
275  * @param data traffic from the other side
276  * @param size the number of bytes available in data read 
277  * @return number of bytes of processed from 'data' (any data remaining should be
278  *         given to the next time the read processor is called).
279  */
280 static size_t
281 input_processor (void *cls,
282                  enum GNUNET_STREAM_Status status,
283                  const void *input_data,
284                  size_t size)
285 {
286   struct PeerData *peer;
287
288   peer = (struct PeerData *) cls;
289
290   if (GNUNET_STREAM_TIMEOUT == status)
291     {
292       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
293                   "Read operation timedout - reading again!\n");
294       GNUNET_assert (0 == size);
295       peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
296                                                  peer->socket,
297                                                  GNUNET_TIME_relative_multiply
298                                                  (GNUNET_TIME_UNIT_SECONDS, 5),
299                                                  &input_processor,
300                                                  cls);
301       GNUNET_assert (NULL != peer->io_read_handle);
302       return 0;
303     }
304
305   GNUNET_assert (GNUNET_STREAM_OK == status);
306   GNUNET_assert (size <= strlen (data));
307   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
308                                (const char *) input_data,
309                                size));
310   peer->bytes_read += size;
311   
312   if (peer->bytes_read < strlen (data))
313     {
314       peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
315                                                  peer->socket,
316                                                  GNUNET_TIME_relative_multiply
317                                                  (GNUNET_TIME_UNIT_SECONDS, 5),
318                                                  &input_processor,
319                                                  cls);
320       GNUNET_assert (NULL != peer->io_read_handle);
321     }
322   else 
323     {
324       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
325         {
326           peer->bytes_wrote = 0;
327           peer->io_write_handle = 
328             GNUNET_STREAM_write (peer->socket,
329                                  data,
330                                  strlen(data),
331                                  GNUNET_TIME_relative_multiply
332                                  (GNUNET_TIME_UNIT_SECONDS, 5),
333                                  &write_completion,
334                                  cls);
335         }
336       else                      /* Peer1 has completed reading. End of tests */
337         {
338           GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
339         }
340     }
341   return size;
342 }
343
344   
345 /**
346  * Scheduler call back; to be executed when a new stream is connected
347  * Called from listen connect for peer2
348  */
349 static void
350 stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
351 {
352   read_task = GNUNET_SCHEDULER_NO_TASK;
353   GNUNET_assert (NULL != cls);
354   peer2.bytes_read = 0;
355   peer2.io_read_handle =
356     GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
357                         GNUNET_TIME_relative_multiply
358                         (GNUNET_TIME_UNIT_SECONDS, 5),
359                         &input_processor,
360                         (void *) &peer2);
361   GNUNET_assert (NULL != peer2.io_read_handle);
362 }
363
364
365 /**
366  * Functions of this type are called upon new stream connection from other peers
367  *
368  * @param cls the closure from GNUNET_STREAM_listen
369  * @param socket the socket representing the stream
370  * @param initiator the identity of the peer who wants to establish a stream
371  *            with us
372  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
373  *             stream (the socket will be invalid after the call)
374  */
375 static int
376 stream_listen_cb (void *cls,
377                   struct GNUNET_STREAM_Socket *socket,
378                   const struct GNUNET_PeerIdentity *initiator)
379 {
380   GNUNET_assert (NULL != socket);
381   GNUNET_assert (NULL != initiator);
382   GNUNET_assert (socket != peer1.socket);
383
384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385               "%s: Peer connected: %s\n",
386               GNUNET_i2s (&peer2.our_id),
387               GNUNET_i2s(initiator));
388
389   peer2.socket = socket;
390   /* FIXME: reading should be done right now instead of a scheduled call */
391   read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
392   return GNUNET_OK;
393 }
394
395
396 /**
397  * Callback to be called when testing peer group is ready
398  *
399  * @param cls NULL
400  * @param emsg NULL on success
401  */
402 void
403 peergroup_ready (void *cls, const char *emsg)
404 {
405   if (NULL != emsg)
406     {
407       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
408                   "Starting peer group failed: %s\n", emsg);
409       return;
410     }
411   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
412               "Peer group is now ready\n");
413   
414   GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
415   
416   d1 = GNUNET_TESTING_daemon_get (pg, 0);
417   GNUNET_assert (NULL != d1);
418   
419   d2 = GNUNET_TESTING_daemon_get (pg, 1);
420   GNUNET_assert (NULL != d2);
421
422   GNUNET_TESTING_get_peer_identity (d1->cfg,
423                                     &peer1.our_id);
424   GNUNET_TESTING_get_peer_identity (d2->cfg,
425                                     &peer2.our_id);
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427               "%s : %s\n",
428               GNUNET_i2s (&peer1.our_id),
429               GNUNET_i2s (&d1->id));
430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
431               "%s : %s\n",
432               GNUNET_i2s (&peer2.our_id),
433               GNUNET_i2s (&d2->id));
434
435   peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
436                                               10, /* App port */
437                                               &stream_listen_cb,
438                                               NULL);
439   GNUNET_assert (NULL != peer2_listen_socket);
440
441   /* Connect to stream library */
442   peer1.socket = GNUNET_STREAM_open (d1->cfg,
443                                      &d2->id,         /* Null for local peer? */
444                                      10,           /* App port */
445                                      &stream_open_cb,
446                                      &peer1);
447   GNUNET_assert (NULL != peer1.socket);
448 }
449
450
451 /**
452  * Initialize framework and start test
453  */
454 static void
455 run (void *cls, char *const *args, const char *cfgfile,
456      const struct GNUNET_CONFIGURATION_Handle *cfg)
457 {
458   struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
459
460   /* GNUNET_log_setup ("test_stream_local", */
461   /*                   "DEBUG", */
462   /*                   NULL); */
463
464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465               "Starting test\n");
466   /* Duplicate the configuration */
467   config = GNUNET_CONFIGURATION_dup (cfg);
468
469   hosts = GNUNET_TESTING_hosts_load (config);
470   
471   pg = GNUNET_TESTING_peergroup_start (config,
472                                        2,
473                                        GNUNET_TIME_relative_multiply
474                                        (GNUNET_TIME_UNIT_SECONDS, 3),
475                                        NULL,
476                                        &peergroup_ready,
477                                        NULL,
478                                        hosts);
479   GNUNET_assert (NULL != pg);
480                                        
481   abort_task =
482     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
483                                   (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
484                                   NULL);
485 }
486
487 /**
488  * Main function
489  */
490 int main (int argc, char **argv)
491 {
492   int ret;
493
494   char *argv2[] = { "test-stream-local",
495                     "-L", "DEBUG",
496                     "-c", "test_stream_local.conf",
497                     NULL};
498   
499   struct GNUNET_GETOPT_CommandLineOption options[] = {
500     GNUNET_GETOPT_OPTION_END
501   };
502
503   ret =
504       GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
505                           "test-stream-local", "nohelp", options, &run, NULL);
506
507   if (GNUNET_OK != ret)
508   {
509     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
510                 ret);
511     return 1;
512   }
513   if (GNUNET_SYSERR == result)
514   {
515     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
516     return 1;
517   }
518   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
519   return 0;
520 }