From e444070fbb20f49bb55afd5d6158a9b42333557f Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Fri, 15 Jun 2012 15:19:08 +0000 Subject: [PATCH] stream misc fixing --- src/stream/stream_api.c | 66 ++++++++++++++++++++++++++---------- src/stream/test_stream_big.c | 55 ++++++++++++++++++------------ 2 files changed, 83 insertions(+), 38 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 5d22ff7b3..63e27ea98 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -470,6 +470,22 @@ struct GNUNET_STREAM_ShutdownHandle static unsigned int default_timeout = 10; +/** + * Function to print the contents of an address location. Used only for debugging + * + * @param ptr the address location; Should be more than 5 bytes long + */ +static void +debug_print_contents (const void *ptr) +{ + /* const char *c; */ + + /* c = ptr; */ + /* LOG (GNUNET_ERROR_TYPE_DEBUG, */ + /* "--- contents: %u %u %u %u %u\n", c[0], c[1], c[2], c[3], c[4]); */ +} + + /** * Callback function for sending queued message * @@ -830,6 +846,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) "%s: Placing DATA message with sequence %u in send queue\n", GNUNET_i2s (&socket->other_peer), ntohl (io_handle->messages[packet]->sequence_number)); + debug_print_contents(&(io_handle->messages[packet][1])); copy_and_queue_message (socket, &io_handle->messages[packet]->header, NULL, @@ -849,6 +866,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) "%s: Placing DATA message with sequence %u in send queue\n", GNUNET_i2s (&socket->other_peer), ntohl (io_handle->messages[packet]->sequence_number)); + debug_print_contents(&(io_handle->messages[packet][1])); copy_and_queue_message (socket, &io_handle->messages[packet]->header, NULL, @@ -910,6 +928,7 @@ call_read_processor (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", GNUNET_i2s (&socket->other_peer)); + debug_print_contents (socket->receive_buffer + socket->copy_offset); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, @@ -943,11 +962,12 @@ call_read_processor (void *cls, GNUNET_i2s (&socket->other_peer), sequence_increase); /* Shift the data in the receive buffer */ - memmove (socket->receive_buffer, - socket->receive_buffer - + socket->receive_buffer_boundaries[sequence_increase-1], - socket->receive_buffer_size - - socket->receive_buffer_boundaries[sequence_increase-1]); + socket->receive_buffer = + memmove (socket->receive_buffer, + socket->receive_buffer + + socket->receive_buffer_boundaries[sequence_increase-1], + socket->receive_buffer_size + - socket->receive_buffer_boundaries[sequence_increase-1]); /* Shift the bitmap */ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; /* Set read_sequence_number */ @@ -963,9 +983,18 @@ call_read_processor (void *cls, { if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) { - socket->receive_buffer_boundaries[packet] = - socket->receive_buffer_boundaries[packet + sequence_increase] - - offset_increase; + uint32_t ahead_buffer_boundary; + + ahead_buffer_boundary = + socket->receive_buffer_boundaries[packet + sequence_increase]; + if (0 == ahead_buffer_boundary) + socket->receive_buffer_boundaries[packet] = 0; + else + { + GNUNET_assert (offset_increase < ahead_buffer_boundary); + socket->receive_buffer_boundaries[packet] = + ahead_buffer_boundary - offset_increase; + } } else socket->receive_buffer_boundaries[packet] = 0; @@ -1130,6 +1159,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, /* Copy Data to buffer */ payload = &msg[1]; + debug_print_contents(payload); GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); memcpy (socket->receive_buffer + relative_offset, payload, @@ -2418,22 +2448,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, } else /* We have to call the write continuation callback now */ { + struct GNUNET_STREAM_IOWriteHandle *write_handle; + /* Free the packets */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) { GNUNET_free_non_null (socket->write_handle->messages[packet]); } - if (NULL != socket->write_handle->write_cont) - socket->write_handle->write_cont - (socket->write_handle->write_cont_cls, - socket->status, - socket->write_handle->size); + write_handle = socket->write_handle; + socket->write_handle = NULL; + if (NULL != write_handle->write_cont) + write_handle->write_cont (write_handle->write_cont_cls, + socket->status, + write_handle->size); + /* We are done with the write handle - Freeing it */ + GNUNET_free (write_handle); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Write completion callback completed\n", - GNUNET_i2s (&socket->other_peer)); - /* We are done with the write handle - Freeing it */ - GNUNET_free (socket->write_handle); - socket->write_handle = NULL; + GNUNET_i2s (&socket->other_peer)); } break; default: diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c index b3e6fb20c..001c4f67e 100644 --- a/src/stream/test_stream_big.c +++ b/src/stream/test_stream_big.c @@ -89,6 +89,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_STREAM_close (peer1.socket); if (NULL != peer2.socket) GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); if (0 != abort_task) { @@ -152,7 +154,7 @@ write_completion (void *cls, { peer->io_write_handle = GNUNET_STREAM_write (peer->socket, - (void *) data, + ((void *) data) + peer->bytes_wrote, DATA_SIZE - peer->bytes_wrote, GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), @@ -221,10 +223,18 @@ stream_open_cb (void *cls, } +/** + * Scheduler call back; to be executed when a new stream is connected + * Called from listen connect for peer2 + */ +static void +stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + /** * Input processor * - * @param cls the closure from GNUNET_STREAM_write/read + * @param cls peer2 * @param status the status of the stream at the time this function is called * @param data traffic from the other side * @param size the number of bytes available in data read @@ -240,21 +250,23 @@ input_processor (void *cls, struct PeerData *peer = cls; GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (&peer2 == peer); GNUNET_assert (size < DATA_SIZE); - GNUNET_assert (memcmp (data + peer->bytes_read, - input_data, - size)); + GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read, + input_data, size)); peer->bytes_read += size; if (peer->bytes_read < DATA_SIZE) { - peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_read_handle); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task); + read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); + /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */ + /* peer->socket, */ + /* GNUNET_TIME_relative_multiply */ + /* (GNUNET_TIME_UNIT_SECONDS, 5), */ + /* &input_processor, */ + /* cls); */ + /* GNUNET_assert (NULL != peer->io_read_handle); */ } else { @@ -274,18 +286,17 @@ static void stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; + struct PeerData *peer = cls; read_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (socket == peer2.socket); - peer2.bytes_read = 0; - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ - peer2.io_read_handle = - GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, + GNUNET_assert (&peer2 == peer); + peer->io_read_handle = + GNUNET_STREAM_read (peer->socket, GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), &input_processor, - cls); - GNUNET_assert (NULL != peer2.io_read_handle); + peer); + GNUNET_assert (NULL != peer->io_read_handle); } @@ -311,7 +322,8 @@ stream_listen_cb (void *cls, "Peer connected: %s\n", GNUNET_i2s(initiator)); peer2.socket = socket; - read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, socket); + peer2.bytes_read = 0; + read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); return GNUNET_OK; } @@ -347,6 +359,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_assert (NULL != peer1.socket); } + /** * Initialize framework and start test */ @@ -366,7 +379,7 @@ run (void *cls, char *const *args, const char *cfgfile, abort_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort, + (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, NULL); test_task = GNUNET_SCHEDULER_add_now (&test, NULL); -- 2.25.1