From a31b6d486baee50c455914f292c2f14e52e398f8 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sun, 17 Jun 2012 17:56:45 +0000 Subject: [PATCH] -stream uses locks from lockmanager --- src/include/gnunet_stream_lib.h | 11 +- src/stream/Makefile.am | 1 + src/stream/stream_api.c | 148 +++++++++++++++++-- src/stream/test_stream_2peers.c | 33 +++-- src/stream/test_stream_2peers_halfclose.c | 33 +++-- src/stream/test_stream_big.c | 43 ++++-- src/stream/test_stream_local.c | 42 ++++-- src/stream/test_stream_local.conf | 9 +- src/stream/test_stream_sequence_wraparound.c | 48 ++++-- 9 files changed, 296 insertions(+), 72 deletions(-) diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index f348780e3..282372483 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -192,9 +192,9 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket); * Functions of this type are called upon new stream connection from other peers * * @param cls the closure from GNUNET_STREAM_listen - * @param socket the socket representing the stream + * @param socket the socket representing the stream; NULL on binding error * @param initiator the identity of the peer who wants to establish a stream - * with us + * with us; NULL on binding error * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the * stream (the socket will be invalid after the call) */ @@ -213,7 +213,12 @@ struct GNUNET_STREAM_ListenSocket; * Listens for stream connections for a specific application ports * * @param cfg the configuration to use - * @param app_port the application port for which new streams will be accepted + * + * @param app_port the application port for which new streams will be + * accepted. If another stream is listening on the same port the + * listen_cb will be called to signal binding error and the returned + * ListenSocket will be invalidated. + * * @param listen_cb this function will be called when a peer tries to establish * a stream with us * @param listen_cb_cls closure for listen_cb diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am index 253ad94fc..3a0e6835e 100644 --- a/src/stream/Makefile.am +++ b/src/stream/Makefile.am @@ -15,6 +15,7 @@ libgnunetstream_la_SOURCES = \ stream_api.c stream_protocol.h libgnunetstream_la_LIBADD = \ $(top_builddir)/src/mesh/libgnunetmesh.la \ + $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \ $(top_builddir)/src/util/libgnunetutil.la $(XLIB) libgnunetstream_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 214ac4e41..d46e589f9 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -38,12 +38,16 @@ #include "platform.h" #include "gnunet_common.h" #include "gnunet_crypto_lib.h" +#include "gnunet_lockmanager_service.h" #include "gnunet_stream_lib.h" #include "stream_protocol.h" #define LOG(kind,...) \ GNUNET_log_from (kind, "stream-api", __VA_ARGS__) +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + /** * The maximum packet size of a stream packet */ @@ -369,6 +373,21 @@ struct GNUNET_STREAM_ListenSocket */ struct GNUNET_MESH_Handle *mesh; + /** + * Our configuration + */ + struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Handle to the lock manager service + */ + struct GNUNET_LOCKMANAGER_Handle *lockmanager; + + /** + * The active LockingRequest from lockmanager + */ + struct GNUNET_LOCKMANAGER_LockingRequest *locking_request; + /** * The callback function which is called after successful opening socket */ @@ -381,15 +400,24 @@ struct GNUNET_STREAM_ListenSocket /** * The service port - * FIXME: Remove if not required! */ GNUNET_MESH_ApplicationType port; + + /** + * The id of the lockmanager timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task; /** * The retransmit timeout */ struct GNUNET_TIME_Relative retransmit_timeout; + /** + * Listen enabled? + */ + int listening; + /** * Whether testing mode is active or not */ @@ -492,7 +520,12 @@ struct GNUNET_STREAM_ShutdownHandle /** * Default value in seconds for various timeouts */ -static unsigned int default_timeout = 10; +static const unsigned int default_timeout = 10; + +/** + * The domain name for locks we use here + */ +static const char *locking_domain = "GNUNET_STREAM_APPLOCK"; /** @@ -2690,6 +2723,15 @@ new_tunnel_notify (void *cls, /* FIXME: If a tunnel is already created, we should not accept new tunnels from the same peer again until the socket is closed */ + if (GNUNET_NO == lsocket->listening) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Destroying tunnel from peer %s as we don't have the lock\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + GNUNET_MESH_tunnel_destroy (tunnel); + return NULL; + } socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); socket->other_peer = *initiator; socket->tunnel = tunnel; @@ -2771,6 +2813,71 @@ tunnel_cleaner (void *cls, } +/** + * Callback to signal timeout on lockmanager lock acquire + * + * @param cls the ListenSocket + * @param tc the scheduler task context + */ +static void +lockmanager_acquire_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_ListenSocket *lsocket = cls; + GNUNET_STREAM_ListenCallback listen_cb; + void *listen_cb_cls; + + lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK; + listen_cb = lsocket->listen_cb; + listen_cb_cls = lsocket->listen_cb_cls; + GNUNET_STREAM_listen_close (lsocket); + if (NULL != listen_cb) + listen_cb (listen_cb_cls, NULL, NULL); +} + + +/** + * Callback to notify us on the status changes on app_port lock + * + * @param cls the ListenSocket + * @param domain the domain name of the lock + * @param lock the app_port + * @param status the current status of the lock + */ +static void +lock_status_change_cb (void *cls, const char *domain_name, uint32_t lock, + enum GNUNET_LOCKMANAGER_Status status) +{ + struct GNUNET_STREAM_ListenSocket *lsocket = cls; + + GNUNET_assert (lock == (uint32_t) lsocket->port); + if (GNUNET_LOCKMANAGER_SUCCESS == status) + { + lsocket->listening = GNUNET_YES; + if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task) + { + GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task); + lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL == lsocket->mesh) + { + GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0}; + + lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg, + RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */ + lsocket, /* Closure */ + &new_tunnel_notify, + &tunnel_cleaner, + server_message_handlers, + ports); + GNUNET_assert (NULL != lsocket->mesh); + } + } + if (GNUNET_LOCKMANAGER_RELEASE == status) + lsocket->listening = GNUNET_NO; +} + + /*****************/ /* API functions */ /*****************/ @@ -3070,11 +3177,19 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { /* FIXME: Add variable args for passing configration options? */ struct GNUNET_STREAM_ListenSocket *lsocket; - GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; enum GNUNET_STREAM_Option option; va_list vargs; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); + lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg); + lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg); + if (NULL == lsocket->lockmanager) + { + GNUNET_CONFIGURATION_destroy (lsocket->cfg); + GNUNET_free (lsocket); + return NULL; + } + lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */ /* Set defaults */ lsocket->retransmit_timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); @@ -3101,14 +3216,13 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, lsocket->port = app_port; lsocket->listen_cb = listen_cb; lsocket->listen_cb_cls = listen_cb_cls; - lsocket->mesh = GNUNET_MESH_connect (cfg, - RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */ - lsocket, /* Closure */ - &new_tunnel_notify, - &tunnel_cleaner, - server_message_handlers, - ports); - GNUNET_assert (NULL != lsocket->mesh); + lsocket->locking_request = + GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain, + (uint32_t) lsocket->port, + &lock_status_change_cb, lsocket); + lsocket->lockmanager_acquire_timeout_task = + GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS (20), + &lockmanager_acquire_timeout, lsocket); return lsocket; } @@ -3122,9 +3236,15 @@ void GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) { /* Close MESH connection */ - GNUNET_assert (NULL != lsocket->mesh); - GNUNET_MESH_disconnect (lsocket->mesh); - + if (NULL != lsocket->mesh) + GNUNET_MESH_disconnect (lsocket->mesh); + GNUNET_CONFIGURATION_destroy (lsocket->cfg); + if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task) + GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task); + if (NULL != lsocket->locking_request) + GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request); + if (NULL != lsocket->lockmanager) + GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager); GNUNET_free (lsocket); } diff --git a/src/stream/test_stream_2peers.c b/src/stream/test_stream_2peers.c index 6a61022e0..ecb4b848c 100644 --- a/src/stream/test_stream_2peers.c +++ b/src/stream/test_stream_2peers.c @@ -39,6 +39,9 @@ */ #define NUM_PEERS 2 +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + /** * Structure for holding peer's sockets and IO Handles */ @@ -432,6 +435,26 @@ stream_listen_cb (void *cls, } +/** + * Task for connecting the peer to stream as client + * + * @param cls PeerData + * @param tc the TaskContext + */ +static void +stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + /* Connect to stream library */ + peer1.socket = GNUNET_STREAM_open (d1->cfg, + &d2->id, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer1.socket); +} + + /** * Callback to be called when testing peer group is ready * @@ -477,15 +500,7 @@ peergroup_ready (void *cls, const char *emsg) NULL, GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer2_listen_socket); - - /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (d1->cfg, - &d2->id, /* Null for local peer? */ - 10, /* App port */ - &stream_open_cb, - &peer1, - GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer1.socket); + GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, NULL); } diff --git a/src/stream/test_stream_2peers_halfclose.c b/src/stream/test_stream_2peers_halfclose.c index 7997c20e3..0e78dec20 100644 --- a/src/stream/test_stream_2peers_halfclose.c +++ b/src/stream/test_stream_2peers_halfclose.c @@ -40,6 +40,9 @@ */ #define NUM_PEERS 2 +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + /** * Structure for holding peer's sockets and IO Handles */ @@ -658,6 +661,26 @@ stream_listen_cb (void *cls, } +/** + * Task for connecting the peer to stream as client + * + * @param cls PeerData + * @param tc the TaskContext + */ +static void +stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + /* Connect to stream library */ + peer1.socket = GNUNET_STREAM_open (d1->cfg, + &d2->id, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer1.socket); +} + + /** * Callback to be called when testing peer group is ready * @@ -702,15 +725,7 @@ peergroup_ready (void *cls, const char *emsg) &stream_listen_cb, NULL); GNUNET_assert (NULL != peer2_listen_socket); - - /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (d1->cfg, - &d2->id, /* Null for local peer? */ - 10, /* App port */ - &stream_open_cb, - &peer1, - GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer1.socket); + GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, NULL); } diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c index 9ba9e6777..2ddfaff80 100644 --- a/src/stream/test_stream_big.c +++ b/src/stream/test_stream_big.c @@ -34,6 +34,10 @@ #define LOG(kind, ...) \ GNUNET_log (kind, __VA_ARGS__); +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + + /** * Structure for holding peer's sockets and IO Handles */ @@ -326,6 +330,32 @@ stream_listen_cb (void *cls, } +/** + * Task for connecting the peer to stream as client + * + * @param cls PeerData + * @param tc the TaskContext + */ +static void +stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + struct GNUNET_PeerIdentity self; + + GNUNET_assert (&peer1 == peer); + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config, + &self)); + /* Connect to stream */ + peer->socket = GNUNET_STREAM_open (config, + &self, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer->socket); +} + + /** * Testing function * @@ -348,15 +378,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) NULL, GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer2_listen_socket); - - /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (config, - &self, /* Null for local peer? */ - 10, /* App port */ - &stream_open_cb, - (void *) &peer1, - GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer1.socket); + GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1); } @@ -381,8 +403,7 @@ run (void *cls, char *const *args, const char *cfgfile, GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, NULL); + test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(3), &test, NULL); } /** diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index fc2a8f842..bf0584015 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -34,6 +34,10 @@ #define VERBOSE 1 +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + + /** * Structure for holding peer's sockets and IO Handles */ @@ -354,6 +358,32 @@ stream_listen_cb (void *cls, } +/** + * Task for connecting the peer to stream as client + * + * @param cls PeerData + * @param tc the TaskContext + */ +static void +stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + struct GNUNET_PeerIdentity self; + + GNUNET_assert (&peer1 == peer); + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, + &self)); + /* Connect to stream library */ + peer->socket = GNUNET_STREAM_open (config_peer1, + &self, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer1.socket); +} + + /** * Testing function * @@ -376,15 +406,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) &peer2, GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer2_listen_socket); - - /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (config_peer1, - &self, /* Null for local peer? */ - 10, /* App port */ - &stream_open_cb, - &peer1, - GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer1.socket); + GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1); } /** @@ -417,7 +439,7 @@ run (void *cls, char *const *args, const char *cfgfile, (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, NULL); - test_task = GNUNET_SCHEDULER_add_now (&test, NULL); + test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &test, NULL); } /** diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf index 1399a62ab..90ddf41c6 100644 --- a/src/stream/test_stream_local.conf +++ b/src/stream/test_stream_local.conf @@ -1,3 +1,8 @@ +[lockmanager] +AUTOSTART = NO +ACCEPT_FROM = 127.0.0.1; +HOSTNAME = localhost + [fs] AUTOSTART = NO @@ -44,7 +49,7 @@ WAN_QUOTA_IN = 3932160 PORT = 12092 [arm] -DEFAULTSERVICES = core +DEFAULTSERVICES = core lockmanager PORT = 12366 DEBUG = NO @@ -74,4 +79,4 @@ SERVICEHOME = /tmp/test-stream/ AUTOSTART = NO [nse] -AUTOSTART = NO +AUTOSTART = NO \ No newline at end of file diff --git a/src/stream/test_stream_sequence_wraparound.c b/src/stream/test_stream_sequence_wraparound.c index 1b10845a9..b71e96b27 100644 --- a/src/stream/test_stream_sequence_wraparound.c +++ b/src/stream/test_stream_sequence_wraparound.c @@ -34,6 +34,9 @@ #define LOG(kind, ...) \ GNUNET_log (kind, __VA_ARGS__); +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + /** * Structure for holding peer's sockets and IO Handles */ @@ -326,6 +329,35 @@ stream_listen_cb (void *cls, } +/** + * Task for connecting the peer to stream as client + * + * @param cls PeerData + * @param tc the TaskContext + */ +static void +stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + struct GNUNET_PeerIdentity self; + + GNUNET_assert (&peer1 == peer); + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config, + &self)); + /* Connect to stream */ + peer->socket = GNUNET_STREAM_open (config, + &self, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER, + UINT32_MAX - GNUNET_CRYPTO_random_u32 + (GNUNET_CRYPTO_QUALITY_WEAK, 64), + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer->socket); +} + + /** * Testing function * @@ -348,18 +380,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) NULL, GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer2_listen_socket); - - /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (config, - &self, /* Null for local peer? */ - 10, /* App port */ - &stream_open_cb, - (void *) &peer1, - GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER, - UINT32_MAX - GNUNET_CRYPTO_random_u32 - (GNUNET_CRYPTO_QUALITY_WEAK, 64), - GNUNET_STREAM_OPTION_END); - GNUNET_assert (NULL != peer1.socket); + GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1); } @@ -384,8 +405,7 @@ run (void *cls, char *const *args, const char *cfgfile, GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, NULL); + test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(3), &test, NULL); } /** -- 2.25.1