From: Sree Harsha Totakura Date: Mon, 27 Feb 2012 11:10:56 +0000 (+0000) Subject: -added prepare_buffer_for_read X-Git-Tag: initial-import-from-subversion-38251~14617 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=a48e053620c5081570710ff072fd857ba73b0f8d;p=oweals%2Fgnunet.git -added prepare_buffer_for_read --- diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 6ae3dbefe..2ccf4e892 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -663,6 +663,128 @@ write_data (struct GNUNET_STREAM_Socket *socket) } +/** + * Task for calling the read processor + * + * @param cls the socket + * @param tc the task context + */ +static void +call_read_processor_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + size_t read_size; + size_t valid_read_size; + + if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; + + GNUNET_assert (NULL != socket->read_handle); + GNUNET_assert (NULL != socket->read_handle->proc); + GNUNET_assert (NULL != socket->copy_buffer); + GNUNET_assert (0 != socket->copy_buffer_size); + + valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset; + GNUNET_assert (0 != valid_read_size); + + read_size = socket->read_handle->proc (socket->read_handle->proc_cls, + socket->status, + socket->copy_buffer + + socket->copy_buffer_read_offset, + valid_read_size); + + GNUNET_assert (read_size <= valid_read_size); + socket->copy_buffer_read_offset += read_size; + + /* Free the copy buffer once it has been read entirely */ + if (socket->copy_buffer_read_offset == socket->copy_buffer_size) + { + GNUNET_free (socket->copy_buffer); + socket->copy_buffer = NULL; + socket->copy_buffer_size = 0; + socket->copy_buffer_read_offset = 0; + } + + /* Free the read handle */ + GNUNET_free (socket->read_handle); + socket->read_handle = NULL; +} + + +/** + * Prepares the receive buffer for possible reads; Should only be called when + * there is a valid READ io request pending and socket->copy_buffer is empty + * + * @param socket the socket pointer + */ +static void +prepare_buffer_for_read (struct GNUNET_STREAM_Socket *socket) +{ + unsigned int packet; + uint32_t offset_increase; + uint32_t sequence_increase; + + GNUNET_assert (NULL == socket->copy_buffer); + GNUNET_assert (NULL != socket->read_handle); + + /* Check the bitmap for any holes */ + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, + packet)) + break; + } + + sequence_increase = packet; + + if (0 == sequence_increase) /* The first packet is still missing */ + { + return; + } + + /* Copy data to copy buffer */ + GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]); + socket->copy_buffer = + GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]); + memcpy (socket->copy_buffer, + socket->receive_buffer, + socket->receive_buffer_boundaries[sequence_increase-1]); + + /* Shift the data in the receive buffer */ + memmove (socket->receive_buffer, + socket->receive_buffer + + socket->receive_buffer_boundaries[sequence_increase-1], + socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); + + /* Shift the bitmap */ + socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; + + /* Set read_sequence_number */ + socket->read_sequence_number += sequence_increase; + + /* Set read_offset */ + offset_increase = socket->receive_buffer_boundaries[sequence_increase-1]; + socket->read_offset += offset_increase; + + /* Fix relative boundaries */ + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) + { + socket->receive_buffer_boundaries[packet] = + socket->receive_buffer_boundaries[packet + sequence_increase] + - offset_increase; + } + else + socket->receive_buffer_boundaries[packet] = 0; + } + + GNUNET_SCHEDULER_add_continuation (&call_read_processor_task, + socket, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); +} + + /** * Handler for DATA messages; Same for both client and server * @@ -757,6 +879,12 @@ handle_data (struct GNUNET_STREAM_Socket *socket, &ack_task, socket); } + + if ((NULL != socket->read_handle) /* A read handle is waiting */ + && (NULL == socket->copy_buffer)) /* And the copy buffer is empty */ + { + prepare_buffer_for_read (socket); + } break; @@ -1653,53 +1781,6 @@ mesh_peer_disconnect_callback (void *cls, } -/** - * Task for calling the read processor - * - * @param cls the socket - */ -static void -call_read_processor_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_STREAM_Socket *socket = cls; - size_t read_size; - size_t valid_read_size; - - if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; - - GNUNET_assert (NULL != socket->read_handle); - GNUNET_assert (NULL != socket->read_handle->proc); - GNUNET_assert (NULL != socket->copy_buffer); - GNUNET_assert (0 != socket->copy_buffer_size); - - valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset; - GNUNET_assert (0 != valid_read_size); - - read_size = socket->read_handle->proc (socket->read_handle->proc_cls, - socket->status, - socket->copy_buffer - + socket->copy_buffer_read_offset, - valid_read_size); - - GNUNET_assert (read_size <= valid_read_size); - socket->copy_buffer_read_offset += read_size; - - /* Free the copy buffer once it has been read entirely */ - if (socket->copy_buffer_read_offset == socket->copy_buffer_size) - { - GNUNET_free (socket->copy_buffer); - socket->copy_buffer = NULL; - socket->copy_buffer_size = 0; - socket->copy_buffer_read_offset = 0; - } - - /* Free the read handle */ - GNUNET_free (socket->read_handle); - socket->read_handle = NULL; -} - - /*****************/ /* API functions */ /*****************/ @@ -2054,10 +2135,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, GNUNET_STREAM_DataProcessor proc, void *proc_cls) { - unsigned int packet; struct GNUNET_STREAM_IOReadHandle *read_handle; - uint32_t offset_increase; - uint32_t sequence_increase; /* Return NULL if there is already a read handle; the user has to cancel that first before continuing or has to wait until it is completed */ @@ -2069,61 +2147,11 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, /* if previous copy buffer is still not read call the data processor on it */ if (NULL != socket->copy_buffer) - { - GNUNET_SCHEDULER_add_now (&call_read_processor_task, - socket); - } - - /* Check the bitmap for any holes */ - for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, - packet)) - break; - } - - sequence_increase = packet; - - if (0 == sequence_increase) /* The first packet is still missing */ - { - /* We can't do anything until it arrives */ - } + GNUNET_SCHEDULER_add_continuation (&call_read_processor_task, + socket, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); else - { - /* Copy data to copy buffer */ - GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]); - socket->copy_buffer = - GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]); - - /* Shift the data in the receive buffer */ - memmove (socket->receive_buffer, - socket->receive_buffer - + socket->receive_buffer_boundaries[sequence_increase-1], - socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); - - /* Shift the bitmap */ - socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; - - /* Set read_sequence_number */ - socket->read_sequence_number += sequence_increase; - - /* Set read_offset */ - offset_increase = socket->receive_buffer_boundaries[sequence_increase-1]; - socket->read_offset += offset_increase; - - /* Fix relative boundaries */ - for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) - { - socket->receive_buffer_boundaries[packet] = - socket->receive_buffer_boundaries[packet + sequence_increase] - - offset_increase; - } - else - socket->receive_buffer_boundaries[packet] = 0; - } - } + prepare_buffer_for_read (socket); return read_handle; }