From e6d8c896432b661f709b8058cc76570c5382d814 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sat, 3 Mar 2012 14:57:24 +0000 Subject: [PATCH] -modified testcase according to modified interface --- src/stream/Makefile.am | 3 +- src/stream/stream_api.c | 209 +++++++++++++++++++----------- src/stream/test_stream_local.c | 160 +++++++++++++++-------- src/stream/test_stream_local.conf | 4 +- 4 files changed, 241 insertions(+), 135 deletions(-) diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am index 385c0cf4c..c2d2ac7f0 100644 --- a/src/stream/Makefile.am +++ b/src/stream/Makefile.am @@ -33,7 +33,8 @@ test_stream_local_SOURCES = \ test_stream_local.c test_stream_local_LDADD = \ $(top_builddir)/src/stream/libgnunetstream.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la #test_stream_halfclose_SOURCES = \ # test_stream_halfclose.c diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 84fcdfd6b..58f180c4d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -18,6 +18,10 @@ Boston, MA 02111-1307, USA. */ +/* TODO: + * Checks for matching the sender and socket->other_peer in server + * message handlers */ + /** * @file stream/stream_api.c * @brief Implementation of the stream library @@ -256,6 +260,11 @@ struct GNUNET_STREAM_Socket */ unsigned int retries; + /** + * The application port number (type: uint32_t) + */ + GNUNET_MESH_ApplicationType app_port; + /** * The session id associated with this stream connection * FIXME: Not used currently, may be removed @@ -328,6 +337,7 @@ struct GNUNET_STREAM_ListenSocket /** * The service port + * FIXME: Remove if not required! */ GNUNET_MESH_ApplicationType port; }; @@ -1314,6 +1324,12 @@ server_handle_hello (void *cls, struct GNUNET_STREAM_HelloAckMessage *reply; GNUNET_assert (socket->tunnel == tunnel); + + /* Catch possible protocol breaks */ + GNUNET_break_op (0 != memcmp (&socket->other_peer, + sender, + sizeof (struct GNUNET_PeerIdentity))); + if (STATE_INIT == socket->state) { /* Get the random sequence number */ @@ -1791,6 +1807,78 @@ mesh_peer_disconnect_callback (void *cls, } +/** + * Method called whenever a peer creates a tunnel to us + * + * @param cls closure + * @param tunnel new handle to the tunnel + * @param initiator peer that started the tunnel + * @param atsi performance information for the tunnel + * @return initial tunnel context for the tunnel + * (can be NULL -- that's not an error) + */ +static void * +new_tunnel_notify (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_ListenSocket *lsocket = cls; + struct GNUNET_STREAM_Socket *socket; + + socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); + socket->tunnel = tunnel; + socket->session_id = 0; /* FIXME */ + socket->other_peer = *initiator; + socket->state = STATE_INIT; + + if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, + socket, + &socket->other_peer)) + { + socket->state = STATE_CLOSED; + /* FIXME: Send CLOSE message and then free */ + GNUNET_free (socket); + GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */ + } + return socket; +} + + +/** + * Function called whenever an inbound tunnel is destroyed. Should clean up + * any associated state. This function is NOT called if the client has + * explicitly asked for the tunnel to be destroyed using + * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on + * the tunnel. + * + * @param cls closure (set from GNUNET_MESH_connect) + * @param tunnel connection to the other end (henceforth invalid) + * @param tunnel_ctx place where local state associated + * with the tunnel is stored + */ +static void +tunnel_cleaner (void *cls, + const struct GNUNET_MESH_Tunnel *tunnel, + void *tunnel_ctx) +{ + struct GNUNET_STREAM_Socket *socket = tunnel_ctx; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer %s has terminated connection abruptly\n", + GNUNET_i2s (&socket->other_peer)); + + socket->status = GNUNET_STREAM_SHUTDOWN; + /* Clear Transmit handles */ + if (NULL != socket->transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + socket->tunnel = NULL; +} + + /*****************/ /* API functions */ /*****************/ @@ -1820,6 +1908,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_STREAM_Socket *socket; enum GNUNET_STREAM_Option option; va_list vargs; /* Variable arguments */ + GNUNET_MESH_ApplicationType no_port; socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); socket->other_peer = *target; @@ -1845,15 +1934,19 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, } } while (GNUNET_STREAM_OPTION_END != option); va_end (vargs); /* End of variable args parsing */ - + no_port = 0; socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ 1, /* QUEUE size as parameter? */ socket, /* cls */ NULL, /* No inbound tunnel handler */ NULL, /* No inbound tunnel cleaner */ client_message_handlers, - NULL); /* We don't get inbound tunnels */ - // FIXME: if (NULL == socket->mesh) ... + &no_port); /* We don't get inbound tunnels */ + if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ + { + GNUNET_free (socket); + return NULL; + } /* Now create the mesh tunnel to target */ socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, @@ -1867,6 +1960,20 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, } +/** + * Shutdown the stream for reading or writing (man 2 shutdown). + * + * @param socket the stream socket + * @param how SHUT_RD, SHUT_WR or SHUT_RDWR + */ +void +GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, + int how) +{ + return; +} + + /** * Closes the stream * @@ -1925,78 +2032,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) } -/** - * Method called whenever a peer creates a tunnel to us - * - * @param cls closure - * @param tunnel new handle to the tunnel - * @param initiator peer that started the tunnel - * @param atsi performance information for the tunnel - * @return initial tunnel context for the tunnel - * (can be NULL -- that's not an error) - */ -static void * -new_tunnel_notify (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_ATS_Information *atsi) -{ - struct GNUNET_STREAM_ListenSocket *lsocket = cls; - struct GNUNET_STREAM_Socket *socket; - - socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); - socket->tunnel = tunnel; - socket->session_id = 0; /* FIXME */ - socket->other_peer = *initiator; - socket->state = STATE_INIT; - - if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, - socket, - &socket->other_peer)) - { - socket->state = STATE_CLOSED; - /* FIXME: Send CLOSE message and then free */ - GNUNET_free (socket); - GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */ - } - return socket; -} - - -/** - * Function called whenever an inbound tunnel is destroyed. Should clean up - * any associated state. This function is NOT called if the client has - * explicitly asked for the tunnel to be destroyed using - * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on - * the tunnel. - * - * @param cls closure (set from GNUNET_MESH_connect) - * @param tunnel connection to the other end (henceforth invalid) - * @param tunnel_ctx place where local state associated - * with the tunnel is stored - */ -static void -tunnel_cleaner (void *cls, - const struct GNUNET_MESH_Tunnel *tunnel, - void *tunnel_ctx) -{ - struct GNUNET_STREAM_Socket *socket = tunnel_ctx; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer %s has terminated connection abruptly\n", - GNUNET_i2s (&socket->other_peer)); - - socket->status = GNUNET_STREAM_SHUTDOWN; - /* Clear Transmit handles */ - if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } - socket->tunnel = NULL; -} - - /** * Listens for stream connections for a specific application ports * @@ -2178,3 +2213,27 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, socket); return read_handle; } + + +/** + * Cancel pending write operation. + * + * @param ioh handle to operation to cancel + */ +void +GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) +{ + return; +} + + +/** + * Cancel pending read operation. + * + * @param ioh handle to operation to cancel + */ +void +GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh) +{ + return; +} diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 4da1258fc..2eeedeec5 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -30,6 +30,7 @@ #include "gnunet_util_lib.h" #include "gnunet_mesh_service.h" #include "gnunet_stream_lib.h" +#include "gnunet_testing_lib.h" #define VERBOSE 1 @@ -44,9 +45,14 @@ struct PeerData struct GNUNET_STREAM_Socket *socket; /** - * Peer's io handle + * Peer's io write handle */ - struct GNUNET_STREAM_IOHandle *io_handle; + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; /** * Bytes the peer has written @@ -63,6 +69,7 @@ static struct GNUNET_OS_Process *arm_pid; static struct PeerData peer1; static struct PeerData peer2; static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; +static struct GNUNET_CONFIGURATION_Handle *config; static GNUNET_SCHEDULER_TaskIdentifier abort_task; static GNUNET_SCHEDULER_TaskIdentifier test_task; @@ -78,7 +85,8 @@ static void do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_STREAM_close (peer1.socket); - GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); if (0 != abort_task) { @@ -90,6 +98,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); + /* Free the duplicated configuration */ + GNUNET_CONFIGURATION_destroy (config); GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); GNUNET_OS_process_close (arm_pid); } @@ -115,6 +125,22 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) do_shutdown (cls, tc); } +/** + * Signature for input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + /** * The write completion function; called upon writing some data to stream or @@ -138,26 +164,28 @@ write_completion (void *cls, if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ { - peer->io_handle = GNUNET_STREAM_write (peer->socket, - (void *) data, - strlen(data) - peer->bytes_wrote, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + cls); + GNUNET_assert (NULL != peer->io_write_handle); } else { if (&peer1 == peer) /* Peer1 has finished writing; should read now */ { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL!=peer->io_handle); + peer->io_read_handle = + GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) + peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + cls); + GNUNET_assert (NULL!=peer->io_read_handle); } } } @@ -175,18 +203,19 @@ stream_open_cb (void *cls, { struct PeerData *peer; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n"); peer = (struct PeerData *) cls; peer->bytes_wrote = 0; GNUNET_assert (socket == peer1.socket); GNUNET_assert (socket == peer->socket); - peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */ - (void *) data, /* data */ - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, + peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */ + (void *) data, /* data */ + strlen(data), + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, cls); - GNUNET_assert (NULL != peer->io_handle); + GNUNET_assert (NULL != peer->io_write_handle); } @@ -210,7 +239,7 @@ input_processor (void *cls, peer = (struct PeerData *) cls; - GNUNET_assert (GNUNET_STERAM_OK == status); + GNUNET_assert (GNUNET_STREAM_OK == status); GNUNET_assert (size < strlen (data)); GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, (const char *) input_data, @@ -219,27 +248,28 @@ input_processor (void *cls, if (peer->bytes_read < strlen (data)) { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_handle); + peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) + peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + cls); + GNUNET_assert (NULL != peer->io_read_handle); } else { if (&peer2 == peer) /* Peer2 has completed reading; should write */ { peer->bytes_wrote = 0; - peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *) - peer->socket, - (void *) data, - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); + peer->io_write_handle = + GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *) + peer->socket, + (void *) data, + strlen(data), + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + cls); } else /* Peer1 has completed reading. End of tests */ { @@ -261,12 +291,13 @@ stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_assert (NULL != cls); peer2.bytes_read = 0; GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ - peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - (void *) &peer2); - GNUNET_assert (NULL != peer2.io_handle); + peer2.io_read_handle = + GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + (void *) &peer2); + GNUNET_assert (NULL != peer2.io_read_handle); } @@ -289,6 +320,9 @@ stream_listen_cb (void *cls, GNUNET_assert (NULL == initiator); /* Local peer=NULL? */ GNUNET_assert (socket != peer1.socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer connected: %s\n", GNUNET_i2s(initiator)); + peer2.socket = socket; read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); return GNUNET_OK; @@ -297,23 +331,33 @@ stream_listen_cb (void *cls, /** * Testing function + * + * @param cls NULL + * @param tc the task context */ static void test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct GNUNET_PeerIdentity self; + test_task = GNUNET_SCHEDULER_NO_TASK; + /* Get our identity */ + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config, + &self)); + + peer2_listen_socket = GNUNET_STREAM_listen (config, + 10, /* App port */ + &stream_listen_cb, + NULL); + GNUNET_assert (NULL != peer2_listen_socket); /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */ + peer1.socket = GNUNET_STREAM_open (config, + &self, /* Null for local peer? */ 10, /* App port */ &stream_open_cb, (void *) &peer1); - GNUNET_assert (NULL != peer1.socket); - peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */ - &stream_listen_cb, - NULL); - GNUNET_assert (NULL != peer2_listen_socket); - + GNUNET_assert (NULL != peer1.socket); } /** @@ -330,6 +374,8 @@ run (void *cls, char *const *args, const char *cfgfile, "WARNING", #endif NULL); + /* Duplicate the configuration */ + config = GNUNET_CONFIGURATION_dup (cfg); arm_pid = GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm", "gnunet-service-arm", @@ -342,8 +388,8 @@ run (void *cls, char *const *args, const char *cfgfile, GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort, NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg); + + test_task = GNUNET_SCHEDULER_add_now (&test, NULL); } @@ -355,7 +401,7 @@ int main (int argc, char **argv) int ret; char *const argv2[] = { "test-stream-local", - "-c", "test_stream.conf", + "-c", "test_stream_local.conf", #if VERBOSE "-L", "DEBUG", #endif diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf index 3d955f09e..3f229c302 100644 --- a/src/stream/test_stream_local.conf +++ b/src/stream/test_stream_local.conf @@ -59,8 +59,8 @@ WEAKRANDOM = YES HOSTKEY = $SERVICEHOME/.hostkey [PATHS] -DEFAULTCONFIG = test_mesh.conf -SERVICEHOME = /tmp/test-mesh/ +DEFAULTCONFIG = test_stream_local.conf +SERVICEHOME = /tmp/test-stream/ [dns] AUTOSTART = NO -- 2.25.1