#include "stream_protocol.h"
+/**
+ * The maximum packet size of a stream packet
+ */
#define MAX_PACKET_SIZE 64000
+/**
+ * The maximum payload a data message packet can carry
+ */
+static size_t max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+
+/**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
/**
* states in the Protocol
* Read sequence number. This number's value is determined during handshake
*/
uint32_t read_sequence_number;
+
+ /**
+ * receiver's available buffer
+ */
+ uint32_t receive_window_available;
};
* The bitmap of this IOHandle; Corresponding bit for a message is set when
* it has been acknowledged by the receiver
*/
- GNUNET_STREAM_AckBitmap bitmap;
+ GNUNET_STREAM_AckBitmap ack_bitmap;
+
+ /**
+ * receiver's available buffer
+ */
+ uint32_t receive_window_available;
+
+ /**
+ * Number of packets sent before waiting for an ack
+ *
+ * FIXME: Do we need this?
+ */
+ unsigned int sent_packets;
};
* @param value GNUNET_YES to on, GNUNET_NO to off
*/
static void
-AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
unsigned int bit,
int value)
{
* @return GNUNET_YES if the bit is set; GNUNET_NO if not
*/
static uint8_t
-AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
unsigned int bit)
{
GNUNET_assert (bit < 64);
}
+
+/**
+ * Function called when Data Message is sent
+ *
+ * @param cls the io_handle corresponding to the Data Message
+ * @param socket the socket which was used
+ */
+static void
+write_data_finish_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_IOHandle *io_handle = cls;
+
+ io_handle->sent_packets++;
+}
+
+
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receive_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
+ unsigned int packet;
+ int ack_packet;
+
+ ack_packet = -1;
+ /* Find the last acknowledged packet */
+ for (packet=0; packet < 64; packet++)
+ {
+ if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ {
+ ack_packet = packet;
+ }
+ }
+ /* Resend packets which weren't ack'ed */
+ for (packet=0; packet < ack_packet; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ {
+ queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ }
+ }
+ packet = ack_packet + 1;
+ /* Now send new packets if there is enough buffer space */
+ while (io_handle->receive_window_available -=
+ io_handle->messages[packet]->header.header.size > 0)
+ {
+ queue_message (socket,
+ &io_handle->messages[packet]->header,
+ &write_data_finish_cb,
+ io_handle);
+ }
+}
+
+
/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
{
case STATE_HELLO_WAIT:
socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ socket->receive_window_available = ntohl (ack_msg->receive_window_size);
/* Get the random sequence number */
socket->write_sequence_number =
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
reply->header.header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
reply->sequence_number = htonl (socket->write_sequence_number);
+ reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
queue_message (socket,
&reply->header,
&set_state_established,
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_HELLO_WAIT == socket->state)
{
- socket->read_sequence_number = ntohs (ack_message->sequence_number);
+ socket->read_sequence_number = ntohl (ack_message->sequence_number);
+ socket->receive_window_available =
+ ntohl (ack_message->receive_window_size);
socket->state = STATE_ESTABLISHED;
}
else
/**
* Message Handler for mesh
*
- * @param cls closure (set from GNUNET_MESH_connect)
+ * @param socket the socket through which the ack was received
* @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
* @param sender who sent the message
- * @param ack the actual message
+ * @param ack the acknowledgment message
* @param atsi performance data for the connection
* @return GNUNET_OK to keep the connection open,
* GNUNET_SYSERR to close it (signal serious error)
const struct GNUNET_STREAM_AckMessage *ack,
const struct GNUNET_ATS_Information*atsi)
{
+ switch (socket->state)
+ {
+ case (STATE_ESTABLISHED):
+ if (NULL == socket->write_handle)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DATA ACK when write_handle is NULL\n");
+ return GNUNET_OK;
+ }
+
+ socket->write_handle->ack_bitmap = ntoh64 (ack->bitmap);
+ socket->write_handle->receive_window_available =
+ ntohl (ack->receive_window_remaining);
+ write_data (socket);
+ break;
+ default:
+ break;
+ }
return GNUNET_OK;
}
unsigned int packet;
struct GNUNET_STREAM_IOHandle *io_handle;
struct GNUNET_STREAM_DataMessage *data_msg;
- size_t max_payload_size;
size_t packet_size;
const void *sweep;
return NULL;
}
- max_payload_size =
- MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
num_needed_packets = ceil (size / max_payload_size);
if (64 < num_needed_packets)
{
}
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
+ io_handle->receive_window_available = socket->receive_window_available;
sweep = data;
- /* Divide the given area into packets for sending */
+ /* Divide the given buffer into packets for sending */
for (packet=0; packet < num_needed_packets; packet++)
{
if ((packet + 1) * max_payload_size < size)
io_handle->messages[packet]->header.header.size = htons (packet_size);
io_handle->messages[packet]->header.header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+ io_handle->messages[packet]->sequence_number =
+ htons (socket->write_sequence_number++);
data_msg = io_handle->messages[packet];
memcpy (&data_msg[1],
sweep,
sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
}
+ write_data (socket);
+
return io_handle;
}