/**
* Task identifier for retransmission task after timeout
*/
- GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+ GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
+
+ /**
+ * Task identifier for retransmission of control messages
+ */
+ GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
/**
* The task for sending timely Acks
socket);
return 0;
}
-
ret = ntohs (head->message->header.size);
GNUNET_assert (size >= ret);
memcpy (buf, head->message, ret);
* @param tc the Task context
*/
static void
-retransmission_timeout_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+data_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
return;
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
- socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
write_data (socket);
}
NULL);
packet++;
}
- if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
- socket->retransmission_timeout_task_id =
+ if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+ socket->data_retransmission_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 8),
- &retransmission_timeout_task,
+ &data_retransmission_task,
socket);
}
/**
* Callback to set state to ESTABLISHED
*
- * @param cls the closure from queue_message FIXME: document
+ * @param cls the closure NULL;
* @param socket the socket to requiring state change
*/
static void
socket->write_offset = 0;
socket->read_offset = 0;
socket->state = STATE_ESTABLISHED;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+ socket->control_retransmission_task_id);
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
if (NULL != socket->lsocket)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_free (socket);
}
}
- else if (NULL != socket->open_cb)
+ else
socket->open_cb (socket->open_cls, socket);
}
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_assert (STATE_INIT == socket->state);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Attaining HELLO_WAIT state\n",
GNUNET_i2s (&socket->other_peer));
socket->state = STATE_HELLO_WAIT;
}
+/**
+ * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @return the generate hello message
+ */
+static struct GNUNET_STREAM_MessageHeader *
+generate_hello (void)
+{
+ struct GNUNET_STREAM_MessageHeader *msg;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ return msg;
+}
+
+
/**
* Returns a new HelloAckMessage. Also sets the write sequence number for the
* socket
*
* @param socket the socket for which this HelloAckMessage has to be generated
+ * @param generate_seq GNUNET_YES to generate the write sequence number,
+ * GNUNET_NO to use the existing sequence number
* @return the HelloAckMessage
*/
static struct GNUNET_STREAM_HelloAckMessage *
-generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
+ int generate_seq)
{
struct GNUNET_STREAM_HelloAckMessage *msg;
- /* Get the random sequence number */
- if (GNUNET_YES == socket->testing_active)
- socket->write_sequence_number =
- socket->testing_set_write_sequence_number_value;
- else
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: write sequence number %u\n",
- GNUNET_i2s (&socket->other_peer),
- (unsigned int) socket->write_sequence_number);
-
+ if (GNUNET_YES == generate_seq)
+ {
+ if (GNUNET_YES == socket->testing_active)
+ socket->write_sequence_number =
+ socket->testing_set_write_sequence_number_value;
+ else
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: write sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->write_sequence_number);
+ }
msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
msg->header.header.size =
htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
msg->sequence_number = htonl (socket->write_sequence_number);
msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
-
return msg;
}
+/**
+ * Task for retransmitting control messages if they aren't ACK'ed before a
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+control_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ return;
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ switch (socket->status)
+ {
+ case STATE_INIT:
+ GNUNET_break (0);
+ break;
+ case STATE_LISTEN:
+ GNUNET_break (0);
+ break;
+ case STATE_HELLO_WAIT:
+ if (NULL == socket->lsocket) /* We are client */
+ queue_message (socket, generate_hello (), NULL, NULL);
+ else
+ queue_message (socket,
+ (struct GNUNET_STREAM_MessageHeader *)
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+ break;
+ default:
+ GNUNET_break (0);
+ }
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
+}
+
+
/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
*
GNUNET_i2s (&socket->other_peer),
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
- reply = generate_hello_ack_msg (socket);
+ reply = generate_hello_ack (socket, GNUNET_YES);
queue_message (socket,
&reply->header,
- &set_state_established,
- NULL);
+ &set_state_established,
+ NULL);
return GNUNET_OK;
case STATE_ESTABLISHED:
case STATE_RECEIVE_CLOSE_WAIT:
return GNUNET_YES;
}
- GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
- ntohs (message->type));
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received HELLO from %s\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
- if (STATE_INIT == socket->state)
+ switch (socket->status)
{
- reply = generate_hello_ack_msg (socket);
+ case STATE_INIT:
+ reply = generate_hello_ack (socket, GNUNET_YES);
queue_message (socket,
&reply->header,
&set_state_hello_wait,
NULL);
- }
- else
- {
+ break;
+ default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Client sent HELLO when in state %d\n",
GNUNET_i2s (&socket->other_peer),
socket->state);
/* FIXME: Send RESET? */
-
}
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
return GNUNET_OK;
}
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
- /* FIXME: increment in the base sequence number is breaking current flow
- */
if (!((socket->write_sequence_number
- ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
{
GNUNET_i2s (&socket->other_peer));
/* Cancel the retransmission task */
- if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
{
- GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
- socket->retransmission_timeout_task_id =
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id =
GNUNET_SCHEDULER_NO_TASK;
}
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
GNUNET_i2s(peer));
return;
}
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Target peer %s connected\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
/* Set state to INIT */
socket->state = STATE_INIT;
-
/* Send HELLO message */
- message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
- message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ message = generate_hello ();
queue_message (socket,
message,
&set_state_hello_wait,
NULL);
-
- /* Call open callback */
- if (NULL == socket->open_cb)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "STREAM_open callback is NULL\n");
- }
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
}
socket->retransmit_timeout = lsocket->retransmit_timeout;
socket->testing_active = lsocket->testing_active;
socket->testing_set_write_sequence_number_value =
- lsocket->testing_set_write_sequence_number_value;
-
+ lsocket->testing_set_write_sequence_number_value;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer %s initiated tunnel to us\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
/* FIXME: Copy MESH handle from lsocket to socket */
-
return socket;
}
GNUNET_SCHEDULER_cancel (socket->ack_task_id);
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
}
- if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
{
- GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
- socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
}
/* FIXME: Cancel all other tasks using socket->tunnel */
socket->tunnel = NULL;
* @param target the target peer to which the stream has to be opened
* @param app_port the application port number which uniquely identifies this
* stream
- * @param open_cb this function will be called after stream has be established
+ * @param open_cb this function will be called after stream has be established;
+ * cannot be NULL
* @param open_cb_cls the closure for open_cb
* @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
* @return if successful it returns the stream socket; NULL if stream cannot be
struct GNUNET_STREAM_Socket *socket;
enum GNUNET_STREAM_Option option;
GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
- va_list vargs; /* Variable arguments */
+ va_list vargs;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
+ GNUNET_assert (NULL != open_cb);
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *target;
socket->open_cb = open_cb;
socket->open_cls = open_cb_cls;
/* Set defaults */
- socket->retransmit_timeout =
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+ socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
socket->testing_active = GNUNET_NO;
va_start (vargs, open_cb_cls); /* Parse variable args */
do {
GNUNET_free (socket);
return NULL;
}
-
/* Now create the mesh tunnel to target */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating MESH Tunnel\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
NULL, /* Tunnel context */
&mesh_peer_connect_callback,
GNUNET_assert (NULL != socket->tunnel);
GNUNET_MESH_peer_request_connect_add (socket->tunnel,
&socket->other_peer);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s() END\n", __func__);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
return socket;
}
{
if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+ handle->socket->shutdown_handle = NULL;
GNUNET_free (handle);
}
GNUNET_STREAM_io_write_cancel (socket->write_handle);
//socket->write_handle = NULL;
}
-
if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
/* socket closed with read task pending!? */
GNUNET_break (0);
GNUNET_SCHEDULER_cancel (socket->read_task_id);
socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
-
+ }
/* Terminate the ack'ing tasks if they are still present */
if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (socket->ack_task_id);
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
}
-
+ /* Terminate the control retransmission tasks */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ }
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
{
socket->ack_msg = NULL;
socket->ack_transmit_handle = NULL;
}
-
/* Clear existing message queue */
while (NULL != (head = socket->queue_head)) {
GNUNET_CONTAINER_DLL_remove (socket->queue_head,
}
lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
/* Set defaults */
- lsocket->retransmit_timeout =
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+ lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
lsocket->testing_active = GNUNET_NO;
lsocket->listen_ok_cb = NULL;
listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
GNUNET_assert (NULL != socket->write_handle);
GNUNET_assert (socket->write_handle == ioh);
- if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
{
- GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
- socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
}
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)