-fix double-sending in stream if finish_cb behaves in a certain way
[oweals/gnunet.git] / src / stream / test_stream_local.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/test_stream_local.c
23  * @brief Stream API testing between local peers
24  * @author Sree Harsha Totakura
25  */
26
27 #include <string.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_mesh_service.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_testing_lib-new.h"
34
35 /**
36  * Relative seconds shorthand
37  */
38 #define TIME_REL_SECS(sec) \
39   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
40
41
42 /**
43  * Structure for holding peer's sockets and IO Handles
44  */
45 struct PeerData
46 {
47   /**
48    * Peer's stream socket
49    */
50   struct GNUNET_STREAM_Socket *socket;
51
52   /**
53    * Peer's io write handle
54    */
55   struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
56
57   /**
58    * Peer's io read handle
59    */
60   struct GNUNET_STREAM_IOReadHandle *io_read_handle;
61
62   /**
63    * Peer's shutdown handle
64    */
65   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
66
67   /**
68    * Bytes the peer has written
69    */
70   unsigned int bytes_wrote;
71
72   /**
73    * Byte the peer has read
74    */
75   unsigned int bytes_read;
76 };
77
78 static struct PeerData peer1;
79 static struct PeerData peer2;
80 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
81 static const struct GNUNET_CONFIGURATION_Handle *config;
82 static struct GNUNET_TESTING_Peer *self;
83 static struct GNUNET_PeerIdentity self_id;
84
85 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
86
87 static char *data = "ABCD";
88 static int result;
89
90 static int writing_success;
91 static int reading_success;
92
93
94 /**
95  * Input processor
96  *
97  * @param cls the closure from GNUNET_STREAM_write/read
98  * @param status the status of the stream at the time this function is called
99  * @param data traffic from the other side
100  * @param size the number of bytes available in data read 
101  * @return number of bytes of processed from 'data' (any data remaining should be
102  *         given to the next time the read processor is called).
103  */
104 static size_t
105 input_processor (void *cls,
106                  enum GNUNET_STREAM_Status status,
107                  const void *input_data,
108                  size_t size);
109
110 /**
111  * Task for calling STREAM_read
112  *
113  * @param cls the peer data entity
114  * @param tc the task context
115  */
116 static void
117 stream_read_task (void *cls,
118                   const struct GNUNET_SCHEDULER_TaskContext *tc)
119 {
120   struct PeerData *peer = cls;
121   
122   peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
123                                              GNUNET_TIME_relative_multiply
124                                              (GNUNET_TIME_UNIT_SECONDS, 5),
125                                              &input_processor,
126                                              peer);
127   GNUNET_assert (NULL != peer->io_read_handle);
128 }
129
130
131 /**
132  * The write completion function; called upon writing some data to stream or
133  * upon error
134  *
135  * @param cls the closure from GNUNET_STREAM_write/read
136  * @param status the status of the stream at the time this function is called
137  * @param size the number of bytes read or written
138  */
139 static void 
140 write_completion (void *cls,
141                   enum GNUNET_STREAM_Status status,
142                   size_t size);
143
144
145 /**
146  * Task for calling STREAM_write
147  *
148  * @param cls the peer data entity
149  * @param tc the task context
150  */
151 static void
152 stream_write_task (void *cls,
153                    const struct GNUNET_SCHEDULER_TaskContext *tc)
154 {
155   struct PeerData *peer = cls;
156   
157   peer->io_write_handle = 
158     GNUNET_STREAM_write (peer->socket,
159                          (void *) data,
160                          strlen(data) - peer->bytes_wrote,
161                          GNUNET_TIME_relative_multiply
162                          (GNUNET_TIME_UNIT_SECONDS, 5),
163                          &write_completion,
164                          peer);
165  
166   GNUNET_assert (NULL != peer->io_write_handle);
167  }
168
169
170 /**
171  * Shutdown nicely
172  */
173 static void
174 do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
175 {
176   if (GNUNET_SCHEDULER_NO_TASK != abort_task)
177     GNUNET_SCHEDULER_cancel (abort_task);
178   if (NULL != peer1.socket)
179     GNUNET_STREAM_close (peer1.socket);
180   if (NULL != peer2.socket)
181     GNUNET_STREAM_close (peer2.socket);
182   if (NULL != peer2_listen_socket)
183     GNUNET_STREAM_listen_close (peer2_listen_socket);
184 }
185
186
187 /**
188  * Something went wrong and timed out. Kill everything and set error flag
189  */
190 static void
191 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
192 {
193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
194   result = GNUNET_SYSERR;
195   abort_task = GNUNET_SCHEDULER_NO_TASK;
196   do_close (cls, tc);
197 }
198
199
200 /**
201  * Completion callback for shutdown
202  *
203  * @param cls the closure from GNUNET_STREAM_shutdown call
204  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
205  *          SHUT_RDWR) 
206  */
207 static void 
208 shutdown_completion (void *cls,
209                      int operation)
210 {
211   static int shutdowns;
212
213   if (++shutdowns == 1)
214   {
215     peer1.shutdown_handle = NULL;
216     peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
217                                                     &shutdown_completion, cls);
218     return;
219   }  
220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
221   GNUNET_SCHEDULER_add_now (&do_close, cls);
222 }
223
224
225 /**
226  * Shutdown sockets gracefully
227  */
228 static void
229 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
230 {
231   result = GNUNET_OK;
232   peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
233                                                   &shutdown_completion, cls);
234 }
235
236
237 /**
238  * The write completion function; called upon writing some data to stream or
239  * upon error
240  *
241  * @param cls the closure from GNUNET_STREAM_write/read
242  * @param status the status of the stream at the time this function is called
243  * @param size the number of bytes read or written
244  */
245 static void 
246 write_completion (void *cls,
247                   enum GNUNET_STREAM_Status status,
248                   size_t size)
249 {
250   struct PeerData *peer=cls;
251
252   GNUNET_assert (GNUNET_STREAM_OK == status);
253   GNUNET_assert (size <= strlen (data));
254   peer->bytes_wrote += size;
255   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
256     {
257       GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
258     }
259   else
260     {
261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
262                   "Writing completed\n");
263       if (&peer1 == peer)  /* Peer1 has finished writing; should read now */
264         {
265           peer->bytes_read = 0;
266           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
267         }
268       else
269         {
270           writing_success = GNUNET_YES;
271           if (GNUNET_YES == reading_success)
272             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
273         }
274     }
275 }
276
277
278 /**
279  * Function executed after stream has been established
280  *
281  * @param cls the closure from GNUNET_STREAM_open
282  * @param socket socket to use to communicate with the other side (read/write)
283  */
284 static void 
285 stream_open_cb (void *cls,
286                 struct GNUNET_STREAM_Socket *socket)
287 {
288   struct PeerData *peer=cls;
289
290   GNUNET_assert (&peer1 == peer);
291   GNUNET_assert (socket == peer1.socket);
292   GNUNET_assert (socket == peer->socket);
293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
294   peer->bytes_wrote = 0;
295   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
296 }
297
298
299 /**
300  * Input processor
301  *
302  * @param cls the closure from GNUNET_STREAM_write/read
303  * @param status the status of the stream at the time this function is called
304  * @param data traffic from the other side
305  * @param size the number of bytes available in data read 
306  * @return number of bytes of processed from 'data' (any data remaining should be
307  *         given to the next time the read processor is called).
308  */
309 static size_t
310 input_processor (void *cls,
311                  enum GNUNET_STREAM_Status status,
312                  const void *input_data,
313                  size_t size)
314 {
315   struct PeerData *peer = cls;
316
317   GNUNET_assert (GNUNET_STREAM_OK == status);
318   GNUNET_assert (size <= strlen (data));
319   GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
320                                (const char *) input_data,
321                                size));
322   peer->bytes_read += size;  
323   if (peer->bytes_read < strlen (data))
324     {
325       GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
326     }
327   else 
328     {
329       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
330         {
331           peer->bytes_wrote = 0;
332           GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
333         }
334       else                      /* Peer1 has completed reading. End of tests */
335         {
336           reading_success = GNUNET_YES;
337           if (GNUNET_YES == writing_success)
338             GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
339         }
340     } 
341   return size;
342 }
343
344   
345 /**
346  * Functions of this type are called upon new stream connection from other peers
347  *
348  * @param cls the PeerData of peer2
349  * @param socket the socket representing the stream
350  * @param initiator the identity of the peer who wants to establish a stream
351  *            with us
352  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
353  *             stream (the socket will be invalid after the call)
354  */
355 static int
356 stream_listen_cb (void *cls,
357            struct GNUNET_STREAM_Socket *socket,
358            const struct GNUNET_PeerIdentity *initiator)
359 {
360   struct PeerData *peer=cls;
361
362   if ((NULL == socket) || (NULL == initiator))
363   {
364     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
365     if (GNUNET_SCHEDULER_NO_TASK != abort_task)
366       GNUNET_SCHEDULER_cancel (abort_task);
367     abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
368     return GNUNET_OK;
369   }
370   GNUNET_assert (NULL != socket);
371   GNUNET_assert (socket != peer1.socket);
372   GNUNET_assert (&peer2 == peer);
373   GNUNET_assert (0 == memcmp (&self_id,
374                               initiator,
375                               sizeof (struct GNUNET_PeerIdentity)));
376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377               "Peer connected: %s\n", GNUNET_i2s(initiator));
378   peer->socket = socket;
379   peer->bytes_read = 0;
380   GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
381   return GNUNET_OK;
382 }
383
384
385 /**
386  * Listen success callback; connects a peer to stream as client
387  */
388 static void
389 stream_connect (void)
390 {
391   peer1.socket = GNUNET_STREAM_open (config,
392                                      &self_id,
393                                      10,           /* App port */
394                                      &stream_open_cb,
395                                      &peer1,
396                                      GNUNET_STREAM_OPTION_END);
397   GNUNET_assert (NULL != peer1.socket);
398 }
399
400
401 /**
402  * Initialize framework and start test
403  */
404 static void
405 run (void *cls,
406      const struct GNUNET_CONFIGURATION_Handle *cfg,
407      struct GNUNET_TESTING_Peer *peer)
408 {
409   config = cfg;
410   self = peer;
411   GNUNET_TESTING_peer_get_identity (peer, &self_id);
412   peer2_listen_socket = 
413     GNUNET_STREAM_listen (config,
414                           10, /* App port */
415                           &stream_listen_cb,
416                           &peer2,
417                           GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
418                           &stream_connect,
419                           GNUNET_STREAM_OPTION_END);
420   GNUNET_assert (NULL != peer2_listen_socket);
421   abort_task =
422     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
423                                   (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort,
424                                   NULL);
425 }
426
427 /**
428  * Main function
429  */
430 int main (int argc, char **argv)
431 {
432   if (0 != GNUNET_TESTING_peer_run ("test_stream_local",
433                                     "test_stream_local.conf",
434                                     &run, NULL))
435     return 1;
436   return (GNUNET_SYSERR == result) ? 1 : 0;
437 }
438
439 /* end of test_stream_local.c */