From: Sree Harsha Totakura Date: Tue, 21 Feb 2012 17:28:55 +0000 (+0000) Subject: -added data message handling X-Git-Tag: initial-import-from-subversion-38251~14764 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=a4c9524076c56b21b5985f3b6b78b60db799287e;p=oweals%2Fgnunet.git -added data message handling --- diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 0f88af92b..e412b679d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -205,6 +205,11 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_STREAM_IOHandle *read_handle; + /** + * Buffer for storing received messages + */ + void *receive_buffer; + /** * The state of the protocol associated with this socket */ @@ -222,11 +227,13 @@ struct GNUNET_STREAM_Socket /** * The session id associated with this stream connection + * FIXME: Not used currently, may be removed */ uint32_t session_id; /** - * Write sequence number. Start at random upon reaching ESTABLISHED state + * Write sequence number. Set to random when sending HELLO(client) and + * HELLO_ACK(server) */ uint32_t write_sequence_number; @@ -437,6 +444,7 @@ ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, /** * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap * + * @param bitmap address of the bitmap that has to be checked * @param bit the bit number to check * @return GNUNET_YES if the bit is set; GNUNET_NO if not */ @@ -514,6 +522,71 @@ write_data (struct GNUNET_STREAM_Socket *socket) } +/** + * Handler for DATA messages; Same for both client and server + * + * @param socket the socket through which the ack was received + * @param tunnel connection to the other end + * @param sender who sent the message + * @param ack the acknowledgment message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_data (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_DataMessage *msg, + const struct GNUNET_ATS_Information*atsi) +{ + uint16_t size; + const void *payload; + + size = msg->header.header.size; + if (size < sizeof (struct GNUNET_STREAM_DataMessage)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + switch (socket->state) + { + case STATE_ESTABLISHED: + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + GNUNET_assert (NULL != socket->receive_buffer); + /* check if the message's sequence number is greater than the one we are + expecting */ + if (ntohl (msg->sequence_number) - socket->read_sequence_number < 64) + { + + } + else /* We are receiving a retransmitted message */ + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message with sequence number %d retransmitted\n", + ntohl (socket->read_sequence_number)); + return GNUNET_YES; + } + /* Copy Data to buffer and send acknowledgements */ + size -= sizeof (struct GNUNET_STREAM_DataMessage); + payload = &msg[1]; + memcpy (socket->receive_buffer + + (ntohl (msg->sequence_number) - socket->read_sequence_number) + * MAX_PACKET_SIZE, + payload, + size); + /* FIXME: We have to send GNUNET_STREAM_AckMessage */ + break; + + default: + /* FIXME: call statistics */ + break; + } + return GNUNET_YES; +} + /** * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA * @@ -535,22 +608,12 @@ client_handle_data (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; - uint16_t size; - const struct GNUNET_STREAM_DataMessage *data_msg; - const void *payload; - size = ntohs (message->size); - if (size < sizeof (struct GNUNET_STREAM_DataMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - data_msg = (const struct GNUNET_STREAM_DataMessage *) message; - size -= sizeof (struct GNUNET_STREAM_DataMessage); - payload = &data_msg[1]; - /* ... */ - - return GNUNET_OK; + return handle_data (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_DataMessage *) message, + atsi); } @@ -565,6 +628,8 @@ set_state_established (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n"); + /* Initialize the receive buffer */ + socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE); socket->state = STATE_ESTABLISHED; } @@ -673,6 +738,47 @@ client_handle_reset (void *cls, } +/** + * Common message handler for handling TRANSMIT_CLOSE messages + * + * @param socket the socket through which the ack was received + * @param tunnel connection to the other end + * @param sender who sent the message + * @param ack the acknowledgment message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_transmit_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *msg, + const struct GNUNET_ATS_Information*atsi) +{ + struct GNUNET_STREAM_MessageHeader *reply; + + switch (socket->state) + { + case STATE_ESTABLISHED: + socket->state = STATE_RECEIVE_CLOSED; + + /* Send TRANSMIT_CLOSE_ACK */ + reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); + reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + queue_message (socket, reply, NULL, NULL); + break; + + default: + /* FIXME: Call statistics? */ + break; + } + return GNUNET_YES; +} + + /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE * @@ -694,8 +800,12 @@ client_handle_transmit_close (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; - - return GNUNET_OK; + + return handle_transmit_close (socket, + tunnel, + sender, + (struct GNUNET_STREAM_MessageHeader *)message, + atsi); } @@ -1001,7 +1111,11 @@ server_handle_transmit_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_transmit_close (socket, + tunnel, + sender, + (struct GNUNET_STREAM_MessageHeader *)message, + atsi); } @@ -1290,6 +1404,7 @@ static struct GNUNET_MESH_MessageHandler server_message_handlers[] = { /** * Function called when our target peer is connected to our tunnel * + * @param cls the socket for which this tunnel is created * @param peer the peer identity of the target * @param atsi performance data for the connection */ @@ -1342,6 +1457,7 @@ mesh_peer_connect_callback (void *cls, /** * Function called when our target peer is disconnected from our tunnel * + * @param cls the socket associated which this tunnel * @param peer the peer identity of the target */ static void @@ -1467,6 +1583,13 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) GNUNET_MESH_disconnect (socket->mesh); socket->mesh = NULL; } + + /* Release receive buffer */ + if (NULL != socket->receive_buffer) + { + GNUNET_free (socket->receive_buffer); + } + GNUNET_free (socket); } @@ -1583,7 +1706,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Closes the listen socket * - * @param socket the listen socket + * @param lsocket the listen socket */ void GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) @@ -1628,7 +1751,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, || STATE_RECEIVE_CLOSED == socket->state)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Attempting to write on a closed/not-yet-established stream\n"); + "Attempting to write on a closed (OR) not-yet-established" + "stream\n"); return NULL; } diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h index 1b3a6838a..baaec2a4f 100644 --- a/src/stream/stream_protocol.h +++ b/src/stream/stream_protocol.h @@ -136,6 +136,9 @@ struct GNUNET_STREAM_AckMessage }; +/** + * Message for Acknowledging HELLO + */ struct GNUNET_STREAM_HelloAckMessage { /** @@ -151,10 +154,30 @@ struct GNUNET_STREAM_HelloAckMessage /** * The size(in bytes) of the receive window on the peer sending this message + * + * FIXME: Remove if not needed */ uint32_t receive_window_size; }; + +/** + * The Transmit close message(used to signal transmission is closed) + */ +struct GNUNET_STREAM_TransmitCloseMessage +{ + /** + * The stream message header + */ + struct GNUNET_STREAM_MessageHeader header; + + /** + * The last sequence number of the packet after which the transmission has + * ended + */ + uint32_t final_sequence_number GNUNET_PACKED; +}; + GNUNET_NETWORK_STRUCT_END