/**
* Functions of this type are called when a message is written
*
+ * @param cls the closure from queue_message
* @param socket the socket the written message was bound to
*/
typedef void (*SendFinishCallback) (void *cls,
GNUNET_STREAM_AckMessage));
ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
- ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
+ ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
/* Request MESH for sending ACK */
GNUNET_assert (NULL != socket->receive_buffer);
/* check if the message's sequence number is greater than the one we are
expecting */
- if (ntohl (msg->sequence_number) - socket->read_sequence_number < 64)
- {
-
- }
- else /* We are receiving a retransmitted message */
- {
+ if (ntohl (msg->sequence_number) - socket->read_sequence_number >= 64)
+ {/* We are receiving a retransmitted message */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Message with sequence number %d retransmitted\n",
ntohl (socket->read_sequence_number));
return GNUNET_YES;
}
- /* Copy Data to buffer and send acknowledgements */
+
+ /* Copy Data to buffer and send acknowledgement for this packet */
size -= sizeof (struct GNUNET_STREAM_DataMessage);
payload = &msg[1];
memcpy (socket->receive_buffer
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_data (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_DataMessage *)message,
+ atsi);
}
struct GNUNET_STREAM_DataMessage *data_msg;
const void *sweep;
- /* There is already a write request pending */
+ /* Return NULL if there is already a write request pending */
if (NULL != socket->write_handle)
{
GNUNET_break (0);
htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
io_handle->messages[packet]->sequence_number =
htons (socket->write_sequence_number++);
+
+ /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
+ determined from RTT */
+ io_handle->messages[packet]->ack_deadline =
+ GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5));
data_msg = io_handle->messages[packet];
+ /* Copy data from given buffer to the packet */
memcpy (&data_msg[1],
sweep,
packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
}
-
+ socket->write_handle = io_handle;
write_data (socket);
return io_handle;
}
+
+
+/**
+ * Tries to read data from the stream
+ *
+ * @param socket the socket representing a stream
+ * @param timeout the timeout period
+ * @param proc function to call with data (once only)
+ * @param proc_cls the closure for proc
+ * @return handle to cancel the operation
+ */
+struct GNUNET_STREAM_IOHandle *
+GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_STREAM_DataProcessor proc,
+ void *proc_cls)
+{
+ /* Check the bitmap for any holes */
+
+ /* Deem the data from the starting of the bitmap upto a hole as available
+ data */
+
+ /* Create an IO handle */
+
+ /* Call the Data processor with this available data */
+
+ /* Update the read_sequence_number to the first hole in the bitmap */
+
+ /* Shift the bitmap so that the first hole is now at the start */
+}