struct MessageQueue *next;
/**
- * The next message in queue. Should be NULL in the last message
+ * The next message in queue. Should be NULL in the first message
*/
struct MessageQueue *prev;
};
*/
uint32_t read_sequence_number;
+ /**
+ * The receiver buffer size
+ */
+ uint32_t receive_buffer_size;
+
/**
* receiver's available buffer after the last acknowledged packet
*/
uint32_t receive_window_available;
+
+ /**
+ * The offset pointer used during write operation
+ */
+ uint32_t write_offset;
+
+ /**
+ * The offset after which we are expecting data
+ */
+ uint32_t read_offset;
};
};
-
-
/**
* The IO Handle
*/
const struct GNUNET_STREAM_DataMessage *msg,
const struct GNUNET_ATS_Information*atsi)
{
- uint16_t size;
const void *payload;
+ uint32_t bytes_needed;
+ uint32_t relative_offset;
+ uint16_t size;
- size = msg->header.header.size;
+ size = htons (msg->header.header.size);
if (size < sizeof (struct GNUNET_STREAM_DataMessage))
{
GNUNET_break_op (0);
case STATE_ESTABLISHED:
case STATE_TRANSMIT_CLOSED:
case STATE_TRANSMIT_CLOSE_WAIT:
- GNUNET_assert (NULL != socket->receive_buffer);
+
/* check if the message's sequence number is greater than the one we are
expecting */
- if (ntohl (msg->sequence_number) - socket->read_sequence_number >= 64)
- {/* We are receiving a retransmitted message */
+ if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message with sequence number %d retransmitted\n",
- ntohl (socket->read_sequence_number));
+ "Ignoring received message with sequence number %d",
+ ntohl (msg->sequence_number));
return GNUNET_YES;
}
- /* Copy Data to buffer and send acknowledgement for this packet */
+ /* Check if we have to allocate the buffer */
size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ relative_offset = ntohl (msg->offset) - socket->read_offset;
+ bytes_needed = relative_offset + size;
+
+ if (bytes_needed > socket->receive_buffer_size)
+ {
+ if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+ {
+ socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+ bytes_needed);
+ socket->receive_buffer_size = bytes_needed;
+ socket->receive_window_available =
+ RECEIVE_BUFFER_SIZE - socket->receive_buffer_size;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cannot accommodate packet %d as buffer is full\n",
+ ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+ }
+
+ /* Copy Data to buffer and send acknowledgement for this packet */
payload = &msg[1];
- memcpy (socket->receive_buffer
- + (ntohl (msg->sequence_number) - socket->read_sequence_number)
- * MAX_PACKET_SIZE,
+ memcpy (socket->receive_buffer + relative_offset,
payload,
size);
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
- /* Initialize the receive buffer */
- socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
+ socket->write_offset = 0;
+ socket->read_offset = 0;
socket->state = STATE_ESTABLISHED;
}
socket->read_sequence_number = ntohl (ack_message->sequence_number);
socket->receive_window_available =
ntohl (ack_message->receive_window_size);
- socket->state = STATE_ESTABLISHED;
+ /* Attain ESTABLISHED state */
+ set_state_established (NULL, socket);
}
else
{
unsigned int num_needed_packets;
unsigned int packet;
struct GNUNET_STREAM_IOHandle *io_handle;
- size_t packet_size;
+ uint32_t packet_size;
+ uint32_t payload_size;
struct GNUNET_STREAM_DataMessage *data_msg;
const void *sweep;
{
if ((packet + 1) * max_payload_size < size)
{
+ payload_size = max_payload_size;
packet_size = MAX_PACKET_SIZE;
}
else
{
- packet_size = size - packet * max_payload_size
- + sizeof (struct GNUNET_STREAM_DataMessage);
+ payload_size = size - packet * max_payload_size;
+ packet_size = payload_size + sizeof (struct
+ GNUNET_STREAM_DataMessage);
}
io_handle->messages[packet] = GNUNET_malloc (packet_size);
io_handle->messages[packet]->header.header.size = htons (packet_size);
htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
io_handle->messages[packet]->sequence_number =
htons (socket->write_sequence_number++);
+ io_handle->messages[packet]->offset = htons (socket->write_offset);
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
/* Copy data from given buffer to the packet */
memcpy (&data_msg[1],
sweep,
- packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
- sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
+ payload_size);
+ sweep += payload_size;
+ socket->write_offset += payload_size;
}
socket->write_handle = io_handle;
write_data (socket);