From: Sree Harsha Totakura Date: Sun, 26 Feb 2012 15:50:19 +0000 (+0000) Subject: -receive buffer (re)allocation in handle_data X-Git-Tag: initial-import-from-subversion-38251~14635 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=9935335d2d685592c0dd3543a9ef7c37b06f67e8;p=oweals%2Fgnunet.git -receive buffer (re)allocation in handle_data --- diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 65836ce1c..0a8782c61 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -139,7 +139,7 @@ struct MessageQueue struct MessageQueue *next; /** - * The next message in queue. Should be NULL in the last message + * The next message in queue. Should be NULL in the first message */ struct MessageQueue *prev; }; @@ -263,10 +263,25 @@ struct GNUNET_STREAM_Socket */ uint32_t read_sequence_number; + /** + * The receiver buffer size + */ + uint32_t receive_buffer_size; + /** * receiver's available buffer after the last acknowledged packet */ uint32_t receive_window_available; + + /** + * The offset pointer used during write operation + */ + uint32_t write_offset; + + /** + * The offset after which we are expecting data + */ + uint32_t read_offset; }; @@ -298,8 +313,6 @@ struct GNUNET_STREAM_ListenSocket }; - - /** * The IO Handle */ @@ -635,10 +648,12 @@ handle_data (struct GNUNET_STREAM_Socket *socket, const struct GNUNET_STREAM_DataMessage *msg, const struct GNUNET_ATS_Information*atsi) { - uint16_t size; const void *payload; + uint32_t bytes_needed; + uint32_t relative_offset; + uint16_t size; - size = msg->header.header.size; + size = htons (msg->header.header.size); if (size < sizeof (struct GNUNET_STREAM_DataMessage)) { GNUNET_break_op (0); @@ -650,23 +665,44 @@ handle_data (struct GNUNET_STREAM_Socket *socket, case STATE_ESTABLISHED: case STATE_TRANSMIT_CLOSED: case STATE_TRANSMIT_CLOSE_WAIT: - GNUNET_assert (NULL != socket->receive_buffer); + /* check if the message's sequence number is greater than the one we are expecting */ - if (ntohl (msg->sequence_number) - socket->read_sequence_number >= 64) - {/* We are receiving a retransmitted message */ + if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64) + { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message with sequence number %d retransmitted\n", - ntohl (socket->read_sequence_number)); + "Ignoring received message with sequence number %d", + ntohl (msg->sequence_number)); return GNUNET_YES; } - /* Copy Data to buffer and send acknowledgement for this packet */ + /* Check if we have to allocate the buffer */ size -= sizeof (struct GNUNET_STREAM_DataMessage); + relative_offset = ntohl (msg->offset) - socket->read_offset; + bytes_needed = relative_offset + size; + + if (bytes_needed > socket->receive_buffer_size) + { + if (bytes_needed <= RECEIVE_BUFFER_SIZE) + { + socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, + bytes_needed); + socket->receive_buffer_size = bytes_needed; + socket->receive_window_available = + RECEIVE_BUFFER_SIZE - socket->receive_buffer_size; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cannot accommodate packet %d as buffer is full\n", + ntohl (msg->sequence_number)); + return GNUNET_YES; + } + } + + /* Copy Data to buffer and send acknowledgement for this packet */ payload = &msg[1]; - memcpy (socket->receive_buffer - + (ntohl (msg->sequence_number) - socket->read_sequence_number) - * MAX_PACKET_SIZE, + memcpy (socket->receive_buffer + relative_offset, payload, size); @@ -736,8 +772,8 @@ set_state_established (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n"); - /* Initialize the receive buffer */ - socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE); + socket->write_offset = 0; + socket->read_offset = 0; socket->state = STATE_ESTABLISHED; } @@ -1162,7 +1198,8 @@ server_handle_hello_ack (void *cls, socket->read_sequence_number = ntohl (ack_message->sequence_number); socket->receive_window_available = ntohl (ack_message->receive_window_size); - socket->state = STATE_ESTABLISHED; + /* Attain ESTABLISHED state */ + set_state_established (NULL, socket); } else { @@ -1852,7 +1889,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, unsigned int num_needed_packets; unsigned int packet; struct GNUNET_STREAM_IOHandle *io_handle; - size_t packet_size; + uint32_t packet_size; + uint32_t payload_size; struct GNUNET_STREAM_DataMessage *data_msg; const void *sweep; @@ -1882,12 +1920,14 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, { if ((packet + 1) * max_payload_size < size) { + payload_size = max_payload_size; packet_size = MAX_PACKET_SIZE; } else { - packet_size = size - packet * max_payload_size - + sizeof (struct GNUNET_STREAM_DataMessage); + payload_size = size - packet * max_payload_size; + packet_size = payload_size + sizeof (struct + GNUNET_STREAM_DataMessage); } io_handle->messages[packet] = GNUNET_malloc (packet_size); io_handle->messages[packet]->header.header.size = htons (packet_size); @@ -1895,6 +1935,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); io_handle->messages[packet]->sequence_number = htons (socket->write_sequence_number++); + io_handle->messages[packet]->offset = htons (socket->write_offset); /* FIXME: Remove the fixed delay for ack deadline; Set it to the value determined from RTT */ @@ -1905,8 +1946,9 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, /* Copy data from given buffer to the packet */ memcpy (&data_msg[1], sweep, - packet_size - sizeof (struct GNUNET_STREAM_DataMessage)); - sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage); + payload_size); + sweep += payload_size; + socket->write_offset += payload_size; } socket->write_handle = io_handle; write_data (socket); diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h index 0d427bfa2..0c1987e5d 100644 --- a/src/stream/stream_protocol.h +++ b/src/stream/stream_protocol.h @@ -89,9 +89,6 @@ struct GNUNET_STREAM_DataMessage * Offset of the packet in the overall stream, modulo 2^32; allows * the receiver to calculate where in the destination buffer the * message should be placed. In network byte order. - * - * FIXME: if all the packets except the last one are of constant size we - * don't need this anymore */ uint32_t offset GNUNET_PACKED;