/**
* The write IO_handle associated with this socket
*/
- struct GNUNET_STREAM_IOHandle *write_handle;
+ struct GNUNET_STREAM_IOWriteHandle *write_handle;
/**
* The read IO_handle associated with this socket
*/
- struct GNUNET_STREAM_IOHandle *read_handle;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
/**
* Buffer for storing received messages
*/
void *receive_buffer;
+ /**
+ * Copy buffer pointer; Used during read operations
+ */
+ void *copy_buffer;
+
/**
* The state of the protocol associated with this socket
*/
*/
uint32_t receive_buffer_size;
+ /**
+ * The receiver buffer boundaries
+ */
+ uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
/**
* receiver's available buffer after the last acknowledged packet
*/
* The offset after which we are expecting data
*/
uint32_t read_offset;
+
+ /**
+ * The size of the copy buffer
+ */
+ uint32_t copy_buffer_size;
+
+ /**
+ * The read offset of copy buffer
+ */
+ uint32_t copy_buffer_read_offset;
};
/**
- * The IO Handle
+ * The IO Write Handle
*/
-struct GNUNET_STREAM_IOHandle
+struct GNUNET_STREAM_IOWriteHandle
{
/**
* The packet_buffers associated with this Handle
*/
GNUNET_STREAM_AckBitmap ack_bitmap;
- /**
- * receiver's available buffer
- */
- uint32_t receive_window_available;
-
/**
* Number of packets sent before waiting for an ack
*
};
+/**
+ * The IO Read Handle
+ */
+struct GNUNET_STREAM_IOReadHandle
+{
+ /**
+ * Callback for the read processor
+ */
+ GNUNET_STREAM_DataProcessor proc;
+
+ /**
+ * The closure pointer for the read processor callback
+ */
+ void *proc_cls;
+};
+
+
/**
* Default value in seconds for various timeouts
*/
ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
- ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
+ ack_msg->receive_window_remaining =
+ htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
/* Request MESH for sending ACK */
GNUNET_MESH_notify_transmit_ready (socket->tunnel,
write_data_finish_cb (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
- struct GNUNET_STREAM_IOHandle *io_handle = cls;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
io_handle->sent_packets++;
}
static void
write_data (struct GNUNET_STREAM_Socket *socket)
{
- struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
unsigned int packet;
int ack_packet;
packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
while ( (NULL != io_handle->messages[packet]) &&
- (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
+ (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
{
- io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
+ socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
queue_message (socket,
&io_handle->messages[packet]->header,
&write_data_finish_cb,
const void *payload;
uint32_t bytes_needed;
uint32_t relative_offset;
+ uint32_t relative_sequence_number;
uint16_t size;
size = htons (msg->header.header.size);
case STATE_TRANSMIT_CLOSED:
case STATE_TRANSMIT_CLOSE_WAIT:
- /* check if the message's sequence number is greater than the one we are
+ /* check if the message's sequence number is in the range we are
expecting */
- if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64)
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number > 64)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring received message with sequence number %d",
socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
bytes_needed);
socket->receive_buffer_size = bytes_needed;
- socket->receive_window_available =
- RECEIVE_BUFFER_SIZE - socket->receive_buffer_size;
}
else
{
}
}
- /* Copy Data to buffer and send acknowledgement for this packet */
+ /* Copy Data to buffer */
payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
memcpy (socket->receive_buffer + relative_offset,
payload,
size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
/* Modify the ACK bitmap */
ackbitmap_modify_bit (&socket->ack_bitmap,
- ntohl (msg->sequence_number) -
- socket->read_sequence_number,
+ relative_sequence_number,
GNUNET_YES);
/* Start ACK sending task if one is not already present */
}
socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
- socket->write_handle->receive_window_available =
+ socket->receive_window_available =
ntohl (ack->receive_window_remaining);
write_data (socket);
break;
}
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ */
+static void
+call_read_processor_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ size_t read_size;
+ size_t valid_read_size;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
+
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (NULL != socket->read_handle->proc);
+ GNUNET_assert (NULL != socket->copy_buffer);
+ GNUNET_assert (0 != socket->copy_buffer_size);
+
+ valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
+ GNUNET_assert (0 != valid_read_size);
+
+ read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->copy_buffer
+ + socket->copy_buffer_read_offset,
+ valid_read_size);
+
+ GNUNET_assert (read_size <= valid_read_size);
+ socket->copy_buffer_read_offset += read_size;
+
+ /* Free the copy buffer once it has been read entirely */
+ if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
+ {
+ GNUNET_free (socket->copy_buffer);
+ socket->copy_buffer = NULL;
+ socket->copy_buffer_size = 0;
+ socket->copy_buffer_read_offset = 0;
+ }
+
+ /* Free the read handle */
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+}
+
+
/*****************/
/* API functions */
/*****************/
* @param write_cont_cls the closure
* @return handle to cancel the operation
*/
-struct GNUNET_STREAM_IOHandle *
+struct GNUNET_STREAM_IOWriteHandle *
GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
const void *data,
size_t size,
{
unsigned int num_needed_packets;
unsigned int packet;
- struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle;
uint32_t packet_size;
uint32_t payload_size;
struct GNUNET_STREAM_DataMessage *data_msg;
if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
- io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
- io_handle->receive_window_available = socket->receive_window_available;
+ io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
sweep = data;
/* Divide the given buffer into packets for sending */
for (packet=0; packet < num_needed_packets; packet++)
* @param proc_cls the closure for proc
* @return handle to cancel the operation
*/
-struct GNUNET_STREAM_IOHandle *
-GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket,
+struct GNUNET_STREAM_IOReadHandle *
+GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_TIME_Relative timeout,
GNUNET_STREAM_DataProcessor proc,
void *proc_cls)
{
+ unsigned int packet;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+
+ /* Return NULL if there is already a read handle; the user has to cancel that
+ first before continuing or has to wait until it is completed */
+ if (NULL != socket->read_handle) return NULL;
+
+ read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
+ read_handle->proc = proc;
+ socket->read_handle = read_handle;
+
+ /* if previous copy buffer is still not read call the data processor on it */
+ if (NULL != socket->copy_buffer)
+ {
+ GNUNET_SCHEDULER_add_now (&call_read_processor_task,
+ socket);
+ }
+
/* Check the bitmap for any holes */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
- /* Deem the data from the starting of the bitmap upto a hole as available
- data */
+ if (0 == packet) /* The first packet is still missing */
+ {
+ /* We can't do anything until it arrives */
+ }
+ else
+ {
+ /* Copy data to copy buffer */
+ socket->copy_buffer =
+ GNUNET_malloc (socket->receive_buffer_boundaries[packet-1]);
+
+ /* Shift the bitmap */
+ socket->ack_bitmap << packet;
- /* Create an IO handle */
+ /* Set read_sequence_number */
+ socket->read_sequence_number += packet;
- /* Call the Data processor with this available data */
-
- /* Update the read_sequence_number to the first hole in the bitmap */
+ /* Set read_offset */
+ socket->read_offset += packet;
+
+ /* FIXME: Fix relative calucations in receive buffer management */
+ }
- /* Shift the bitmap so that the first hole is now at the start */
+ return read_handle;
}