Boston, MA 02111-1307, USA.
*/
+/* TODO:
+ * Checks for matching the sender and socket->other_peer in server
+ * message handlers */
+
/**
* @file stream/stream_api.c
* @brief Implementation of the stream library
*/
unsigned int retries;
+ /**
+ * The application port number (type: uint32_t)
+ */
+ GNUNET_MESH_ApplicationType app_port;
+
/**
* The session id associated with this stream connection
* FIXME: Not used currently, may be removed
/**
* The service port
+ * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
};
struct GNUNET_STREAM_HelloAckMessage *reply;
GNUNET_assert (socket->tunnel == tunnel);
+
+ /* Catch possible protocol breaks */
+ GNUNET_break_op (0 != memcmp (&socket->other_peer,
+ sender,
+ sizeof (struct GNUNET_PeerIdentity)));
+
if (STATE_INIT == socket->state)
{
/* Get the random sequence number */
}
+/**
+ * Method called whenever a peer creates a tunnel to us
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ * (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel_notify (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *initiator,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ struct GNUNET_STREAM_Socket *socket;
+
+ socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+ socket->tunnel = tunnel;
+ socket->session_id = 0; /* FIXME */
+ socket->other_peer = *initiator;
+ socket->state = STATE_INIT;
+
+ if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
+ socket,
+ &socket->other_peer))
+ {
+ socket->state = STATE_CLOSED;
+ /* FIXME: Send CLOSE message and then free */
+ GNUNET_free (socket);
+ GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
+ }
+ return socket;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed. Should clean up
+ * any associated state. This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ * with the tunnel is stored
+ */
+static void
+tunnel_cleaner (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel,
+ void *tunnel_ctx)
+{
+ struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer %s has terminated connection abruptly\n",
+ GNUNET_i2s (&socket->other_peer));
+
+ socket->status = GNUNET_STREAM_SHUTDOWN;
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ socket->tunnel = NULL;
+}
+
+
/*****************/
/* API functions */
/*****************/
struct GNUNET_STREAM_Socket *socket;
enum GNUNET_STREAM_Option option;
va_list vargs; /* Variable arguments */
+ GNUNET_MESH_ApplicationType no_port;
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *target;
}
} while (GNUNET_STREAM_OPTION_END != option);
va_end (vargs); /* End of variable args parsing */
-
+ no_port = 0;
socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1, /* QUEUE size as parameter? */
socket, /* cls */
NULL, /* No inbound tunnel handler */
NULL, /* No inbound tunnel cleaner */
client_message_handlers,
- NULL); /* We don't get inbound tunnels */
- // FIXME: if (NULL == socket->mesh) ...
+ &no_port); /* We don't get inbound tunnels */
+ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
+ {
+ GNUNET_free (socket);
+ return NULL;
+ }
/* Now create the mesh tunnel to target */
socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
}
+/**
+ * Shutdown the stream for reading or writing (man 2 shutdown).
+ *
+ * @param socket the stream socket
+ * @param how SHUT_RD, SHUT_WR or SHUT_RDWR
+ */
+void
+GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
+ int how)
+{
+ return;
+}
+
+
/**
* Closes the stream
*
}
-/**
- * Method called whenever a peer creates a tunnel to us
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- * (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel_notify (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- const struct GNUNET_PeerIdentity *initiator,
- const struct GNUNET_ATS_Information *atsi)
-{
- struct GNUNET_STREAM_ListenSocket *lsocket = cls;
- struct GNUNET_STREAM_Socket *socket;
-
- socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
- socket->tunnel = tunnel;
- socket->session_id = 0; /* FIXME */
- socket->other_peer = *initiator;
- socket->state = STATE_INIT;
-
- if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
- socket,
- &socket->other_peer))
- {
- socket->state = STATE_CLOSED;
- /* FIXME: Send CLOSE message and then free */
- GNUNET_free (socket);
- GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
- }
- return socket;
-}
-
-
-/**
- * Function called whenever an inbound tunnel is destroyed. Should clean up
- * any associated state. This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- * with the tunnel is stored
- */
-static void
-tunnel_cleaner (void *cls,
- const struct GNUNET_MESH_Tunnel *tunnel,
- void *tunnel_ctx)
-{
- struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer %s has terminated connection abruptly\n",
- GNUNET_i2s (&socket->other_peer));
-
- socket->status = GNUNET_STREAM_SHUTDOWN;
- /* Clear Transmit handles */
- if (NULL != socket->transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
- socket->transmit_handle = NULL;
- }
- socket->tunnel = NULL;
-}
-
-
/**
* Listens for stream connections for a specific application ports
*
socket);
return read_handle;
}
+
+
+/**
+ * Cancel pending write operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
+{
+ return;
+}
+
+
+/**
+ * Cancel pending read operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
+{
+ return;
+}
#include "gnunet_util_lib.h"
#include "gnunet_mesh_service.h"
#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
#define VERBOSE 1
struct GNUNET_STREAM_Socket *socket;
/**
- * Peer's io handle
+ * Peer's io write handle
*/
- struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_IOReadHandle *io_read_handle;
/**
* Bytes the peer has written
static struct PeerData peer1;
static struct PeerData peer2;
static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config;
static GNUNET_SCHEDULER_TaskIdentifier abort_task;
static GNUNET_SCHEDULER_TaskIdentifier test_task;
do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_STREAM_close (peer1.socket);
- GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
if (0 != abort_task)
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+ /* Free the duplicated configuration */
+ GNUNET_CONFIGURATION_destroy (config);
GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
GNUNET_OS_process_close (arm_pid);
}
do_shutdown (cls, tc);
}
+/**
+ * Signature for input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size);
+
/**
* The write completion function; called upon writing some data to stream or
if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
{
- peer->io_handle = GNUNET_STREAM_write (peer->socket,
- (void *) data,
- strlen(data) - peer->bytes_wrote,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ (void *) data,
+ strlen(data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ GNUNET_assert (NULL != peer->io_write_handle);
}
else
{
if (&peer1 == peer) /* Peer1 has finished writing; should read now */
{
- peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL!=peer->io_handle);
+ peer->io_read_handle =
+ GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL!=peer->io_read_handle);
}
}
}
{
struct PeerData *peer;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
peer = (struct PeerData *) cls;
peer->bytes_wrote = 0;
GNUNET_assert (socket == peer1.socket);
GNUNET_assert (socket == peer->socket);
- peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */
- (void *) data, /* data */
- strlen(data),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
+ peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
+ (void *) data, /* data */
+ strlen(data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
cls);
- GNUNET_assert (NULL != peer->io_handle);
+ GNUNET_assert (NULL != peer->io_write_handle);
}
peer = (struct PeerData *) cls;
- GNUNET_assert (GNUNET_STERAM_OK == status);
+ GNUNET_assert (GNUNET_STREAM_OK == status);
GNUNET_assert (size < strlen (data));
GNUNET_assert (strncmp ((const char *) data + peer->bytes_read,
(const char *) input_data,
if (peer->bytes_read < strlen (data))
{
- peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+ peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL != peer->io_read_handle);
}
else
{
if (&peer2 == peer) /* Peer2 has completed reading; should write */
{
peer->bytes_wrote = 0;
- peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- (void *) data,
- strlen(data),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
+ peer->io_write_handle =
+ GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ (void *) data,
+ strlen(data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
}
else /* Peer1 has completed reading. End of tests */
{
GNUNET_assert (NULL != cls);
peer2.bytes_read = 0;
GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
- peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- (void *) &peer2);
- GNUNET_assert (NULL != peer2.io_handle);
+ peer2.io_read_handle =
+ GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ (void *) &peer2);
+ GNUNET_assert (NULL != peer2.io_read_handle);
}
GNUNET_assert (NULL == initiator); /* Local peer=NULL? */
GNUNET_assert (socket != peer1.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer connected: %s\n", GNUNET_i2s(initiator));
+
peer2.socket = socket;
read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
return GNUNET_OK;
/**
* Testing function
+ *
+ * @param cls NULL
+ * @param tc the task context
*/
static void
test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GNUNET_PeerIdentity self;
+
test_task = GNUNET_SCHEDULER_NO_TASK;
+ /* Get our identity */
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+ &self));
+
+ peer2_listen_socket = GNUNET_STREAM_listen (config,
+ 10, /* App port */
+ &stream_listen_cb,
+ NULL);
+ GNUNET_assert (NULL != peer2_listen_socket);
/* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */
+ peer1.socket = GNUNET_STREAM_open (config,
+ &self, /* Null for local peer? */
10, /* App port */
&stream_open_cb,
(void *) &peer1);
- GNUNET_assert (NULL != peer1.socket);
- peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
- &stream_listen_cb,
- NULL);
- GNUNET_assert (NULL != peer2_listen_socket);
-
+ GNUNET_assert (NULL != peer1.socket);
}
/**
"WARNING",
#endif
NULL);
+ /* Duplicate the configuration */
+ config = GNUNET_CONFIGURATION_dup (cfg);
arm_pid =
GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
"gnunet-service-arm",
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
+
+ test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
}
int ret;
char *const argv2[] = { "test-stream-local",
- "-c", "test_stream.conf",
+ "-c", "test_stream_local.conf",
#if VERBOSE
"-L", "DEBUG",
#endif