From: Christian Grothoff Date: Sun, 18 Sep 2016 20:43:47 +0000 (+0000) Subject: more work on new MST and service logic X-Git-Tag: initial-import-from-subversion-38251~259 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=af6f9ae173e641a15639b59238bd5e86113a9113;p=oweals%2Fgnunet.git more work on new MST and service logic --- diff --git a/src/util/mst.c b/src/util/mst.c index 578ba8e04..82a21b880 100644 --- a/src/util/mst.c +++ b/src/util/mst.c @@ -144,15 +144,20 @@ do_align: { /* need to align or need more space */ mst->pos -= mst->off; - memmove (ibuf, &ibuf[mst->off], mst->pos); + memmove (ibuf, + &ibuf[mst->off], + mst->pos); mst->off = 0; } if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) { - delta = - GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - - (mst->pos - mst->off), size); - GNUNET_memcpy (&ibuf[mst->pos], buf, delta); + delta + = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) + - (mst->pos - mst->off), + size); + GNUNET_memcpy (&ibuf[mst->pos], + buf, + delta); mst->pos += delta; buf += delta; size -= delta; @@ -178,23 +183,29 @@ do_align: { /* can get more space by moving */ mst->pos -= mst->off; - memmove (ibuf, &ibuf[mst->off], mst->pos); + memmove (ibuf, + &ibuf[mst->off], + mst->pos); mst->off = 0; } if (mst->curr_buf < want) { /* need to get more space by growing buffer */ GNUNET_assert (0 == mst->off); - mst->hdr = GNUNET_realloc (mst->hdr, want); + mst->hdr = GNUNET_realloc (mst->hdr, + want); ibuf = (char *) mst->hdr; mst->curr_buf = want; } hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; if (mst->pos - mst->off < want) { - delta = GNUNET_MIN (want - (mst->pos - mst->off), size); + delta = GNUNET_MIN (want - (mst->pos - mst->off), + size); GNUNET_assert (mst->pos + delta <= mst->curr_buf); - GNUNET_memcpy (&ibuf[mst->pos], buf, delta); + GNUNET_memcpy (&ibuf[mst->pos], + buf, + delta); mst->pos += delta; buf += delta; size -= delta; @@ -278,12 +289,15 @@ copy: { if (size + mst->pos > mst->curr_buf) { - mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); + mst->hdr = GNUNET_realloc (mst->hdr, + size + mst->pos); ibuf = (char *) mst->hdr; mst->curr_buf = size + mst->pos; } GNUNET_assert (size + mst->pos <= mst->curr_buf); - GNUNET_memcpy (&ibuf[mst->pos], buf, size); + GNUNET_memcpy (&ibuf[mst->pos], + buf, + size); mst->pos += size; } if (purge) @@ -318,8 +332,35 @@ GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst, int purge, int one_shot) { - GNUNET_assert (0); // not implemented - return GNUNET_SYSERR; + ssize_t ret; + size_t left; + char *buf; + + left = mst->curr_buf - mst->pos; + buf = (char *) mst->hdr; + ret = GNUNET_NETWORK_socket_recv (sock, + &buf[mst->pos], + left); + if (-1 == ret) + { + if ( (EAGAIN == errno) || + (EINTR == errno) ) + return GNUNET_OK; + GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO, + "recv"); + return GNUNET_SYSERR; + } + if (0 == ret) + { + /* other side closed connection, treat as error */ + return GNUNET_SYSERR; + } + mst->pos += ret; + return GNUNET_MST_from_buffer (mst, + NULL, + 0, + purge, + one_shot); } diff --git a/src/util/service_new.c b/src/util/service_new.c index 6d17720fd..30fb88f7d 100644 --- a/src/util/service_new.c +++ b/src/util/service_new.c @@ -205,9 +205,8 @@ struct GNUNET_SERVICE_Handle int ret; /** - * If GNUNET_YES, consider unknown message types an error where the + * If #GNUNET_YES, consider unknown message types an error where the * client is disconnected. - * FIXME: remove? */ int require_found; }; @@ -247,7 +246,7 @@ struct GNUNET_SERVICE_Client /** * Tokenizer we use for processing incoming data. */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct GNUNET_MessageStreamTokenizer *mst; /** * Task that warns about missing calls to @@ -272,6 +271,12 @@ struct GNUNET_SERVICE_Client */ void *user_context; + /** + * Time when we last gave a message from this client + * to the application. + */ + struct GNUNET_TIME_Absolute warn_start; + /** * Persist the file handle for this client no matter what happens, * force the OS to close once the process actually dies. Should only @@ -286,6 +291,11 @@ struct GNUNET_SERVICE_Client */ int is_monitor; + /** + * Are we waiting for the application to call #GNUNET_SERVICE_client_continue()? + */ + int needs_continue; + /** * Type of last message processed (for warn_no_receive_done). */ @@ -386,7 +396,9 @@ process_acl4 (struct GNUNET_STRINGS_IPv4NetworkPolicy **ret, { char *opt; - if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option)) + if (! GNUNET_CONFIGURATION_have_value (sh->cfg, + sh->service_name, + option)) { *ret = NULL; return GNUNET_OK; @@ -394,7 +406,8 @@ process_acl4 (struct GNUNET_STRINGS_IPv4NetworkPolicy **ret, GNUNET_break (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (sh->cfg, sh->service_name, - option, &opt)); + option, + &opt)); if (NULL == (*ret = GNUNET_STRINGS_parse_ipv4_policy (opt))) { LOG (GNUNET_ERROR_TYPE_WARNING, @@ -426,7 +439,9 @@ process_acl6 (struct GNUNET_STRINGS_IPv6NetworkPolicy **ret, { char *opt; - if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option)) + if (! GNUNET_CONFIGURATION_have_value (sh->cfg, + sh->service_name, + option)) { *ret = NULL; return GNUNET_OK; @@ -434,12 +449,15 @@ process_acl6 (struct GNUNET_STRINGS_IPv6NetworkPolicy **ret, GNUNET_break (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (sh->cfg, sh->service_name, - option, &opt)); + option, + &opt)); if (NULL == (*ret = GNUNET_STRINGS_parse_ipv6_policy (opt))) { LOG (GNUNET_ERROR_TYPE_WARNING, _("Could not parse IPv6 network specification `%s' for `%s:%s'\n"), - opt, sh->service_name, option); + opt, + sh->service_name, + option); GNUNET_free (opt); return GNUNET_SYSERR; } @@ -469,7 +487,9 @@ add_unixpath (struct sockaddr **saddrs, un = GNUNET_new (struct sockaddr_un); un->sun_family = AF_UNIX; - strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1); + strncpy (un->sun_path, + unixpath, + sizeof (un->sun_path) - 1); #ifdef LINUX if (GNUNET_YES == abstract) un->sun_path[0] = '\0'; @@ -554,10 +574,10 @@ get_server_addresses (const char *service_name, 0); if (NULL == desc) { - if ((ENOBUFS == errno) || - (ENOMEM == errno) || - (ENFILE == errno) || - (EACCES == errno)) + if ( (ENOBUFS == errno) || + (ENOMEM == errno) || + (ENFILE == errno) || + (EACCES == errno) ) { LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "socket"); @@ -571,7 +591,8 @@ get_server_addresses (const char *service_name, } else { - GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc)); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (desc)); desc = NULL; } } @@ -648,9 +669,9 @@ get_server_addresses (const char *service_name, if (GNUNET_SYSERR == abstract) abstract = GNUNET_NO; #endif - if ((GNUNET_YES != abstract) - && (GNUNET_OK != - GNUNET_DISK_directory_create_for_file (unixpath))) + if ( (GNUNET_YES != abstract) && + (GNUNET_OK != + GNUNET_DISK_directory_create_for_file (unixpath)) ) GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "mkdir", unixpath); @@ -682,7 +703,8 @@ get_server_addresses (const char *service_name, } else { - GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc)); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (desc)); desc = NULL; } } @@ -994,7 +1016,8 @@ receive_sockets_from_parent (struct GNUNET_SERVICE_Handle *sh) LOG (GNUNET_ERROR_TYPE_ERROR, _("Could not access a pre-bound socket, will try to bind myself\n")); for (i = 0; (i < count) && (NULL != lsocks[i]); i++) - GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[i])); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (lsocks[i])); GNUNET_free (lsocks); return NULL; } @@ -1081,7 +1104,8 @@ open_listen_socket (const struct sockaddr *server_addr, GNUNET_a2s (server_addr, socklen)); } } - GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock)); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (sock)); errno = eno; return NULL; } @@ -1090,7 +1114,8 @@ open_listen_socket (const struct sockaddr *server_addr, { LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "listen"); - GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock)); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (sock)); errno = 0; return NULL; } @@ -1177,7 +1202,8 @@ setup_service (struct GNUNET_SERVICE_Handle *sh) (unsigned int) 3 + cnt); cnt++; while (NULL != lsocks[cnt]) - GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[cnt++])); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (lsocks[cnt++])); GNUNET_free (lsocks); lsocks = NULL; break; @@ -1694,7 +1720,10 @@ shutdown: struct ServiceListenContext *slc = sh.slc_head; sh.slc_head = slc->next; - // FIXME: destroy slc + if (NULL != slc->listen_task) + GNUNET_SCHEDULER_cancel (slc->listen_task); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (slc->listen_socket)); GNUNET_free (slc); } @@ -1778,7 +1807,7 @@ service_mq_cancel (struct GNUNET_MQ_Handle *mq, * the message queue. * Not every message queue implementation supports an error handler. * - * @param cls closure + * @param cls closure with our `struct GNUNET_SERVICE_Client` * @param error error code */ static void @@ -1786,8 +1815,35 @@ service_mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_SERVICE_Client *client = cls; + struct GNUNET_SERVICE_Handle *sh = client->sh; - // FIXME! + if ( (GNUNET_MQ_ERROR_NO_MATCH == error) && + (GNUNET_NO == sh->require_found) ) + return; /* ignore error */ + GNUNET_SERVICE_client_drop (client); +} + + +/** + * Task run to warn about missing calls to #GNUNET_SERVICE_client_continue(). + * + * @param cls our `struct GNUNET_SERVICE_Client *` to process more requests from + */ +static void +warn_no_client_continue (void *cls) +{ + struct GNUNET_SERVICE_Client *client = cls; + + GNUNET_break (0 != client->warn_type); /* type should never be 0 here, as we don't use 0 */ + client->warn_task + = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &warn_no_client_continue, + client); + LOG (GNUNET_ERROR_TYPE_WARNING, + _("Processing code for message of type %u did not call `GNUNET_SERVICE_client_continue' after %s\n"), + (unsigned int) client->warn_type, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (client->warn_start), + GNUNET_YES)); } @@ -1795,21 +1851,27 @@ service_mq_error_handler (void *cls, * Functions with this signature are called whenever a * complete message is received by the tokenizer for a client. * - * Do not call #GNUNET_SERVER_mst_destroy() from within + * Do not call #GNUNET_MST_destroy() from within * the scope of this callback. * * @param cls closure with the `struct GNUNET_SERVICE_Client *` - * @param client_cls closure with the `struct GNUNET_SERVICE_Client *` * @param message the actual message * @return #GNUNET_OK on success (always) */ static int service_client_mst_cb (void *cls, - void *client_cls, const struct GNUNET_MessageHeader *message) { struct GNUNET_SERVICE_Client *client = cls; + GNUNET_assert (GNUNET_NO == client->needs_continue); + client->needs_continue = GNUNET_YES; + client->warn_type = ntohs (message->type); + client->warn_start = GNUNET_TIME_absolute_get (); + client->warn_task + = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &warn_no_client_continue, + client); GNUNET_MQ_inject_message (client->mq, message); return GNUNET_OK; @@ -1826,10 +1888,31 @@ static void service_client_recv (void *cls) { struct GNUNET_SERVICE_Client *client = cls; + int ret; - // FIXME: read into buffer, pass to MST, then client->mq inject! - // FIXME: revise MST API to avoid the memcpy! - // i.e.: GNUNET_MST_read (client->sock); + client->recv_task = NULL; + ret = GNUNET_MST_read (client->mst, + client->sock, + GNUNET_NO, + GNUNET_YES); + if (GNUNET_SYSERR == ret) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + if (GNUNET_NO == ret) + return; /* more messages in buffer, wait for application + to be done processing */ + GNUNET_assert (GNUNET_OK == ret); + if (GNUNET_YES == client->needs_continue) + return; + /* MST needs more data, re-schedule read job */ + client->recv_task + = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + client->sock, + &service_client_recv, + client); } @@ -1859,8 +1942,8 @@ start_client (struct GNUNET_SERVICE_Handle *sh, sh->handlers, &service_mq_error_handler, client); - client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb, - client); + client->mst = GNUNET_MST_create (&service_client_mst_cb, + client); client->user_context = sh->connect_cb (sh->cb_cls, client, client->mq); @@ -1955,69 +2038,70 @@ accept_client (void *cls) slc->listen_task = NULL; while (1) + { + struct GNUNET_NETWORK_Handle *sock; + const struct sockaddr_in *v4; + const struct sockaddr_in6 *v6; + struct sockaddr_storage sa; + socklen_t addrlen; + int ok; + + addrlen = sizeof (sa); + sock = GNUNET_NETWORK_socket_accept (slc->listen_socket, + (struct sockaddr *) &sa, + &addrlen); + if (NULL == sock) + break; + switch (sa.ss_family) { - struct GNUNET_NETWORK_Handle *sock; - const struct sockaddr_in *v4; - const struct sockaddr_in6 *v6; - struct sockaddr_storage sa; - socklen_t addrlen; - int ok; - - addrlen = sizeof (sa); - sock = GNUNET_NETWORK_socket_accept (slc->listen_socket, - (struct sockaddr *) &sa, - &addrlen); - if (NULL == sock) - break; - switch (sa.ss_family) - { - case AF_INET: - GNUNET_assert (addrlen == sizeof (struct sockaddr_in)); - v4 = (const struct sockaddr_in *) &sa; - ok = ( ( (NULL == sh->v4_allowed) || - (check_ipv4_listed (sh->v4_allowed, - &v4->sin_addr))) && - ( (NULL == sh->v4_denied) || - (! check_ipv4_listed (sh->v4_denied, - &v4->sin_addr)) ) ); - break; - case AF_INET6: - GNUNET_assert (addrlen == sizeof (struct sockaddr_in6)); - v6 = (const struct sockaddr_in6 *) &sa; - ok = ( ( (NULL == sh->v6_allowed) || - (check_ipv6_listed (sh->v6_allowed, - &v6->sin6_addr))) && - ( (NULL == sh->v6_denied) || - (! check_ipv6_listed (sh->v6_denied, - &v6->sin6_addr)) ) ); - break; + case AF_INET: + GNUNET_assert (addrlen == sizeof (struct sockaddr_in)); + v4 = (const struct sockaddr_in *) &sa; + ok = ( ( (NULL == sh->v4_allowed) || + (check_ipv4_listed (sh->v4_allowed, + &v4->sin_addr))) && + ( (NULL == sh->v4_denied) || + (! check_ipv4_listed (sh->v4_denied, + &v4->sin_addr)) ) ); + break; + case AF_INET6: + GNUNET_assert (addrlen == sizeof (struct sockaddr_in6)); + v6 = (const struct sockaddr_in6 *) &sa; + ok = ( ( (NULL == sh->v6_allowed) || + (check_ipv6_listed (sh->v6_allowed, + &v6->sin6_addr))) && + ( (NULL == sh->v6_denied) || + (! check_ipv6_listed (sh->v6_denied, + &v6->sin6_addr)) ) ); + break; #ifndef WINDOWS - case AF_UNIX: - ok = GNUNET_OK; /* controlled using file-system ACL now */ - break; + case AF_UNIX: + ok = GNUNET_OK; /* controlled using file-system ACL now */ + break; #endif - default: - LOG (GNUNET_ERROR_TYPE_WARNING, - _("Unknown address family %d\n"), - sa.ss_family); - return; - } - if (! ok) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Service rejected incoming connection from %s due to policy.\n", - GNUNET_a2s ((const struct sockaddr *) &sa, - addrlen)); - GNUNET_NETWORK_socket_close (sock); - continue; - } + default: + LOG (GNUNET_ERROR_TYPE_WARNING, + _("Unknown address family %d\n"), + sa.ss_family); + return; + } + if (! ok) + { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Service accepted incoming connection from %s.\n", - GNUNET_a2s ((const struct sockaddr *) &sa, - addrlen)); - start_client (slc->sh, - sock); + "Service rejected incoming connection from %s due to policy.\n", + GNUNET_a2s ((const struct sockaddr *) &sa, + addrlen)); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (sock)); + continue; } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Service accepted incoming connection from %s.\n", + GNUNET_a2s ((const struct sockaddr *) &sa, + addrlen)); + start_client (slc->sh, + sock); + } slc->listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, slc->listen_socket, @@ -2048,6 +2132,42 @@ GNUNET_SERVICE_resume (struct GNUNET_SERVICE_Handle *sh) } +/** + * Task run to resume receiving data from the client after + * the client called #GNUNET_SERVICE_client_continue(). + * + * @param cls our `struct GNUNET_SERVICE_Client` + */ +static void +resume_client_receive (void *cls) +{ + struct GNUNET_SERVICE_Client *c = cls; + int ret; + + c->recv_task = NULL; + /* first, check if there is still something in the buffer */ + ret = GNUNET_MST_next (c->mst, + GNUNET_YES); + if (GNUNET_SYSERR == ret) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (c); + return; + } + if (GNUNET_NO == ret) + return; /* done processing, wait for more later */ + GNUNET_assert (GNUNET_OK == ret); + if (GNUNET_YES == c->needs_continue) + return; /* #GNUNET_MST_next() did give a message to the client */ + /* need to receive more data from the network first */ + c->recv_task + = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + c->sock, + &service_client_recv, + c); +} + + /** * Continue receiving further messages from the given client. * Must be called after each message received. @@ -2057,7 +2177,16 @@ GNUNET_SERVICE_resume (struct GNUNET_SERVICE_Handle *sh) void GNUNET_SERVICE_client_continue (struct GNUNET_SERVICE_Client *c) { - GNUNET_break (0); // not implemented + GNUNET_assert (GNUNET_YES == c->needs_continue); + GNUNET_assert (NULL == c->recv_task); + c->needs_continue = GNUNET_NO; + if (NULL != c->warn_task) + { + GNUNET_SCHEDULER_cancel (c->warn_task); + c->warn_task = NULL; + } + c->recv_task = GNUNET_SCHEDULER_add_now (&resume_client_receive, + c); } @@ -2117,11 +2246,12 @@ GNUNET_SERVICE_client_drop (struct GNUNET_SERVICE_Client *c) GNUNET_SCHEDULER_cancel (c->send_task); c->send_task = NULL; } - GNUNET_SERVER_mst_destroy (c->mst); + GNUNET_MST_destroy (c->mst); GNUNET_MQ_destroy (c->mq); if (GNUNET_NO == c->persist) { - GNUNET_NETWORK_socket_close (c->sock); + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (c->sock)); } else {