From: Sree Harsha Totakura Date: Sun, 26 Feb 2012 22:32:23 +0000 (+0000) Subject: -copy buffer and STREAM_read(incomplete) X-Git-Tag: initial-import-from-subversion-38251~14627 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=b231b33e0a4da703c962a9a3e02ca684d27dcc75;p=oweals%2Fgnunet.git -copy buffer and STREAM_read(incomplete) --- diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 0a8782c61..41bae2da5 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -219,18 +219,23 @@ struct GNUNET_STREAM_Socket /** * The write IO_handle associated with this socket */ - struct GNUNET_STREAM_IOHandle *write_handle; + struct GNUNET_STREAM_IOWriteHandle *write_handle; /** * The read IO_handle associated with this socket */ - struct GNUNET_STREAM_IOHandle *read_handle; + struct GNUNET_STREAM_IOReadHandle *read_handle; /** * Buffer for storing received messages */ void *receive_buffer; + /** + * Copy buffer pointer; Used during read operations + */ + void *copy_buffer; + /** * The state of the protocol associated with this socket */ @@ -268,6 +273,11 @@ struct GNUNET_STREAM_Socket */ uint32_t receive_buffer_size; + /** + * The receiver buffer boundaries + */ + uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; + /** * receiver's available buffer after the last acknowledged packet */ @@ -282,6 +292,16 @@ struct GNUNET_STREAM_Socket * The offset after which we are expecting data */ uint32_t read_offset; + + /** + * The size of the copy buffer + */ + uint32_t copy_buffer_size; + + /** + * The read offset of copy buffer + */ + uint32_t copy_buffer_read_offset; }; @@ -314,9 +334,9 @@ struct GNUNET_STREAM_ListenSocket /** - * The IO Handle + * The IO Write Handle */ -struct GNUNET_STREAM_IOHandle +struct GNUNET_STREAM_IOWriteHandle { /** * The packet_buffers associated with this Handle @@ -329,11 +349,6 @@ struct GNUNET_STREAM_IOHandle */ GNUNET_STREAM_AckBitmap ack_bitmap; - /** - * receiver's available buffer - */ - uint32_t receive_window_available; - /** * Number of packets sent before waiting for an ack * @@ -343,6 +358,23 @@ struct GNUNET_STREAM_IOHandle }; +/** + * The IO Read Handle + */ +struct GNUNET_STREAM_IOReadHandle +{ + /** + * Callback for the read processor + */ + GNUNET_STREAM_DataProcessor proc; + + /** + * The closure pointer for the read processor callback + */ + void *proc_cls; +}; + + /** * Default value in seconds for various timeouts */ @@ -511,7 +543,8 @@ ack_task (void *cls, ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap); ack_msg->base_sequence_number = htonl (socket->read_sequence_number); - ack_msg->receive_window_remaining = htonl (socket->receive_window_available); + ack_msg->receive_window_remaining = + htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); /* Request MESH for sending ACK */ GNUNET_MESH_notify_transmit_ready (socket->tunnel, @@ -574,7 +607,7 @@ static void write_data_finish_cb (void *cls, struct GNUNET_STREAM_Socket *socket) { - struct GNUNET_STREAM_IOHandle *io_handle = cls; + struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; io_handle->sent_packets++; } @@ -589,7 +622,7 @@ write_data_finish_cb (void *cls, static void write_data (struct GNUNET_STREAM_Socket *socket) { - struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle; + struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle; unsigned int packet; int ack_packet; @@ -618,9 +651,9 @@ write_data (struct GNUNET_STREAM_Socket *socket) packet = ack_packet + 1; /* Now send new packets if there is enough buffer space */ while ( (NULL != io_handle->messages[packet]) && - (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) + (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) { - io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); + socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); queue_message (socket, &io_handle->messages[packet]->header, &write_data_finish_cb, @@ -651,6 +684,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, const void *payload; uint32_t bytes_needed; uint32_t relative_offset; + uint32_t relative_sequence_number; uint16_t size; size = htons (msg->header.header.size); @@ -666,9 +700,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket, case STATE_TRANSMIT_CLOSED: case STATE_TRANSMIT_CLOSE_WAIT: - /* check if the message's sequence number is greater than the one we are + /* check if the message's sequence number is in the range we are expecting */ - if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64) + relative_sequence_number = + ntohl (msg->sequence_number) - socket->read_sequence_number; + if ( relative_sequence_number > 64) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ignoring received message with sequence number %d", @@ -688,8 +724,6 @@ handle_data (struct GNUNET_STREAM_Socket *socket, socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, bytes_needed); socket->receive_buffer_size = bytes_needed; - socket->receive_window_available = - RECEIVE_BUFFER_SIZE - socket->receive_buffer_size; } else { @@ -700,16 +734,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket, } } - /* Copy Data to buffer and send acknowledgement for this packet */ + /* Copy Data to buffer */ payload = &msg[1]; + GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); memcpy (socket->receive_buffer + relative_offset, payload, size); + socket->receive_buffer_boundaries[relative_sequence_number] = + relative_offset + size; /* Modify the ACK bitmap */ ackbitmap_modify_bit (&socket->ack_bitmap, - ntohl (msg->sequence_number) - - socket->read_sequence_number, + relative_sequence_number, GNUNET_YES); /* Start ACK sending task if one is not already present */ @@ -1427,7 +1463,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, } socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); - socket->write_handle->receive_window_available = + socket->receive_window_available = ntohl (ack->receive_window_remaining); write_data (socket); break; @@ -1617,6 +1653,53 @@ mesh_peer_disconnect_callback (void *cls, } +/** + * Task for calling the read processor + * + * @param cls the socket + */ +static void +call_read_processor_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + size_t read_size; + size_t valid_read_size; + + if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; + + GNUNET_assert (NULL != socket->read_handle); + GNUNET_assert (NULL != socket->read_handle->proc); + GNUNET_assert (NULL != socket->copy_buffer); + GNUNET_assert (0 != socket->copy_buffer_size); + + valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset; + GNUNET_assert (0 != valid_read_size); + + read_size = socket->read_handle->proc (socket->read_handle->proc_cls, + socket->status, + socket->copy_buffer + + socket->copy_buffer_read_offset, + valid_read_size); + + GNUNET_assert (read_size <= valid_read_size); + socket->copy_buffer_read_offset += read_size; + + /* Free the copy buffer once it has been read entirely */ + if (socket->copy_buffer_read_offset == socket->copy_buffer_size) + { + GNUNET_free (socket->copy_buffer); + socket->copy_buffer = NULL; + socket->copy_buffer_size = 0; + socket->copy_buffer_read_offset = 0; + } + + /* Free the read handle */ + GNUNET_free (socket->read_handle); + socket->read_handle = NULL; +} + + /*****************/ /* API functions */ /*****************/ @@ -1878,7 +1961,7 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) * @param write_cont_cls the closure * @return handle to cancel the operation */ -struct GNUNET_STREAM_IOHandle * +struct GNUNET_STREAM_IOWriteHandle * GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, const void *data, size_t size, @@ -1888,7 +1971,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, { unsigned int num_needed_packets; unsigned int packet; - struct GNUNET_STREAM_IOHandle *io_handle; + struct GNUNET_STREAM_IOWriteHandle *io_handle; uint32_t packet_size; uint32_t payload_size; struct GNUNET_STREAM_DataMessage *data_msg; @@ -1912,8 +1995,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; - io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); - io_handle->receive_window_available = socket->receive_window_available; + io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); sweep = data; /* Divide the given buffer into packets for sending */ for (packet=0; packet < num_needed_packets; packet++) @@ -1966,22 +2048,59 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, * @param proc_cls the closure for proc * @return handle to cancel the operation */ -struct GNUNET_STREAM_IOHandle * -GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket, +struct GNUNET_STREAM_IOReadHandle * +GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, struct GNUNET_TIME_Relative timeout, GNUNET_STREAM_DataProcessor proc, void *proc_cls) { + unsigned int packet; + struct GNUNET_STREAM_IOReadHandle *read_handle; + + /* Return NULL if there is already a read handle; the user has to cancel that + first before continuing or has to wait until it is completed */ + if (NULL != socket->read_handle) return NULL; + + read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); + read_handle->proc = proc; + socket->read_handle = read_handle; + + /* if previous copy buffer is still not read call the data processor on it */ + if (NULL != socket->copy_buffer) + { + GNUNET_SCHEDULER_add_now (&call_read_processor_task, + socket); + } + /* Check the bitmap for any holes */ + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, + packet)) + break; + } - /* Deem the data from the starting of the bitmap upto a hole as available - data */ + if (0 == packet) /* The first packet is still missing */ + { + /* We can't do anything until it arrives */ + } + else + { + /* Copy data to copy buffer */ + socket->copy_buffer = + GNUNET_malloc (socket->receive_buffer_boundaries[packet-1]); + + /* Shift the bitmap */ + socket->ack_bitmap << packet; - /* Create an IO handle */ + /* Set read_sequence_number */ + socket->read_sequence_number += packet; - /* Call the Data processor with this available data */ - - /* Update the read_sequence_number to the first hole in the bitmap */ + /* Set read_offset */ + socket->read_offset += packet; + + /* FIXME: Fix relative calucations in receive buffer management */ + } - /* Shift the bitmap so that the first hole is now at the start */ + return read_handle; }