From 3d1275946f7264fbd3baecfecddb3fd2e3a4fe57 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Thu, 28 Jun 2012 19:42:01 +0000 Subject: [PATCH] -control retransmission for HELLO and HELLO_ACK --- src/include/gnunet_stream_lib.h | 3 +- src/stream/stream_api.c | 221 ++++++++++++++++++++------------ 2 files changed, 141 insertions(+), 83 deletions(-) diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index fd44ccfa0..a134470c7 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -140,7 +140,8 @@ enum GNUNET_STREAM_Option * @param target the target peer to which the stream has to be opened * @param app_port the application port number which uniquely identifies this * stream - * @param open_cb this function will be called after stream has be established + * @param open_cb this function will be called after stream has be established; + * cannot be NULL * @param open_cb_cls the closure for open_cb * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END * @return if successful it returns the stream socket; NULL if stream cannot be diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index df0710e80..168929b01 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -272,7 +272,12 @@ struct GNUNET_STREAM_Socket /** * Task identifier for retransmission task after timeout */ - GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; + GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id; + + /** + * Task identifier for retransmission of control messages + */ + GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id; /** * The task for sending timely Acks @@ -576,7 +581,6 @@ send_message_notify (void *cls, size_t size, void *buf) socket); return 0; } - ret = ntohs (head->message->header.size); GNUNET_assert (size >= ret); memcpy (buf, head->message, ret); @@ -731,17 +735,16 @@ write_data (struct GNUNET_STREAM_Socket *socket); * @param tc the Task context */ static void -retransmission_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +data_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) return; - LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer)); - socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; write_data (socket); } @@ -925,11 +928,11 @@ write_data (struct GNUNET_STREAM_Socket *socket) NULL); packet++; } - if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) - socket->retransmission_timeout_task_id = + if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id) + socket->data_retransmission_task_id = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 8), - &retransmission_timeout_task, + &data_retransmission_task, socket); } @@ -1292,7 +1295,7 @@ client_handle_data (void *cls, /** * Callback to set state to ESTABLISHED * - * @param cls the closure from queue_message FIXME: document + * @param cls the closure NULL; * @param socket the socket to requiring state change */ static void @@ -1305,6 +1308,10 @@ set_state_established (void *cls, socket->write_offset = 0; socket->read_offset = 0; socket->state = STATE_ESTABLISHED; + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != + socket->control_retransmission_task_id); + GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id); + socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; if (NULL != socket->lsocket) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1321,7 +1328,7 @@ set_state_established (void *cls, GNUNET_free (socket); } } - else if (NULL != socket->open_cb) + else socket->open_cb (socket->open_cls, socket); } @@ -1337,7 +1344,7 @@ set_state_hello_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_assert (STATE_INIT == socket->state); - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Attaining HELLO_WAIT state\n", GNUNET_i2s (&socket->other_peer)); socket->state = STATE_HELLO_WAIT; @@ -1415,41 +1422,102 @@ set_state_closed (void *cls, } +/** + * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO + * + * @return the generate hello message + */ +static struct GNUNET_STREAM_MessageHeader * +generate_hello (void) +{ + struct GNUNET_STREAM_MessageHeader *msg; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + return msg; +} + + /** * Returns a new HelloAckMessage. Also sets the write sequence number for the * socket * * @param socket the socket for which this HelloAckMessage has to be generated + * @param generate_seq GNUNET_YES to generate the write sequence number, + * GNUNET_NO to use the existing sequence number * @return the HelloAckMessage */ static struct GNUNET_STREAM_HelloAckMessage * -generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket) +generate_hello_ack (struct GNUNET_STREAM_Socket *socket, + int generate_seq) { struct GNUNET_STREAM_HelloAckMessage *msg; - /* Get the random sequence number */ - if (GNUNET_YES == socket->testing_active) - socket->write_sequence_number = - socket->testing_set_write_sequence_number_value; - else - socket->write_sequence_number = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s: write sequence number %u\n", - GNUNET_i2s (&socket->other_peer), - (unsigned int) socket->write_sequence_number); - + if (GNUNET_YES == generate_seq) + { + if (GNUNET_YES == socket->testing_active) + socket->write_sequence_number = + socket->testing_set_write_sequence_number_value; + else + socket->write_sequence_number = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: write sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->write_sequence_number); + } msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); msg->header.header.size = htons (sizeof (struct GNUNET_STREAM_HelloAckMessage)); msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); msg->sequence_number = htonl (socket->write_sequence_number); msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE); - return msg; } +/** + * Task for retransmitting control messages if they aren't ACK'ed before a + * deadline + * + * @param cls the socket + * @param tc the Task context + */ +static void +control_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + + if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) + return; + socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; + switch (socket->status) + { + case STATE_INIT: + GNUNET_break (0); + break; + case STATE_LISTEN: + GNUNET_break (0); + break; + case STATE_HELLO_WAIT: + if (NULL == socket->lsocket) /* We are client */ + queue_message (socket, generate_hello (), NULL, NULL); + else + queue_message (socket, + (struct GNUNET_STREAM_MessageHeader *) + generate_hello_ack (socket, GNUNET_NO), NULL, NULL); + break; + default: + GNUNET_break (0); + } + socket->control_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &control_retransmission_task, socket); +} + + /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK * @@ -1499,11 +1567,11 @@ client_handle_hello_ack (void *cls, GNUNET_i2s (&socket->other_peer), (unsigned int) socket->read_sequence_number); socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); - reply = generate_hello_ack_msg (socket); + reply = generate_hello_ack (socket, GNUNET_YES); queue_message (socket, &reply->header, - &set_state_established, - NULL); + &set_state_established, + NULL); return GNUNET_OK; case STATE_ESTABLISHED: case STATE_RECEIVE_CLOSE_WAIT: @@ -2087,31 +2155,34 @@ server_handle_hello (void *cls, return GNUNET_YES; } - GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == - ntohs (message->type)); + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); - if (STATE_INIT == socket->state) + switch (socket->status) { - reply = generate_hello_ack_msg (socket); + case STATE_INIT: + reply = generate_hello_ack (socket, GNUNET_YES); queue_message (socket, &reply->header, &set_state_hello_wait, NULL); - } - else - { + break; + default: LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Client sent HELLO when in state %d\n", GNUNET_i2s (&socket->other_peer), socket->state); /* FIXME: Send RESET? */ - } + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == + socket->control_retransmission_task_id); + socket->control_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &control_retransmission_task, socket); return GNUNET_OK; } @@ -2427,8 +2498,6 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, GNUNET_i2s (&socket->other_peer)); return GNUNET_OK; } - /* FIXME: increment in the base sequence number is breaking current flow - */ if (!((socket->write_sequence_number - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) { @@ -2450,10 +2519,10 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, GNUNET_i2s (&socket->other_peer)); /* Cancel the retransmission task */ - if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id) { - GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); - socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id); + socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; } for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) @@ -2665,30 +2734,23 @@ mesh_peer_connect_callback (void *cls, GNUNET_i2s(peer)); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Target peer %s connected\n", GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); - /* Set state to INIT */ socket->state = STATE_INIT; - /* Send HELLO message */ - message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO); - message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + message = generate_hello (); queue_message (socket, message, &set_state_hello_wait, NULL); - - /* Call open callback */ - if (NULL == socket->open_cb) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "STREAM_open callback is NULL\n"); - } + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == + socket->control_retransmission_task_id); + socket->control_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &control_retransmission_task, socket); } @@ -2752,15 +2814,12 @@ new_tunnel_notify (void *cls, socket->retransmit_timeout = lsocket->retransmit_timeout; socket->testing_active = lsocket->testing_active; socket->testing_set_write_sequence_number_value = - lsocket->testing_set_write_sequence_number_value; - + lsocket->testing_set_write_sequence_number_value; LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Peer %s initiated tunnel to us\n", GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); - /* FIXME: Copy MESH handle from lsocket to socket */ - return socket; } @@ -2814,10 +2873,10 @@ tunnel_cleaner (void *cls, GNUNET_SCHEDULER_cancel (socket->ack_task_id); socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; } - if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id) { - GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); - socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id); + socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; } /* FIXME: Cancel all other tasks using socket->tunnel */ socket->tunnel = NULL; @@ -2905,7 +2964,8 @@ lock_status_change_cb (void *cls, const char *domain, uint32_t lock, * @param target the target peer to which the stream has to be opened * @param app_port the application port number which uniquely identifies this * stream - * @param open_cb this function will be called after stream has be established + * @param open_cb this function will be called after stream has be established; + * cannot be NULL * @param open_cb_cls the closure for open_cb * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END * @return if successful it returns the stream socket; NULL if stream cannot be @@ -2922,17 +2982,17 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_STREAM_Socket *socket; enum GNUNET_STREAM_Option option; GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; - va_list vargs; /* Variable arguments */ + va_list vargs; LOG (GNUNET_ERROR_TYPE_DEBUG, "%s\n", __func__); + GNUNET_assert (NULL != open_cb); socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); socket->other_peer = *target; socket->open_cb = open_cb; socket->open_cls = open_cb_cls; /* Set defaults */ - socket->retransmit_timeout = - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); + socket->retransmit_timeout = TIME_REL_SECS (default_timeout); socket->testing_active = GNUNET_NO; va_start (vargs, open_cb_cls); /* Parse variable args */ do { @@ -2972,10 +3032,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_free (socket); return NULL; } - /* Now create the mesh tunnel to target */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Creating MESH Tunnel\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n"); socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, NULL, /* Tunnel context */ &mesh_peer_connect_callback, @@ -2984,9 +3042,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_assert (NULL != socket->tunnel); GNUNET_MESH_peer_request_connect_add (socket->tunnel, &socket->other_peer); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); return socket; } @@ -3088,6 +3144,7 @@ GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle) { if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id) GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id); + handle->socket->shutdown_handle = NULL; GNUNET_free (handle); } @@ -3114,22 +3171,24 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) GNUNET_STREAM_io_write_cancel (socket->write_handle); //socket->write_handle = NULL; } - if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { /* socket closed with read task pending!? */ GNUNET_break (0); GNUNET_SCHEDULER_cancel (socket->read_task_id); socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; - } - + } /* Terminate the ack'ing tasks if they are still present */ if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (socket->ack_task_id); socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; } - + /* Terminate the control retransmission tasks */ + if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id) + { + GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id); + } /* Clear Transmit handles */ if (NULL != socket->transmit_handle) { @@ -3143,7 +3202,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) socket->ack_msg = NULL; socket->ack_transmit_handle = NULL; } - /* Clear existing message queue */ while (NULL != (head = socket->queue_head)) { GNUNET_CONTAINER_DLL_remove (socket->queue_head, @@ -3213,8 +3271,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, } lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */ /* Set defaults */ - lsocket->retransmit_timeout = - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); + lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout); lsocket->testing_active = GNUNET_NO; lsocket->listen_ok_cb = NULL; listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */ @@ -3491,10 +3548,10 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) GNUNET_assert (NULL != socket->write_handle); GNUNET_assert (socket->write_handle == ioh); - if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id) { - GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); - socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id); + socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; } for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) -- 2.25.1