From 7ff8930703ab120dcf5ac42a1d501c92423c9482 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sun, 5 Feb 2012 10:44:42 +0000 Subject: [PATCH] -added message sequencing --- src/stream/stream_api.c | 279 +++++++++++++++++++++++++++------------- 1 file changed, 188 insertions(+), 91 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index a7085c0af..d1b4dc299 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -85,6 +85,42 @@ enum State }; +/** + * Functions of this type are called when a message is written + * + * @param socket the socket the written message was bound to + */ +typedef void (*SendFinishCallback) (void *cls, + struct GNUNET_STREAM_Socket *socket); + + +/** + * The send message queue + */ +struct MessageQueue +{ + /** + * The message + */ + struct GNUNET_STREAM_MessageHeader *message; + + /** + * Callback to be called when the message is sent + */ + SendFinishCallback finish_cb; + + /** + * The closure for finish_cb + */ + void *finish_cb_cls; + + /** + * The next message in queue. Should be NULL in the last message + */ + struct MessageQueue *next; +}; + + /** * The STREAM Socket Handler */ @@ -143,7 +179,17 @@ struct GNUNET_STREAM_Socket /** * The current message associated with the transmit handle */ - struct GNUNET_MessageHeader *message; + struct MessageQueue *queue; + + /** + * The queue tail, should always point to the last message in queue + */ + struct MessageQueue *queue_tail; + + /** + * The number of previous timeouts + */ + unsigned int retries; }; @@ -175,6 +221,7 @@ struct GNUNET_STREAM_ListenSocket }; + /** * Default value in seconds for various timeouts */ @@ -182,9 +229,9 @@ static unsigned int default_timeout = 300; /** - * Callback function from send_message + * Callback function for sending hello message * - * @param cls closure the socket on which the send message was called + * @param cls closure the socket * @param size number of bytes available in buf * @param buf where the callee should write the message * @return number of bytes written to buf @@ -193,64 +240,108 @@ static size_t send_message_notify (void *cls, size_t size, void *buf) { struct GNUNET_STREAM_Socket *socket = cls; + struct MessageQueue *head; size_t ret; + head = socket->queue; socket->transmit_handle = NULL; /* Remove the transmit handle */ if (0 == size) /* request timed out */ { - // statistics ("message timeout") - - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Message not sent as tunnel was closed \n"); - ret = 0; + socket->retries++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message sending timed out. Retry %d \n", + socket->retries); + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + return 0; + } + + ret = ntohs (head->message->header.size); + GNUNET_assert (size >= ret); + memcpy (buf, head->message, ret); + if (NULL != head->finish_cb) + { + head->finish_cb (socket, head->finish_cb_cls); } - else /* Size is more or equal to what was requested */ + + socket->queue = head->next; /* Will be NULL if the queue is empty */ + GNUNET_free (head->message); + GNUNET_free (head); + head = socket->queue; + if (NULL != head) /* more pending messages to send */ { - ret = ntohs (socket->message->size); - GNUNET_assert (size >= ret); - memcpy (buf, socket->message, ret); + socket->retries = 0; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); } - GNUNET_free (socket->message); /* Free the message memory */ - socket->message = NULL; return ret; } /** - * Sends a message using the mesh connection of a socket + * Queues a message for sending using the mesh connection of a socket * * @param socket the socket whose mesh connection is used * @param message the message to be sent + * @param finish_cb the callback to be called when the message is sent + * @param finish_cb_cls the closure for the callback */ static void -send_message (struct GNUNET_STREAM_Socket *socket, - struct GNUNET_MessageHeader *message) +queue_message (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_STREAM_MessageHeader *message, + SendFinishCallback finish_cb, + void *finish_cb_cls) { - socket->message = message; - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (message->size), - &send_message_notify, - socket); -} + struct MessageQueue *msg_info; + + msg_info = GNUNET_malloc (sizeof (struct MessageQueue)); + msg_info->message = message; + msg_info->finish_cb = finish_cb; + msg_info->finish_cb_cls = finish_cb_cls; + msg_info->next = NULL; -/** - * Makes state transition dependending on the given state - * - * @param socket the socket whose state has to be transitioned - */ -static void -make_state_transition (struct GNUNET_STREAM_Socket *socket) -{ + if (NULL == socket->queue) + { + socket->queue = msg_info; + socket->queue_tail = msg_info; + socket->retries = 0; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (message->header.size), + &send_message_notify, + socket); + } + else /* There is a pending message in queue */ + { + socket->queue_tail->next = msg_info; /* Add to tail */ + socket->queue_tail = msg_info; + } } + + /** * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA * @@ -290,29 +381,33 @@ client_handle_data (void *cls, return GNUNET_OK; } + /** - * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO + * Callback to set state to ESTABLISHED * - * @param cls the socket (set from GNUNET_MESH_connect) - * @param tunnel connection to the other end - * @param tunnel_ctx this is NULL - * @param sender who sent the message - * @param message the actual message - * @param atsi performance data for the connection - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) + * @param cls the closure from queue_message + * @param socket the socket to requiring state change */ -static int -client_handle_hello (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - void **tunnel_ctx, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) +static void +set_state_established (void *cls, + struct GNUNET_STREAM_Socket *socket) { - struct GNUNET_STREAM_Socket *socket = cls; + socket->state = STATE_ESTABLISHED; +} - return GNUNET_OK; + +/** + * Callback to set state to HELLO_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket to requiring state change + */ +static void +set_state_hello_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + GNUNET_assert (STATE_INIT == socket->state); + socket->state = STATE_HELLO_WAIT; } @@ -337,6 +432,22 @@ client_handle_hello_ack (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; + struct GNUNET_STREAM_MessageHeader *reply; + + GNUNET_assert (socket->tunnel == tunnel); + if (STATE_HELLO_WAIT == socket->state) + { + reply = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.size = + htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + queue_message (socket, + reply, + &set_state_established, + NULL); + } return GNUNET_OK; } @@ -872,8 +983,6 @@ static struct GNUNET_MESH_MessageHandler client_message_handlers[] = { {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0}, {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) }, - {&client_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, - sizeof (struct GNUNET_STREAM_MessageHeader)}, {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, sizeof (struct GNUNET_STREAM_MessageHeader)}, {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, @@ -936,6 +1045,7 @@ mesh_peer_connect_callback (void *cls, const struct GNUNET_ATS_Information * atsi) { struct GNUNET_STREAM_Socket *socket = cls; + struct GNUNET_STREAM_MessageHeader *message; if (0 != memcmp (&socket->other_peer, peer, @@ -953,13 +1063,19 @@ mesh_peer_connect_callback (void *cls, /* Set state to INIT */ socket->state = STATE_INIT; - /* Try to achieve ESTABLISHED state */ - make_state_transition (socket); + /* Send HELLO message */ + message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO); + message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + queue_message (socket, + message, + &set_state_hello_wait, + NULL); /* Call open callback */ if (NULL == socket->open_cls) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM_open callback is NULL\n"); } if (NULL != socket->open_cb) @@ -982,18 +1098,6 @@ mesh_peer_disconnect_callback (void *cls, } -/** - * Function to find the mapped socket of a tunnel - * - * @param tunnel the tunnel whose associated socket has to be retrieved - * @return the socket corresponding to the tunnel - */ -static struct GNUNET_STREAM_Socket * -find_socket (const struct GNUNET_MESH_Tunnel *tunnel) -{ - /* Search tunnel in a list or hashtable and retrieve the socket */ -} - /*****************/ /* API functions */ /*****************/ @@ -1084,11 +1188,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) { GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); } - /* Clear existing message queue message */ - if (NULL != socket->message) - { - GNUNET_free (socket->message); - } + /* FIXME: Clear message queue */ /* Close associated tunnel */ if (NULL != socket->tunnel) { @@ -1126,21 +1226,17 @@ new_tunnel_notify (void *cls, socket->tunnel = tunnel; socket->session_id = 0; /* FIXME */ socket->other_peer = *initiator; - socket->state = STATE_LISTEN; + socket->state = STATE_INIT; if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, socket, &socket->other_peer)) { socket->state = STATE_CLOSED; - make_state_transition (socket); + /* FIXME: Send CLOSE message and then free */ GNUNET_free (socket); GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */ } - else - { - make_state_transition (socket); - } return socket; } @@ -1164,8 +1260,8 @@ tunnel_cleaner (void *cls, { struct GNUNET_STREAM_ListenSocket *lsocket = cls; struct GNUNET_STREAM_Socket *socket = tunnel_ctx; + struct MessageQueue *head; - socket = find_socket (tunnel); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer %s has terminated connection abruptly\n", GNUNET_i2s (&socket->other_peer)); @@ -1178,12 +1274,13 @@ tunnel_cleaner (void *cls, socket->transmit_handle = NULL; } - /* Clear existing message queue message */ - if (NULL != socket->message) - { - GNUNET_free (socket->message); - socket->message = NULL; - } + /* Clear existing message queue */ + while (NULL != socket->queue) { + head = socket->queue; + socket->queue = head->next; + GNUNET_free (head->message); + GNUNET_free (head); + } } @@ -1208,7 +1305,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_MESH_ApplicationType app_types[2]; app_types[0] = app_port; - app_types[1] = NULL; + app_types[1] = 0; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); lsocket->port = app_port; lsocket->listen_cb = listen_cb; -- 2.25.1