* Functions of this type are called upon new stream connection from other peers
*
* @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream
+ * @param socket the socket representing the stream; NULL on binding error
* @param initiator the identity of the peer who wants to establish a stream
- * with us
+ * with us; NULL on binding error
* @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
* stream (the socket will be invalid after the call)
*/
* Listens for stream connections for a specific application ports
*
* @param cfg the configuration to use
- * @param app_port the application port for which new streams will be accepted
+ *
+ * @param app_port the application port for which new streams will be
+ * accepted. If another stream is listening on the same port the
+ * listen_cb will be called to signal binding error and the returned
+ * ListenSocket will be invalidated.
+ *
* @param listen_cb this function will be called when a peer tries to establish
* a stream with us
* @param listen_cb_cls closure for listen_cb
stream_api.c stream_protocol.h
libgnunetstream_la_LIBADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \
$(top_builddir)/src/util/libgnunetutil.la $(XLIB)
libgnunetstream_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
+#include "gnunet_lockmanager_service.h"
#include "gnunet_stream_lib.h"
#include "stream_protocol.h"
#define LOG(kind,...) \
GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* The maximum packet size of a stream packet
*/
*/
struct GNUNET_MESH_Handle *mesh;
+ /**
+ * Our configuration
+ */
+ struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Handle to the lock manager service
+ */
+ struct GNUNET_LOCKMANAGER_Handle *lockmanager;
+
+ /**
+ * The active LockingRequest from lockmanager
+ */
+ struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
+
/**
* The callback function which is called after successful opening socket
*/
/**
* The service port
- * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
+
+ /**
+ * The id of the lockmanager timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
/**
* The retransmit timeout
*/
struct GNUNET_TIME_Relative retransmit_timeout;
+ /**
+ * Listen enabled?
+ */
+ int listening;
+
/**
* Whether testing mode is active or not
*/
/**
* Default value in seconds for various timeouts
*/
-static unsigned int default_timeout = 10;
+static const unsigned int default_timeout = 10;
+
+/**
+ * The domain name for locks we use here
+ */
+static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
/**
/* FIXME: If a tunnel is already created, we should not accept new tunnels
from the same peer again until the socket is closed */
+ if (GNUNET_NO == lsocket->listening)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Destroying tunnel from peer %s as we don't have the lock\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ GNUNET_MESH_tunnel_destroy (tunnel);
+ return NULL;
+ }
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *initiator;
socket->tunnel = tunnel;
}
+/**
+ * Callback to signal timeout on lockmanager lock acquire
+ *
+ * @param cls the ListenSocket
+ * @param tc the scheduler task context
+ */
+static void
+lockmanager_acquire_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ GNUNET_STREAM_ListenCallback listen_cb;
+ void *listen_cb_cls;
+
+ lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ listen_cb = lsocket->listen_cb;
+ listen_cb_cls = lsocket->listen_cb_cls;
+ GNUNET_STREAM_listen_close (lsocket);
+ if (NULL != listen_cb)
+ listen_cb (listen_cb_cls, NULL, NULL);
+}
+
+
+/**
+ * Callback to notify us on the status changes on app_port lock
+ *
+ * @param cls the ListenSocket
+ * @param domain the domain name of the lock
+ * @param lock the app_port
+ * @param status the current status of the lock
+ */
+static void
+lock_status_change_cb (void *cls, const char *domain_name, uint32_t lock,
+ enum GNUNET_LOCKMANAGER_Status status)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+
+ GNUNET_assert (lock == (uint32_t) lsocket->port);
+ if (GNUNET_LOCKMANAGER_SUCCESS == status)
+ {
+ lsocket->listening = GNUNET_YES;
+ if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
+ lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL == lsocket->mesh)
+ {
+ GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
+
+ lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
+ RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
+ lsocket, /* Closure */
+ &new_tunnel_notify,
+ &tunnel_cleaner,
+ server_message_handlers,
+ ports);
+ GNUNET_assert (NULL != lsocket->mesh);
+ }
+ }
+ if (GNUNET_LOCKMANAGER_RELEASE == status)
+ lsocket->listening = GNUNET_NO;
+}
+
+
/*****************/
/* API functions */
/*****************/
{
/* FIXME: Add variable args for passing configration options? */
struct GNUNET_STREAM_ListenSocket *lsocket;
- GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
enum GNUNET_STREAM_Option option;
va_list vargs;
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
+ lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
+ lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
+ if (NULL == lsocket->lockmanager)
+ {
+ GNUNET_CONFIGURATION_destroy (lsocket->cfg);
+ GNUNET_free (lsocket);
+ return NULL;
+ }
+ lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
/* Set defaults */
lsocket->retransmit_timeout =
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
lsocket->listen_cb_cls = listen_cb_cls;
- lsocket->mesh = GNUNET_MESH_connect (cfg,
- RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
- lsocket, /* Closure */
- &new_tunnel_notify,
- &tunnel_cleaner,
- server_message_handlers,
- ports);
- GNUNET_assert (NULL != lsocket->mesh);
+ lsocket->locking_request =
+ GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
+ (uint32_t) lsocket->port,
+ &lock_status_change_cb, lsocket);
+ lsocket->lockmanager_acquire_timeout_task =
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS (20),
+ &lockmanager_acquire_timeout, lsocket);
return lsocket;
}
GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
{
/* Close MESH connection */
- GNUNET_assert (NULL != lsocket->mesh);
- GNUNET_MESH_disconnect (lsocket->mesh);
-
+ if (NULL != lsocket->mesh)
+ GNUNET_MESH_disconnect (lsocket->mesh);
+ GNUNET_CONFIGURATION_destroy (lsocket->cfg);
+ if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
+ GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
+ if (NULL != lsocket->locking_request)
+ GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
+ if (NULL != lsocket->lockmanager)
+ GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
GNUNET_free (lsocket);
}
*/
#define NUM_PEERS 2
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
}
+/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (d1->cfg,
+ &d2->id, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
/**
* Callback to be called when testing peer group is ready
*
NULL,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (d1->cfg,
- &d2->id, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, NULL);
}
*/
#define NUM_PEERS 2
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
}
+/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (d1->cfg,
+ &d2->id, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
/**
* Callback to be called when testing peer group is ready
*
&stream_listen_cb,
NULL);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (d1->cfg,
- &d2->id, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, NULL);
}
#define LOG(kind, ...) \
GNUNET_log (kind, __VA_ARGS__);
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+
/**
* Structure for holding peer's sockets and IO Handles
*/
}
+/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+ &self));
+ /* Connect to stream */
+ peer->socket = GNUNET_STREAM_open (config,
+ &self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer->socket);
+}
+
+
/**
* Testing function
*
NULL,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config,
- &self, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- (void *) &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1);
}
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(3), &test, NULL);
}
/**
#define VERBOSE 1
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+
/**
* Structure for holding peer's sockets and IO Handles
*/
}
+/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
+ &self));
+ /* Connect to stream library */
+ peer->socket = GNUNET_STREAM_open (config_peer1,
+ &self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
/**
* Testing function
*
&peer2,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config_peer1,
- &self, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1);
}
/**
(GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &test, NULL);
}
/**
+[lockmanager]
+AUTOSTART = NO
+ACCEPT_FROM = 127.0.0.1;
+HOSTNAME = localhost
+
[fs]
AUTOSTART = NO
PORT = 12092
[arm]
-DEFAULTSERVICES = core
+DEFAULTSERVICES = core lockmanager
PORT = 12366
DEBUG = NO
AUTOSTART = NO
[nse]
-AUTOSTART = NO
+AUTOSTART = NO
\ No newline at end of file
#define LOG(kind, ...) \
GNUNET_log (kind, __VA_ARGS__);
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
}
+/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+ &self));
+ /* Connect to stream */
+ peer->socket = GNUNET_STREAM_open (config,
+ &self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER,
+ UINT32_MAX - GNUNET_CRYPTO_random_u32
+ (GNUNET_CRYPTO_QUALITY_WEAK, 64),
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer->socket);
+}
+
+
/**
* Testing function
*
NULL,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config,
- &self, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- (void *) &peer1,
- GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER,
- UINT32_MAX - GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK, 64),
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1);
}
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(3), &test, NULL);
}
/**