X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fstream%2Fperf_stream_api.c;h=fdce1c5c1035e4601f6b8d5b33cd803bdf1a58b5;hb=9a38c9af01fadf13c594126dcd2c9e74d586bfbd;hp=2aec2be578a54cfa3f41164dd9b85c7003182a89;hpb=ec2206e33164fe6a8067bd634dbb647faec432c1;p=oweals%2Fgnunet.git diff --git a/src/stream/perf_stream_api.c b/src/stream/perf_stream_api.c index 2aec2be57..fdce1c5c1 100644 --- a/src/stream/perf_stream_api.c +++ b/src/stream/perf_stream_api.c @@ -1,4 +1,4 @@ -/* + /* This file is part of GNUnet. (C) 2011, 2012 Christian Grothoff (and other contributing authors) @@ -102,8 +102,6 @@ struct PeerData */ struct GNUNET_STREAM_Socket *socket; - struct GNUNET_PeerIdentity self; - /** * Peer's io write handle */ @@ -114,6 +112,21 @@ struct PeerData */ struct GNUNET_STREAM_IOReadHandle *io_read_handle; + /** + * The peer handle when we use the testbed servie + */ + struct GNUNET_TESTBED_Peer *peer; + + /** + * Handle to peer specific opearations while using testbed service + */ + struct GNUNET_TESTBED_Operation *op; + + /** + * The identity of this peer + */ + struct GNUNET_PeerIdentity id; + /** * Bytes the peer has written */ @@ -126,6 +139,28 @@ struct PeerData }; +/** + * Enumeration of stages in this testing + */ +enum TestStage +{ + /** + * The initial stage + */ + INIT, + + /** + * Uplink testing stage + */ + UPLINK_OK, + + /** + * Downlink testing stage + */ + DOWNLINK_OK +}; + + /** * Maximum size of the data which we will transfer during tests */ @@ -151,6 +186,11 @@ static struct ProgressMeter *meter; */ static struct PeerData peer_data[3]; +/** + * Handle to common operations while using testbed + */ +static struct GNUNET_TESTBED_Operation *common_op; + /** * Task ID for abort task */ @@ -185,12 +225,7 @@ static uint32_t data[DATA_SIZE / 4]; * Payload sizes to test each major test with */ static uint16_t payload_size[] = -{ 20, 500, 2000, 7000, 13000, 25000, 56000, 60000 }; - -/** - * Handle for the progress meter - */ -static struct ProgressMeter *meter; +{ 20, 500, 2000, 7000, 13000, 25000, 50000, 60000, 63000, 64000 }; /** * Current step of testing @@ -202,10 +237,15 @@ static enum TestStep test_step; */ static unsigned int payload_size_index; +/** + * Number of peers we want to create while using the testbed service + */ +static int num_peers; + /** * Testing result of a major test */ -static int result; +static enum TestStage result; /** * Create a meter to keep track of the progress of some task. @@ -316,16 +356,37 @@ free_meter (struct ProgressMeter *meter) static void do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_STREAM_close (peer_data[1].socket); - if (NULL != peer_data[2].socket) - GNUNET_STREAM_close (peer_data[2].socket); - if (NULL != peer2_listen_socket) - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ + switch (test_step) + { + case TEST_STEP_1_HOP: + if (NULL != peer_data[0].socket) + GNUNET_STREAM_close (peer_data[0].socket); + if (NULL != peer_data[1].socket) + GNUNET_STREAM_close (peer_data[1].socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ + break; + case TEST_STEP_2_HOP: + if (NULL != peer_data[1].socket) + GNUNET_STREAM_close (peer_data[1].socket); + if (NULL != peer_data[0].op) + GNUNET_TESTBED_operation_done (peer_data[0].op); + if (NULL != peer_data[1].op) + GNUNET_TESTBED_operation_done (peer_data[1].op); + break; + case TEST_STEP_3_HOP: + GNUNET_break (0); + } if (GNUNET_SCHEDULER_NO_TASK != abort_task) GNUNET_SCHEDULER_cancel (abort_task); if (GNUNET_SCHEDULER_NO_TASK != write_task) GNUNET_SCHEDULER_cancel (write_task); GNUNET_SCHEDULER_shutdown (); /* Shutdown this testcase */ + if (NULL != meter) + { + free_meter (meter); + meter = NULL; + } } @@ -335,14 +396,32 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); + abort_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test: ABORT\n"); if (GNUNET_SCHEDULER_NO_TASK != read_task) GNUNET_SCHEDULER_cancel (read_task); result = GNUNET_SYSERR; - abort_task = GNUNET_SCHEDULER_NO_TASK; do_shutdown (cls, tc); } + +/** + * Scheduler call back; to be executed when a new stream is connected + * Called from listen connect for peer2 + */ +static void +stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Task for calling STREAM_write with a chunk of random data + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + /** * The write completion function; called upon writing some data to stream or @@ -356,12 +435,29 @@ static void write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { struct PeerData *pdata = cls; + double throughput; + double prof_time_sec; - GNUNET_assert (GNUNET_STREAM_OK == status); + if (GNUNET_STREAM_OK != status) + { + GNUNET_SCHEDULER_cancel (abort_task); + abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL); + return; + } GNUNET_assert (size <= DATA_SIZE); - pdata->bytes_wrote += size; + pdata->bytes_wrote += size; + for (;size > 0; size--) + update_meter (meter); if (pdata->bytes_wrote < DATA_SIZE) /* Have more data to send */ - { + { + if (GNUNET_SCHEDULER_NO_TASK != abort_task) + { + GNUNET_SCHEDULER_cancel (abort_task); + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort, + NULL); + } pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, ((void *) data) + pdata->bytes_wrote, @@ -372,12 +468,31 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) } else { + free_meter (meter); + meter = NULL; prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time); - result = GNUNET_OK; - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000)); + throughput = (((float) sizeof (data)) / prof_time_sec); + PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00); + switch (result) + { + case INIT: + result = UPLINK_OK; + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == write_task); + pdata->bytes_read = 0; + meter = create_meter (sizeof (data), "Testing Downlink\n", GNUNET_YES); + read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer_data[0]); + write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, &peer_data[1]); + break; + case UPLINK_OK: + result = DOWNLINK_OK; + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + break; + case DOWNLINK_OK: + GNUNET_assert (0); + } } - for (;size > 0; size--) - update_meter (meter); } @@ -388,11 +503,18 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) * @param tc the task context */ static void -stream_write_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerData *pdata = cls; + if (GNUNET_SCHEDULER_NO_TASK != abort_task) + { + GNUNET_SCHEDULER_cancel (abort_task); + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort, + NULL); + } write_task = GNUNET_SCHEDULER_NO_TASK; prof_start_time = GNUNET_TIME_absolute_get (); pdata->bytes_wrote = 0; @@ -428,12 +550,16 @@ input_processor (void *cls, enum GNUNET_STREAM_Status status, { struct PeerData *pdata = cls; - GNUNET_assert (GNUNET_STREAM_OK == status); + if (GNUNET_STREAM_OK != status) + { + GNUNET_SCHEDULER_cancel (abort_task); + abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL); + return 0; + } GNUNET_assert (size < DATA_SIZE); GNUNET_assert (0 == memcmp (((void *)data ) + pdata->bytes_read, input_data, size)); - pdata->bytes_read += size; - + pdata->bytes_read += size; if (pdata->bytes_read < DATA_SIZE) { GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task); @@ -441,7 +567,6 @@ input_processor (void *cls, enum GNUNET_STREAM_Status status, } else { - /* Peer2 has completed reading*/ LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n"); } return size; @@ -481,9 +606,18 @@ stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, { struct PeerData *pdata = cls; + + if ((NULL == socket) || (NULL == initiator)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n"); + if (GNUNET_SCHEDULER_NO_TASK != abort_task) + GNUNET_SCHEDULER_cancel (abort_task); + abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL); + return GNUNET_OK; + } GNUNET_assert (NULL != socket); - GNUNET_assert (pdata == &peer_data[2]); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n", + GNUNET_assert (pdata == &peer_data[1]); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n", GNUNET_i2s(initiator)); pdata->socket = socket; pdata->bytes_read = 0; @@ -505,6 +639,7 @@ stream_open_cb (void *cls, struct PeerData *pdata = cls; GNUNET_assert (socket == pdata->socket); + meter = create_meter (sizeof (data), "Testing Uplink\n", GNUNET_YES); write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, pdata); } @@ -515,13 +650,13 @@ stream_open_cb (void *cls, static void stream_connect (void) { - peer_data[1].socket = - GNUNET_STREAM_open (config, &peer_data[2].self, 10, &stream_open_cb, - &peer_data[1], + peer_data[0].socket = + GNUNET_STREAM_open (config, &peer_data[1].id, 10, &stream_open_cb, + &peer_data[0], GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, payload_size[payload_size_index], GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer_data[1].socket); + GNUNET_assert (NULL != peer_data[0].socket); } @@ -537,21 +672,229 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_TESTING_Peer *peer) { - struct GNUNET_PeerIdentity self; + struct GNUNET_PeerIdentity id; - GNUNET_TESTING_peer_get_identity (peer, &self); + GNUNET_TESTING_peer_get_identity (peer, &id); config = cfg; peer2_listen_socket = - GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[2], + GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[1], GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, - &stream_connect, GNUNET_STREAM_OPTION_END); + &stream_connect, + GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, + payload_size[payload_size_index], + GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer2_listen_socket); - peer_data[1].self = self; - peer_data[2].self = self; + peer_data[0].id = id; + peer_data[1].id = id; abort_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, - NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort, + NULL); +} + + +/** + * Adapter function called to establish a connection to + * a service. + * + * @param cls closure + * @param cfg configuration of the peer to connect to; will be available until + * GNUNET_TESTBED_operation_done() is called on the operation returned + * from GNUNET_TESTBED_service_connect() + * @return service handle to return in 'op_result', NULL on error + */ +static void * +stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg); + + +/** + * Adapter function called to destroy a connection to + * a service. + * + * @param cls closure + * @param op_result service handle returned from the connect adapter + */ +static void +stream_da (void *cls, void *op_result) +{ + if (&peer_data[1] == cls) + { + GNUNET_STREAM_listen_close (op_result); + return; + } + else if (&peer_data[0] == cls) + { + GNUNET_STREAM_close (op_result); + return; + } + GNUNET_assert (0); +} + + +/** + * Listen success callback; connects a peer to stream as client. Called from + * testbed stream_ca + */ +static void +stream_connect2 (void) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n"); + peer_data[0].op = + GNUNET_TESTBED_service_connect (&peer_data[0], peer_data[0].peer, + "stream", NULL, NULL, stream_ca, + stream_da, &peer_data[0]); +} + + +/** + * Adapter function called to establish a connection to + * a service. + * + * @param cls closure + * @param cfg configuration of the peer to connect to; will be available until + * GNUNET_TESTBED_operation_done() is called on the operation returned + * from GNUNET_TESTBED_service_connect() + * @return service handle to return in 'op_result', NULL on error + */ +static void * +stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct PeerData *pdata = cls; + + if (&peer_data[1] == pdata) + { + peer2_listen_socket = NULL; + peer2_listen_socket = + GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, &peer_data[1], + GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, + &stream_connect2, + GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, + payload_size[payload_size_index], + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer2_listen_socket); + return peer2_listen_socket; + } + if (&peer_data[0] == pdata) + { + pdata->socket = + GNUNET_STREAM_open (cfg, &peer_data[1].id, 10, &stream_open_cb, + &peer_data[0], + GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, + payload_size[payload_size_index], + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != pdata->socket); + return pdata->socket; + } + GNUNET_assert (0); + return NULL; +} + + +/** + * Callback to be called when the requested peer information is available + * + * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information() + * @param op the operation this callback corresponds to + * @param pinfo the result; will be NULL if the operation has failed + * @param emsg error message if the operation has failed; will be NULL if the + * operation is successfull + */ +static void +peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op, + const struct GNUNET_TESTBED_PeerInformation *pinfo, + const char *emsg) +{ + struct PeerData *pdata = cb_cls; + + GNUNET_assert (NULL == emsg); + GNUNET_assert (common_op == op); + GNUNET_assert (NULL != pdata); + memcpy (&pdata->id, pinfo->result.id, sizeof (struct GNUNET_PeerIdentity)); + GNUNET_TESTBED_operation_done (op); + if (pdata == &peer_data[0]) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", + GNUNET_i2s (&pdata->id)); + common_op = GNUNET_TESTBED_peer_get_information (peer_data[1].peer, + GNUNET_TESTBED_PIT_IDENTITY, + &peerinfo_cb, &peer_data[1]); + } + else if (pdata == &peer_data[1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", + GNUNET_i2s (&pdata->id)); + if (TEST_STEP_2_HOP == test_step) + peer_data[1].op = + GNUNET_TESTBED_service_connect (&peer_data[1], peer_data[1].peer, + "stream", NULL, NULL, stream_ca, + stream_da, &peer_data[1]); + else + GNUNET_break (0); /* FIXME: 3 hop test case here... */ + } +} + + +/** + * Controller event callback + * + * @param cls NULL + * @param event the controller event + */ +static void +controller_event_cb (void *cls, + const struct GNUNET_TESTBED_EventInformation *event) +{ + switch (event->type) + { + case GNUNET_TESTBED_ET_OPERATION_FINISHED: + if (NULL != event->details.operation_finished.emsg) + { + FPRINTF (stderr, "Error while expecting an operation to succeed:%s \n", + event->details.operation_finished.emsg); + GNUNET_assert (0); + } + break; + case GNUNET_TESTBED_ET_CONNECT: + GNUNET_TESTBED_operation_done (common_op); + /* Get the peer identity and configuration of peers */ + common_op = + GNUNET_TESTBED_peer_get_information (peer_data[0].peer, + GNUNET_TESTBED_PIT_IDENTITY, + &peerinfo_cb, &peer_data[0]); + break; + default: + GNUNET_assert (0); + } +} + + +/** + * Signature of a main function for a testcase. + * + * @param cls closure + * @param num_peers number of peers in 'peers' + * @param peers handle to peers run in the testbed + */ +static void +test_master (void *cls, unsigned int num_peers_, + struct GNUNET_TESTBED_Peer **peers) +{ + GNUNET_assert (NULL != peers); + GNUNET_assert (NULL != peers[0]); + GNUNET_assert (NULL != peers[1]); + GNUNET_assert (num_peers_ == num_peers); + peer_data[0].peer = peers[0]; + peer_data[1].peer = peers[1]; + if (2 == num_peers) + common_op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL, + peer_data[0].peer, + peer_data[1].peer); + else + GNUNET_break (0); + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 120), &do_abort, + NULL); } @@ -560,11 +903,9 @@ run (void *cls, */ int main (int argc, char **argv) { - char *pmsg; char *test_name = "perf_stream_api"; char *cfg_file = "test_stream_local.conf"; - double throughput; - double prof_time_sec; + uint64_t event_mask; unsigned int count; int ret; @@ -577,31 +918,37 @@ int main (int argc, char **argv) } reset_meter (meter); free_meter (meter); + meter = NULL; test_step = TEST_STEP_1_HOP; - for (payload_size_index = 0; + for (payload_size_index = 0; payload_size_index < (sizeof (payload_size) / sizeof (uint16_t)); payload_size_index++) { - GNUNET_asprintf (&pmsg, "\nTesting over loopback with payload size %hu\n", - payload_size[payload_size_index]); - meter = create_meter (sizeof (data), pmsg, GNUNET_YES); - GNUNET_free (pmsg); - result = GNUNET_SYSERR; + PRINTF ("\nTesting over loopback with payload size %hu\n", + payload_size[payload_size_index]); + (void) memset (peer_data, 0, sizeof (peer_data)); + result = INIT; ret = GNUNET_TESTING_peer_run (test_name, cfg_file, &run, NULL); - free_meter (meter); - if ((0 != ret) || (GNUNET_OK != result)) + if ((0 != ret) || (DOWNLINK_OK != result)) goto return_fail; - prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000)); - throughput = (((float) sizeof (data)) / prof_time_sec); - //PRINTF ("Profiling time %llu ms = %.2f sec\n", prof_time.rel_value, prof_time_sec); - PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00); } test_step = TEST_STEP_2_HOP; - for (payload_size_index = 0; + num_peers = 2; + event_mask = 0; + event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT); + event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED); + for (payload_size_index = 0; payload_size_index < (sizeof (payload_size) / sizeof (uint16_t)); payload_size_index++) { - /* Initialize testbed here */ + PRINTF ("\nTesting over 1 hop with payload size %hu\n", + payload_size[payload_size_index]); + (void) memset (peer_data, 0, sizeof (peer_data)); + result = INIT; + GNUNET_TESTBED_test_run (test_name, cfg_file, num_peers, event_mask, + &controller_event_cb, NULL, &test_master, NULL); + if (DOWNLINK_OK != result) + goto return_fail; } test_step = TEST_STEP_3_HOP; for (payload_size_index = 0; @@ -610,9 +957,11 @@ int main (int argc, char **argv) { /* Initialize testbed here */ } - return ret; + return 0; return_fail: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test failed\n"); return 1; } + +/* end of perf_stream_api.c */