*/
#include "platform.h"
#include "gnunet_common.h"
+#include "gnunet_crypto_lib.h"
#include "gnunet_stream_lib.h"
#include "stream_protocol.h"
+
+#define MAX_PACKET_SIZE 64000
+
/**
* states in the Protocol
*/
* The number of previous timeouts
*/
unsigned int retries;
+
+ /**
+ * The write IO_handle associated with this socket
+ */
+ struct GNUNET_STREAM_IOHandle *write_handle;
+
+ /**
+ * The read IO_handle associated with this socket
+ */
+ struct GNUNET_STREAM_IOHandle *read_handle;
+
+ /**
+ * Write sequence number. Start at random upon reaching ESTABLISHED state
+ */
+ uint32_t write_sequence_number;
+
+ /**
+ * Read sequence number. This number's value is determined during handshake
+ */
+ uint32_t read_sequence_number;
};
* The call back closure
*/
void *listen_cb_cls;
+};
+
+/**
+ * The IO Handle
+ */
+struct GNUNET_STREAM_IOHandle
+{
+ /**
+ * The packet_buffers associated with this Handle
+ */
+ struct GNUNET_STREAM_DataMessage *messages[64];
+
+ /**
+ * The bitmap of this IOHandle; Corresponding bit for a message is set when
+ * it has been acknowledged by the receiver
+ */
+ GNUNET_STREAM_AckBitmap bitmap;
};
}
+/**
+ * Function to modify a bit in GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap the bitmap to modify
+ * @param bit the bit number to modify
+ * @param GNUNET_YES to on, GNUNET_NO to off
+ */
+static void
+AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ uint8_t bit,
+ uint8_t value)
+{
+ uint64_t val;
+
+ val = value;
+ *bitmap = *bitmap | (val << bit);
+}
+
+
+/**
+ * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
+ *
+ * @param bit the bit number to check
+ * @return GNUNET_YES if the bit is set; GNUNET_NO if not
+ */
+static uint8_t
+AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ uint8_t bit)
+{
+ return (*bitmap & (0x0000000000000001 << bit)) >> bit;
+}
/**
set_state_established (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
socket->state = STATE_ESTABLISHED;
}
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_assert (STATE_INIT == socket->state);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
socket->state = STATE_HELLO_WAIT;
}
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
- struct GNUNET_STREAM_MessageHeader *reply;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
+ ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_HELLO_WAIT == socket->state)
{
+ socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff);
reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.size =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.type =
+ reply->header.header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
queue_message (socket,
- reply,
+ (struct GNUNET_STREAM_MessageHeader *) reply,
&set_state_established,
NULL);
}
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- struct GNUNET_STREAM_MessageHeader *reply;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_INIT == socket->state)
{
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 0xffffffff);
reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.size =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.type =
+ reply->header.header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
queue_message (socket,
- reply,
+ (struct GNUNET_STREAM_MessageHeader *)reply,
&set_state_hello_wait,
NULL);
}
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- struct GNUNET_STREAM_MessageHeader *reply;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_message;
+ ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_assert (socket->tunnel == tunnel);
if (STATE_HELLO_WAIT == socket->state)
{
+ socket->read_sequence_number = ntohs (ack_message->sequence_number);
socket->state = STATE_ESTABLISHED;
}
else
{&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
sizeof (struct GNUNET_STREAM_AckMessage) },
{&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
- sizeof (struct GNUNET_STREAM_MessageHeader)},
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
{&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
{&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
- sizeof (struct GNUNET_STREAM_MessageHeader)},
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
{&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
if (NULL != socket->transmit_handle)
{
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
}
/* Clear existing message queue */
if (NULL != socket->tunnel)
{
GNUNET_MESH_tunnel_destroy (socket->tunnel);
+ socket->tunnel = NULL;
}
/* Close mesh connection */
if (NULL != socket->mesh)
{
GNUNET_MESH_disconnect (socket->mesh);
+ socket->mesh = NULL;
}
GNUNET_free (socket);
}
void *tunnel_ctx)
{
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
- struct MessageQueue *head;
-
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer %s has terminated connection abruptly\n",
GNUNET_i2s (&socket->other_peer));
GNUNET_free (lsocket);
}
+
+
+/**
+ * Tries to write the given data to the stream
+ *
+ * @param socket the socket representing a stream
+ * @param data the data buffer from where the data is written into the stream
+ * @param size the number of bytes to be written from the data buffer
+ * @param timeout the timeout period
+ * @param write_cont the function to call upon writing some bytes into the stream
+ * @param write_cont_cls the closure
+ * @return handle to cancel the operation
+ */
+struct GNUNET_STREAM_IOHandle *
+GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
+ const void *data,
+ size_t size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_STREAM_CompletionContinuation write_cont,
+ void *write_cont_cls)
+{
+ unsigned int num_needed_packets;
+ unsigned int packet;
+ struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_DataMessage *data_msg;
+ size_t max_payload_size;
+ size_t packet_size;
+ const void *sweep;
+
+ /* There is already a write request pending */
+ if (NULL != socket->write_handle) return NULL;
+ if (!(STATE_ESTABLISHED == socket->state
+ || STATE_RECEIVE_CLOSE_WAIT == socket->state
+ || STATE_RECEIVE_CLOSED == socket->state))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Attempting to write on a closed/not-yet-established stream\n");
+ return NULL;
+ }
+
+ max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+ num_needed_packets = ceil (size / max_payload_size);
+ if (64 < num_needed_packets)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Given buffer cannot be accommodated in 64 packets\n");
+ num_needed_packets = 64;
+ }
+
+ io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
+ sweep = data;
+ /* Divide the given area into packets for sending */
+ for (packet=0; packet < num_needed_packets; packet++)
+ {
+ if ((packet + 1) * max_payload_size < size)
+ {
+ packet_size = MAX_PACKET_SIZE;
+ }
+ else
+ {
+ packet_size = size - packet * max_payload_size
+ + sizeof (struct GNUNET_STREAM_DataMessage);
+ }
+ io_handle->messages[packet] = GNUNET_malloc (packet_size);
+ io_handle->messages[packet]->header.header.size = htons (packet_size);
+ io_handle->messages[packet]->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+ data_msg = io_handle->messages[packet];
+ memcpy (&data_msg[1],
+ sweep,
+ packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
+ sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
+ }
+
+ return io_handle;
+}