From 4bcb854976f9f86f3b05e519542015bc2f14d484 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 21 Oct 2016 05:23:21 +0000 Subject: [PATCH] misc fixes to new client impl --- src/util/Makefile.am | 5 +- src/util/client_new.c | 221 +++++++++++++++++++++++------------------- 2 files changed, 126 insertions(+), 100 deletions(-) diff --git a/src/util/Makefile.am b/src/util/Makefile.am index 2325874b2..776927219 100644 --- a/src/util/Makefile.am +++ b/src/util/Makefile.am @@ -60,6 +60,7 @@ libgnunetutil_la_SOURCES = \ bandwidth.c \ bio.c \ client.c \ + client_new.c \ common_allocation.c \ common_endian.c \ common_logging.c \ @@ -325,13 +326,13 @@ check_PROGRAMS = \ # Declare .nc (NO-CONCURRENCY) as a test extension so that we can impart # sequential execution order for them TEST_EXTENSIONS = .nc -test_connection.log: test_client.log +test_connection.log: test_client.log test_connection_addressing.log: test_connection.log test_connection_timeout_no_connect.log: test_connection_addressing.log test_connection_transmit_cancel.log: test_connection_timeout_no_connect.log test_connection_receive_cancel.log: test_connection_transmit_cancel.log test_connection_timeout.log: test_connection_receive_cancel.log -test_resolver_api.log: test_connection_timeout.log +test_resolver_api.log: test_connection_timeout.log test_server.log: test_resolver_api.log test_server_disconnect.log: test_server.log test_server_with_client.log: test_server_disconnect.log diff --git a/src/util/client_new.c b/src/util/client_new.c index b5c0147b3..d2b70388c 100644 --- a/src/util/client_new.c +++ b/src/util/client_new.c @@ -29,16 +29,10 @@ #include "platform.h" #include "gnunet_protocols.h" #include "gnunet_util_lib.h" +#include "gnunet_resolver_service.h" #include "gnunet_socks.h" -/** - * How often do we re-try tranmsitting requests before giving up? - * Note that if we succeeded transmitting a request but failed to read - * a response, we do NOT re-try. - */ -#define MAX_ATTEMPTS 50 - #define LOG(kind,...) GNUNET_log_from (kind, "util",__VA_ARGS__) @@ -160,6 +154,11 @@ struct ClientState */ struct GNUNET_MessageStreamTokenizer *mst; + /** + * Message queue under our control. + */ + struct GNUNET_MQ_Handle *mq; + /** * Timeout for receiving a response (absolute time). */ @@ -182,14 +181,16 @@ struct ClientState size_t msg_off; /** - * Is this the first message we are sending to the service? + * How often have we tried to connect? */ - int first_message; + unsigned int attempts; /** - * How often have we tried to connect? + * Are we supposed to die? #GNUNET_SYSERR if destruction must be + * deferred, #GNUNET_NO by default, #GNUNET_YES if destruction was + * deferred. */ - unsigned int attempts; + int in_destroy; }; @@ -220,11 +221,13 @@ connect_fail_continuation (struct ClientState *cstate) GNUNET_break (NULL == cstate->ap_tail); GNUNET_break (NULL == cstate->dns_active); GNUNET_break (NULL == cstate->sock); - GNUNET_assert (NULL == cstate->write_task); + GNUNET_assert (NULL == cstate->send_task); + GNUNET_assert (NULL == cstate->recv_task); // GNUNET_assert (NULL == cstate->proxy_handshake); + cstate->back_off = GNUNET_TIME_STD_BACKOFF (cstate->back_off); cstate->retry_task - = GNUNET_SCHEDULER_add_delayed (cstate->retry_delay, + = GNUNET_SCHEDULER_add_delayed (cstate->back_off, &start_connect, cstate); } @@ -263,7 +266,7 @@ transmit_ready (void *cls) { GNUNET_MQ_impl_send_in_flight (cstate->mq); } - cstate->msg_off += pos; + cstate->msg_off += ret; if (cstate->msg_off < len) { cstate->send_task @@ -284,15 +287,81 @@ transmit_ready (void *cls) * * @param cls the `struct ClientState` * @param msg message we received. + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing */ -static void +static int recv_message (void *cls, const struct GNUNET_MessageHeader *msg) { struct ClientState *cstate = cls; + if (GNUNET_YES == cstate->in_destroy) + return GNUNET_SYSERR; GNUNET_MQ_inject_message (cstate->mq, msg); + if (GNUNET_YES == cstate->in_destroy) + return GNUNET_SYSERR; + return GNUNET_OK; +} + + +/** + * Cancel all remaining connect attempts + * + * @param cstate handle of the client state to process + */ +static void +cancel_aps (struct ClientState *cstate) +{ + struct AddressProbe *pos; + + while (NULL != (pos = cstate->ap_head)) + { + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (pos->sock)); + GNUNET_SCHEDULER_cancel (pos->task); + GNUNET_CONTAINER_DLL_remove (cstate->ap_head, + cstate->ap_tail, + pos); + GNUNET_free (pos); + } +} + + +/** + * Implement the destruction of a message queue. Implementations must + * not free @a mq, but should take care of @a impl_state. + * + * @param mq the message queue to destroy + * @param impl_state our `struct ClientState` + */ +static void +connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct ClientState *cstate = impl_state; + + if (GNUNET_SYSERR == cstate->in_destroy) + { + /* defer destruction */ + cstate->in_destroy = GNUNET_YES; + return; + } + if (NULL != cstate->dns_active) + GNUNET_RESOLVER_request_cancel (cstate->dns_active); + if (NULL != cstate->send_task) + GNUNET_SCHEDULER_cancel (cstate->send_task); + if (NULL != cstate->recv_task) + GNUNET_SCHEDULER_cancel (cstate->recv_task); + if (NULL != cstate->retry_task) + GNUNET_SCHEDULER_cancel (cstate->retry_task); + if (NULL != cstate->sock) + GNUNET_NETWORK_socket_close (cstate->sock); + cancel_aps (cstate); + GNUNET_free (cstate->service_name); + GNUNET_free_non_null (cstate->hostname); + GNUNET_MST_destroy (cstate->mst); + GNUNET_free (cstate); } @@ -305,11 +374,11 @@ static void receive_ready (void *cls) { struct ClientState *cstate = cls; - const struct GNUNET_SCHEDULER_TaskContext *tc; int ret; - connection->recv_task = NULL; - ret = GNUNET_MST_read (cstate->msg, + cstate->recv_task = NULL; + cstate->in_destroy = GNUNET_SYSERR; + ret = GNUNET_MST_read (cstate->mst, cstate->sock, GNUNET_NO, GNUNET_NO); @@ -319,6 +388,13 @@ receive_ready (void *cls) GNUNET_MQ_ERROR_READ); return; } + if (GNUNET_YES == cstate->in_destroy) + { + connection_client_destroy_impl (cstate->mq, + cstate); + return; + } + cstate->in_destroy = GNUNET_NO; cstate->recv_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, cstate->sock, @@ -335,7 +411,7 @@ receive_ready (void *cls) static void connect_success_continuation (struct ClientState *cstate) { - GNUNET_assert (NULL == connection->read_task); + GNUNET_assert (NULL == cstate->recv_task); cstate->recv_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, cstate->sock, @@ -391,13 +467,13 @@ try_unixpath (const char *service_name, if (NULL == unixpath) return NULL; } - memset (&un, + memset (&s_un, 0, - sizeof (un)); - un.sun_family = AF_UNIX; - strncpy (un.sun_path, + sizeof (s_un)); + s_un.sun_family = AF_UNIX; + strncpy (s_un.sun_path, unixpath, - sizeof (un->sun_path) - 1); + sizeof (s_un.sun_path) - 1); #ifdef LINUX { int abstract; @@ -406,7 +482,7 @@ try_unixpath (const char *service_name, "TESTING", "USE_ABSTRACT_SOCKETS"); if (GNUNET_YES == abstract) - un.sun_path[0] = '\0'; + s_un.sun_path[0] = '\0'; } #endif #if HAVE_SOCKADDR_IN_SIN_LEN @@ -417,8 +493,8 @@ try_unixpath (const char *service_name, 0); if ( (GNUNET_OK == GNUNET_NETWORK_socket_connect (sock, - (struct sockaddr *) &un, - sizeof (un))) || + (struct sockaddr *) &s_un, + sizeof (s_un))) || (EINPROGRESS == errno) ) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -434,29 +510,6 @@ try_unixpath (const char *service_name, } -/** - * Cancel all remaining connect attempts - * - * @param cstate handle of the client state to process - */ -static void -cancel_aps (struct ClientState *cstate) -{ - struct AddressProbe *pos; - - while (NULL != (pos = cstate->ap_head)) - { - GNUNET_break (GNUNET_OK == - GNUNET_NETWORK_socket_close (pos->sock)); - GNUNET_SCHEDULER_cancel (pos->task); - GNUNET_CONTAINER_DLL_remove (cstate->ap_head, - cstate->ap_tail, - pos); - GNUNET_free (pos); - } -} - - /** * Scheduler let us know that we're either ready to write on the * socket OR connect timed out. Do the right thing. @@ -467,15 +520,15 @@ static void connect_probe_continuation (void *cls) { struct AddressProbe *ap = cls; - struct ClientState *cstate *connection = ap->cstate; + struct ClientState *cstate = ap->cstate; const struct GNUNET_SCHEDULER_TaskContext *tc; int error; socklen_t len; ap->task = NULL; GNUNET_assert (NULL != ap->sock); - GNUNET_CONTAINER_DLL_remove (connection->ap_head, - connection->ap_tail, + GNUNET_CONTAINER_DLL_remove (cstate->ap_head, + cstate->ap_tail, ap); len = sizeof (error); error = 0; @@ -500,8 +553,7 @@ connect_probe_continuation (void *cls) } LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection to `%s' succeeded!\n", - GNUNET_a2s (cstate->addr, - cstate->addrlen)); + cstate->service_name); /* trigger jobs that waited for the connection */ GNUNET_assert (NULL == cstate->sock); cstate->sock = ap->sock; @@ -515,7 +567,7 @@ connect_probe_continuation (void *cls) * Try to establish a connection given the specified address. * This function is called by the resolver once we have a DNS reply. * - * @param cls our `struct GNUNET_CONNECTION_Handle *` + * @param cls our `struct ClientState *` * @param addr address to try, NULL for "last call" * @param addrlen length of @a addr */ @@ -526,7 +578,6 @@ try_connect_using_address (void *cls, { struct ClientState *cstate = cls; struct AddressProbe *ap; - struct GNUNET_TIME_Relative delay; if (NULL == addr) { @@ -544,22 +595,22 @@ try_connect_using_address (void *cls, "Trying to connect using address `%s:%u'\n", GNUNET_a2s (addr, addrlen), - connection->port); + cstate->port); ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen); ap->addr = (const struct sockaddr *) &ap[1]; GNUNET_memcpy (&ap[1], addr, addrlen); ap->addrlen = addrlen; - ap->connection = connection; + ap->cstate = cstate; switch (ap->addr->sa_family) { case AF_INET: - ((struct sockaddr_in *) ap->addr)->sin_port = htons (connection->port); + ((struct sockaddr_in *) ap->addr)->sin_port = htons (cstate->port); break; case AF_INET6: - ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (connection->port); + ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (cstate->port); break; default: GNUNET_break (0); @@ -581,15 +632,15 @@ try_connect_using_address (void *cls, (EINPROGRESS != errno) ) { /* maybe refused / unsupported address, try next */ - LOG_STRERROR (GNUNET_ERROR_TYPE_INFO, - "connect"); + GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO, + "connect"); GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (ap->sock)); GNUNET_free (ap); return; } - GNUNET_CONTAINER_DLL_insert (connection->ap_head, - connection->ap_tail, + GNUNET_CONTAINER_DLL_insert (cstate->ap_head, + cstate->ap_tail, ap); ap->task = GNUNET_SCHEDULER_add_write_net (GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT, ap->sock, @@ -663,8 +714,8 @@ start_connect (void *cls) #if 0 /* Never use a local source if a proxy is configured */ if (GNUNET_YES == - GNUNET_SOCKS_check_service (service_name, - cfg)) + GNUNET_SOCKS_check_service (cstate->service_name, + cstate->cfg)) { socks_connect (cstate); return; @@ -675,8 +726,8 @@ start_connect (void *cls) (0 == cstate->port) ) { /* on even rounds, try UNIX first */ - cstate->sock = try_unixpath (service_name, - cfg); + cstate->sock = try_unixpath (cstate->service_name, + cstate->cfg); if (NULL != cstate->sock) { connect_success_continuation (cstate); @@ -689,33 +740,6 @@ start_connect (void *cls) GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT, &try_connect_using_address, cstate); - return connection; -} - - - -/** - * Implement the destruction of a message queue. Implementations must - * not free @a mq, but should take care of @a impl_state. - * - * @param mq the message queue to destroy - * @param impl_state our `struct ClientState` - */ -static void -connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct ClientState *cstate = impl_state; - - if (NULL != cstate->dns_active) - GNUNET_RESOLVER_ip_get_cancel (cstate->dns_active); - if (NULL != cstate->sock) - GNUNET_NETWORK_socket_close (cstate->sock); - cancel_aps (cstate); - GNUNET_free (cstate->service_name); - GNUNET_free_non_null (cstate->hostname); - GNUNET_MST_destroy (cstate->mst); - GNUNET_free (cstate); } @@ -770,7 +794,8 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, * Create a message queue to connect to a GNUnet service. * If handlers are specfied, receive messages from the connection. * - * @param connection the client connection + * @param cfg our configuration + * @param service_name name of the service to connect to * @param handlers handlers for receiving messages, can be NULL * @param error_handler error handler * @param error_handler_cls closure for the @a error_handler @@ -794,7 +819,7 @@ GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg, cstate->cfg = cfg; cstate->retry_task = GNUNET_SCHEDULER_add_now (&start_connect, cstate); - cstate->msg = GNUNET_MST_create (&recv_message, + cstate->mst = GNUNET_MST_create (&recv_message, cstate); if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (cfg, -- 2.25.1