From 954517d8f65fa3a228ee66ecf6f7fefbf1967d81 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sun, 12 Feb 2012 12:28:31 +0000 Subject: [PATCH] -added write operation --- src/stream/stream_api.c | 140 ++++++++++++++++++++++++++++++++--- src/stream/stream_protocol.h | 6 ++ 2 files changed, 135 insertions(+), 11 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index faceac297..c25cb6c9b 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -30,8 +30,21 @@ #include "stream_protocol.h" +/** + * The maximum packet size of a stream packet + */ #define MAX_PACKET_SIZE 64000 +/** + * The maximum payload a data message packet can carry + */ +static size_t max_payload_size = + MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); + +/** + * Receive buffer + */ +#define RECEIVE_BUFFER_SIZE 4096000 /** * states in the Protocol @@ -221,6 +234,11 @@ struct GNUNET_STREAM_Socket * Read sequence number. This number's value is determined during handshake */ uint32_t read_sequence_number; + + /** + * receiver's available buffer + */ + uint32_t receive_window_available; }; @@ -266,7 +284,19 @@ struct GNUNET_STREAM_IOHandle * The bitmap of this IOHandle; Corresponding bit for a message is set when * it has been acknowledged by the receiver */ - GNUNET_STREAM_AckBitmap bitmap; + GNUNET_STREAM_AckBitmap ack_bitmap; + + /** + * receiver's available buffer + */ + uint32_t receive_window_available; + + /** + * Number of packets sent before waiting for an ack + * + * FIXME: Do we need this? + */ + unsigned int sent_packets; }; @@ -392,7 +422,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket, * @param value GNUNET_YES to on, GNUNET_NO to off */ static void -AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, +ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit, int value) { @@ -411,7 +441,7 @@ AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, * @return GNUNET_YES if the bit is set; GNUNET_NO if not */ static uint8_t -AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, +ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit) { GNUNET_assert (bit < 64); @@ -419,6 +449,71 @@ AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, } + +/** + * Function called when Data Message is sent + * + * @param cls the io_handle corresponding to the Data Message + * @param socket the socket which was used + */ +static void +write_data_finish_cb (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + struct GNUNET_STREAM_IOHandle *io_handle = cls; + + io_handle->sent_packets++; +} + + +/** + * Writes data using the given socket. The amount of data written is limited by + * the receive_window_size + * + * @param socket the socket to use + */ +static void +write_data (struct GNUNET_STREAM_Socket *socket) +{ + struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle; + unsigned int packet; + int ack_packet; + + ack_packet = -1; + /* Find the last acknowledged packet */ + for (packet=0; packet < 64; packet++) + { + if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) + { + ack_packet = packet; + } + } + /* Resend packets which weren't ack'ed */ + for (packet=0; packet < ack_packet; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) + { + queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); + } + } + packet = ack_packet + 1; + /* Now send new packets if there is enough buffer space */ + while (io_handle->receive_window_available -= + io_handle->messages[packet]->header.header.size > 0) + { + queue_message (socket, + &io_handle->messages[packet]->header, + &write_data_finish_cb, + io_handle); + } +} + + /** * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA * @@ -520,6 +615,7 @@ client_handle_hello_ack (void *cls, { case STATE_HELLO_WAIT: socket->read_sequence_number = ntohl (ack_msg->sequence_number); + socket->receive_window_available = ntohl (ack_msg->receive_window_size); /* Get the random sequence number */ socket->write_sequence_number = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); @@ -530,6 +626,7 @@ client_handle_hello_ack (void *cls, reply->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); reply->sequence_number = htonl (socket->write_sequence_number); + reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE); queue_message (socket, &reply->header, &set_state_established, @@ -840,7 +937,9 @@ server_handle_hello_ack (void *cls, GNUNET_assert (socket->tunnel == tunnel); if (STATE_HELLO_WAIT == socket->state) { - socket->read_sequence_number = ntohs (ack_message->sequence_number); + socket->read_sequence_number = ntohl (ack_message->sequence_number); + socket->receive_window_available = + ntohl (ack_message->receive_window_size); socket->state = STATE_ESTABLISHED; } else @@ -1039,11 +1138,10 @@ server_handle_close_ack (void *cls, /** * Message Handler for mesh * - * @param cls closure (set from GNUNET_MESH_connect) + * @param socket the socket through which the ack was received * @param tunnel connection to the other end - * @param tunnel_ctx place to store local state associated with the tunnel * @param sender who sent the message - * @param ack the actual message + * @param ack the acknowledgment message * @param atsi performance data for the connection * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) @@ -1055,6 +1153,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, const struct GNUNET_STREAM_AckMessage *ack, const struct GNUNET_ATS_Information*atsi) { + switch (socket->state) + { + case (STATE_ESTABLISHED): + if (NULL == socket->write_handle) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA ACK when write_handle is NULL\n"); + return GNUNET_OK; + } + + socket->write_handle->ack_bitmap = ntoh64 (ack->bitmap); + socket->write_handle->receive_window_available = + ntohl (ack->receive_window_remaining); + write_data (socket); + break; + default: + break; + } return GNUNET_OK; } @@ -1502,7 +1618,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, unsigned int packet; struct GNUNET_STREAM_IOHandle *io_handle; struct GNUNET_STREAM_DataMessage *data_msg; - size_t max_payload_size; size_t packet_size; const void *sweep; @@ -1517,8 +1632,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, return NULL; } - max_payload_size = - MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); num_needed_packets = ceil (size / max_payload_size); if (64 < num_needed_packets) { @@ -1528,8 +1641,9 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, } io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); + io_handle->receive_window_available = socket->receive_window_available; sweep = data; - /* Divide the given area into packets for sending */ + /* Divide the given buffer into packets for sending */ for (packet=0; packet < num_needed_packets; packet++) { if ((packet + 1) * max_payload_size < size) @@ -1545,6 +1659,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, io_handle->messages[packet]->header.header.size = htons (packet_size); io_handle->messages[packet]->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); + io_handle->messages[packet]->sequence_number = + htons (socket->write_sequence_number++); data_msg = io_handle->messages[packet]; memcpy (&data_msg[1], sweep, @@ -1552,5 +1668,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage); } + write_data (socket); + return io_handle; } diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h index 426cd345b..1b3a6838a 100644 --- a/src/stream/stream_protocol.h +++ b/src/stream/stream_protocol.h @@ -123,6 +123,8 @@ struct GNUNET_STREAM_AckMessage /** * The sequence number of the Data Message upto which the receiver has filled * its buffer without any missing packets + * + * FIXME: Do we need this? */ uint32_t base_sequence_number GNUNET_PACKED; @@ -147,6 +149,10 @@ struct GNUNET_STREAM_HelloAckMessage */ uint32_t sequence_number; + /** + * The size(in bytes) of the receive window on the peer sending this message + */ + uint32_t receive_window_size; }; GNUNET_NETWORK_STRUCT_END -- 2.25.1