X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fstream%2Ftest_stream_local.c;h=af1b8ff84de1f0ba5dff62d14e7b70a51732be63;hb=32566458738aa4d91d02616ef4f38c2614acf301;hp=32e3bf2e406a4bed1e369968b11e6764927c2a42;hpb=3c61c59e70103d21f9b3f645108f51f7a34b77fc;p=oweals%2Fgnunet.git diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 32e3bf2e4..af1b8ff84 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -25,38 +25,141 @@ */ #include -#include /* For SHUT_RD, SHUT_WR */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_mesh_service.h" #include "gnunet_stream_lib.h" +#include "gnunet_testing_lib-new.h" + +/** + * Relative seconds shorthand + */ +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) -#define VERBOSE 1 -static struct GNUNET_OS_Process *arm_pid; -static struct GNUNET_STREAM_Socket *peer1_socket; +/** + * Structure for holding peer's sockets and IO Handles + */ +struct PeerData +{ + /** + * Peer's stream socket + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Peer's io write handle + */ + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; + + /** + * Bytes the peer has written + */ + unsigned int bytes_wrote; + + /** + * Byte the peer has read + */ + unsigned int bytes_read; +}; + +static struct PeerData peer1; +static struct PeerData peer2; static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; -static struct GNUNET_STREAM_Socket *peer2_socket; +static const struct GNUNET_CONFIGURATION_Handle *config; +static struct GNUNET_TESTING_Peer *self; +static struct GNUNET_PeerIdentity self_id; static GNUNET_SCHEDULER_TaskIdentifier abort_task; -static GNUNET_SCHEDULER_TaskIdentifier test_task; -static GNUNET_SCHEDULER_TaskIdentifier read_task; - -static GNUNET_STREAM_IOHandle *peer1_IOHandle; -static GNUNET_STREAM_IOHandle *peer2_IOHandle; static char *data = "ABCD"; -static unsigned int data_pointer; -static unsigned int read_pointer; static int result; -static int peer1_write_pass; -static int peer2_read_pass; -static int peer1_half_closed_write_pass; -static int peer2_half_closed_read_pass; -static int peer1_write_shutdown_pass; -static int peer1_read_shutdown_pass; +static int writing_success; +static int reading_success; + + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + peer); + GNUNET_assert (NULL != peer->io_read_handle); +} + + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + + GNUNET_assert (NULL != peer->io_write_handle); + } /** @@ -65,21 +168,16 @@ static int peer1_read_shutdown_pass; static void do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_STREAM_close (peer1_socket); - GNUNET_STREAM_close (peer2_socket); + GNUNET_STREAM_close (peer1.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); if (0 != abort_task) { GNUNET_SCHEDULER_cancel (abort_task); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: arm\n"); - if (0 != GNUNET_OS_process_kill (arm_pid, SIGTERM)) - { - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); - GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); - GNUNET_OS_process_close (arm_pid); } @@ -90,14 +188,6 @@ static void do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); - if (0 != test_task) - { - GNUNET_SCHEDULER_cancel (test_task); - } - if (0 != read_task) - { - GNUNET_SCHEDULER_cancel (read_task); - } result = GNUNET_SYSERR; abort_task = 0; do_shutdown (cls, tc); @@ -112,56 +202,36 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param status the status of the stream at the time this function is called * @param size the number of bytes read or written */ -void write_completion (void *cls, - enum GNUNET_STREAM_Status status, - size_t size) +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) { + struct PeerData *peer=cls; - if (peer1_write_shutdown_pass) /* Called for peer2's write operation */ + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + peer->bytes_wrote += size; + if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ { - /* peer1 has shutdown reading */ - GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); - GNUNET_assert (0 == size); - peer1_read_shutdown_pass = 1; - - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } - GNUNET_assert (GNUNET_STREAM_OK == status); - if (data_pointer + size != strlen(data)) /* Have more data to send */ + else { - data_pointer += size; - peer1_IOHandle = GNUNET_STREAM_write (peer1_socket, - (void *) data, - strlen(data) - data_pointer, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - NULL); - GNUNET_assert (NULL != peer1_IOHandle); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + if (&peer1 == peer) /* Peer1 has finished writing; should read now */ + { + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + writing_success = GNUNET_YES; + if (GNUNET_YES == reading_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + } } - else{ - peer1_write_pass = 1; - /* If we are here and peer2_read_pass == 1 => we have send the data twice */ - if (peer2_read_pass) peer1_half_closed_write_pass = 1; - - if (! peer1_half_closed_write_pass) - { - GNUNET_STREAM_shutdown (peer1_socket, SHUT_RD); - /* Half closed write */ - data_pointer = 0; - peer1_IOHandle = GNUNET_STREAM_write (peer1_socket, - (void *) data, - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - NULL); - } - else - { - GNUNET_STREAM_shutdown (peer1_socket, SHUT_WR); - } - } } @@ -173,20 +243,16 @@ void write_completion (void *cls, */ static void stream_open_cb (void *cls, - struct GNUNET_STREAM_Socket - *socket) + struct GNUNET_STREAM_Socket *socket) { - data_pointer = 0; - GNUNET_assert (socket == peer1_socket); - peer1_IOHandle = GNUNET_STREAM_write (socket, /* socket */ - (void *) data, /* data */ - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - NULL); - GNUNET_assert (NULL != peer1_IOHandle); - + struct PeerData *peer=cls; + + GNUNET_assert (&peer1 == peer); + GNUNET_assert (socket == peer1.socket); + GNUNET_assert (socket == peer->socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n"); + peer->bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } @@ -206,83 +272,40 @@ input_processor (void *cls, const void *input_data, size_t size) { + struct PeerData *peer = cls; - if (peer2_half_closed_read_pass) + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); + peer->bytes_read += size; + if (peer->bytes_read < strlen (data)) { - GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); - GNUNET_assert (0 == size); - peer1_write_shutdown_pass = 1; - /* Now this should result in STREAM_SHUTDOWN */ - peer2_IOHandle = GNUNET_STREAM_write (peer2_socket, - (void *) data, - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - (void *) peer2_socket); - - return 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); } - - GNUNET_assert (GNUNET_STERAM_OK == status); - GNUNET_assert (size < strlen (data)); - GNUNET_assert (strncmp ((const char *) data, - (const char *) input_data, - size)); - read_pointer += size; - - if (read_pointer < strlen (data)) + else { - peer2_IOHandle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - NULL); - GNUNET_assert (NULL != peer2_IOHandle); - } - else { - /* If we are here and peer2_read_pass => we have finished reading twice */ - if (peer1_write_pass && peer2_read_pass) peer2_half_closed_read_pass = 1; - if (peer1_write_pass) peer2_read_pass = 1; - - /* Half closed read */ - read_pointer = 0; - peer2_IOHandle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - NULL); - - - } - + if (&peer2 == peer) /* Peer2 has completed reading; should write */ + { + peer->bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); + } + else /* Peer1 has completed reading. End of tests */ + { + reading_success = GNUNET_YES; + if (GNUNET_YES == writing_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + } + } return size; } - -/** - * Scheduler call back; to be executed when a new stream is connected - */ -static void -stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - read_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (NULL != cls); - read_pointer = 0; - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ - peer2_IOHandle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - NULL); - GNUNET_assert (NULL != peer2_IOHandle); -} - - + /** * Functions of this type are called upon new stream connection from other peers * - * @param cls the closure from GNUNET_STREAM_listen + * @param cls the PeerData of peer2 * @param socket the socket representing the stream * @param initiator the identity of the peer who wants to establish a stream * with us @@ -294,65 +317,63 @@ stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, const struct GNUNET_PeerIdentity *initiator) { - GNUNET_assert (NULL != socket); - GNUNET_assert (NULL == initiator); /* Local peer=NULL? */ - GNUNET_assert (socket != peer1_socket); + struct PeerData *peer=cls; - peer2_socket = socket; - read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); + GNUNET_assert (NULL != socket); + GNUNET_assert (socket != peer1.socket); + GNUNET_assert (&peer2 == peer); + GNUNET_assert (0 == memcmp (&self_id, + initiator, + sizeof (struct GNUNET_PeerIdentity))); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer connected: %s\n", GNUNET_i2s(initiator)); + peer->socket = socket; + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); return GNUNET_OK; } /** - * Testing function + * Listen success callback; connects a peer to stream as client */ static void -test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +stream_connect (void) { - test_task = GNUNET_SCHEDULER_NO_TASK; - - /* Connect to stream library */ - peer1_socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */ + peer1.socket = GNUNET_STREAM_open (config, + &self_id, 10, /* App port */ - open_cb); - GNUNET_assert (NULL != peer1_socket); - peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */ - &stream_listen_cb, - NULL); - GNUNET_assert (NULL != peer2_listen_socket); - + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer1.socket); } + /** * Initialize framework and start test */ static void -run (void *cls, char *const *args, const char *cfgfile, - const struct GNUNET_CONFIGURATION_Handle *cfg) +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) { - GNUNET_log_setup ("test_stream_local", -#if VERBOSE - "DEBUG", -#else - "WARNING", -#endif - NULL); - arm_pid = - GNUNET_OS_start_process (NULL, NULL, "gnunet-service-arm", - "gnunet-service-arm", -#if VERBOSE_ARM - "-L", "DEBUG", -#endif - "-c", "test_stream_local.conf", NULL); - - abort_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort, - NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg); - + config = cfg; + self = peer; + GNUNET_TESTING_peer_get_identity (peer, &self_id); + peer2_listen_socket = + GNUNET_STREAM_listen (config, + 10, /* App port */ + &stream_listen_cb, + &peer2, + GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, + &stream_connect, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer2_listen_socket); + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort, + NULL); } /** @@ -360,35 +381,11 @@ run (void *cls, char *const *args, const char *cfgfile, */ int main (int argc, char **argv) { - int ret; - - char *const argv2[] = { "test-stream-local", - "-c", "test_stream.conf", -#if VERBOSE - "-L", "DEBUG", -#endif - NULL - }; - - struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_OPTION_END - }; - - ret = - GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, - "test-stream-local", "nohelp", options, &run, NULL); - - if (GNUNET_OK != ret) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n", - ret); + if (0 != GNUNET_TESTING_peer_run ("test_stream_local", + "test_stream_local.conf", + &run, NULL)) return 1; - } - if (GNUNET_SYSERR == result) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); - return 1; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n"); - return 0; + return (GNUNET_SYSERR == result) ? 1 : 0; } + +/* end of test_stream_local.c */