*/
struct GNUNET_PeerIdentity other_peer;
- /**
- * Task identifier for the read io timeout task
- */
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
-
/**
* Task identifier for retransmission task after timeout
*/
*/
GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
- /**
- * Task scheduled to continue a read operation.
- */
- GNUNET_SCHEDULER_TaskIdentifier read_task_id;
-
/**
* The state of the protocol associated with this socket
*/
*/
uint32_t testing_set_write_sequence_number_value;
- /**
- * The session id associated with this stream connection
- * FIXME: Not used currently, may be removed
- */
- uint32_t session_id;
-
/**
* Write sequence number. Set to random when sending HELLO(client) and
* HELLO_ACK(server)
* The closure pointer for the read processor callback
*/
void *proc_cls;
+
+ /**
+ * Task identifier for the read io timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task_id;
};
{
socket->retries = 0;
socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- GNUNET_NO, /* Corking */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (message->header.size),
- &send_message_notify,
- socket);
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ GNUNET_NO, /* Corking */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (message->header.size),
+ &send_message_notify,
+ socket);
}
}
{
struct GNUNET_STREAM_Socket *socket = cls;
- if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
return;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
- socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
write_data (socket);
}
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_AckMessage *ack_msg;
- if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
- {
- return;
- }
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
/* Create the ACK Message */
ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
ack_msg->header.header.size = htons (sizeof (struct
struct GNUNET_STREAM_MessageHeader *msg;
struct GNUNET_STREAM_Socket *socket;
+ shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (NULL != shutdown_handle);
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
socket = shutdown_handle->socket;
-
msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
switch (shutdown_handle->operation)
}
}
/* Now send new packets if there is enough buffer space */
- while ( (NULL != io_handle->messages[packet]) &&
- (socket->receiver_window_available
- >= ntohs (io_handle->messages[packet]->header.header.size)) &&
- (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
+ (NULL != io_handle->messages[packet]) &&
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)))
{
socket->receiver_window_available -=
ntohs (io_handle->messages[packet]->header.header.size);
packet++;
}
io_handle->packets_sent = packet;
+ // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
socket->data_retransmission_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
size_t read_size;
size_t valid_read_size;
unsigned int packet;
uint32_t sequence_increase;
uint32_t offset_increase;
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
-
if (NULL == socket->receive_buffer)
return;
-
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (NULL != socket->read_handle->proc);
-
/* Check the bitmap for any holes */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
GNUNET_assert (0 != valid_read_size);
/* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Calling read processor\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
GNUNET_i2s (&socket->other_peer));
read_size =
- socket->read_handle->proc (socket->read_handle->proc_cls,
- socket->status,
- socket->receive_buffer + socket->copy_offset,
- valid_read_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read processor read %d bytes\n",
+ socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
GNUNET_i2s (&socket->other_peer), read_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read processor completed successfully\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
GNUNET_i2s (&socket->other_peer));
/* Free the read handle */
GNUNET_free (socket->read_handle);
for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
- { packet++; break; }
+ {
+ packet++;
+ break;
+ }
if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
break;
}
-
/* If no packets can be removed we can't move the buffer */
- if (0 == packet) return;
+ if (0 == packet)
+ return;
sequence_increase = packet;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Sequence increase after read processor completion: %u\n",
GNUNET_i2s (&socket->other_peer), sequence_increase);
-
/* Shift the data in the receive buffer */
socket->receive_buffer =
memmove (socket->receive_buffer,
/* Fix relative boundaries */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
- if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
{
uint32_t ahead_buffer_boundary;
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
GNUNET_STREAM_DataProcessor proc;
void *proc_cls;
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
- if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Read task timedout - Cancelling it\n",
GNUNET_i2s (&socket->other_peer));
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
- GNUNET_assert (NULL != socket->read_handle);
- proc = socket->read_handle->proc;
- proc_cls = socket->read_handle->proc_cls;
- GNUNET_free (socket->read_handle);
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ GNUNET_free (read_handle);
socket->read_handle = NULL;
/* Call the read processor to signal timeout */
proc (proc_cls,
expecting */
relative_sequence_number =
ntohl (msg->sequence_number) - socket->read_sequence_number;
- if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Ignoring received message with sequence number %u\n",
}
if ((NULL != socket->read_handle) /* A read handle is waiting */
/* There is no current read task */
- && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
/* We have the first packet */
&& (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
- GNUNET_i2s (&socket->other_peer));
- socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_handle->read_task_id =
+ GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
+ }
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
{
struct GNUNET_STREAM_Socket *socket = cls;
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
return;
- socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
LOG_DEBUG ("%s: Retransmitting a control message\n",
GNUNET_i2s (&socket->other_peer));
switch (socket->state)
GNUNET_NO);
else
GNUNET_break (0);
+ break;
default:
GNUNET_break (0);
}
GNUNET_assert (socket->tunnel == tunnel);
LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
- switch (socket->status)
+ switch (socket->state)
{
case STATE_INIT:
reply = generate_hello_ack (socket, GNUNET_YES);
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *initiator;
socket->tunnel = tunnel;
- socket->session_id = 0; /* FIXME */
socket->state = STATE_INIT;
socket->lsocket = lsocket;
socket->retransmit_timeout = lsocket->retransmit_timeout;
"%s: Peer %s initiated tunnel to us\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
- /* FIXME: Copy MESH handle from lsocket to socket */
return socket;
}
void *tunnel_ctx)
{
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+ struct MessageQueue *head;
- if (tunnel != socket->tunnel)
- return;
-
+ GNUNET_assert (tunnel == socket->tunnel);
GNUNET_break_op(0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer %s has terminated connection abruptly\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
socket->status = GNUNET_STREAM_SHUTDOWN;
-
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
{
{
GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* Terminate the control retransmission tasks */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ /* Clear existing message queue */
+ while (NULL != (head = socket->queue_head)) {
+ GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+ socket->queue_tail,
+ head);
+ GNUNET_free (head->message);
+ GNUNET_free (head);
}
- /* FIXME: Cancel all other tasks using socket->tunnel */
socket->tunnel = NULL;
}
GNUNET_STREAM_io_write_cancel (socket->write_handle);
//socket->write_handle = NULL;
}
- if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
- {
- /* socket closed with read task pending!? */
- GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
- /* Terminate the ack'ing tasks if they are still present */
+ /* Terminate the ack'ing task if they are still present */
if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (socket->ack_task_id);
* @param proc function to call with data (once only)
* @param proc_cls the closure for proc
*
- * @return handle to cancel the operation; if the stream has been shutdown for
- * this type of opeartion then the DataProcessor is immediately
- * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if: the stream has
+ * been shutdown for this type of opeartion (the DataProcessor is
+ * immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
+ * read handle is present (only one read handle per socket is present
+ * at any time)
*/
struct GNUNET_STREAM_IOReadHandle *
GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
read_handle->proc_cls = proc_cls;
read_handle->socket = socket;
socket->read_handle = read_handle;
- /* Check if we have a packet at bitmap 0 */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
0))
- {
- socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
- }
- /* Setup the read timeout task */
- socket->read_io_timeout_task_id =
- GNUNET_SCHEDULER_add_delayed (timeout,
- &read_io_timeout,
- socket);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: %s() END\n",
- GNUNET_i2s (&socket->other_peer),
- __func__);
+ read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
+ read_handle->read_io_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+ GNUNET_i2s (&socket->other_peer), __func__);
return read_handle;
}
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (ioh == socket->read_handle);
/* Read io time task should be there; if it is already executed then this
- read handle is not valid */
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ read handle is not valid; However upon scheduler shutdown the read io task
+ may be executed before */
+ if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
+ GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
/* reading task may be present; if so we have to stop it */
- if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
- {
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
+ if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
+ GNUNET_SCHEDULER_cancel (ioh->read_task_id);
GNUNET_free (ioh);
socket->read_handle = NULL;
}