From 25392eeb76fe54b71bdf5b855eeaafb29560e899 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Wed, 8 Feb 2012 10:51:55 +0000 Subject: [PATCH] -added socket write handle --- src/stream/stream_api.c | 196 ++++++++++++++++++++++++++++++++--- src/stream/stream_protocol.h | 19 +++- 2 files changed, 198 insertions(+), 17 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index bd89320ec..c16d1f54f 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -25,9 +25,13 @@ */ #include "platform.h" #include "gnunet_common.h" +#include "gnunet_crypto_lib.h" #include "gnunet_stream_lib.h" #include "stream_protocol.h" + +#define MAX_PACKET_SIZE 64000 + /** * states in the Protocol */ @@ -190,6 +194,26 @@ struct GNUNET_STREAM_Socket * The number of previous timeouts */ unsigned int retries; + + /** + * The write IO_handle associated with this socket + */ + struct GNUNET_STREAM_IOHandle *write_handle; + + /** + * The read IO_handle associated with this socket + */ + struct GNUNET_STREAM_IOHandle *read_handle; + + /** + * Write sequence number. Start at random upon reaching ESTABLISHED state + */ + uint32_t write_sequence_number; + + /** + * Read sequence number. This number's value is determined during handshake + */ + uint32_t read_sequence_number; }; @@ -218,7 +242,24 @@ struct GNUNET_STREAM_ListenSocket * The call back closure */ void *listen_cb_cls; +}; + +/** + * The IO Handle + */ +struct GNUNET_STREAM_IOHandle +{ + /** + * The packet_buffers associated with this Handle + */ + struct GNUNET_STREAM_DataMessage *messages[64]; + + /** + * The bitmap of this IOHandle; Corresponding bit for a message is set when + * it has been acknowledged by the receiver + */ + GNUNET_STREAM_AckBitmap bitmap; }; @@ -339,6 +380,37 @@ queue_message (struct GNUNET_STREAM_Socket *socket, } +/** + * Function to modify a bit in GNUNET_STREAM_AckBitmap + * + * @param bitmap the bitmap to modify + * @param bit the bit number to modify + * @param GNUNET_YES to on, GNUNET_NO to off + */ +static void +AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, + uint8_t bit, + uint8_t value) +{ + uint64_t val; + + val = value; + *bitmap = *bitmap | (val << bit); +} + + +/** + * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap + * + * @param bit the bit number to check + * @return GNUNET_YES if the bit is set; GNUNET_NO if not + */ +static uint8_t +AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, + uint8_t bit) +{ + return (*bitmap & (0x0000000000000001 << bit)) >> bit; +} /** @@ -391,6 +463,7 @@ static void set_state_established (void *cls, struct GNUNET_STREAM_Socket *socket) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n"); socket->state = STATE_ESTABLISHED; } @@ -406,6 +479,7 @@ set_state_hello_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_assert (STATE_INIT == socket->state); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n"); socket->state = STATE_HELLO_WAIT; } @@ -431,19 +505,26 @@ client_handle_hello_ack (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; - struct GNUNET_STREAM_MessageHeader *reply; + const struct GNUNET_STREAM_HelloAckMessage *ack_msg; + struct GNUNET_STREAM_HelloAckMessage *reply; + ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; GNUNET_assert (socket->tunnel == tunnel); if (STATE_HELLO_WAIT == socket->state) { + socket->read_sequence_number = ntohl (ack_msg->sequence_number); + /* Get the random sequence number */ + socket->write_sequence_number = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff); reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.size = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + reply->header.header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.type = + reply->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + reply->sequence_number = htonl (socket->write_sequence_number); queue_message (socket, - reply, + (struct GNUNET_STREAM_MessageHeader *) reply, &set_state_established, NULL); } @@ -690,19 +771,23 @@ server_handle_hello (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - struct GNUNET_STREAM_MessageHeader *reply; + struct GNUNET_STREAM_HelloAckMessage *reply; GNUNET_assert (socket->tunnel == tunnel); if (STATE_INIT == socket->state) { + /* Get the random sequence number */ + socket->write_sequence_number = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff); reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.size = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + reply->header.header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.type = + reply->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + reply->sequence_number = htonl (socket->write_sequence_number); queue_message (socket, - reply, + (struct GNUNET_STREAM_MessageHeader *)reply, &set_state_hello_wait, NULL); } @@ -738,11 +823,13 @@ server_handle_hello_ack (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - struct GNUNET_STREAM_MessageHeader *reply; + const struct GNUNET_STREAM_HelloAckMessage *ack_message; + ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; GNUNET_assert (socket->tunnel == tunnel); if (STATE_HELLO_WAIT == socket->state) { + socket->read_sequence_number = ntohs (ack_message->sequence_number); socket->state = STATE_ESTABLISHED; } else @@ -1024,7 +1111,7 @@ static struct GNUNET_MESH_MessageHandler client_message_handlers[] = { {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) }, {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_STREAM_HelloAckMessage)}, {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, sizeof (struct GNUNET_STREAM_MessageHeader)}, {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, @@ -1054,7 +1141,7 @@ static struct GNUNET_MESH_MessageHandler server_message_handlers[] = { {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, sizeof (struct GNUNET_STREAM_MessageHeader)}, {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_STREAM_HelloAckMessage)}, {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, sizeof (struct GNUNET_STREAM_MessageHeader)}, {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, @@ -1228,6 +1315,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) if (NULL != socket->transmit_handle) { GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; } /* Clear existing message queue */ @@ -1242,12 +1330,14 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) if (NULL != socket->tunnel) { GNUNET_MESH_tunnel_destroy (socket->tunnel); + socket->tunnel = NULL; } /* Close mesh connection */ if (NULL != socket->mesh) { GNUNET_MESH_disconnect (socket->mesh); + socket->mesh = NULL; } GNUNET_free (socket); } @@ -1309,8 +1399,7 @@ tunnel_cleaner (void *cls, void *tunnel_ctx) { struct GNUNET_STREAM_Socket *socket = tunnel_ctx; - struct MessageQueue *head; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer %s has terminated connection abruptly\n", GNUNET_i2s (&socket->other_peer)); @@ -1376,3 +1465,80 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) GNUNET_free (lsocket); } + + +/** + * Tries to write the given data to the stream + * + * @param socket the socket representing a stream + * @param data the data buffer from where the data is written into the stream + * @param size the number of bytes to be written from the data buffer + * @param timeout the timeout period + * @param write_cont the function to call upon writing some bytes into the stream + * @param write_cont_cls the closure + * @return handle to cancel the operation + */ +struct GNUNET_STREAM_IOHandle * +GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, + const void *data, + size_t size, + struct GNUNET_TIME_Relative timeout, + GNUNET_STREAM_CompletionContinuation write_cont, + void *write_cont_cls) +{ + unsigned int num_needed_packets; + unsigned int packet; + struct GNUNET_STREAM_IOHandle *io_handle; + struct GNUNET_STREAM_DataMessage *data_msg; + size_t max_payload_size; + size_t packet_size; + const void *sweep; + + /* There is already a write request pending */ + if (NULL != socket->write_handle) return NULL; + if (!(STATE_ESTABLISHED == socket->state + || STATE_RECEIVE_CLOSE_WAIT == socket->state + || STATE_RECEIVE_CLOSED == socket->state)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Attempting to write on a closed/not-yet-established stream\n"); + return NULL; + } + + max_payload_size = + MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); + num_needed_packets = ceil (size / max_payload_size); + if (64 < num_needed_packets) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Given buffer cannot be accommodated in 64 packets\n"); + num_needed_packets = 64; + } + + io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); + sweep = data; + /* Divide the given area into packets for sending */ + for (packet=0; packet < num_needed_packets; packet++) + { + if ((packet + 1) * max_payload_size < size) + { + packet_size = MAX_PACKET_SIZE; + } + else + { + packet_size = size - packet * max_payload_size + + sizeof (struct GNUNET_STREAM_DataMessage); + } + io_handle->messages[packet] = GNUNET_malloc (packet_size); + io_handle->messages[packet]->header.header.size = htons (packet_size); + io_handle->messages[packet]->header.header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); + data_msg = io_handle->messages[packet]; + memcpy (&data_msg[1], + sweep, + packet_size - sizeof (struct GNUNET_STREAM_DataMessage)); + sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage); + } + + return io_handle; +} diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h index 9c7964e33..cc08dbe8e 100644 --- a/src/stream/stream_protocol.h +++ b/src/stream/stream_protocol.h @@ -42,8 +42,7 @@ GNUNET_NETWORK_STRUCT_BEGIN /** * The stream message header - * - * The message can be of Data, Acknowledgement or both + * All messages of STREAM should commonly have this as header */ struct GNUNET_STREAM_MessageHeader { @@ -134,6 +133,22 @@ struct GNUNET_STREAM_AckMessage uint32_t receive_window_remaining GNUNET_PACKED; }; + +struct GNUNET_STREAM_HelloAckMessage +{ + /** + * The stream message header + */ + struct GNUNET_STREAM_MessageHeader header; + + /** + * The selected sequence number. Following data tranmissions from the sender + * start with this sequence + */ + uint32_t sequence_number; + +}; + GNUNET_NETWORK_STRUCT_END -- 2.25.1