From 04fe186c8825f1eba376a092772095bc20bf353a Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Tue, 11 Dec 2012 10:51:28 +0000 Subject: [PATCH] allow calling GNUNET_STREAM_read() from DataProcessor callback --- src/include/gnunet_stream_lib.h | 10 ++-- src/stream/stream_api.c | 102 +++++++++++++++++++++++--------- 2 files changed, 78 insertions(+), 34 deletions(-) diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index 7097f8a5b..909c659d7 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -345,18 +345,18 @@ typedef size_t (*GNUNET_STREAM_DataProcessor) (void *cls, /** - * Tries to read data from the stream. + * Tries to read data from the stream. Should not be called when another read + * handle is present; the existing read handle should be canceled with + * GNUNET_STREAM_io_read_cancel(). Only one read handle per socket is present at + * any time * * @param socket the socket representing a stream * @param timeout the timeout period * @param proc function to call with data (once only) * @param proc_cls the closure for proc - * * @return handle to cancel the operation; NULL is returned if: the stream has * been shutdown for this type of opeartion (the DataProcessor is - * immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another - * read handle is present (only one read handle per socket is present - * at any time) + * immediately called with GNUNET_STREAM_SHUTDOWN as status) */ struct GNUNET_STREAM_IOReadHandle * GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 9c6056c01..1ca4031f2 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -537,6 +537,12 @@ struct GNUNET_STREAM_IOReadHandle * Task scheduled to continue a read operation. */ GNUNET_SCHEDULER_TaskIdentifier read_task_id; + + /** + * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call + * the read processor task + */ + GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id; }; @@ -944,6 +950,32 @@ write_data (struct GNUNET_STREAM_Socket *socket) } +/** + * Cleansup the sockets read handle + * + * @param socket the socket whose read handle has to be cleanedup + */ +static void +cleanup_read_handle (struct GNUNET_STREAM_Socket *socket) +{ + struct GNUNET_STREAM_IOReadHandle *read_handle; + + read_handle = socket->read_handle; + /* Read io time task should be there; if it is already executed then this + read handle is not valid; However upon scheduler shutdown the read io task + may be executed before */ + if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id) + GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id); + /* reading task may be present; if so we have to stop it */ + if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id) + GNUNET_SCHEDULER_cancel (read_handle->read_task_id); + if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id) + GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id); + GNUNET_free (read_handle); + socket->read_handle = NULL; +} + + /** * Task for calling the read processor * @@ -956,6 +988,8 @@ call_read_processor (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; struct GNUNET_STREAM_IOReadHandle *read_handle; + GNUNET_STREAM_DataProcessor proc; + void *proc_cls; size_t read_size; size_t valid_read_size; unsigned int packet; @@ -969,8 +1003,7 @@ call_read_processor (void *cls, return; if (NULL == socket->receive_buffer) return; - GNUNET_assert (NULL != socket->read_handle); - GNUNET_assert (NULL != socket->read_handle->proc); + GNUNET_assert (NULL != read_handle->proc); /* Check the bitmap for any holes */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) { @@ -983,24 +1016,19 @@ call_read_processor (void *cls, valid_read_size = socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; GNUNET_assert (0 != valid_read_size); - /* Cancel the read_io_timeout_task */ - GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id); - read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + proc = read_handle->proc; + proc_cls = read_handle->proc_cls; + cleanup_read_handle (socket); /* Call the data processor */ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", GNUNET_i2s (&socket->other_peer)); - read_size = - socket->read_handle->proc (socket->read_handle->proc_cls, - socket->status, - socket->receive_buffer + socket->copy_offset, - valid_read_size); + read_size = proc (proc_cls, socket->status, + socket->receive_buffer + socket->copy_offset, + valid_read_size); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n", GNUNET_i2s (&socket->other_peer), read_size); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n", GNUNET_i2s (&socket->other_peer)); - /* Free the read handle */ - GNUNET_free (socket->read_handle); - socket->read_handle = NULL; GNUNET_assert (read_size <= valid_read_size); socket->copy_offset += read_size; /* Determine upto which packet we can remove from the buffer */ @@ -3554,7 +3582,34 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, /** - * Tries to read data from the stream. + * Function to check the ACK bitmap for any received messages and call the data processor + * + * @param cls the socket + * @param tc the scheduler task context + */ +static void +probe_data_availability (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + + GNUNET_assert (NULL != socket->read_handle); + socket->read_handle->probe_data_availability_task_id = + GNUNET_SCHEDULER_NO_TASK; + if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id) + return; /* A task to call read processor is present */ + if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, + 0)) + socket->read_handle->read_task_id + = GNUNET_SCHEDULER_add_now (&call_read_processor, socket); +} + + + +/** + * Tries to read data from the stream. Should not be called when another read + * handle is present; the existing read handle should be canceled with + * GNUNET_STREAM_io_read_cancel(). Only one read handle per socket is present at any time * * @param socket the socket representing a stream * @param timeout the timeout period @@ -3583,7 +3638,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, first before continuing or has to wait until it is completed */ if (NULL != socket->read_handle) { - GNUNET_break (0); + GNUNET_assert (0); return NULL; } GNUNET_assert (NULL != proc); @@ -3607,10 +3662,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, read_handle->proc_cls = proc_cls; read_handle->socket = socket; socket->read_handle = read_handle; - if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, - 0)) - read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, - socket); + read_handle->probe_data_availability_task_id = + GNUNET_SCHEDULER_add_now (&probe_data_availability, socket); read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n", @@ -3660,16 +3713,7 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh) socket = ioh->socket; GNUNET_assert (NULL != socket->read_handle); GNUNET_assert (ioh == socket->read_handle); - /* Read io time task should be there; if it is already executed then this - read handle is not valid; However upon scheduler shutdown the read io task - may be executed before */ - if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id) - GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id); - /* reading task may be present; if so we have to stop it */ - if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id) - GNUNET_SCHEDULER_cancel (ioh->read_task_id); - GNUNET_free (ioh); - socket->read_handle = NULL; + cleanup_read_handle (socket); } /* end of stream_api.c */ -- 2.25.1