*/
struct GNUNET_STREAM_IOHandle *read_handle;
+ /**
+ * Buffer for storing received messages
+ */
+ void *receive_buffer;
+
/**
* The state of the protocol associated with this socket
*/
/**
* The session id associated with this stream connection
+ * FIXME: Not used currently, may be removed
*/
uint32_t session_id;
/**
- * Write sequence number. Start at random upon reaching ESTABLISHED state
+ * Write sequence number. Set to random when sending HELLO(client) and
+ * HELLO_ACK(server)
*/
uint32_t write_sequence_number;
/**
* Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
*
+ * @param bitmap address of the bitmap that has to be checked
* @param bit the bit number to check
* @return GNUNET_YES if the bit is set; GNUNET_NO if not
*/
}
+/**
+ * Handler for DATA messages; Same for both client and server
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param ack the acknowledgment message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_data (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_DataMessage *msg,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ uint16_t size;
+ const void *payload;
+
+ size = msg->header.header.size;
+ if (size < sizeof (struct GNUNET_STREAM_DataMessage))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ GNUNET_assert (NULL != socket->receive_buffer);
+ /* check if the message's sequence number is greater than the one we are
+ expecting */
+ if (ntohl (msg->sequence_number) - socket->read_sequence_number < 64)
+ {
+
+ }
+ else /* We are receiving a retransmitted message */
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message with sequence number %d retransmitted\n",
+ ntohl (socket->read_sequence_number));
+ return GNUNET_YES;
+ }
+ /* Copy Data to buffer and send acknowledgements */
+ size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ payload = &msg[1];
+ memcpy (socket->receive_buffer
+ + (ntohl (msg->sequence_number) - socket->read_sequence_number)
+ * MAX_PACKET_SIZE,
+ payload,
+ size);
+ /* FIXME: We have to send GNUNET_STREAM_AckMessage */
+ break;
+
+ default:
+ /* FIXME: call statistics */
+ break;
+ }
+ return GNUNET_YES;
+}
+
/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
- uint16_t size;
- const struct GNUNET_STREAM_DataMessage *data_msg;
- const void *payload;
- size = ntohs (message->size);
- if (size < sizeof (struct GNUNET_STREAM_DataMessage))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
- size -= sizeof (struct GNUNET_STREAM_DataMessage);
- payload = &data_msg[1];
- /* ... */
-
- return GNUNET_OK;
+ return handle_data (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_DataMessage *) message,
+ atsi);
}
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
+ /* Initialize the receive buffer */
+ socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
socket->state = STATE_ESTABLISHED;
}
}
+/**
+ * Common message handler for handling TRANSMIT_CLOSE messages
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param ack the acknowledgment message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *msg,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *reply;
+
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ socket->state = STATE_RECEIVE_CLOSED;
+
+ /* Send TRANSMIT_CLOSE_ACK */
+ reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
+ reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket, reply, NULL, NULL);
+ break;
+
+ default:
+ /* FIXME: Call statistics? */
+ break;
+ }
+ return GNUNET_YES;
+}
+
+
/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
*
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
-
- return GNUNET_OK;
+
+ return handle_transmit_close (socket,
+ tunnel,
+ sender,
+ (struct GNUNET_STREAM_MessageHeader *)message,
+ atsi);
}
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_transmit_close (socket,
+ tunnel,
+ sender,
+ (struct GNUNET_STREAM_MessageHeader *)message,
+ atsi);
}
/**
* Function called when our target peer is connected to our tunnel
*
+ * @param cls the socket for which this tunnel is created
* @param peer the peer identity of the target
* @param atsi performance data for the connection
*/
/**
* Function called when our target peer is disconnected from our tunnel
*
+ * @param cls the socket associated which this tunnel
* @param peer the peer identity of the target
*/
static void
GNUNET_MESH_disconnect (socket->mesh);
socket->mesh = NULL;
}
+
+ /* Release receive buffer */
+ if (NULL != socket->receive_buffer)
+ {
+ GNUNET_free (socket->receive_buffer);
+ }
+
GNUNET_free (socket);
}
/**
* Closes the listen socket
*
- * @param socket the listen socket
+ * @param lsocket the listen socket
*/
void
GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
|| STATE_RECEIVE_CLOSED == socket->state))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Attempting to write on a closed/not-yet-established stream\n");
+ "Attempting to write on a closed (OR) not-yet-established"
+ "stream\n");
return NULL;
}