From 804486fd4779aee6b3b92ad11c0261c663b27fe4 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Tue, 10 Apr 2012 20:56:54 +0000 Subject: [PATCH] -retransmission of close messages --- src/stream/stream_api.c | 135 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 126 insertions(+), 9 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 8fd22e8cb..9ae34752c 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -444,11 +444,6 @@ struct GNUNET_STREAM_ShutdownHandle */ struct GNUNET_STREAM_Socket *socket; - /** - * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR - */ - int operation; - /** * Shutdown completion callback */ @@ -458,6 +453,16 @@ struct GNUNET_STREAM_ShutdownHandle * Closure for completion callback */ void *completion_cls; + + /** + * Close message retransmission task id + */ + GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id; + + /** + * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR + */ + int operation; }; @@ -718,6 +723,50 @@ ack_task (void *cls, } +/** + * Retransmission task for shutdown messages + * + * @param cls the shutdown handle + * @param tc the Task Context + */ +static void +close_msg_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls; + struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_STREAM_Socket *socket; + + GNUNET_assert (NULL != shutdown_handle); + socket = shutdown_handle->socket; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (shutdown_handle->operation) + { + case SHUT_RDWR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + break; + case SHUT_RD: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + break; + case SHUT_WR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + break; + default: + GNUNET_free (msg); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + return; + } + queue_message (socket, msg, NULL, NULL); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + shutdown_handle); +} + + /** * Function to modify a bit in GNUNET_STREAM_AckBitmap * @@ -1263,6 +1312,43 @@ set_state_close_wait (void *cls, } +/** + * Callback to set state to RECEIVE_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_receive_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Attaing RECEIVE_CLOSE_WAIT state\n", + socket->our_id); + socket->state = STATE_RECEIVE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to TRANSMIT_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_transmit_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Attaing TRANSMIT_CLOSE_WAIT state\n", + socket->our_id); + socket->state = STATE_TRANSMIT_CLOSE_WAIT; +} + + /** * Callback to set state to CLOSED * @@ -1640,7 +1726,7 @@ handle_close_ack (struct GNUNET_STREAM_Socket *socket, struct GNUNET_MESH_Tunnel *tunnel, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_STREAM_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + const struct GNUNET_ATS_Information *atsi) { struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; @@ -1648,7 +1734,6 @@ handle_close_ack (struct GNUNET_STREAM_Socket *socket, switch (socket->state) { case STATE_CLOSE_WAIT: - socket->state = STATE_CLOSED; if ( (NULL == shutdown_handle) || (SHUT_RDWR != shutdown_handle->operation) ) { @@ -1658,10 +1743,21 @@ handle_close_ack (struct GNUNET_STREAM_Socket *socket, socket->our_id); return GNUNET_OK; } + + if (GNUNET_SCHEDULER_NO_TASK + != shutdown_handle->close_msg_retransmission_task_id) + { + GNUNET_SCHEDULER_cancel + (shutdown_handle->close_msg_retransmission_task_id); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%x: Received CLOSE_ACK from %x\n", socket->our_id, socket->other_peer); + socket->state = STATE_CLOSED; if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ shutdown_handle->completion_cb(shutdown_handle->completion_cls, SHUT_RDWR); @@ -1695,7 +1791,7 @@ client_handle_close_ack (void *cls, void **tunnel_ctx, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + const struct GNUNET_ATS_Information *atsi) { struct GNUNET_STREAM_Socket *socket = cls; @@ -2571,10 +2667,23 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Existing read handle should be cancelled before shutting" " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + queue_message (socket, + msg, + &set_state_receive_close_wait, + NULL); break; case SHUT_WR: handle->operation = SHUT_WR; - + if (NULL != socket->write_handle) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + queue_message (socket, + msg, + &set_state_transmit_close_wait, + NULL); break; case SHUT_RDWR: handle->operation = SHUT_RDWR; @@ -2596,9 +2705,14 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "GNUNET_STREAM_shutdown called with invalid value for " "parameter operation -- Ignoring\n"); + GNUNET_free (msg); GNUNET_free (handle); return NULL; } + handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + handle); return handle; } @@ -2611,6 +2725,9 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, void GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle) { + if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id) + GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id); + GNUNET_free (handle); return; } -- 2.25.1