From bb613dd9d32c66e3eb6d87f583d2be137d4137e6 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sun, 25 Mar 2012 13:33:20 +0000 Subject: [PATCH] -added write io cancellation --- src/include/gnunet_stream_lib.h | 22 ++++++-- src/stream/README | 12 ++--- src/stream/stream_api.c | 95 ++++++++++++++++++--------------- src/stream/test_stream_local.c | 16 ++++-- 4 files changed, 91 insertions(+), 54 deletions(-) diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index 78b11d1c5..5a2f9b2e4 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -217,13 +217,17 @@ struct GNUNET_STREAM_IOWriteHandle; struct GNUNET_STREAM_IOReadHandle; /** - * Tries to write the given data to the stream + * Tries to write the given data to the stream. The maximum size of data that + * can be written as part of a write operation is (64 * (64000 - sizeof (struct + * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API + * violation, however only the said number of maximum bytes will be written. * * @param socket the socket representing a stream * @param data the data buffer from where the data is written into the stream * @param size the number of bytes to be written from the data buffer * @param timeout the timeout period - * @param write_cont the function to call upon writing some bytes into the stream + * @param write_cont the function to call upon writing some bytes into the + * stream * @param write_cont_cls the closure * @return handle to cancel the operation; NULL if a previous write is pending */ @@ -270,7 +274,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, /** - * Cancel pending write operation. + * Cancels pending write operation. Also cancels packet retransmissions which + * may have resulted otherwise. + * + * CAUTION: Normally a write operation is considered successful if the data + * given to it is sent and acknowledged by the receiver. As data is divided + * into packets, it is possible that not all packets are received by the + * receiver. Any missing packets are then retransmitted till the receiver + * acknowledges all packets or until a timeout . During this scenario if the + * write operation is cancelled all such retransmissions are also + * cancelled. This may leave the receiver's receive buffer incompletely filled + * as some missing packets are never retransmitted. So this operation should be + * used before shutting down transmission from our side or before closing the + * socket. * * @param ioh handle to operation to cancel */ diff --git a/src/stream/README b/src/stream/README index 9b550b09b..977ca2d49 100644 --- a/src/stream/README +++ b/src/stream/README @@ -1,11 +1,11 @@ -The aim of the stream library is to provide stream connections between peers in -GNUnet. This is a convenience library which hides the complexity of dividing -data stream into packets, transmitting them and retransmitting them in case of +Stream library provides stream connections between peers in GNUnet. This is a +convenience library which hides the complexity of dividing data stream into +packets, transmitting them and retransmitting them in case of communication errors. This library's API are similar to unix PIPE API. The user is expected to open a stream to a listening target peer. Once the stream is established, the user can -use it as a pipe. Any data written into the stream will be readable by the -target peer. +use it as a pipe. Any data written into the stream at one peer will be readable +by the other peer and vice versa. -This library uses mesh API for establishing streams between peers. \ No newline at end of file +This library uses mesh API for establishing tunnels between peers. \ No newline at end of file diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 336ba0204..0749d8c5f 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -25,6 +25,8 @@ * * Decrement PEER intern count during socket close and listen close to free the * memory used for PEER interning + * + * Add code for write io timeout **/ /** @@ -32,6 +34,8 @@ * @brief Implementation of the stream library * @author Sree Harsha Totakura */ + + #include "platform.h" #include "gnunet_common.h" #include "gnunet_crypto_lib.h" @@ -46,15 +50,15 @@ #define MAX_PACKET_SIZE 64000 /** - * The maximum payload a data message packet can carry + * Receive buffer */ -static size_t max_payload_size = - MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); +#define RECEIVE_BUFFER_SIZE 4096000 /** - * Receive buffer + * The maximum payload a data message packet can carry */ -#define RECEIVE_BUFFER_SIZE 4096000 +static size_t max_payload_size = + MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); /** * states in the Protocol @@ -239,7 +243,7 @@ struct GNUNET_STREAM_Socket /** * Task identifier for the read io timeout task */ - GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; + GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; /** * Task identifier for retransmission task after timeout @@ -373,6 +377,11 @@ struct GNUNET_STREAM_ListenSocket */ struct GNUNET_STREAM_IOWriteHandle { + /** + * The socket to which this write handle is associated + */ + struct GNUNET_STREAM_Socket *socket; + /** * The packet_buffers associated with this Handle */ @@ -398,13 +407,6 @@ struct GNUNET_STREAM_IOWriteHandle * Number of bytes in this write handle */ size_t size; - - /** - * Number of packets sent before waiting for an ack - * - * FIXME: Do we need this? - */ - unsigned int sent_packets; }; @@ -717,23 +719,6 @@ ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, } - -/** - * Function called when Data Message is sent - * - * @param cls the io_handle corresponding to the Data Message - * @param socket the socket which was used - */ -static void -write_data_finish_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) -{ - struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; - - io_handle->sent_packets++; -} - - /** * Writes data using the given socket. The amount of data written is limited by * the receiver_window_size @@ -788,15 +773,15 @@ write_data (struct GNUNET_STREAM_Socket *socket) ntohl (io_handle->messages[packet]->sequence_number)); copy_and_queue_message (socket, &io_handle->messages[packet]->header, - &write_data_finish_cb, - io_handle); + NULL, + NULL); packet++; } if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), + (GNUNET_TIME_UNIT_SECONDS, 8), &retransmission_timeout_task, socket); } @@ -810,7 +795,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) */ static void call_read_processor (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; size_t read_size; @@ -842,8 +827,8 @@ call_read_processor (void *cls, GNUNET_assert (0 != valid_read_size); /* Cancel the read_io_timeout_task */ - GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task); - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; /* Call the data processor */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -891,7 +876,8 @@ call_read_processor (void *cls, memmove (socket->receive_buffer, socket->receive_buffer + socket->receive_buffer_boundaries[sequence_increase-1], - socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); + socket->receive_buffer_size + - socket->receive_buffer_boundaries[sequence_increase-1]); /* Shift the bitmap */ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; @@ -936,7 +922,7 @@ read_io_timeout (void *cls, GNUNET_STREAM_DataProcessor proc; void *proc_cls; - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1129,6 +1115,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, return GNUNET_YES; } + /** * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA * @@ -2388,6 +2375,9 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) { struct MessageQueue *head; + GNUNET_break (NULL == socket->read_handle); + GNUNET_break (NULL == socket->write_handle); + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { /* socket closed with read task pending!? */ @@ -2548,6 +2538,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); + io_handle->socket = socket; io_handle->write_cont = write_cont; io_handle->write_cont_cls = write_cont_cls; io_handle->size = size; @@ -2642,9 +2633,10 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, } /* Setup the read timeout task */ - socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, - &read_io_timeout, - socket); + socket->read_io_timeout_task_id = + GNUNET_SCHEDULER_add_delayed (timeout, + &read_io_timeout, + socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%x: %s() END\n", socket->our_id, @@ -2661,7 +2653,26 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, void GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) { - /* FIXME: Should cancel the write retransmission task */ + struct GNUNET_STREAM_Socket *socket = ioh->socket; + unsigned int packet; + + GNUNET_assert (NULL != socket->write_handle); + GNUNET_assert (socket->write_handle == ioh); + + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } + + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == ioh->messages[packet]) break; + GNUNET_free (ioh->messages[packet]); + } + + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; return; } diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 535ee62a2..c3fcc6492 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -101,6 +101,9 @@ static GNUNET_SCHEDULER_TaskIdentifier read_task; static char *data = "ABCD"; static int result; +static int writing_success; +static int reading_success; + /** * Check whether peers successfully shut down. @@ -197,9 +200,8 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - struct PeerData *peer; + struct PeerData *peer = cls; - peer = (struct PeerData *) cls; GNUNET_assert (GNUNET_STREAM_OK == status); GNUNET_assert (size <= strlen (data)); peer->bytes_wrote += size; @@ -232,6 +234,12 @@ write_completion (void *cls, cls); GNUNET_assert (NULL!=peer->io_read_handle); } + else + { + writing_success = GNUNET_YES; + if (GNUNET_YES == reading_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + } } } @@ -335,7 +343,9 @@ input_processor (void *cls, } else /* Peer1 has completed reading. End of tests */ { - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + reading_success = GNUNET_YES; + if (GNUNET_YES == writing_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); } } return size; -- 2.25.1