From 26e1543206629acaaf9ce135c91915c4e1431b46 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Wed, 22 Feb 2012 10:10:18 +0000 Subject: [PATCH] added ack sending --- src/stream/stream_api.c | 112 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 3 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index e412b679d..f7ab23145 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -160,6 +160,26 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_TIME_Relative retransmit_timeout; + /** + * The Acknowledgement Bitmap + */ + GNUNET_STREAM_AckBitmap ack_bitmap; + + /** + * Time when the Acknowledgement was queued + */ + struct GNUNET_TIME_Absolute ack_time_registered; + + /** + * Queued Acknowledgement deadline + */ + struct GNUNET_TIME_Relative ack_time_deadline; + + /** + * The task for sending timely Acks + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task_id; + /** * The mesh handle */ @@ -243,7 +263,7 @@ struct GNUNET_STREAM_Socket uint32_t read_sequence_number; /** - * receiver's available buffer + * receiver's available buffer after the last acknowledged packet */ uint32_t receive_window_available; }; @@ -421,6 +441,76 @@ queue_message (struct GNUNET_STREAM_Socket *socket, } +/** + * Callback function for sending ack message + * + * @param cls closure the ACK message created in ack_task + * @param size number of bytes available in buffer + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +send_ack_notify (void *cls, size_t size, void *buf) +{ + struct GNUNET_STREAM_AckMessage *ack_msg = cls; + + if (0 == size) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s called with size 0\n", __func__); + return 0; + } + GNUNET_assert (ack_msg->header.header.size <= size); + + size = ack_msg->header.header.size; + memcpy (buf, ack_msg, size); + return size; +} + + +/** + * Task for sending ACK message + * + * @param cls the socket + * @param tc the Task context + */ +static void +ack_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + struct GNUNET_STREAM_AckMessage *ack_msg; + + if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) + { + return; + } + + socket->ack_task_id = 0; + + /* Create the ACK Message */ + ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); + ack_msg->header.header.size = htons (sizeof (struct + GNUNET_STREAM_AckMessage)); + ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); + ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap); + ack_msg->base_sequence_number = htonl (socket->write_sequence_number); + ack_msg->receive_window_remaining = htonl (socket->receive_window_available); + + /* Request MESH for sending ACK */ + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (ack_msg->header.header.size), + &send_ack_notify, + ack_msg); + + +} + + /** * Function to modify a bit in GNUNET_STREAM_AckBitmap * @@ -577,7 +667,23 @@ handle_data (struct GNUNET_STREAM_Socket *socket, * MAX_PACKET_SIZE, payload, size); - /* FIXME: We have to send GNUNET_STREAM_AckMessage */ + + /* Modify the ACK bitmap */ + ackbitmap_modify_bit (&socket->bitmap, + ntohl (msg->sequence_number) - + socket->read_sequence_number, + GNUNET_YES); + + /* Start ACK sending task if one is not already present */ + if (0 == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + break; default: @@ -1740,8 +1846,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, unsigned int num_needed_packets; unsigned int packet; struct GNUNET_STREAM_IOHandle *io_handle; - struct GNUNET_STREAM_DataMessage *data_msg; size_t packet_size; + struct GNUNET_STREAM_DataMessage *data_msg; const void *sweep; /* There is already a write request pending */ -- 2.25.1