* @brief Implementation of the stream library
* @author Sree Harsha Totakura
*/
-
+#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_stream_lib.h"
-
/**
* states in the Protocol
*/
/**
* The session id associated with this stream connection
*/
- unint32_t session_id;
+ uint32_t session_id;
/**
* The peer identity of the peer at the other end of the stream
*/
- GNUNET_PeerIdentity *other_peer;
+ GNUNET_PeerIdentity other_peer;
/**
* Stream open closure
static unsigned int default_timeout = 300;
-/**
- * Converts message fields from host byte order to network byte order
- *
- * @param msg the message to convert
- */
-static void
-GNUNET_STREAM_convert_message_h2n (struct GNUNET_STREAM_MessageHeader *msg)
-{
- /* Add type specific message conversion here */
-
- msg->size = htons (msg->size);
- msg->type = htons (msg->type);
-}
-
-
-/**
- * Converts message fields from network byte order to host byte order
- *
- * @param msg the messeage to convert
- */
-static void
-GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
-{
- msg->size = ntohs (msg->size);
- msg->type = ntohs (msg->type);
-
- /* Add type specific message conversion here */
-}
-
/**
* Callback function from send_message
*
static size_t
send_message_notify (void *cls, size_t size, void *buf)
{
- struct GNUNET_STREAM_Socket *socket;
+ struct GNUNET_STREAM_Socket *socket = cls;
+ size_t ret;
- socket = (struct GNUNET_STREAM_Socket *) cls;
socket->transmit_handle = NULL; /* Remove the transmit handle */
if (0 == size) /* Socket closed? */
{
+ // statistics ("message timeout")
+
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Message not sent as tunnel was closed \n");
+ "Message not sent as tunnel was closed \n");
+ ret = 0;
}
else /* Size is more or equal to what was requested */
{
- size = socket->message->size;
- GNUNET_STREAM_convert_message_h2n (socket->message) /* Convert h2n */
- memcpy (buf, socket->message, size);
+ ret = ntohs (socket->message->size);
+ GNUNET_assert (size >= ret);
+ memcpy (buf, socket->message, ret);
}
GNUNET_free (socket->message); /* Free the message memory */
socket->message = NULL;
- return size;
+ return ret;
}
0, /* Corking */
timeout, /* FIXME: Maxdelay */
socket->other_peer,
- message->size,
- send_message_notify,
+ ntohs (message->size),
+ &send_message_notify,
socket);
}
const struct GNUNET_MessageHeader *message,
const struct GNUNET_ATS_Information*atsi)
{
+ struct GNUNET_STREAM_Socket *socket = cls;
uint16_t size;
- struct GNUNET_STREAM_MessageHeader *message_copy;
-
+ const struct GNUNET_STREAM_DataMessage *data_msg;
+ const void *payload;
+
size = ntohs (message->size);
- message_copy = GNUNET_malloc (size);
- memcpy (message_copy, message, size);
- GNUNET_STREAM_convert_message_n2h (message_copy);
+ if (size < sizeof (struct GNUNET_STREAM_DataMessage))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
+ size -= sizeof (Struct GNUNET_STREAM_DataMessage);
+ payload = &data_msg[1];
+ /* ... */
- route_message (message_copy);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Message Handler for mesh
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
+
}
static struct GNUNET_MESH_MessageHandler message_handlers[] = {
{&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
- {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_ACK, 0},
- {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
- {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
- {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
+ {&handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) },
+ {&handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
+ {&handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
+ {&handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
{&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
{&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
{&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_ATS_Information * atsi)
{
- const struct GNUNET_STREAM_Socket *socket;
-
- socket = (const struct GNUNET_STREAM_Socket *) cls;
+ const struct GNUNET_STREAM_Socket *socket = cls;
+
if (0 != memcmp (socket->other_peer,
peer,
sizeof (struct GNUNET_PeerIdentity)))
va_list vargs; /* Variable arguments */
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
- if (NULL == socket)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Unable to allocate memory\n");
- return NULL;
- }
- socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
- if (NULL == socket->other_peer)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Unable to allocate memory \n");
- return NULL;
- }
+ socket->other_peer = *target;
+ socket->open_cb = open_cb;
+ socket->open_cls = open_cb_cls;
/* Set defaults */
socket->retransmit_timeout =
va_end (vargs); /* End of variable args parsing */
socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
- 10, /* QUEUE size as parameter? */
- NULL, /* cls */
+ 1, /* QUEUE size as parameter? */
+ socket, /* cls */
NULL, /* No inbound tunnel handler */
NULL, /* No inbound tunnel cleaner */
message_handlers,
NULL); /* We don't get inbound tunnels */
-
- memcpy (socket->other_peer, target, sizeof (struct GNUNET_PeerIdentity));
- socket->open_cb = open_cb;
- socket->open_cls = open_cb_cls;
+ // FIXME: if (NULL == socket->mesh) ...
/* Now create the mesh tunnel to target */
socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
&mesh_peer_connect_callback,
&mesh_peer_disconnect_callback,
(void *) socket);
+ // FIXME: if (NULL == socket->tunnel) ...
return socket;
}
{
GNUNET_free (socket->message);
}
- /* Clear memory allocated for other peer's PeerIdentity */
- GNUNET_Free (socket->other_peer);
/* Close associated tunnel */
if (NULL != socket->tunnel)
{
* @return initial tunnel context for the tunnel
* (can be NULL -- that's not an error)
*/
-void
+static void
new_tunnel_notify (void *cls,
struct GNUNET_MESH_Tunnel *tunnel,
const struct GNUNET_PeerIdentity *initiator,
const struct GNUNET_ATS_Information *atsi)
{
- struct GNUNET_STREAM_ListenSocket *lsocket;
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
struct GNUNET_STREAM_Socket *socket;
- lsocket = (struct GNUNET_STREAM_ListenSocket *) cls;
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->tunnel = tunnel;
socket->session_id = 0; /* FIXME */
* @param tunnel_ctx place where local state associated
* with the tunnel is stored
*/
-void
+static void
tunnel_cleaner (void *cls,
const struct GNUNET_MESH_Tunnel *tunnel,
void *tunnel_ctx)
struct GNUNET_STREAM_ListenSocket *lsocket;
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
- if (NULL == lsocket)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Unable to allocate memory\n");
- return NULL;
- }
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
lsocket->listen_cb_cls = listen_cb_cls;
#include "gnunet_util_lib.h"
+GNUNET_NETWORK_STRUCT_BEGIN
+
/**
* The stream message header
/**
* A number which identifies a session between the two peers.
*/
- uint32_t session_id;
+ uint32_t session_id GNUNET_PACKED;
};
*/
struct GNUNET_STREAM_MessageHeader header;
+ /**
+ * Sequence number; starts with a random value. (Just in case
+ * someone breaks mesh and is able to try to do a Sequence
+ * Prediction Attack on us.)
+ */
+ uint32_t sequence_number GNUNET_PACKED;
+
/**
* number of milliseconds to the soft deadline for sending acknowledgement
* measured from the time this message is received. It is optimal for the
*/
struct GNUNET_TIME_RelativeNBO ack_deadline;
- /**
- * Sequence number; starts with a random value. (Just in case
- * someone breaks mesh and is able to try to do a Sequence
- * Prediction Attack on us.)
- */
- uint32_t sequence_number;
-
/**
* Offset of the packet in the overall stream, modulo 2^32; allows
* the receiver to calculate where in the destination buffer the
- * message should be placed.
+ * message should be placed. In network byte order.
*/
- uint32_t offset;
+ uint32_t offset GNUNET_PACKED;
/**
* The data should be appended here
* The Selective Acknowledgement Bitmap. Computed relative to the base_seq
* (bit n corresponds to the Data message with sequence number base_seq+n)
*/
- GNUNET_STREAM_AckBitmap bitmap;
+ GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
/**
* The sequence number of the Data Message upto which the receiver has filled
* its buffer without any missing packets
*/
- uint32_t base_sequence_number;
+ uint32_t base_sequence_number GNUNET_PACKED;
/**
* Available buffer space past the last acknowledged buffer (for flow control),
* in bytes.
*/
- uint32_t receive_window_remaining;
+ uint32_t receive_window_remaining GNUNET_PACKED;
};
+GNUNET_NETWORK_STRUCT_END
+
#if 0 /** keep Emacsens' auto-indent happy */
{