From 12f69de19d27c2f962e4c0fe8480591e0e0ac6cf Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sun, 29 Jul 2012 20:40:55 +0000 Subject: [PATCH] fixes --- src/stream/stream_api.c | 147 +++++++++++----------------------------- 1 file changed, 41 insertions(+), 106 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 49227ccec..4e5401c56 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -225,16 +225,6 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_MESH_TransmitHandle *transmit_handle; - /** - * The current act transmit handle (if a pending ack transmit request exists) - */ - struct GNUNET_MESH_TransmitHandle *ack_transmit_handle; - - /** - * Pointer to the current ack message using in ack_task - */ - struct GNUNET_STREAM_AckMessage *ack_msg; - /** * The current message associated with the transmit handle */ @@ -629,19 +619,21 @@ send_message_notify (void *cls, size_t size, void *buf) * @param message the message to be sent * @param finish_cb the callback to be called when the message is sent * @param finish_cb_cls the closure for the callback + * @param urgent set to GNUNET_YES to add the message to the beginning of the + * queue; GNUNET_NO to add at the tail */ static void queue_message (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_MessageHeader *message, SendFinishCallback finish_cb, - void *finish_cb_cls) + void *finish_cb_cls, + int urgent) { struct MessageQueue *queue_entity; GNUNET_assert ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); - LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Queueing message of type %d and size %d\n", GNUNET_i2s (&socket->other_peer), @@ -652,9 +644,20 @@ queue_message (struct GNUNET_STREAM_Socket *socket, queue_entity->message = message; queue_entity->finish_cb = finish_cb; queue_entity->finish_cb_cls = finish_cb_cls; - GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head, - socket->queue_tail, - queue_entity); + if (GNUNET_YES == urgent) + { + GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail, + queue_entity); + if (NULL != socket->transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + } + else + GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head, + socket->queue_tail, + queue_entity); if (NULL == socket->transmit_handle) { socket->retries = 0; @@ -691,38 +694,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, size = ntohs (message->header.size); msg_copy = GNUNET_malloc (size); memcpy (msg_copy, message, size); - queue_message (socket, msg_copy, finish_cb, finish_cb_cls); -} - - -/** - * Callback function for sending ack message - * - * @param cls closure the ACK message created in ack_task - * @param size number of bytes available in buffer - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -send_ack_notify (void *cls, size_t size, void *buf) -{ - struct GNUNET_STREAM_Socket *socket = cls; - - if (0 == size) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s called with size 0\n", __func__); - return 0; - } - GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); - - size = ntohs (socket->ack_msg->header.header.size); - memcpy (buf, socket->ack_msg, size); - - GNUNET_free (socket->ack_msg); - socket->ack_msg = NULL; - socket->ack_transmit_handle = NULL; - return size; + queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO); } @@ -785,16 +757,8 @@ ack_task (void *cls, ack_msg->base_sequence_number = htonl (socket->read_sequence_number); ack_msg->receive_window_remaining = htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); - socket->ack_msg = ack_msg; - /* Request MESH for sending ACK */ - socket->ack_transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - GNUNET_NO, /* Corking */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (ack_msg->header.header.size), - &send_ack_notify, - socket); + /* Queue up ACK for immediate sending */ + queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES); } @@ -834,7 +798,7 @@ close_msg_retransmission_task (void *cls, GNUNET_SCHEDULER_NO_TASK; return; } - queue_message (socket, msg, NULL, NULL); + queue_message (socket, msg, NULL, NULL, GNUNET_NO); shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, &close_msg_retransmission_task, @@ -1512,11 +1476,12 @@ control_retransmission_task (void *cls, break; case STATE_HELLO_WAIT: if (NULL == socket->lsocket) /* We are client */ - queue_message (socket, generate_hello (), NULL, NULL); + queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO); else queue_message (socket, (struct GNUNET_STREAM_MessageHeader *) - generate_hello_ack (socket, GNUNET_NO), NULL, NULL); + generate_hello_ack (socket, GNUNET_NO), NULL, NULL, + GNUNET_NO); socket->control_retransmission_task_id = GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, &control_retransmission_task, socket); @@ -1525,7 +1490,8 @@ control_retransmission_task (void *cls, if (NULL == socket->lsocket) queue_message (socket, (struct GNUNET_STREAM_MessageHeader *) - generate_hello_ack (socket, GNUNET_NO), NULL, NULL); + generate_hello_ack (socket, GNUNET_NO), NULL, NULL, + GNUNET_NO); else GNUNET_break (0); default: @@ -1584,10 +1550,8 @@ client_handle_hello_ack (void *cls, (unsigned int) socket->read_sequence_number); socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); reply = generate_hello_ack (socket, GNUNET_YES); - queue_message (socket, - &reply->header, - &set_state_established, - NULL); + queue_message (socket, &reply->header, &set_state_established, + NULL, GNUNET_NO); return GNUNET_OK; case STATE_ESTABLISHED: // call statistics (# ACKs ignored++) @@ -1663,7 +1627,7 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket, reply->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - queue_message (socket, reply, NULL, NULL); + queue_message (socket, reply, NULL, NULL, GNUNET_NO); break; default: @@ -1914,11 +1878,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, htons (sizeof (struct GNUNET_STREAM_MessageHeader)); receive_close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); - queue_message (socket, - receive_close_ack, - &set_state_closed, - NULL); - + queue_message (socket, receive_close_ack, &set_state_closed, + NULL, GNUNET_NO); /* FIXME: Handle the case where write handle is present; the write operation should be deemed as finised and the write continuation callback has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ @@ -2029,10 +1990,7 @@ handle_close (struct GNUNET_STREAM_Socket *socket, close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); - queue_message (socket, - close_ack, - &set_state_closed, - NULL); + queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO); if (socket->state == STATE_CLOSED) return GNUNET_OK; @@ -2177,7 +2135,8 @@ server_handle_hello (void *cls, { case STATE_INIT: reply = generate_hello_ack (socket, GNUNET_YES); - queue_message (socket, &reply->header, &set_state_hello_wait, NULL); + queue_message (socket, &reply->header, &set_state_hello_wait, NULL, + GNUNET_NO); GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == socket->control_retransmission_task_id); socket->control_retransmission_task_id = @@ -2753,10 +2712,7 @@ mesh_peer_connect_callback (void *cls, socket->state = STATE_INIT; /* Send HELLO message */ message = generate_hello (); - queue_message (socket, - message, - &set_state_hello_wait, - NULL); + queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO); GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == socket->control_retransmission_task_id); socket->control_retransmission_task_id = @@ -2873,13 +2829,6 @@ tunnel_cleaner (void *cls, GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); socket->transmit_handle = NULL; } - if (NULL != socket->ack_transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); - GNUNET_free (socket->ack_msg); - socket->ack_msg = NULL; - socket->ack_transmit_handle = NULL; - } /* Stop Tasks using socket->tunnel */ if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) { @@ -3096,10 +3045,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, "Existing read handle should be cancelled before shutting" " down reading\n"); msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); - queue_message (socket, - msg, - &set_state_receive_close_wait, - NULL); + queue_message (socket, msg, &set_state_receive_close_wait, NULL, + GNUNET_NO); break; case SHUT_WR: handle->operation = SHUT_WR; @@ -3108,10 +3055,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, "Existing write handle should be cancelled before shutting" " down writing\n"); msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); - queue_message (socket, - msg, - &set_state_transmit_close_wait, - NULL); + queue_message (socket, msg, &set_state_transmit_close_wait, NULL, + GNUNET_NO); break; case SHUT_RDWR: handle->operation = SHUT_RDWR; @@ -3124,10 +3069,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, "Existing read handle should be cancelled before shutting" " down reading\n"); msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); - queue_message (socket, - msg, - &set_state_close_wait, - NULL); + queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO); break; default: LOG (GNUNET_ERROR_TYPE_WARNING, @@ -3206,13 +3148,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); socket->transmit_handle = NULL; } - if (NULL != socket->ack_transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); - GNUNET_free (socket->ack_msg); - socket->ack_msg = NULL; - socket->ack_transmit_handle = NULL; - } /* Clear existing message queue */ while (NULL != (head = socket->queue_head)) { GNUNET_CONTAINER_DLL_remove (socket->queue_head, -- 2.25.1