From: Sree Harsha Totakura Date: Thu, 8 Mar 2012 07:45:52 +0000 (+0000) Subject: Data message retransmissions X-Git-Tag: initial-import-from-subversion-38251~14359 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=3e03fc63c4d17feccd9ecc15acf79bd59b547a13;p=oweals%2Fgnunet.git Data message retransmissions --- diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index f3266cb13..d694d43bb 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -157,7 +157,6 @@ struct MessageQueue */ struct GNUNET_STREAM_Socket { - /** * The peer identity of the peer at the other end of the stream */ @@ -253,6 +252,11 @@ struct GNUNET_STREAM_Socket */ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; + /** + * Task identifier for retransmission task after timeout + */ + GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; + /** * The state of the protocol associated with this socket */ @@ -371,12 +375,27 @@ struct GNUNET_STREAM_IOWriteHandle */ struct GNUNET_STREAM_DataMessage *messages[64]; + /** + * The write continuation callback + */ + GNUNET_STREAM_CompletionContinuation write_cont; + + /** + * Write continuation closure + */ + void *write_cont_cls; + /** * The bitmap of this IOHandle; Corresponding bit for a message is set when * it has been acknowledged by the receiver */ GNUNET_STREAM_AckBitmap ack_bitmap; + /** + * Number of bytes in this write handle + */ + size_t size; + /** * Number of packets sent before waiting for an ack * @@ -406,7 +425,7 @@ struct GNUNET_STREAM_IOReadHandle /** * Default value in seconds for various timeouts */ -static unsigned int default_timeout = 300; +static unsigned int default_timeout = 10; /** @@ -494,10 +513,16 @@ queue_message (struct GNUNET_STREAM_Socket *socket, { struct MessageQueue *queue_entity; + GNUNET_assert + ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) + && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %d and size %d\n", + "%s: Queueing message of type %d and size %d\n", + GNUNET_i2s (&socket->our_id), ntohs (message->header.type), ntohs (message->header.size)); + GNUNET_assert (NULL != message); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->message = message; queue_entity->finish_cb = finish_cb; @@ -521,6 +546,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket, } +/** + * Copies a message and queues it for sending using the mesh connection of + * given socket + * + * @param socket the socket whose mesh connection is used + * @param message the message to be sent + * @param finish_cb the callback to be called when the message is sent + * @param finish_cb_cls the closure for the callback + */ +static void +copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_STREAM_MessageHeader *message, + SendFinishCallback finish_cb, + void *finish_cb_cls) +{ + struct GNUNET_STREAM_MessageHeader *msg_copy; + uint16_t size; + + size = ntohs (message->header.size); + msg_copy = GNUNET_malloc (size); + memcpy (msg_copy, message, size); + queue_message (socket, msg_copy, finish_cb, finish_cb_cls); +} + + /** * Callback function for sending ack message * @@ -547,6 +597,37 @@ send_ack_notify (void *cls, size_t size, void *buf) return size; } +/** + * Writes data using the given socket. The amount of data written is limited by + * the receive_window_size + * + * @param socket the socket to use + */ +static void +write_data (struct GNUNET_STREAM_Socket *socket); + +/** + * Task for retransmitting data messages if they aren't ACK before their ack + * deadline + * + * @param cls the socket + * @param tc the Task context + */ +static void +retransmission_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + + if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->our_id)); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + write_data (socket); +} + /** * Task for sending ACK message @@ -674,10 +755,17 @@ write_data (struct GNUNET_STREAM_Socket *socket) if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, packet)) { - queue_message (socket, - &io_handle->messages[packet]->header, - NULL, - NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send", + "queue\n", + GNUNET_i2s (&socket->our_id), + (unsigned int) + io_handle->messages[packet]->sequence_number); + + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); } } packet = ack_packet + 1; @@ -685,13 +773,25 @@ write_data (struct GNUNET_STREAM_Socket *socket) while ( (NULL != io_handle->messages[packet]) && (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) { - socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); - queue_message (socket, - &io_handle->messages[packet]->header, - &write_data_finish_cb, - io_handle); + socket->receive_window_available -= + ntohs (io_handle->messages[packet]->header.header.size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send", + "queue\n", + GNUNET_i2s (&socket->our_id), + (unsigned int) + io_handle->messages[packet]->sequence_number); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + &write_data_finish_cb, + io_handle); packet++; } + + GNUNET_SCHEDULER_add_delayed + (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), + &retransmission_timeout_task, + socket); } @@ -865,7 +965,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket, if ( relative_sequence_number > 64) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring received message with sequence number %d", + "%s: Ignoring received message with sequence number %u\n", + GNUNET_i2s (&socket->our_id), ntohl (msg->sequence_number)); return GNUNET_YES; } @@ -1032,33 +1133,44 @@ client_handle_hello_ack (void *cls, const struct GNUNET_STREAM_HelloAckMessage *ack_msg; struct GNUNET_STREAM_HelloAckMessage *reply; + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == + ntohs (message->type)); + ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n", GNUNET_i2s (&socket->our_id), GNUNET_i2s (sender)); - ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; + GNUNET_assert (socket->tunnel == tunnel); switch (socket->state) { case STATE_HELLO_WAIT: - socket->read_sequence_number = ntohl (ack_msg->sequence_number); - socket->receive_window_available = ntohl (ack_msg->receive_window_size); - /* Get the random sequence number */ - socket->write_sequence_number = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - reply->header.header.size = - htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); - reply->sequence_number = htonl (socket->write_sequence_number); - reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE); - queue_message (socket, - &reply->header, - &set_state_established, - NULL); - return GNUNET_OK; + socket->read_sequence_number = ntohl (ack_msg->sequence_number); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read sequence number %u\n", + GNUNET_i2s (&socket->our_id), + (unsigned int) socket->read_sequence_number); + socket->receive_window_available = ntohl (ack_msg->receive_window_size); + /* Get the random sequence number */ + socket->write_sequence_number = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Generated write sequence number %u\n", + GNUNET_i2s (&socket->our_id), + (unsigned int) socket->write_sequence_number); + reply = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + reply->header.header.size = + htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + reply->sequence_number = htonl (socket->write_sequence_number); + reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE); + queue_message (socket, + &reply->header, + &set_state_established, + NULL); + return GNUNET_OK; case STATE_ESTABLISHED: case STATE_RECEIVE_CLOSE_WAIT: // call statistics (# ACKs ignored++) @@ -1359,9 +1471,13 @@ server_handle_hello (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; struct GNUNET_STREAM_HelloAckMessage *reply; + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == + ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received HELLO from %s\n", GNUNET_i2s(sender)); + "%s: Received HELLO from %s\n", + GNUNET_i2s (&socket->our_id), + GNUNET_i2s(sender)); /* Catch possible protocol breaks */ GNUNET_break_op (0 == memcmp (&socket->other_peer, @@ -1373,6 +1489,10 @@ server_handle_hello (void *cls, /* Get the random sequence number */ socket->write_sequence_number = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Generated write sequence number %u\n", + GNUNET_i2s (&socket->our_id), + (unsigned int) socket->write_sequence_number); reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); reply->header.header.size = @@ -1419,11 +1539,17 @@ server_handle_hello_ack (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; const struct GNUNET_STREAM_HelloAckMessage *ack_message; - ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == + ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); + ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; if (STATE_HELLO_WAIT == socket->state) { socket->read_sequence_number = ntohl (ack_message->sequence_number); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read sequence number %u\n", + GNUNET_i2s (&socket->our_id), + (unsigned int) socket->read_sequence_number); socket->receive_window_available = ntohl (ack_message->receive_window_size); /* Attain ESTABLISHED state */ @@ -1644,20 +1770,76 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, const struct GNUNET_STREAM_AckMessage *ack, const struct GNUNET_ATS_Information*atsi) { + unsigned int packet; + int need_retransmission; + switch (socket->state) { case (STATE_ESTABLISHED): if (NULL == socket->write_handle) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA ACK when write_handle is NULL\n"); + "Received DATA_ACK when write_handle is NULL\n"); return GNUNET_OK; } - + + if (!((socket->write_sequence_number + - htonl (ack->base_sequence_number)) < 64)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK with unexpected base sequence", + "number\n", + GNUNET_i2s (&socket->our_id)); + return GNUNET_OK; + } + /* FIXME: include the case when write_handle is cancelled - ignore the + acks */ + + /* Cancel the retransmission task */ + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); socket->receive_window_available = ntohl (ack->receive_window_remaining); - write_data (socket); + + /* Check if we have received all acknowledgements */ + need_retransmission = GNUNET_NO; + for (packet=0; packet < 64; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (GNUNET_YES != ackbitmap_is_bit_set + (&socket->write_handle->ack_bitmap,packet)) + { + need_retransmission = GNUNET_YES; + break; + } + } + if (GNUNET_YES == need_retransmission) + { + write_data (socket); + } + else /* We have to call the write continuation callback now */ + { + + /* Free the packets */ + for (packet=0; packet < 64; packet++) + { + GNUNET_free_non_null (socket->write_handle->messages[packet]); + } + if (NULL != socket->write_handle->write_cont) + socket->write_handle->write_cont + (socket->write_handle->write_cont_cls, + socket->status, + socket->write_handle->size); + /* We are done with the write handle - Freeing it */ + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; + } break; default: break; @@ -1806,7 +1988,9 @@ mesh_peer_connect_callback (void *cls, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Target peer %s connected\n", GNUNET_i2s (peer)); + "%s: Target peer %s connected\n", + GNUNET_i2s (&socket->our_id), + GNUNET_i2s (peer)); /* Set state to INIT */ socket->state = STATE_INIT; @@ -1987,7 +2171,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, 10, /* QUEUE size as parameter? */ socket, /* cls */ NULL, /* No inbound tunnel handler */ - &tunnel_cleaner, + &tunnel_cleaner, /* FIXME: not required? */ client_message_handlers, &app_port); /* We don't get inbound tunnels */ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ @@ -2164,6 +2348,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, uint32_t payload_size; struct GNUNET_STREAM_DataMessage *data_msg; const void *sweep; + struct GNUNET_TIME_Relative ack_deadline; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s\n", __func__); @@ -2188,7 +2373,13 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); + io_handle->write_cont = write_cont; + io_handle->write_cont_cls = write_cont_cls; + io_handle->size = size; sweep = data; + /* FIXME: Remove the fixed delay for ack deadline; Set it to the value + determined from RTT */ + ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); /* Divide the given buffer into packets for sending */ for (packet=0; packet < num_needed_packets; packet++) { @@ -2213,9 +2404,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, /* FIXME: Remove the fixed delay for ack deadline; Set it to the value determined from RTT */ - io_handle->messages[packet]->ack_deadline = - GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5)); + io_handle->messages[packet]->ack_deadline = + GNUNET_TIME_relative_hton (ack_deadline); data_msg = io_handle->messages[packet]; /* Copy data from given buffer to the packet */ memcpy (&data_msg[1], @@ -2227,10 +2417,10 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, socket->write_handle = io_handle; write_data (socket); - return io_handle; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); + + return io_handle; } diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index aaecc2394..9cf608164 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -380,7 +380,7 @@ peergroup_ready (void *cls, const char *emsg) return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer group is now read \n"); + "Peer group is now ready\n"); GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));