*/
struct GNUNET_MESH_TransmitHandle *transmit_handle;
- /**
- * The current act transmit handle (if a pending ack transmit request exists)
- */
- struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
-
- /**
- * Pointer to the current ack message using in ack_task
- */
- struct GNUNET_STREAM_AckMessage *ack_msg;
-
/**
* The current message associated with the transmit handle
*/
* @param message the message to be sent
* @param finish_cb the callback to be called when the message is sent
* @param finish_cb_cls the closure for the callback
+ * @param urgent set to GNUNET_YES to add the message to the beginning of the
+ * queue; GNUNET_NO to add at the tail
*/
static void
queue_message (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_STREAM_MessageHeader *message,
SendFinishCallback finish_cb,
- void *finish_cb_cls)
+ void *finish_cb_cls,
+ int urgent)
{
struct MessageQueue *queue_entity;
GNUNET_assert
((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
&& (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Queueing message of type %d and size %d\n",
GNUNET_i2s (&socket->other_peer),
queue_entity->message = message;
queue_entity->finish_cb = finish_cb;
queue_entity->finish_cb_cls = finish_cb_cls;
- GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
- socket->queue_tail,
- queue_entity);
+ if (GNUNET_YES == urgent)
+ {
+ GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
+ queue_entity);
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ }
+ else
+ GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
+ socket->queue_tail,
+ queue_entity);
if (NULL == socket->transmit_handle)
{
socket->retries = 0;
size = ntohs (message->header.size);
msg_copy = GNUNET_malloc (size);
memcpy (msg_copy, message, size);
- queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
-}
-
-
-/**
- * Callback function for sending ack message
- *
- * @param cls closure the ACK message created in ack_task
- * @param size number of bytes available in buffer
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-send_ack_notify (void *cls, size_t size, void *buf)
-{
- struct GNUNET_STREAM_Socket *socket = cls;
-
- if (0 == size)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s called with size 0\n", __func__);
- return 0;
- }
- GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
-
- size = ntohs (socket->ack_msg->header.header.size);
- memcpy (buf, socket->ack_msg, size);
-
- GNUNET_free (socket->ack_msg);
- socket->ack_msg = NULL;
- socket->ack_transmit_handle = NULL;
- return size;
+ queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
}
ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
ack_msg->receive_window_remaining =
htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
- socket->ack_msg = ack_msg;
- /* Request MESH for sending ACK */
- socket->ack_transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- GNUNET_NO, /* Corking */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (ack_msg->header.header.size),
- &send_ack_notify,
- socket);
+ /* Queue up ACK for immediate sending */
+ queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
}
GNUNET_SCHEDULER_NO_TASK;
return;
}
- queue_message (socket, msg, NULL, NULL);
+ queue_message (socket, msg, NULL, NULL, GNUNET_NO);
shutdown_handle->close_msg_retransmission_task_id =
GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
&close_msg_retransmission_task,
break;
case STATE_HELLO_WAIT:
if (NULL == socket->lsocket) /* We are client */
- queue_message (socket, generate_hello (), NULL, NULL);
+ queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
else
queue_message (socket,
(struct GNUNET_STREAM_MessageHeader *)
- generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+ GNUNET_NO);
socket->control_retransmission_task_id =
GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
&control_retransmission_task, socket);
if (NULL == socket->lsocket)
queue_message (socket,
(struct GNUNET_STREAM_MessageHeader *)
- generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+ GNUNET_NO);
else
GNUNET_break (0);
default:
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
reply = generate_hello_ack (socket, GNUNET_YES);
- queue_message (socket,
- &reply->header,
- &set_state_established,
- NULL);
+ queue_message (socket, &reply->header, &set_state_established,
+ NULL, GNUNET_NO);
return GNUNET_OK;
case STATE_ESTABLISHED:
// call statistics (# ACKs ignored++)
reply->header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- queue_message (socket, reply, NULL, NULL);
+ queue_message (socket, reply, NULL, NULL, GNUNET_NO);
break;
default:
htons (sizeof (struct GNUNET_STREAM_MessageHeader));
receive_close_ack->header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
- queue_message (socket,
- receive_close_ack,
- &set_state_closed,
- NULL);
-
+ queue_message (socket, receive_close_ack, &set_state_closed,
+ NULL, GNUNET_NO);
/* FIXME: Handle the case where write handle is present; the write operation
should be deemed as finised and the write continuation callback
has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
- queue_message (socket,
- close_ack,
- &set_state_closed,
- NULL);
+ queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
if (socket->state == STATE_CLOSED)
return GNUNET_OK;
{
case STATE_INIT:
reply = generate_hello_ack (socket, GNUNET_YES);
- queue_message (socket, &reply->header, &set_state_hello_wait, NULL);
+ queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
+ GNUNET_NO);
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
socket->control_retransmission_task_id);
socket->control_retransmission_task_id =
socket->state = STATE_INIT;
/* Send HELLO message */
message = generate_hello ();
- queue_message (socket,
- message,
- &set_state_hello_wait,
- NULL);
+ queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
socket->control_retransmission_task_id);
socket->control_retransmission_task_id =
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
socket->transmit_handle = NULL;
}
- if (NULL != socket->ack_transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
- GNUNET_free (socket->ack_msg);
- socket->ack_msg = NULL;
- socket->ack_transmit_handle = NULL;
- }
/* Stop Tasks using socket->tunnel */
if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
{
"Existing read handle should be cancelled before shutting"
" down reading\n");
msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
- queue_message (socket,
- msg,
- &set_state_receive_close_wait,
- NULL);
+ queue_message (socket, msg, &set_state_receive_close_wait, NULL,
+ GNUNET_NO);
break;
case SHUT_WR:
handle->operation = SHUT_WR;
"Existing write handle should be cancelled before shutting"
" down writing\n");
msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
- queue_message (socket,
- msg,
- &set_state_transmit_close_wait,
- NULL);
+ queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
+ GNUNET_NO);
break;
case SHUT_RDWR:
handle->operation = SHUT_RDWR;
"Existing read handle should be cancelled before shutting"
" down reading\n");
msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
- queue_message (socket,
- msg,
- &set_state_close_wait,
- NULL);
+ queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
break;
default:
LOG (GNUNET_ERROR_TYPE_WARNING,
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
socket->transmit_handle = NULL;
}
- if (NULL != socket->ack_transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
- GNUNET_free (socket->ack_msg);
- socket->ack_msg = NULL;
- socket->ack_transmit_handle = NULL;
- }
/* Clear existing message queue */
while (NULL != (head = socket->queue_head)) {
GNUNET_CONTAINER_DLL_remove (socket->queue_head,