From 32039d08a7fd13dfcbe5d5d1548a1482afc29187 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Tue, 18 Sep 2012 15:46:24 +0000 Subject: [PATCH] 1 hop performance testing --- src/stream/perf_stream_api.c | 305 ++++++++++++++++++++++++++++++++--- 1 file changed, 280 insertions(+), 25 deletions(-) diff --git a/src/stream/perf_stream_api.c b/src/stream/perf_stream_api.c index f9f91b5f9..96507834c 100644 --- a/src/stream/perf_stream_api.c +++ b/src/stream/perf_stream_api.c @@ -102,8 +102,6 @@ struct PeerData */ struct GNUNET_STREAM_Socket *socket; - struct GNUNET_PeerIdentity self; - /** * Peer's io write handle */ @@ -114,6 +112,21 @@ struct PeerData */ struct GNUNET_STREAM_IOReadHandle *io_read_handle; + /** + * The peer handle when we use the testbed servie + */ + struct GNUNET_TESTBED_Peer *peer; + + /** + * Handle to peer specific opearations while using testbed service + */ + struct GNUNET_TESTBED_Operation *op; + + /** + * The identity of this peer + */ + struct GNUNET_PeerIdentity id; + /** * Bytes the peer has written */ @@ -151,6 +164,11 @@ static struct ProgressMeter *meter; */ static struct PeerData peer_data[3]; +/** + * Handle to common operations while using testbed + */ +static struct GNUNET_TESTBED_Operation *common_op; + /** * Task ID for abort task */ @@ -185,12 +203,7 @@ static uint32_t data[DATA_SIZE / 4]; * Payload sizes to test each major test with */ static uint16_t payload_size[] = -{ 20, 500, 2000, 7000, 13000, 25000, 50000, 60000, 63000, 64000 }; - -/** - * Handle for the progress meter - */ -static struct ProgressMeter *meter; +{ 50000 }; //{ 20, 500, 2000, 7000, 13000, 25000, 50000, 60000, 63000, 64000 }; /** * Current step of testing @@ -202,6 +215,11 @@ static enum TestStep test_step; */ static unsigned int payload_size_index; +/** + * Number of peers we want to create while using the testbed service + */ +static int num_peers; + /** * Testing result of a major test */ @@ -316,11 +334,27 @@ free_meter (struct ProgressMeter *meter) static void do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_STREAM_close (peer_data[1].socket); - if (NULL != peer_data[2].socket) - GNUNET_STREAM_close (peer_data[2].socket); - if (NULL != peer2_listen_socket) - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ + switch (test_step) + { + case TEST_STEP_1_HOP: + if (NULL != peer_data[0].socket) + GNUNET_STREAM_close (peer_data[0].socket); + if (NULL != peer_data[1].socket) + GNUNET_STREAM_close (peer_data[1].socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ + break; + case TEST_STEP_2_HOP: + if (NULL != peer_data[1].socket) + GNUNET_STREAM_close (peer_data[1].socket); + if (NULL != peer_data[0].op) + GNUNET_TESTBED_operation_done (peer_data[0].op); + if (NULL != peer_data[1].op) + GNUNET_TESTBED_operation_done (peer_data[1].op); + break; + case TEST_STEP_3_HOP: + GNUNET_break (0); + } if (GNUNET_SCHEDULER_NO_TASK != abort_task) GNUNET_SCHEDULER_cancel (abort_task); if (GNUNET_SCHEDULER_NO_TASK != write_task) @@ -482,8 +516,8 @@ stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, struct PeerData *pdata = cls; GNUNET_assert (NULL != socket); - GNUNET_assert (pdata == &peer_data[2]); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n", + GNUNET_assert (pdata == &peer_data[1]); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n", GNUNET_i2s(initiator)); pdata->socket = socket; pdata->bytes_read = 0; @@ -515,13 +549,13 @@ stream_open_cb (void *cls, static void stream_connect (void) { - peer_data[1].socket = - GNUNET_STREAM_open (config, &peer_data[2].self, 10, &stream_open_cb, - &peer_data[1], + peer_data[0].socket = + GNUNET_STREAM_open (config, &peer_data[1].id, 10, &stream_open_cb, + &peer_data[0], GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, payload_size[payload_size_index], GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer_data[1].socket); + GNUNET_assert (NULL != peer_data[0].socket); } @@ -537,17 +571,17 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_TESTING_Peer *peer) { - struct GNUNET_PeerIdentity self; + struct GNUNET_PeerIdentity id; - GNUNET_TESTING_peer_get_identity (peer, &self); + GNUNET_TESTING_peer_get_identity (peer, &id); config = cfg; peer2_listen_socket = - GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[2], + GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[1], GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, &stream_connect, GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer2_listen_socket); - peer_data[1].self = self; - peer_data[2].self = self; + peer_data[0].id = id; + peer_data[1].id = id; abort_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort, @@ -555,6 +589,208 @@ run (void *cls, } +/** + * Adapter function called to establish a connection to + * a service. + * + * @param cls closure + * @param cfg configuration of the peer to connect to; will be available until + * GNUNET_TESTBED_operation_done() is called on the operation returned + * from GNUNET_TESTBED_service_connect() + * @return service handle to return in 'op_result', NULL on error + */ +static void * +stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg); + + +/** + * Adapter function called to destroy a connection to + * a service. + * + * @param cls closure + * @param op_result service handle returned from the connect adapter + */ +static void +stream_da (void *cls, void *op_result) +{ + if (&peer_data[1] == cls) + { + GNUNET_STREAM_listen_close (op_result); + return; + } + else if (&peer_data[0] == cls) + { + GNUNET_STREAM_close (op_result); + return; + } + GNUNET_assert (0); +} + + +/** + * Listen success callback; connects a peer to stream as client. Called from + * testbed stream_ca + */ +static void +stream_connect2 (void) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n"); + peer_data[0].op = + GNUNET_TESTBED_service_connect (&peer_data[0], peer_data[0].peer, + "stream", NULL, NULL, stream_ca, + stream_da, &peer_data[0]); +} + + +/** + * Adapter function called to establish a connection to + * a service. + * + * @param cls closure + * @param cfg configuration of the peer to connect to; will be available until + * GNUNET_TESTBED_operation_done() is called on the operation returned + * from GNUNET_TESTBED_service_connect() + * @return service handle to return in 'op_result', NULL on error + */ +static void * +stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct PeerData *pdata = cls; + + if (&peer_data[1] == pdata) + { + peer2_listen_socket = NULL; + peer2_listen_socket = + GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, &peer_data[1], + GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, + &stream_connect2, GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer2_listen_socket); + return peer2_listen_socket; + } + if (&peer_data[0] == pdata) + { + pdata->socket = + GNUNET_STREAM_open (cfg, &peer_data[1].id, 10, &stream_open_cb, + &peer_data[0], + GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, + payload_size[payload_size_index], + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != pdata->socket); + return pdata->socket; + } + GNUNET_assert (0); + return NULL; +} + + +/** + * Callback to be called when the requested peer information is available + * + * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information() + * @param op the operation this callback corresponds to + * @param pinfo the result; will be NULL if the operation has failed + * @param emsg error message if the operation has failed; will be NULL if the + * operation is successfull + */ +static void +peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op, + const struct GNUNET_TESTBED_PeerInformation *pinfo, + const char *emsg) +{ + struct PeerData *pdata = cb_cls; + + GNUNET_assert (NULL == emsg); + GNUNET_assert (common_op == op); + GNUNET_assert (NULL != pdata); + memcpy (&pdata->id, pinfo->result.id, sizeof (struct GNUNET_PeerIdentity)); + GNUNET_TESTBED_operation_done (op); + if (pdata == &peer_data[0]) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", + GNUNET_i2s (&pdata->id)); + common_op = GNUNET_TESTBED_peer_get_information (peer_data[1].peer, + GNUNET_TESTBED_PIT_IDENTITY, + &peerinfo_cb, &peer_data[1]); + } + else if (pdata == &peer_data[1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", + GNUNET_i2s (&pdata->id)); + if (TEST_STEP_2_HOP == test_step) + peer_data[1].op = + GNUNET_TESTBED_service_connect (&peer_data[1], peer_data[1].peer, + "stream", NULL, NULL, stream_ca, + stream_da, &peer_data[1]); + else + GNUNET_break (0); /* FIXME: 3 hop test case here... */ + } +} + + +/** + * Controller event callback + * + * @param cls NULL + * @param event the controller event + */ +static void +controller_event_cb (void *cls, + const struct GNUNET_TESTBED_EventInformation *event) +{ + switch (event->type) + { + case GNUNET_TESTBED_ET_OPERATION_FINISHED: + if (NULL != event->details.operation_finished.emsg) + { + FPRINTF (stderr, "Error while expecting an operation to succeed:%s \n", + event->details.operation_finished.emsg); + GNUNET_assert (0); + } + break; + case GNUNET_TESTBED_ET_CONNECT: + GNUNET_TESTBED_operation_done (common_op); + /* Get the peer identity and configuration of peers */ + common_op = + GNUNET_TESTBED_peer_get_information (peer_data[0].peer, + GNUNET_TESTBED_PIT_IDENTITY, + &peerinfo_cb, &peer_data[0]); + break; + default: + GNUNET_assert (0); + } +} + + +/** + * Signature of a main function for a testcase. + * + * @param cls closure + * @param num_peers number of peers in 'peers' + * @param peers handle to peers run in the testbed + */ +static void +test_master (void *cls, unsigned int num_peers_, + struct GNUNET_TESTBED_Peer **peers) +{ + GNUNET_assert (NULL != peers); + GNUNET_assert (NULL != peers[0]); + GNUNET_assert (NULL != peers[1]); + GNUNET_assert (num_peers_ == num_peers); + peer_data[0].peer = peers[0]; + peer_data[1].peer = peers[1]; + if (2 == num_peers) + common_op = GNUNET_TESTBED_overlay_connect (NULL, NULL, NULL, + peer_data[0].peer, + peer_data[1].peer); + else + GNUNET_break (0); + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort, + NULL); +} + + /** * Main function */ @@ -563,6 +799,7 @@ int main (int argc, char **argv) char *pmsg; char *test_name = "perf_stream_api"; char *cfg_file = "test_stream_local.conf"; + uint64_t event_mask; double throughput; double prof_time_sec; unsigned int count; @@ -584,6 +821,7 @@ int main (int argc, char **argv) { GNUNET_asprintf (&pmsg, "\nTesting over loopback with payload size %hu\n", payload_size[payload_size_index]); + (void) memset (peer_data, 0, sizeof (peer_data)); meter = create_meter (sizeof (data), pmsg, GNUNET_YES); GNUNET_free (pmsg); result = GNUNET_SYSERR; @@ -597,11 +835,28 @@ int main (int argc, char **argv) PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00); } test_step = TEST_STEP_2_HOP; + num_peers = 2; + event_mask = 0; + event_mask |= (1LL << GNUNET_TESTBED_ET_CONNECT); + event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED); for (payload_size_index = 0; payload_size_index < (sizeof (payload_size) / sizeof (uint16_t)); payload_size_index++) { - /* Initialize testbed here */ + (void) memset (peer_data, 0, sizeof (peer_data)); + GNUNET_asprintf (&pmsg, "\nTesting over 1 hop with payload size %hu\n", + payload_size[payload_size_index]); + meter = create_meter (sizeof (data), pmsg, GNUNET_YES); + GNUNET_free (pmsg); + result = GNUNET_SYSERR; + GNUNET_TESTBED_test_run (test_name, cfg_file, num_peers, event_mask, + &controller_event_cb, NULL, &test_master, NULL); + free_meter (meter); + if (GNUNET_OK != result) + goto return_fail; + prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000)); + throughput = (((float) sizeof (data)) / prof_time_sec); + PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00); } test_step = TEST_STEP_3_HOP; for (payload_size_index = 0; -- 2.25.1