From c45d415b80be0e6fa4ca42c8f44a7902b9afda31 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Mon, 30 Jan 2012 20:42:53 +0000 Subject: [PATCH] -stream api implementation skeleton(incomplete) --- src/stream/stream_api.c | 660 +++++++++++++++++++++++ src/stream/stream_protocol.h | 27 - src/stream/test_stream_local.c | 7 +- src/stream/test_stream_local_halfclose.c | 7 +- 4 files changed, 668 insertions(+), 33 deletions(-) create mode 100644 src/stream/stream_api.c diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c new file mode 100644 index 000000000..25b876333 --- /dev/null +++ b/src/stream/stream_api.c @@ -0,0 +1,660 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file stream/stream_api.c + * @brief Implementation of the stream library + * @author Sree Harsha Totakura + */ + +#include "gnunet_common.h" +#include "gnunet_stream_lib.h" + + +/** + * states in the Protocol + */ +enum State + { + /** + * Client initialization state + */ + STATE_INIT, + + /** + * Listener initialization state + */ + STATE_LISTEN, + + /** + * Pre-connection establishment state + */ + STATE_HELLO_WAIT, + + /** + * State where a connection has been established + */ + STATE_ESTABLISHED, + + /** + * State where the socket is closed on our side and waiting to be ACK'ed + */ + STATE_RECEIVE_CLOSE_WAIT, + + /** + * State where the socket is closed for reading + */ + STATE_RECEIVE_CLOSED, + + /** + * State where the socket is closed on our side and waiting to be ACK'ed + */ + STATE_TRANSMIT_CLOSE_WAIT, + + /** + * State where the socket is closed for writing + */ + STATE_TRANSMIT_CLOSED, + + /** + * State where the socket is closed on our side and waiting to be ACK'ed + */ + STATE_CLOSE_WAIT, + + /** + * State where the socket is closed + */ + STATE_CLOSED + }; + + +/** + * The STREAM Socket Handler + */ +struct GNUNET_STREAM_Socket +{ + /** + * The mesh handle + */ + struct GNUNET_MESH_Handle *mesh; + + /** + * The mesh tunnel handle + */ + struct GNUNET_MESH_Tunnel *tunnel; + + /** + * The session id associated with this stream connection + */ + unint32_t session_id; + + /** + * The peer identity of the peer at the other end of the stream + */ + GNUNET_PeerIdentity *other_peer; + + /** + * Stream open closure + */ + void *open_cls; + + /** + * Stream open callback + */ + GNUNET_STREAM_OpenCallback open_cb; + + /** + * Retransmission timeout + */ + struct GNUNET_TIME_Relative retransmit_timeout; + + /** + * The state of the protocol associated with this socket + */ + enum State state; + + /** + * The status of the socket + */ + enum GNUNET_STREAM_Status status; + + /** + * The current transmit handle (if a pending transmit request exists) + */ + struct GNUNET_MESH_TransmitHandle *transmit_handle; + + /** + * The current message associated with the transmit handle + */ + struct GNUNET_MessageHeader *message; +}; + + +/** + * A socket for listening + */ +struct GNUNET_STREAM_ListenSocket +{ + + /** + * The mesh handle + */ + struct GNUNET_MESH_Handle *mesh; + + /** + * The service port + */ + GNUNET_MESH_ApplicationType port; + + /** + * The callback function which is called after successful opening socket + */ + GNUNET_STREAM_ListenCallback listen_cb; + + /** + * The call back closure + */ + void *listen_cb_cls; + +}; + +/** + * Default value in seconds for various timeouts + */ +static unsigned int default_timeout = 300; + + +/** + * Converts message fields from host byte order to network byte order + * + * @param msg the message to convert + */ +static void +GNUNET_STREAM_convert_message_h2n (struct GNUNET_STREAM_MessageHeader *msg) +{ + /* Add type specific message conversion here */ + + msg->size = htons (msg->size); + msg->type = htons (msg->type); +} + + +/** + * Converts message fields from network byte order to host byte order + * + * @param msg the messeage to convert + */ +static void +GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg) +{ + msg->size = ntohs (msg->size); + msg->type = ntohs (msg->type); + + /* Add type specific message conversion here */ +} + +/** + * Callback function from send_message + * + * @param cls closure the socket on which the send message was called + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +send_message_notify (void *cls, size_t size, void *buf) +{ + struct GNUNET_STREAM_Socket *socket; + + socket = (struct GNUNET_STREAM_Socket *) cls; + socket->transmit_handle = NULL; /* Remove the transmit handle */ + if (0 == size) /* Socket closed? */ + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Message not sent as tunnel was closed \n"); + } + else /* Size is more or equal to what was requested */ + { + size = socket->message->size; + GNUNET_STREAM_convert_message_h2n (socket->message) /* Convert h2n */ + memcpy (buf, socket->message, size); + } + GNUNET_free (socket->message); /* Free the message memory */ + socket->message = NULL; + return size; +} + + +/** + * Sends a message using the mesh connection of a socket + * + * @param socket the socket whose mesh connection is used + * @param message the message to be sent + */ +static void +send_message (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MessageHeader *message) +{ + socket->message = message; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + timeout, /* FIXME: Maxdelay */ + socket->other_peer, + message->size, + send_message_notify, + socket); +} + +/** + * Makes state transition dependending on the given state + * + * @param socket the socket whose state has to be transitioned + */ +static void +make_state_transition (struct GNUNET_STREAM_Socket *socket) +{ + +} + + +/** + * Message Handler for mesh + * + * @param cls closure (set from GNUNET_MESH_connect) + * @param tunnel connection to the other end + * @param tunnel_ctx place to store local state associated with the tunnel + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_data (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) +{ + uint16_t size; + struct GNUNET_STREAM_MessageHeader *message_copy; + + size = ntohs (message->size); + message_copy = GNUNET_malloc (size); + memcpy (message_copy, message, size); + GNUNET_STREAM_convert_message_n2h (message_copy); + + route_message (message_copy); +} + + +static struct GNUNET_MESH_MessageHandler message_handlers[] = { + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_ACK, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, 0}, + {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, 0}, + {NULL, 0, 0} +}; + + +/** + * Function called when our target peer is connected to our tunnel + * + * @param peer the peer identity of the target + * @param atsi performance data for the connection + */ +static void +mesh_peer_connect_callback (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_ATS_Information * atsi) +{ + const struct GNUNET_STREAM_Socket *socket; + + socket = (const struct GNUNET_STREAM_Socket *) cls; + if (0 != memcmp (socket->other_peer, + peer, + sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "A peer (%s) which is not our target has\ + connected to our tunnel", GNUNET_i2s (peer)); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Target peer %s connected\n", GNUNET_i2s (peer)); + + /* Set state to INIT */ + socket->state = STATE_INIT; + + /* Try to achieve ESTABLISHED state */ + make_state_transition (socket); + + /* Call open callback */ + if (NULL == open_cls) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "STREAM_open callback is NULL\n"); + } + if (NULL != socket->open_cb) + { + socket->open_cb (socket->open_cls, socket); + } +} + + +/** + * Function called when our target peer is disconnected from our tunnel + * + * @param peer the peer identity of the target + */ +static void +mesh_peer_disconnect_callback (void *cls, + const struct GNUNET_PeerIdentity *peer) +{ + +} + + +/** + * Function to find the mapped socket of a tunnel + * + * @param tunnel the tunnel whose associated socket has to be retrieved + * @return the socket corresponding to the tunnel + */ +static struct GNUNET_STREAM_Socket * +find_socket (const struct GNUNET_MESH_Tunnel *tunnel) +{ + /* Search tunnel in a list or hashtable and retrieve the socket */ +} + +/*****************/ +/* API functions */ +/*****************/ + + +/** + * Tries to open a stream to the target peer + * + * @param cfg configuration to use + * @param target the target peer to which the stream has to be opened + * @param app_port the application port number which uniquely identifies this + * stream + * @param open_cb this function will be called after stream has be established + * @param open_cb_cls the closure for open_cb + * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END + * @return if successful it returns the stream socket; NULL if stream cannot be + * opened + */ +struct GNUNET_STREAM_Socket * +GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_PeerIdentity *target, + GNUNET_MESH_ApplicationType app_port, + GNUNET_STREAM_OpenCallback open_cb, + void *open_cb_cls, + ...) +{ + struct GNUNET_STREAM_Socket *socket; + enum GNUNET_STREAM_Option option; + va_list vargs; /* Variable arguments */ + + socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); + if (NULL == socket) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unable to allocate memory\n"); + return NULL; + } + socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); + if (NULL == socket->other_peer) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unable to allocate memory \n"); + return NULL; + } + + /* Set defaults */ + socket->retransmit_timeout = + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); + + va_start (vargs, open_cb_cls); /* Parse variable args */ + do { + option = va_arg (vargs, enum GNUNET_STREAM_Option); + switch (option) + { + case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: + /* Expect struct GNUNET_TIME_Relative */ + socket->retransmit_timeout = va_arg (vargs, + struct GNUNET_TIME_Relative); + break; + case GNUNET_STREAM_OPTION_END: + break; + } + + } while (0 != option); + va_end (vargs); /* End of variable args parsing */ + + socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ + 10, /* QUEUE size as parameter? */ + NULL, /* cls */ + NULL, /* No inbound tunnel handler */ + NULL, /* No inbound tunnel cleaner */ + message_handlers, + NULL); /* We don't get inbound tunnels */ + + memcpy (socket->other_peer, target, sizeof (struct GNUNET_PeerIdentity)); + socket->open_cb = open_cb; + socket->open_cls = open_cb_cls; + + /* Now create the mesh tunnel to target */ + socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, + NULL, /* Tunnel context */ + &mesh_peer_connect_callback, + &mesh_peer_disconnect_callback, + (void *) socket); + + return socket; +} + + +/** + * Closes the stream + * + * @param socket the stream socket + */ +void +GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) +{ + /* Clear Transmit handles */ + if (NULL != socket->transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + } + /* Clear existing message queue message */ + if (NULL != socket->message) + { + GNUNET_free (socket->message); + } + /* Clear memory allocated for other peer's PeerIdentity */ + GNUNET_Free (socket->other_peer); + /* Close associated tunnel */ + if (NULL != socket->tunnel) + { + GNUNET_MESH_tunnel_destroy (socket->tunnel); + } + /* Close mesh connection */ + if (NULL != socket->mesh) + { + GNUNET_MESH_disconnect (socket->mesh); + } + GNUNET_free (socket); +} + + +/** + * Method called whenever a peer creates a tunnel to us + * + * @param cls closure + * @param tunnel new handle to the tunnel + * @param initiator peer that started the tunnel + * @param atsi performance information for the tunnel + * @return initial tunnel context for the tunnel + * (can be NULL -- that's not an error) + */ +void +new_tunnel_notify (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_ListenSocket *lsocket; + struct GNUNET_STREAM_Socket *socket; + + lsocket = (struct GNUNET_STREAM_ListenSocket *) cls; + socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); + socket->tunnel = tunnel; + socket->session_id = 0; /* FIXME */ + socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); + memcpy (socket->other_peer, initiator, sizeof (struct GNUNET_PeerIdentity)); + socket->state = STATE_LISTEN; + + if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, + socket, + socket->other_peer)) + { + socket->state = STATE_CLOSED; + make_state_transition (socket); + GNUNET_free (socket->other_peer); + GNUNET_free (socket); + GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */ + } + else + { + make_state_transition (socket); + } +} + + +/** + * Function called whenever an inbound tunnel is destroyed. Should clean up + * any associated state. This function is NOT called if the client has + * explicitly asked for the tunnel to be destroyed using + * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on + * the tunnel. + * + * @param cls closure (set from GNUNET_MESH_connect) + * @param tunnel connection to the other end (henceforth invalid) + * @param tunnel_ctx place where local state associated + * with the tunnel is stored + */ +void +tunnel_cleaner (void *cls, + const struct GNUNET_MESH_Tunnel *tunnel, + void *tunnel_ctx) +{ + struct GNUNET_STREAM_Socket *socket; + + socket = find_socket (tunnel); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Peer %s has terminated connection abruptly\n", + GNUNET_i2s (socket->other_peer)); + + socket->status = GNUNET_STREAM_SHUTDOWN; + /* Clear Transmit handles */ + if (NULL != socket->transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + + /* Clear existing message queue message */ + if (NULL != socket->message) + { + GNUNET_free (socket->message); + socket->message = NULL; + } +} + + +/** + * Listens for stream connections for a specific application ports + * + * @param cfg the configuration to use + * @param app_port the application port for which new streams will be accepted + * @param listen_cb this function will be called when a peer tries to establish + * a stream with us + * @param listen_cb_cls closure for listen_cb + * @return listen socket, NULL for any error + */ +struct GNUNET_STREAM_ListenSocket * +GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, + GNUNET_MESH_ApplicationType app_port, + GNUNET_STREAM_ListenCallback listen_cb, + void *listen_cb_cls) +{ + /* FIXME: Add variable args for passing configration options? */ + struct GNUNET_STREAM_ListenSocket *lsocket; + + lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); + if (NULL == lsocket) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unable to allocate memory\n"); + return NULL; + } + lsocket->port = app_port; + lsocket->listen_cb = listen_cb; + lsocket->listen_cb_cls = listen_cb_cls; + lsocket->mesh = GNUNET_MESH_connect (cfg, + 10, /* FIXME: QUEUE size as parameter? */ + lsocket, /* Closure */ + &new_tunnel_notify, + &tunnel_cleaner, + message_handlers, + {app_port, NULL}); + return lsocket; +} + + +/** + * Closes the listen socket + * + * @param socket the listen socket + */ +void +GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) +{ + /* Do house keeping */ + + /* Close MESH connection */ + GNUNET_MESH_disconnect (lsocket->mesh); + + GNUNET_free (lsocket); +} diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h index 3f6155eb1..b7fff7e5f 100644 --- a/src/stream/stream_protocol.h +++ b/src/stream/stream_protocol.h @@ -133,33 +133,6 @@ struct GNUNET_STREAM_AckMessage }; -/** - * states in the Protocol - */ -enum GNUNET_STREAM_State - { - GNUNET_STREAM_STATE_INIT, - - GNUNET_STREAM_STATE_LISTEN, - - GNUNET_STREAM_STATE_HANDSHAKE_WAIT, - - GNUNET_STREAM_STATE_ESTABLISHED, - - GNUNET_STREAM_STATE_RECEIVE_CLOSE_WAIT, - - GNUNET_STREAM_STATE_RECEIVE_CLOSED, - - GNUNET_STREAM_STATE_TRANSMIT_CLOSE_WAIT, - - GNUNET_STREAM_STATE_TRANSMIT_CLOSED, - - GNUNET_STREAM_STATE_CLOSE_WAIT, - - GNUNET_STREAM_STATE_CLOSED - } - - #if 0 /** keep Emacsens' auto-indent happy */ { #endif diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 27271faff..63347d826 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -124,9 +124,10 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param status the status of the stream at the time this function is called * @param size the number of bytes read or written */ -void write_completion (void *cls, - enum GNUNET_STREAM_Status status, - size_t size) +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) { struct PeerData *peer; diff --git a/src/stream/test_stream_local_halfclose.c b/src/stream/test_stream_local_halfclose.c index 44ae5787a..5acaf213a 100644 --- a/src/stream/test_stream_local_halfclose.c +++ b/src/stream/test_stream_local_halfclose.c @@ -125,9 +125,10 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param status the status of the stream at the time this function is called * @param size the number of bytes read or written */ -void write_completion (void *cls, - enum GNUNET_STREAM_Status status, - size_t size) +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) { struct PeerData *peer; -- 2.25.1