From cc8cf22281f1f9d77fb498664ad8f847dccee17e Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Wed, 18 Apr 2012 14:21:25 +0000 Subject: [PATCH] -fixed mesh application types array and makefile --- src/stream/Makefile.am | 7 +- src/stream/stream_api.c | 6 +- src/stream/test_stream_2peers.c | 194 ++++++++++++++---------------- src/stream/test_stream_local.c | 101 +++++++--------- src/stream/test_stream_local.conf | 2 +- 5 files changed, 146 insertions(+), 164 deletions(-) diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am index 5359c5d45..2fe70913e 100644 --- a/src/stream/Makefile.am +++ b/src/stream/Makefile.am @@ -21,13 +21,14 @@ libgnunetstream_la_LDFLAGS = \ check_PROGRAMS = \ test_stream_2peers \ - test_stream_2peers_halfclose -# test_stream_local + test_stream_2peers_halfclose \ + test_stream_local EXTRA_DIST = test_stream_local.conf if ENABLE_TEST_RUN -TESTS = $(check_PROGRAMS) +TESTS = test_stream_2peers \ + test_stream_2peers_halfclose endif test_stream_2peers_SOURCES = \ diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index a61437a2c..e20d43863 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -2753,6 +2753,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_STREAM_Socket *socket; struct GNUNET_PeerIdentity own_peer_id; enum GNUNET_STREAM_Option option; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; va_list vargs; /* Variable arguments */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2790,7 +2791,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, NULL, /* No inbound tunnel handler */ &tunnel_cleaner, /* FIXME: not required? */ client_message_handlers, - &app_port); /* We don't get inbound tunnels */ + ports); /* We don't get inbound tunnels */ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ { GNUNET_free (socket); @@ -3003,6 +3004,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { /* FIXME: Add variable args for passing configration options? */ struct GNUNET_STREAM_ListenSocket *lsocket; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; struct GNUNET_PeerIdentity our_peer_id; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); @@ -3017,7 +3019,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, &new_tunnel_notify, &tunnel_cleaner, server_message_handlers, - &app_port); + ports); GNUNET_assert (NULL != lsocket->mesh); return lsocket; } diff --git a/src/stream/test_stream_2peers.c b/src/stream/test_stream_2peers.c index bca0e595c..d10e12c5e 100644 --- a/src/stream/test_stream_2peers.c +++ b/src/stream/test_stream_2peers.c @@ -101,7 +101,6 @@ static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; static struct GNUNET_CONFIGURATION_Handle *config; static GNUNET_SCHEDULER_TaskIdentifier abort_task; -static GNUNET_SCHEDULER_TaskIdentifier read_task; static char *data = "ABCD"; static int result; @@ -110,6 +109,81 @@ static int writing_success; static int reading_success; +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + peer); + GNUNET_assert (NULL != peer->io_read_handle); +} + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + + GNUNET_assert (NULL != peer->io_write_handle); + } + + /** * Check whether peers successfully shut down. */ @@ -198,31 +272,11 @@ static void do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); - if (0 != read_task) - { - GNUNET_SCHEDULER_cancel (read_task); - } result = GNUNET_SYSERR; abort_task = 0; do_close (cls, tc); } -/** - * Signature for input processor - * - * @param cls the closure from GNUNET_STREAM_write/read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). - */ -static size_t -input_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *input_data, - size_t size); - /** * The write completion function; called upon writing some data to stream or @@ -237,7 +291,7 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - struct PeerData *peer = cls; + struct PeerData *peer=cls; GNUNET_assert (GNUNET_STREAM_OK == status); GNUNET_assert (size <= strlen (data)); @@ -245,15 +299,7 @@ write_completion (void *cls, if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ { - peer->io_write_handle = - GNUNET_STREAM_write (peer->socket, - (void *) data, - strlen(data) - peer->bytes_wrote, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_write_handle); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } else { @@ -262,14 +308,8 @@ write_completion (void *cls, if (&peer1 == peer) /* Peer1 has finished writing; should read now */ { - peer->io_read_handle = - GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL!=peer->io_read_handle); + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); } else { @@ -291,24 +331,15 @@ static void stream_open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) { - struct PeerData *peer; - + struct PeerData *peer=cls; + + GNUNET_assert (&peer1 == peer); GNUNET_assert (socket == peer1.socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Stream established from peer1\n", GNUNET_i2s (&peer1.our_id)); - peer = (struct PeerData *) cls; peer->bytes_wrote = 0; - GNUNET_assert (socket == peer1.socket); - GNUNET_assert (socket == peer->socket); - peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */ - (void *) data, /* data */ - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_write_handle); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } @@ -337,13 +368,7 @@ input_processor (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Read operation timedout - reading again!\n"); GNUNET_assert (0 == size); - peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_read_handle); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); return 0; } @@ -356,27 +381,14 @@ input_processor (void *cls, if (peer->bytes_read < strlen (data)) { - peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_read_handle); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); } else { if (&peer2 == peer) /* Peer2 has completed reading; should write */ { peer->bytes_wrote = 0; - peer->io_write_handle = - GNUNET_STREAM_write (peer->socket, - data, - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } else /* Peer1 has completed reading. End of tests */ { @@ -389,26 +401,6 @@ input_processor (void *cls, } -/** - * Scheduler call back; to be executed when a new stream is connected - * Called from listen connect for peer2 - */ -static void -stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - read_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (NULL != cls); - peer2.bytes_read = 0; - peer2.io_read_handle = - GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - (void *) &peer2); - GNUNET_assert (NULL != peer2.io_read_handle); -} - - /** * Functions of this type are called upon new stream connection from other peers * @@ -434,8 +426,8 @@ stream_listen_cb (void *cls, GNUNET_i2s(initiator)); peer2.socket = socket; - /* FIXME: reading should be done right now instead of a scheduled call */ - read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); + peer2.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); return GNUNET_OK; } @@ -504,9 +496,9 @@ run (void *cls, char *const *args, const char *cfgfile, { struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */ - /* GNUNET_log_setup ("test_stream_local", */ - /* "DEBUG", */ - /* NULL); */ + GNUNET_log_setup ("test_stream_2peers", + "DEBUG", + NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting test\n"); @@ -538,7 +530,7 @@ int main (int argc, char **argv) { int ret; - char *argv2[] = { "test-stream-local", + char *argv2[] = { "test-stream-2peers", "-L", "DEBUG", "-c", "test_stream_local.conf", NULL}; @@ -549,7 +541,7 @@ int main (int argc, char **argv) ret = GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, - "test-stream-local", "nohelp", options, &run, NULL); + "test-stream-2peers", "nohelp", options, &run, NULL); if (GNUNET_OK != ret) { diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 9289f9863..4f0f7629f 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -69,15 +69,18 @@ static struct GNUNET_OS_Process *arm_pid; static struct PeerData peer1; static struct PeerData peer2; static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; -static struct GNUNET_CONFIGURATION_Handle *config; +static struct GNUNET_CONFIGURATION_Handle *config_peer1; +static struct GNUNET_CONFIGURATION_Handle *config_peer2; static GNUNET_SCHEDULER_TaskIdentifier abort_task; static GNUNET_SCHEDULER_TaskIdentifier test_task; -static GNUNET_SCHEDULER_TaskIdentifier read_task; static char *data = "ABCD"; static int result; +static int writing_success; +static int reading_success; + /** * Input processor * @@ -175,7 +178,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); /* Free the duplicated configuration */ - GNUNET_CONFIGURATION_destroy (config); + GNUNET_CONFIGURATION_destroy (config_peer1); + GNUNET_CONFIGURATION_destroy (config_peer2); GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); GNUNET_OS_process_close (arm_pid); } @@ -192,31 +196,12 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_SCHEDULER_cancel (test_task); } - if (0 != read_task) - { - GNUNET_SCHEDULER_cancel (read_task); - } + result = GNUNET_SYSERR; abort_task = 0; do_shutdown (cls, tc); } -/** - * Signature for input processor - * - * @param cls the closure from GNUNET_STREAM_write/read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). - */ -static size_t -input_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *input_data, - size_t size); - /** * The write completion function; called upon writing some data to stream or @@ -231,10 +216,10 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - struct PeerData *peer=cls;; + struct PeerData *peer=cls; GNUNET_assert (GNUNET_STREAM_OK == status); - GNUNET_assert (size < strlen (data)); + GNUNET_assert (size <= strlen (data)); peer->bytes_wrote += size; if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ @@ -243,11 +228,20 @@ write_completion (void *cls, } else { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + if (&peer1 == peer) /* Peer1 has finished writing; should read now */ { peer->bytes_read = 0; GNUNET_SCHEDULER_add_now (&stream_read_task, peer); } + else + { + writing_success = GNUNET_YES; + if (GNUNET_YES == reading_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + } } } @@ -264,10 +258,12 @@ stream_open_cb (void *cls, { struct PeerData *peer=cls; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n"); - peer->bytes_wrote = 0; + GNUNET_assert (&peer1 == peer); GNUNET_assert (socket == peer1.socket); GNUNET_assert (socket == peer->socket); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n"); + peer->bytes_wrote = 0; GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } @@ -291,7 +287,7 @@ input_processor (void *cls, struct PeerData *peer = cls; GNUNET_assert (GNUNET_STREAM_OK == status); - GNUNET_assert (size < strlen (data)); + GNUNET_assert (size <= strlen (data)); GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, (const char *) input_data, size)); @@ -310,31 +306,19 @@ input_processor (void *cls, } else /* Peer1 has completed reading. End of tests */ { - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + reading_success = GNUNET_YES; + if (GNUNET_YES == writing_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); } } return size; } -/** - * Scheduler call back; to be executed when a new stream is connected - * Called from listen connect for peer2 - */ -static void -stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - read_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (NULL != cls); - peer2.bytes_read = 0; - GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); -} - - /** * Functions of this type are called upon new stream connection from other peers * - * @param cls the closure from GNUNET_STREAM_listen + * @param cls the PeerData of peer2 * @param socket the socket representing the stream * @param initiator the identity of the peer who wants to establish a stream * with us @@ -346,13 +330,15 @@ stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, const struct GNUNET_PeerIdentity *initiator) { + struct PeerData *peer=cls; struct GNUNET_PeerIdentity self; GNUNET_assert (NULL != socket); GNUNET_assert (socket != peer1.socket); + GNUNET_assert (&peer2 == peer); /* Get our identity */ - GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config, + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, &self)); GNUNET_assert (0 == memcmp (&self, initiator, @@ -361,8 +347,9 @@ stream_listen_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n", GNUNET_i2s(initiator)); - peer2.socket = socket; - read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); + peer->socket = socket; + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); return GNUNET_OK; } @@ -380,22 +367,22 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) test_task = GNUNET_SCHEDULER_NO_TASK; /* Get our identity */ - GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config, + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, &self)); - peer2_listen_socket = GNUNET_STREAM_listen (config, + peer2_listen_socket = GNUNET_STREAM_listen (config_peer2, 10, /* App port */ &stream_listen_cb, - NULL); + &peer2); GNUNET_assert (NULL != peer2_listen_socket); /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (config, + peer1.socket = GNUNET_STREAM_open (config_peer1, &self, /* Null for local peer? */ 10, /* App port */ &stream_open_cb, - (void *) &peer1); - GNUNET_assert (NULL != peer1.socket); + &peer1); + GNUNET_assert (NULL != peer1.socket); } /** @@ -413,7 +400,8 @@ run (void *cls, char *const *args, const char *cfgfile, #endif NULL); /* Duplicate the configuration */ - config = GNUNET_CONFIGURATION_dup (cfg); + config_peer1 = GNUNET_CONFIGURATION_dup (cfg); + config_peer2 = GNUNET_CONFIGURATION_dup (cfg); arm_pid = GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm", "gnunet-service-arm", @@ -428,7 +416,6 @@ run (void *cls, char *const *args, const char *cfgfile, NULL); test_task = GNUNET_SCHEDULER_add_now (&test, NULL); - } /** @@ -449,7 +436,7 @@ int main (int argc, char **argv) struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_OPTION_END }; - + ret = GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, "test-stream-local", "nohelp", options, &run, NULL); @@ -465,6 +452,6 @@ int main (int argc, char **argv) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); return 1; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n"); return 0; } diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf index db5db0bf0..c6d11c339 100644 --- a/src/stream/test_stream_local.conf +++ b/src/stream/test_stream_local.conf @@ -9,7 +9,7 @@ DEBUG = YES AUTOSTART = YES ACCEPT_FROM = 127.0.0.1; HOSTNAME = localhost -PORT = 10511 +PORT = 10700 # PREFIX = valgrind --leak-check=full # PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args -- 2.25.1