/**
* The maximum packet size of a stream packet
*/
-#define MAX_PACKET_SIZE 512//64000
+#define DEFAULT_MAX_PAYLOAD_SIZE 64000
/**
* Receive buffer
*/
#define RECEIVE_BUFFER_SIZE 4096000
-/**
- * The maximum payload a data message packet can carry
- */
-static const size_t max_payload_size =
- MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
-
/**
* states in the Protocol
*/
*/
struct GNUNET_PeerIdentity other_peer;
- /**
- * Task identifier for the read io timeout task
- */
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
-
/**
* Task identifier for retransmission task after timeout
*/
*/
GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
- /**
- * Task scheduled to continue a read operation.
- */
- GNUNET_SCHEDULER_TaskIdentifier read_task_id;
-
/**
* The state of the protocol associated with this socket
*/
*/
uint32_t testing_set_write_sequence_number_value;
- /**
- * The session id associated with this stream connection
- * FIXME: Not used currently, may be removed
- */
- uint32_t session_id;
-
/**
* Write sequence number. Set to random when sending HELLO(client) and
* HELLO_ACK(server)
* The offset upto which user has read from the received buffer
*/
uint32_t copy_offset;
+
+ /**
+ * The maximum size of the data message payload this stream handle can send
+ */
+ uint16_t max_payload_size;
};
* The write sequence number to be set incase of testing
*/
uint32_t testing_set_write_sequence_number_value;
+
+ /**
+ * The maximum size of the data message payload this stream handle can send
+ */
+ uint16_t max_payload_size;
+
};
* Number of bytes in this write handle
*/
size_t size;
+
+ /**
+ * Number of packets already transmitted from this IO handle. Retransmitted
+ * packets are not taken into account here. This is used to determine which
+ * packets account for retransmission and which packets occupy buffer space at
+ * the receiver.
+ */
+ unsigned int packets_sent;
};
* The closure pointer for the read processor callback
*/
void *proc_cls;
+
+ /**
+ * Task identifier for the read io timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task_id;
};
{
socket->retries = 0;
socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- GNUNET_NO, /* Corking */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (message->header.size),
- &send_message_notify,
- socket);
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ GNUNET_NO, /* Corking */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (message->header.size),
+ &send_message_notify,
+ socket);
}
}
{
struct GNUNET_STREAM_Socket *socket = cls;
- if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
return;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
- socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
write_data (socket);
}
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_AckMessage *ack_msg;
- if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
- {
- return;
- }
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
/* Create the ACK Message */
ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
ack_msg->header.header.size = htons (sizeof (struct
struct GNUNET_STREAM_MessageHeader *msg;
struct GNUNET_STREAM_Socket *socket;
+ shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (NULL != shutdown_handle);
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
socket = shutdown_handle->socket;
-
msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
switch (shutdown_handle->operation)
write_data (struct GNUNET_STREAM_Socket *socket)
{
struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
- int packet; /* Although an int, should never be negative */
- int ack_packet;
-
- ack_packet = -1;
- /* Find the last acknowledged packet */
- for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
- ack_packet = packet;
- else if (NULL == io_handle->messages[packet])
- break;
- }
- /* Resend packets which weren't ack'ed */
- for (packet=0; packet < ack_packet; packet++)
+ unsigned int packet;
+
+ for (packet=0; packet < io_handle->packets_sent; packet++)
{
if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
+ packet))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Placing DATA message with sequence %u in send queue\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (io_handle->messages[packet]->sequence_number));
+ "%s: Retransmitting DATA message with sequence %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
copy_and_queue_message (socket,
- &io_handle->messages[packet]->header,
- NULL,
- NULL);
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
}
}
- packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
- while ( (NULL != io_handle->messages[packet]) &&
- (socket->receiver_window_available
- >= ntohs (io_handle->messages[packet]->header.header.size)) &&
- (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
+ (NULL != io_handle->messages[packet]) &&
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)))
{
socket->receiver_window_available -=
ntohs (io_handle->messages[packet]->header.header.size);
NULL);
packet++;
}
+ io_handle->packets_sent = packet;
+ // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
socket->data_retransmission_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
size_t read_size;
size_t valid_read_size;
unsigned int packet;
uint32_t sequence_increase;
uint32_t offset_increase;
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
-
if (NULL == socket->receive_buffer)
return;
-
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (NULL != socket->read_handle->proc);
-
/* Check the bitmap for any holes */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
GNUNET_assert (0 != valid_read_size);
/* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Calling read processor\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
GNUNET_i2s (&socket->other_peer));
read_size =
- socket->read_handle->proc (socket->read_handle->proc_cls,
- socket->status,
- socket->receive_buffer + socket->copy_offset,
- valid_read_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read processor read %d bytes\n",
+ socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
GNUNET_i2s (&socket->other_peer), read_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read processor completed successfully\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
GNUNET_i2s (&socket->other_peer));
/* Free the read handle */
GNUNET_free (socket->read_handle);
for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
- { packet++; break; }
+ {
+ packet++;
+ break;
+ }
if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
break;
}
-
/* If no packets can be removed we can't move the buffer */
- if (0 == packet) return;
+ if (0 == packet)
+ return;
sequence_increase = packet;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Sequence increase after read processor completion: %u\n",
GNUNET_i2s (&socket->other_peer), sequence_increase);
-
/* Shift the data in the receive buffer */
socket->receive_buffer =
memmove (socket->receive_buffer,
/* Fix relative boundaries */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
- if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
{
uint32_t ahead_buffer_boundary;
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
GNUNET_STREAM_DataProcessor proc;
void *proc_cls;
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
- if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Read task timedout - Cancelling it\n",
GNUNET_i2s (&socket->other_peer));
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
- GNUNET_assert (NULL != socket->read_handle);
- proc = socket->read_handle->proc;
- proc_cls = socket->read_handle->proc_cls;
- GNUNET_free (socket->read_handle);
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ GNUNET_free (read_handle);
socket->read_handle = NULL;
/* Call the read processor to signal timeout */
proc (proc_cls,
const struct GNUNET_ATS_Information*atsi)
{
const void *payload;
+ struct GNUNET_TIME_Relative ack_deadline_rel;
uint32_t bytes_needed;
uint32_t relative_offset;
uint32_t relative_sequence_number;
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
-
- if (0 != memcmp (sender,
- &socket->other_peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received DATA from non-confirming peer\n",
- GNUNET_i2s (&socket->other_peer));
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_YES;
}
-
switch (socket->state)
{
case STATE_ESTABLISHED:
case STATE_TRANSMIT_CLOSED:
- case STATE_TRANSMIT_CLOSE_WAIT:
-
+ case STATE_TRANSMIT_CLOSE_WAIT:
/* check if the message's sequence number is in the range we are
expecting */
relative_sequence_number =
ntohl (msg->sequence_number) - socket->read_sequence_number;
- if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Ignoring received message with sequence number %u\n",
socket);
}
return GNUNET_YES;
- }
-
+ }
/* Check if we have already seen this message */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
relative_sequence_number))
{
socket->ack_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
+ (msg->ack_deadline), &ack_task, socket);
}
return GNUNET_YES;
}
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (msg->sequence_number),
- ntohs (msg->header.header.size),
- GNUNET_i2s (&socket->other_peer));
-
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
/* Check if we have to allocate the buffer */
size -= sizeof (struct GNUNET_STREAM_DataMessage);
relative_offset = ntohl (msg->offset) - socket->read_offset;
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Cannot accommodate packet %d as buffer is full\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (msg->sequence_number));
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
return GNUNET_YES;
}
}
-
/* Copy Data to buffer */
payload = &msg[1];
GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
- memcpy (socket->receive_buffer + relative_offset,
- payload,
- size);
+ memcpy (socket->receive_buffer + relative_offset, payload, size);
socket->receive_buffer_boundaries[relative_sequence_number] =
- relative_offset + size;
-
+ relative_offset + size;
/* Modify the ACK bitmap */
- ackbitmap_modify_bit (&socket->ack_bitmap,
- relative_sequence_number,
- GNUNET_YES);
-
+ ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+ GNUNET_YES);
/* Start ACK sending task if one is not already present */
+ ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
{
+ ack_deadline_rel =
+ GNUNET_TIME_relative_min (ack_deadline_rel,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300));
socket->ack_task_id =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
+ else
+ {
+ struct GNUNET_TIME_Relative ack_time_past;
+ struct GNUNET_TIME_Relative ack_time_remaining;
+ struct GNUNET_TIME_Relative ack_time_min;
+ ack_time_past =
+ GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+ ack_time_remaining = GNUNET_TIME_relative_subtract
+ (socket->ack_time_deadline, ack_time_past);
+ ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+ ack_deadline_rel);
+ if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+ sizeof (struct GNUNET_TIME_Relative)))
+ {
+ ack_deadline_rel = ack_time_min;
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+ &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
}
-
if ((NULL != socket->read_handle) /* A read handle is waiting */
/* There is no current read task */
- && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
/* We have the first packet */
- && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
- 0)))
+ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Scheduling read processor\n",
- GNUNET_i2s (&socket->other_peer));
-
- socket->read_task_id =
- GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_handle->read_task_id =
+ GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
}
-
break;
-
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received data message when it cannot be handled\n",
{
struct GNUNET_STREAM_Socket *socket = cls;
- return handle_data (socket,
- tunnel,
- sender,
- (const struct GNUNET_STREAM_DataMessage *) message,
- atsi);
+ return handle_data (socket, tunnel, sender,
+ (const struct GNUNET_STREAM_DataMessage *) message, atsi);
}
{
struct GNUNET_STREAM_Socket *socket = cls;
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
return;
- socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
LOG_DEBUG ("%s: Retransmitting a control message\n",
GNUNET_i2s (&socket->other_peer));
switch (socket->state)
GNUNET_NO);
else
GNUNET_break (0);
+ break;
default:
GNUNET_break (0);
}
const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
struct GNUNET_STREAM_HelloAckMessage *reply;
- if (0 != memcmp (sender,
- &socket->other_peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received HELLO_ACK from non-confirming peer\n",
return GNUNET_YES;
}
ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
GNUNET_assert (socket->tunnel == tunnel);
switch (socket->state)
{
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
reply = generate_hello_ack (socket, GNUNET_YES);
- queue_message (socket, &reply->header, &set_state_established,
+ queue_message (socket, &reply->header, &set_state_established,
NULL, GNUNET_NO);
return GNUNET_OK;
case STATE_ESTABLISHED:
default:
LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n",
GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer),
- socket->state);
+ GNUNET_i2s (&socket->other_peer), socket->state);
socket->state = STATE_CLOSED; // introduce STATE_ERROR?
return GNUNET_SYSERR;
}
{
case STATE_ESTABLISHED:
socket->state = STATE_RECEIVE_CLOSED;
-
/* Send TRANSMIT_CLOSE_ACK */
reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
reply->header.type =
reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
queue_message (socket, reply, NULL, NULL, GNUNET_NO);
break;
-
default:
/* FIXME: Call statistics? */
break;
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
switch (operation)
{
case SHUT_RDWR:
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received CLOSE_ACK when shutdown handle is not for "
- "SHUT_RDWR\n",
- GNUNET_i2s (&socket->other_peer));
+ "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_CLOSED;
break;
default:
return GNUNET_OK;
}
break;
-
case SHUT_RD:
switch (socket->state)
{
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
- "is not for SHUT_RD\n",
- GNUNET_i2s (&socket->other_peer));
+ "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received RECEIVE_CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_RECEIVE_CLOSED;
break;
default:
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
break;
case SHUT_WR:
switch (socket->state)
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_TRANSMIT_CLOSED;
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
- GNUNET_i2s (&socket->other_peer));
-
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
break;
default:
GNUNET_assert (0);
}
-
if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
shutdown_handle->completion_cb(shutdown_handle->completion_cls,
operation);
GNUNET_SCHEDULER_cancel
(shutdown_handle->close_msg_retransmission_task_id);
shutdown_handle->close_msg_retransmission_task_id =
- GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_free (shutdown_handle); /* Free shutdown handle */
socket->shutdown_handle = NULL;
GNUNET_assert (socket->tunnel == tunnel);
LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
- switch (socket->status)
+ switch (socket->state)
{
case STATE_INIT:
reply = generate_hello_ack (socket, GNUNET_YES);
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
- if (!((socket->write_sequence_number
- - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ sequence_difference =
+ socket->write_sequence_number - ntohl (ack->base_sequence_number);
+ if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received DATA_ACK with unexpected base sequence number\n",
}
/* FIXME: include the case when write_handle is cancelled - ignore the
acks */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received DATA_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
/* Cancel the retransmission task */
if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
{
GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
- socket->data_retransmission_task_id =
- GNUNET_SCHEDULER_NO_TASK;
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
}
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
/* BS: Base sequence from ack; PS: sequence num of current packet */
sequence_difference = ntohl (ack->base_sequence_number)
- ntohl (socket->write_handle->messages[packet]->sequence_number);
+ if ((0 == sequence_difference) ||
+ (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
+ continue; /* The message in our handle is not yet received */
/* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
- if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
- || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
- && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
- {
- ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
- GNUNET_YES);
- continue;
- }
- if (GNUNET_YES ==
- ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
- -sequence_difference))/*inversion as PS >= BS */
- {
- ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
- GNUNET_YES);
- }
+ /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet, GNUNET_YES);
}
/* Update the receive window remaining
FIXME : Should update with the value from a data ack with greater
if (GNUNET_NO == lsocket->listening)
{
-// FIXME: socket uninitalized
-// FIXME: cannot use GNUNET_i2s twice in same call (static buffer)
-// LOG (GNUNET_ERROR_TYPE_DEBUG,
-// "%s: Destroying tunnel from peer %s as we don't have the lock\n",
-// GNUNET_i2s (&socket->other_peer),
-// GNUNET_i2s (&socket->other_peer));
GNUNET_MESH_tunnel_destroy (tunnel);
return NULL;
}
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *initiator;
socket->tunnel = tunnel;
- socket->session_id = 0; /* FIXME */
socket->state = STATE_INIT;
socket->lsocket = lsocket;
socket->retransmit_timeout = lsocket->retransmit_timeout;
socket->testing_active = lsocket->testing_active;
socket->testing_set_write_sequence_number_value =
- lsocket->testing_set_write_sequence_number_value;
+ lsocket->testing_set_write_sequence_number_value;
+ socket->max_payload_size = lsocket->max_payload_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer %s initiated tunnel to us\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
- /* FIXME: Copy MESH handle from lsocket to socket */
return socket;
}
void *tunnel_ctx)
{
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+ struct MessageQueue *head;
- if (tunnel != socket->tunnel)
- return;
-
+ GNUNET_assert (tunnel == socket->tunnel);
GNUNET_break_op(0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer %s has terminated connection abruptly\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
socket->status = GNUNET_STREAM_SHUTDOWN;
-
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
{
{
GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* Terminate the control retransmission tasks */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ /* Clear existing message queue */
+ while (NULL != (head = socket->queue_head)) {
+ GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+ socket->queue_tail,
+ head);
+ GNUNET_free (head->message);
+ GNUNET_free (head);
}
- /* FIXME: Cancel all other tasks using socket->tunnel */
socket->tunnel = NULL;
}
lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
listen_cb = lsocket->listen_cb;
listen_cb_cls = lsocket->listen_cb_cls;
- GNUNET_STREAM_listen_close (lsocket);
if (NULL != listen_cb)
- listen_cb (listen_cb_cls, NULL, NULL);
+ listen_cb (listen_cb_cls, NULL, NULL);
}
enum GNUNET_STREAM_Option option;
GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
va_list vargs;
+ uint16_t payload_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
/* Set defaults */
socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
socket->testing_active = GNUNET_NO;
+ socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
va_start (vargs, open_cb_cls); /* Parse variable args */
do {
option = va_arg (vargs, enum GNUNET_STREAM_Option);
case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
GNUNET_break (0); /* Option irrelevant in STREAM_open */
break;
+ case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
+ payload_size = (uint16_t) va_arg (vargs, unsigned int);
+ GNUNET_assert (0 != payload_size);
+ if (payload_size < socket->max_payload_size)
+ socket->max_payload_size = payload_size;
+ break;
case GNUNET_STREAM_OPTION_END:
break;
}
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Closing STREAM socket when a read handle is pending\n");
+ GNUNET_STREAM_io_read_cancel (socket->read_handle);
}
if (NULL != socket->write_handle)
{
GNUNET_STREAM_io_write_cancel (socket->write_handle);
//socket->write_handle = NULL;
}
- if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
- {
- /* socket closed with read task pending!? */
- GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
- /* Terminate the ack'ing tasks if they are still present */
+ /* Terminate the ack'ing task if they are still present */
if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (socket->ack_task_id);
GNUNET_free (head->message);
GNUNET_free (head);
}
-
/* Close associated tunnel */
if (NULL != socket->tunnel)
{
GNUNET_MESH_tunnel_destroy (socket->tunnel);
socket->tunnel = NULL;
}
-
/* Close mesh connection */
if (NULL != socket->mesh && NULL == socket->lsocket)
{
GNUNET_MESH_disconnect (socket->mesh);
socket->mesh = NULL;
- }
-
+ }
/* Release receive buffer */
if (NULL != socket->receive_buffer)
{
GNUNET_free (socket->receive_buffer);
}
-
GNUNET_free (socket);
}
struct GNUNET_TIME_Relative listen_timeout;
enum GNUNET_STREAM_Option option;
va_list vargs;
+ uint16_t payload_size;
GNUNET_assert (NULL != listen_cb);
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
lsocket->testing_active = GNUNET_NO;
lsocket->listen_ok_cb = NULL;
+ lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
va_start (vargs, listen_cb_cls);
do {
lsocket->listen_ok_cb = va_arg (vargs,
GNUNET_STREAM_ListenSuccessCallback);
break;
+ case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
+ payload_size = (uint16_t) va_arg (vargs, unsigned int);
+ GNUNET_assert (0 != payload_size);
+ if (payload_size < lsocket->max_payload_size)
+ lsocket->max_payload_size = payload_size;
+ break;
case GNUNET_STREAM_OPTION_END:
break;
}
GNUNET_STREAM_CompletionContinuation write_cont,
void *write_cont_cls)
{
- unsigned int num_needed_packets;
- unsigned int packet;
struct GNUNET_STREAM_IOWriteHandle *io_handle;
- uint32_t packet_size;
- uint32_t payload_size;
struct GNUNET_STREAM_DataMessage *data_msg;
const void *sweep;
struct GNUNET_TIME_Relative ack_deadline;
+ unsigned int num_needed_packets;
+ unsigned int packet;
+ uint32_t packet_size;
+ uint32_t payload_size;
+ uint16_t max_data_packet_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
-
- /* Return NULL if there is already a write request pending */
if (NULL != socket->write_handle)
{
GNUNET_break (0);
return NULL;
}
-
switch (socket->state)
{
case STATE_TRANSMIT_CLOSED:
case STATE_RECEIVE_CLOSE_WAIT:
break;
}
-
- if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
- size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
- num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
+ if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
+ size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size;
+ num_needed_packets =
+ (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
io_handle->socket = socket;
io_handle->write_cont = write_cont;
io_handle->write_cont_cls = write_cont_cls;
io_handle->size = size;
+ io_handle->packets_sent = 0;
sweep = data;
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
/* Divide the given buffer into packets for sending */
+ max_data_packet_size =
+ socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
for (packet=0; packet < num_needed_packets; packet++)
{
- if ((packet + 1) * max_payload_size < size)
+ if ((packet + 1) * socket->max_payload_size < size)
{
- payload_size = max_payload_size;
- packet_size = MAX_PACKET_SIZE;
+ payload_size = socket->max_payload_size;
+ packet_size = max_data_packet_size;
}
else
{
- payload_size = size - packet * max_payload_size;
- packet_size = payload_size + sizeof (struct
- GNUNET_STREAM_DataMessage);
+ payload_size = size - packet * socket->max_payload_size;
+ packet_size =
+ payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
}
io_handle->messages[packet] = GNUNET_malloc (packet_size);
io_handle->messages[packet]->header.header.size = htons (packet_size);
io_handle->messages[packet]->sequence_number =
htonl (socket->write_sequence_number++);
io_handle->messages[packet]->offset = htonl (socket->write_offset);
-
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
io_handle->messages[packet]->ack_deadline =
GNUNET_TIME_relative_hton (ack_deadline);
data_msg = io_handle->messages[packet];
/* Copy data from given buffer to the packet */
- memcpy (&data_msg[1],
- sweep,
- payload_size);
+ memcpy (&data_msg[1], sweep, payload_size);
sweep += payload_size;
socket->write_offset += payload_size;
}
+ /* ack the last data message. FIXME: remove when we figure out how to do this
+ using RTT */
+ io_handle->messages[num_needed_packets - 1]->ack_deadline =
+ GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
socket->write_handle = io_handle;
write_data (socket);
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s() END\n", __func__);
-
return io_handle;
}
* @param proc function to call with data (once only)
* @param proc_cls the closure for proc
*
- * @return handle to cancel the operation; if the stream has been shutdown for
- * this type of opeartion then the DataProcessor is immediately
- * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if: the stream has
+ * been shutdown for this type of opeartion (the DataProcessor is
+ * immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
+ * read handle is present (only one read handle per socket is present
+ * at any time)
*/
struct GNUNET_STREAM_IOReadHandle *
GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
read_handle->proc_cls = proc_cls;
read_handle->socket = socket;
socket->read_handle = read_handle;
- /* Check if we have a packet at bitmap 0 */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
0))
- {
- socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
- }
- /* Setup the read timeout task */
- socket->read_io_timeout_task_id =
- GNUNET_SCHEDULER_add_delayed (timeout,
- &read_io_timeout,
- socket);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: %s() END\n",
- GNUNET_i2s (&socket->other_peer),
- __func__);
+ read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
+ read_handle->read_io_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+ GNUNET_i2s (&socket->other_peer), __func__);
return read_handle;
}
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (ioh == socket->read_handle);
/* Read io time task should be there; if it is already executed then this
- read handle is not valid */
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ read handle is not valid; However upon scheduler shutdown the read io task
+ may be executed before */
+ if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
+ GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
/* reading task may be present; if so we have to stop it */
- if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
- {
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
+ if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
+ GNUNET_SCHEDULER_cancel (ioh->read_task_id);
GNUNET_free (ioh);
socket->read_handle = NULL;
}