From 6a131ab255bb3419eb0e59a24879556d5b1c75d3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 20 Sep 2016 02:28:01 +0000 Subject: [PATCH] convert statistics service to new service MQ API --- src/include/gnunet_protocols.h | 10 + src/statistics/gnunet-service-statistics.c | 645 ++++++++++++--------- src/statistics/gnunet-statistics.c | 181 +++--- src/statistics/statistics_api.c | 18 +- src/util/mq.c | 3 + src/util/service_new.c | 1 + 6 files changed, 476 insertions(+), 382 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index dc5b5deed..392a4d761 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -680,6 +680,16 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE 173 +/** + * Client is done sending service requests and will now disconnect. + */ +#define GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT 174 + +/** + * Service confirms disconnect and that it is done processing + * all requests from the client. + */ +#define GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM 175 /******************************************************************************* * VPN message types diff --git a/src/statistics/gnunet-service-statistics.c b/src/statistics/gnunet-service-statistics.c index c0be7c668..161327421 100644 --- a/src/statistics/gnunet-service-statistics.c +++ b/src/statistics/gnunet-service-statistics.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009, 2010, 2012, 2014 GNUnet e.V. + Copyright (C) 2009, 2010, 2012, 2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -54,7 +54,7 @@ struct WatchEntry /** * For which client is this watch entry? */ - struct GNUNET_SERVER_Client *client; + struct ClientEntry *ce; /** * Last value we communicated to the client for this watch entry. @@ -185,7 +185,12 @@ struct ClientEntry /** * Corresponding server handle. */ - struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVICE_Client *client; + + /** + * Corresponding message queue. + */ + struct GNUNET_MQ_Handle *mq; /** * Which subsystem is this client writing to (SET/UPDATE)? @@ -220,15 +225,10 @@ static struct SubsystemEntry *sub_tail; */ static unsigned int client_count; -/** - * Handle to our server. - */ -static struct GNUNET_SERVER_Handle *srv; - /** * Our notification context. */ -static struct GNUNET_SERVER_NotificationContext *nc; +static struct GNUNET_NotificationContext *nc; /** * Counter used to generate unique values. @@ -241,103 +241,6 @@ static uint32_t uidgen; static int in_shutdown; -/** - * Inject a message to our server with a client of 'NULL'. - * - * @param cls the `struct GNUNET_SERVER_Handle` - * @param client unused - * @param msg message to inject - */ -static int -inject_message (void *cls, - void *client, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_SERVER_Handle *server = cls; - - GNUNET_break (GNUNET_OK == GNUNET_SERVER_inject (server, NULL, msg)); - return GNUNET_OK; -} - - -/** - * Load persistent values from disk. Disk format is exactly the same - * format that we also use for setting the values over the network. - * - * @param server handle to the server context - */ -static void -load (struct GNUNET_SERVER_Handle *server) -{ - char *fn; - struct GNUNET_BIO_ReadHandle *rh; - uint64_t fsize; - char *buf; - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_filename (cfg, - "STATISTICS", - "DATABASE", - &fn)) - { - GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - "STATISTICS", - "DATABASE"); - return; - } - if ( (GNUNET_OK != - GNUNET_DISK_file_size (fn, - &fsize, - GNUNET_NO, - GNUNET_YES)) || - (0 == fsize) ) - { - GNUNET_free (fn); - return; - } - buf = GNUNET_malloc (fsize); - rh = GNUNET_BIO_read_open (fn); - if (!rh) - { - GNUNET_free (buf); - GNUNET_free (fn); - return; - } - if (GNUNET_OK != - GNUNET_BIO_read (rh, - fn, - buf, - fsize)) - { - GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, - "read", - fn); - GNUNET_break (GNUNET_OK == - GNUNET_BIO_read_close (rh, NULL)); - GNUNET_free (buf); - GNUNET_free (fn); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Loading %llu bytes of statistics from `%s'\n"), - (unsigned long long) fsize, fn); - mst = GNUNET_SERVER_mst_create (&inject_message, - server); - GNUNET_break (GNUNET_OK == - GNUNET_SERVER_mst_receive (mst, NULL, - buf, fsize, - GNUNET_YES, - GNUNET_NO)); - GNUNET_SERVER_mst_destroy (mst); - GNUNET_free (buf); - GNUNET_break (GNUNET_OK == - GNUNET_BIO_read_close (rh, - NULL)); - GNUNET_free (fn); -} - - /** * Write persistent statistics to disk. */ @@ -379,7 +282,8 @@ save () GNUNET_CONTAINER_DLL_remove (se->stat_head, se->stat_tail, pos); - if ((pos->persistent) && (NULL != wh)) + if ( (pos->persistent) && + (NULL != wh) ) { nlen = strlen (pos->name) + 1; size = sizeof (struct GNUNET_STATISTICS_SetMessage) + nlen + slen; @@ -446,24 +350,23 @@ save () * @param e value to transmit */ static void -transmit (struct GNUNET_SERVER_Client *client, +transmit (struct ClientEntry *ce, const struct StatsEntry *e) { + struct GNUNET_MQ_Envelope *env; struct GNUNET_STATISTICS_ReplyMessage *m; size_t size; - size = sizeof (struct GNUNET_STATISTICS_ReplyMessage) + - strlen (e->subsystem->service) + 1 + + size = strlen (e->subsystem->service) + 1 + strlen (e->name) + 1; GNUNET_assert (size < GNUNET_SERVER_MAX_MESSAGE_SIZE); - m = GNUNET_malloc (size); - m->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_VALUE); - m->header.size = htons (size); + env = GNUNET_MQ_msg_extra (m, + size, + GNUNET_MESSAGE_TYPE_STATISTICS_VALUE); m->uid = htonl (e->uid); if (e->persistent) m->uid |= htonl (GNUNET_STATISTICS_PERSIST_BIT); m->value = GNUNET_htonll (e->value); - size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); GNUNET_assert (size == GNUNET_STRINGS_buffer_fill ((char *) &m[1], size, @@ -476,66 +379,51 @@ transmit (struct GNUNET_SERVER_Client *client, e->name, e->persistent, (unsigned long long) e->value); - GNUNET_SERVER_notification_context_unicast (nc, client, &m->header, - GNUNET_NO); - GNUNET_free (m); + GNUNET_MQ_send (ce->mq, + env); } /** - * Find a client entry for the given client handle, or create one. + * Callback called when a client connects to the service. * - * @param client handle to match - * @return corresponding client entry struct + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a c */ -static struct ClientEntry * -make_client_entry (struct GNUNET_SERVER_Client *client) +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) { struct ClientEntry *ce; - ce = GNUNET_SERVER_client_get_user_context (client, - struct ClientEntry); - if (NULL != ce) - return ce; - if (NULL == nc) - { - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return NULL; - } ce = GNUNET_new (struct ClientEntry); - ce->client = client; - GNUNET_SERVER_client_set_user_context (client, ce); + ce->client = c; + ce->mq = mq; client_count++; - GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_notification_context_add (nc, + mq); return ce; } /** - * Handle GET-message. + * Check integrity of GET-message. * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @return #GNUNET_OK if @a message is well-formed */ -static void -handle_get (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +static int +check_get (void *cls, + const struct GNUNET_MessageHeader *message) { - struct GNUNET_MessageHeader end; const char *service; const char *name; - size_t slen; - size_t nlen; - struct SubsystemEntry *se; - struct StatsEntry *pos; size_t size; - if (NULL == make_client_entry (client)) - return; /* new client during shutdown */ size = ntohs (message->size) - sizeof (struct GNUNET_MessageHeader); if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1], @@ -545,10 +433,40 @@ handle_get (void *cls, &name)) { GNUNET_break (0); - GNUNET_SERVER_receive_done (client, - GNUNET_SYSERR); - return; + return GNUNET_SYSERR; } + return GNUNET_OK; +} + + +/** + * Handle GET-message. + * + * @param cls identification of the client + * @param message the actual message + */ +static void +handle_get (void *cls, + const struct GNUNET_MessageHeader *message) +{ + struct ClientEntry *ce = cls; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *end; + const char *service; + const char *name; + size_t slen; + size_t nlen; + struct SubsystemEntry *se; + struct StatsEntry *pos; + size_t size; + + size = ntohs (message->size) - sizeof (struct GNUNET_MessageHeader); + GNUNET_assert (size == + GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1], + size, + 2, + &service, + &name)); slen = strlen (service); nlen = strlen (name); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -563,18 +481,18 @@ handle_get (void *cls, for (pos = se->stat_head; NULL != pos; pos = pos->next) { if (! ( (0 == nlen) || - (0 == strcmp (name, pos->name))) ) + (0 == strcmp (name, + pos->name))) ) continue; - transmit (client, pos); + transmit (ce, + pos); } } - end.size = htons (sizeof (struct GNUNET_MessageHeader)); - end.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_END); - GNUNET_SERVER_notification_context_unicast (nc, - client, - &end, - GNUNET_NO); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + env = GNUNET_MQ_msg (end, + GNUNET_MESSAGE_TYPE_STATISTICS_END); + GNUNET_MQ_send (ce->mq, + env); + GNUNET_SERVICE_client_continue (ce->client); } @@ -586,7 +504,8 @@ handle_get (void *cls, static void notify_change (struct StatsEntry *se) { - struct GNUNET_STATISTICS_WatchValueMessage wvm; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_STATISTICS_WatchValueMessage *wvm; struct WatchEntry *pos; for (pos = se->we_head; NULL != pos; pos = pos->next) @@ -600,17 +519,14 @@ notify_change (struct StatsEntry *se) { pos->last_value_set = GNUNET_YES; } - wvm.header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE); - wvm.header.size = - htons (sizeof (struct GNUNET_STATISTICS_WatchValueMessage)); - wvm.flags = htonl (se->persistent ? GNUNET_STATISTICS_SETFLAG_PERSISTENT : 0); - wvm.wid = htonl (pos->wid); - wvm.reserved = htonl (0); - wvm.value = GNUNET_htonll (se->value); - GNUNET_SERVER_notification_context_unicast (nc, - pos->client, - &wvm.header, - GNUNET_NO); + env = GNUNET_MQ_msg (wvm, + GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE); + wvm->flags = htonl (se->persistent ? GNUNET_STATISTICS_SETFLAG_PERSISTENT : 0); + wvm->wid = htonl (pos->wid); + wvm->reserved = htonl (0); + wvm->value = GNUNET_htonll (se->value); + GNUNET_MQ_send (pos->ce->mq, + env); pos->last_value = se->value; } } @@ -687,26 +603,53 @@ find_stat_entry (struct SubsystemEntry *se, } +/** + * Check format of SET-message. + * + * @param cls the `struct ClientEntry` + * @param message the actual message + * @return #GNUNET_OK if message is well-formed + */ +static int +check_set (void *cls, + const struct GNUNET_STATISTICS_SetMessage *msg) +{ + const char *service; + const char *name; + size_t msize; + + msize = ntohs (msg->header.size) - sizeof (*msg); + if (msize != + GNUNET_STRINGS_buffer_tokenize ((const char *) &msg[1], + msize, + 2, + &service, + &name)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Handle SET-message. * - * @param cls closure - * @param client identification of the client + * @param cls the `struct ClientEntry` * @param message the actual message */ static void handle_set (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_STATISTICS_SetMessage *msg) { + struct ClientEntry *ce = cls; const char *service; const char *name; size_t nlen; uint16_t msize; uint16_t size; - const struct GNUNET_STATISTICS_SetMessage *msg; struct SubsystemEntry *se; - struct ClientEntry *ce; struct StatsEntry *pos; uint32_t flags; uint64_t value; @@ -714,32 +657,16 @@ handle_set (void *cls, int changed; int initial_set; - ce = NULL; - if ( (NULL != client) && - (NULL == (ce = make_client_entry (client))) ) - return; /* new client during shutdown */ - msize = ntohs (message->size); - if (msize < sizeof (struct GNUNET_STATISTICS_SetMessage)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } + msize = ntohs (msg->header.size); size = msize - sizeof (struct GNUNET_STATISTICS_SetMessage); - msg = (const struct GNUNET_STATISTICS_SetMessage *) message; - if (size != - GNUNET_STRINGS_buffer_tokenize ((const char *) &msg[1], - size, - 2, - &service, - &name)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, - GNUNET_SYSERR); - return; - } - se = find_subsystem_entry (ce, service); + GNUNET_assert (size == + GNUNET_STRINGS_buffer_tokenize ((const char *) &msg[1], + size, + 2, + &service, + &name)); + se = find_subsystem_entry (ce, + service); flags = ntohl (msg->flags); value = GNUNET_ntohll (msg->value); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -748,7 +675,8 @@ handle_set (void *cls, name, (unsigned int) flags, (unsigned long long) value); - pos = find_stat_entry (se, name); + pos = find_stat_entry (se, + name); if (NULL != pos) { initial_set = 0; @@ -798,16 +726,15 @@ handle_set (void *cls, if ( (changed) || (1 == initial_set) ) notify_change (pos); - GNUNET_SERVER_receive_done (client, - GNUNET_OK); + GNUNET_SERVICE_client_continue (ce->client); return; } /* not found, create a new entry */ nlen = strlen (name) + 1; pos = GNUNET_malloc (sizeof (struct StatsEntry) + nlen); GNUNET_memcpy (&pos[1], - name, - nlen); + name, + nlen); pos->name = (const char *) &pos[1]; pos->subsystem = se; if ( (0 == (flags & GNUNET_STATISTICS_SETFLAG_RELATIVE)) || @@ -830,72 +757,91 @@ handle_set (void *cls, service, name, (unsigned long long) pos->value); - GNUNET_SERVER_receive_done (client, - GNUNET_OK); + if (NULL != ce) + GNUNET_SERVICE_client_continue (ce->client); +} + + +/** + * Check integrity of WATCH-message. + * + * @param cls the `struct ClientEntry *` + * @param message the actual message + * @return #GNUNET_OK if message is well-formed + */ +static int +check_watch (void *cls, + const struct GNUNET_MessageHeader *message) +{ + size_t size; + const char *service; + const char *name; + + size = ntohs (message->size) - sizeof (struct GNUNET_MessageHeader); + if (size != + GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1], + size, + 2, + &service, + &name)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** * Handle WATCH-message. * - * @param cls closure - * @param client identification of the client + * @param cls the `struct ClientEntry *` * @param message the actual message */ static void handle_watch (void *cls, - struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + struct ClientEntry *ce = cls; const char *service; const char *name; uint16_t msize; uint16_t size; struct SubsystemEntry *se; struct StatsEntry *pos; - struct ClientEntry *ce; struct WatchEntry *we; size_t nlen; if (NULL == nc) { - GNUNET_SERVER_receive_done (client, - GNUNET_SYSERR); + GNUNET_SERVICE_client_drop (ce->client); return; } - GNUNET_SERVER_client_mark_monitor (client); - ce = make_client_entry (client); + GNUNET_SERVICE_client_mark_monitor (ce->client); msize = ntohs (message->size); - if (msize < sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } size = msize - sizeof (struct GNUNET_MessageHeader); - if (size != - GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1], - size, - 2, - &service, - &name)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } + GNUNET_assert (size == + GNUNET_STRINGS_buffer_tokenize ((const char *) &message[1], + size, + 2, + &service, + &name)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received request to watch statistic on `%s:%s'\n", service, name); - se = find_subsystem_entry (ce, service); - pos = find_stat_entry (se, name); + se = find_subsystem_entry (ce, + service); + pos = find_stat_entry (se, + name); if (NULL == pos) { nlen = strlen (name) + 1; pos = GNUNET_malloc (sizeof (struct StatsEntry) + nlen); - GNUNET_memcpy (&pos[1], name, nlen); + GNUNET_memcpy (&pos[1], + name, + nlen); pos->name = (const char *) &pos[1]; pos->subsystem = se; GNUNET_CONTAINER_DLL_insert (se->stat_head, @@ -910,7 +856,7 @@ handle_watch (void *cls, (unsigned long long) pos->value); } we = GNUNET_new (struct WatchEntry); - we->client = client; + we->ce = ce; we->last_value_set = GNUNET_NO; we->wid = ce->max_wid++; GNUNET_CONTAINER_DLL_insert (pos->we_head, @@ -918,7 +864,31 @@ handle_watch (void *cls, we); if (0 != pos->value) notify_change (pos); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (ce->client); +} + + +/** + * Handle DISCONNECT-message. Sync to disk and send + * back a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM + * message. + * + * @param cls the `struct ClientEntry *` + * @param message the actual message + */ +static void +handle_disconnect (void *cls, + const struct GNUNET_MessageHeader *message) +{ + struct ClientEntry *ce = cls; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *msg; + + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM); + GNUNET_MQ_send (ce->mq, + env); + GNUNET_SERVICE_client_continue (ce->client); } @@ -935,7 +905,7 @@ do_shutdown () if (NULL == nc) return; save (); - GNUNET_SERVER_notification_context_destroy (nc); + GNUNET_notification_context_destroy (nc); nc = NULL; GNUNET_assert (0 == client_count); while (NULL != (se = sub_head)) @@ -983,25 +953,19 @@ shutdown_task (void *cls) * * @param cls closure, NULL * @param client identification of the client + * @param app_cls the `struct ClientEntry *` */ static void -handle_client_disconnect (void *cls, - struct GNUNET_SERVER_Client *client) +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_cls) { - struct ClientEntry *ce; + struct ClientEntry *ce = app_cls; struct WatchEntry *we; struct WatchEntry *wen; struct StatsEntry *pos; struct SubsystemEntry *se; - if (NULL == client) - return; - ce = GNUNET_SERVER_client_get_user_context (client, - struct ClientEntry); - if (NULL == ce) - return; - GNUNET_SERVER_client_set_user_context (client, - NULL); client_count--; for (se = sub_head; NULL != se; se = se->next) { @@ -1011,7 +975,7 @@ handle_client_disconnect (void *cls, while (NULL != (we = wen)) { wen = we->next; - if (we->client != client) + if (we->ce != ce) continue; GNUNET_CONTAINER_DLL_remove (pos->we_head, pos->we_tail, @@ -1026,53 +990,164 @@ handle_client_disconnect (void *cls, } +/** + * We've read a `struct GNUNET_STATISTICS_SetMessage *` from + * disk. Check that it is well-formed, and if so pass it to + * the handler for set messages. + * + * @param cls NULL + * @param message the message found on disk + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + */ +static int +inject_message (void *cls, + const struct GNUNET_MessageHeader *message) +{ + uint16_t msize = ntohs (message->size); + const struct GNUNET_STATISTICS_SetMessage *sm; + + sm = (const struct GNUNET_STATISTICS_SetMessage *) message; + if ( (sizeof (struct GNUNET_STATISTICS_SetMessage) > msize) || + (GNUNET_OK != + check_set (NULL, + sm)) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + handle_set (NULL, + sm); + return GNUNET_OK; +} + + +/** + * Load persistent values from disk. Disk format is exactly the same + * format that we also use for setting the values over the network. + */ +static void +load () +{ + char *fn; + struct GNUNET_BIO_ReadHandle *rh; + uint64_t fsize; + char *buf; + struct GNUNET_MessageStreamTokenizer *mst; + + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_filename (cfg, + "STATISTICS", + "DATABASE", + &fn)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "STATISTICS", + "DATABASE"); + return; + } + if ( (GNUNET_OK != + GNUNET_DISK_file_size (fn, + &fsize, + GNUNET_NO, + GNUNET_YES)) || + (0 == fsize) ) + { + GNUNET_free (fn); + return; + } + buf = GNUNET_malloc (fsize); + rh = GNUNET_BIO_read_open (fn); + if (! rh) + { + GNUNET_free (buf); + GNUNET_free (fn); + return; + } + if (GNUNET_OK != + GNUNET_BIO_read (rh, + fn, + buf, + fsize)) + { + GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, + "read", + fn); + GNUNET_break (GNUNET_OK == + GNUNET_BIO_read_close (rh, + NULL)); + GNUNET_free (buf); + GNUNET_free (fn); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Loading %llu bytes of statistics from `%s'\n"), + (unsigned long long) fsize, + fn); + mst = GNUNET_MST_create (&inject_message, + NULL); + GNUNET_break (GNUNET_OK == + GNUNET_MST_from_buffer (mst, + buf, + (size_t) fsize, + GNUNET_YES, + GNUNET_NO)); + GNUNET_MST_destroy (mst); + GNUNET_free (buf); + GNUNET_break (GNUNET_OK == + GNUNET_BIO_read_close (rh, + NULL)); + GNUNET_free (fn); +} + + /** * Process statistics requests. * * @param cls closure - * @param server the initialized server * @param c configuration to use + * @param service the initialized service */ static void run (void *cls, - struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *service) { - static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&handle_set, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_SET, 0}, - {&handle_get, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_GET, 0}, - {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_STATISTICS_WATCH, 0}, - {NULL, NULL, 0, 0} - }; cfg = c; - srv = server; - GNUNET_SERVER_add_handlers (server, - handlers); - nc = GNUNET_SERVER_notification_context_create (server, 16); - GNUNET_SERVER_disconnect_notify (server, - &handle_client_disconnect, - NULL); - load (server); + nc = GNUNET_notification_context_create (16); + load (); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } /** - * The main function for the statistics service. - * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error + * Define "main" method using service macro. */ -int -main (int argc, char *const *argv) -{ - return (GNUNET_OK == - GNUNET_SERVICE_run (argc, argv, "statistics", - GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, - &run, NULL)) ? 0 : 1; -} +GNUNET_SERVICE_MAIN +("statistics", + GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_var_size (set, + GNUNET_MESSAGE_TYPE_STATISTICS_SET, + struct GNUNET_STATISTICS_SetMessage, + NULL), + GNUNET_MQ_hd_var_size (get, + GNUNET_MESSAGE_TYPE_STATISTICS_GET, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (watch, + GNUNET_MESSAGE_TYPE_STATISTICS_WATCH, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end ()); + #if defined(LINUX) && defined(__GLIBC__) #include diff --git a/src/statistics/gnunet-statistics.c b/src/statistics/gnunet-statistics.c index 192a450ac..ed0c3f27d 100644 --- a/src/statistics/gnunet-statistics.c +++ b/src/statistics/gnunet-statistics.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2001, 2002, 2004, 2005, 2006, 2007, 2009 GNUnet e.V. + Copyright (C) 2001, 2002, 2004-2007, 2009, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -104,27 +104,35 @@ printer (void *cls, int is_persistent) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get(); - const char * now_str; + const char *now_str; if (quiet == GNUNET_NO) { if (GNUNET_YES == watch) { - now_str = GNUNET_STRINGS_absolute_time_to_string(now); - FPRINTF (stdout, "%24s %s%12s %50s: %16llu \n", + now_str = GNUNET_STRINGS_absolute_time_to_string (now); + FPRINTF (stdout, + "%24s %s%12s %50s: %16llu\n", now_str, is_persistent ? "!" : " ", - subsystem, _(name), (unsigned long long) value); + subsystem, + _(name), + (unsigned long long) value); } else { - FPRINTF (stdout, "%s%12s %50s: %16llu \n", + FPRINTF (stdout, + "%s%12s %50s: %16llu\n", is_persistent ? "!" : " ", - subsystem, _(name), (unsigned long long) value); + subsystem, + _(name), + (unsigned long long) value); } } else - FPRINTF (stdout, "%llu\n", (unsigned long long) value); + FPRINTF (stdout, + "%llu\n", + (unsigned long long) value); return GNUNET_OK; } @@ -183,7 +191,8 @@ shutdown_task (void *cls) GNUNET_STATISTICS_watch_cancel (h, subsystem, name, - &printer, h)); + &printer, + h)); GNUNET_STATISTICS_destroy (h, GNUNET_NO); h = NULL; @@ -205,17 +214,22 @@ main_task (void *cls) { if (NULL == subsystem) { - FPRINTF (stderr, "%s", _("Missing argument: subsystem \n")); + FPRINTF (stderr, + "%s", + _("Missing argument: subsystem \n")); ret = 1; return; } if (NULL == name) { - FPRINTF (stderr, "%s", _("Missing argument: name\n")); + FPRINTF (stderr, + "%s", + _("Missing argument: name\n")); ret = 1; return; } - h = GNUNET_STATISTICS_create (subsystem, cfg); + h = GNUNET_STATISTICS_create (subsystem, + cfg); if (NULL == h) { ret = 1; @@ -243,15 +257,19 @@ main_task (void *cls) subsystem, name, &cleanup, - &printer, h)) ) - cleanup (h, GNUNET_SYSERR); + &printer, + h)) ) + cleanup (h, + GNUNET_SYSERR); } else { - if ((NULL == subsystem) || (NULL == name)) + if ( (NULL == subsystem) || + (NULL == name) ) { printf (_("No subsystem or name given\n")); - GNUNET_STATISTICS_destroy (h, GNUNET_NO); + GNUNET_STATISTICS_destroy (h, + GNUNET_NO); h = NULL; ret = 1; return; @@ -260,7 +278,8 @@ main_task (void *cls) GNUNET_STATISTICS_watch (h, subsystem, name, - &printer, h)) + &printer, + h)) { fprintf (stderr, _("Failed to initialize watch routine\n")); @@ -274,65 +293,6 @@ main_task (void *cls) } -/** - * Function called with th test result to see if the resolver is - * running. - * - * @param cls closure with our configuration - * @param result #GNUNET_YES if the resolver is running - */ -static void -resolver_test_task (void *cls, - int result) -{ - struct GNUNET_CONFIGURATION_Handle *cfg = cls; - - if (GNUNET_YES != result) - { - FPRINTF (stderr, - _("Trying to connect to remote host, but service `%s' is not running\n"), - "resolver"); - return; - } - /* connect to a remote host */ - if (0 == remote_port) - { - if (GNUNET_SYSERR == - GNUNET_CONFIGURATION_get_value_number (cfg, "statistics", - "PORT", - &remote_port)) - { - FPRINTF (stderr, - _("A port is required to connect to host `%s'\n"), - remote_host); - return; - } - } - else if (65535 <= remote_port) - { - FPRINTF (stderr, - _("A port has to be between 1 and 65535 to connect to host `%s'\n"), - remote_host); - return; - } - - /* Manipulate configuration */ - GNUNET_CONFIGURATION_set_value_string (cfg, - "statistics", - "UNIXPATH", - ""); - GNUNET_CONFIGURATION_set_value_string (cfg, - "statistics", - "HOSTNAME", - remote_host); - GNUNET_CONFIGURATION_set_value_number (cfg, - "statistics", - "PORT", - remote_port); - GNUNET_SCHEDULER_add_now (&main_task, cfg); -} - - /** * Main function that will be run by the scheduler. * @@ -347,24 +307,64 @@ run (void *cls, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *cfg) { + struct GNUNET_CONFIGURATION_Handle *c; + + c = (struct GNUNET_CONFIGURATION_Handle *) cfg; set_value = GNUNET_NO; if (NULL != args[0]) { - if (1 != SSCANF (args[0], "%llu", &set_val)) + if (1 != SSCANF (args[0], + "%llu", + &set_val)) { - FPRINTF (stderr, _("Invalid argument `%s'\n"), args[0]); + FPRINTF (stderr, + _("Invalid argument `%s'\n"), + args[0]); ret = 1; return; } set_value = GNUNET_YES; } if (NULL != remote_host) - GNUNET_CLIENT_service_test ("resolver", - cfg, - GNUNET_TIME_UNIT_SECONDS, - &resolver_test_task, (void *) cfg); - else - GNUNET_SCHEDULER_add_now (&main_task, (void *) cfg); + { + if (0 == remote_port) + { + if (GNUNET_SYSERR == + GNUNET_CONFIGURATION_get_value_number (cfg, + "statistics", + "PORT", + &remote_port)) + { + FPRINTF (stderr, + _("A port is required to connect to host `%s'\n"), + remote_host); + return; + } + } + else if (65535 <= remote_port) + { + FPRINTF (stderr, + _("A port has to be between 1 and 65535 to connect to host `%s'\n"), + remote_host); + return; + } + + /* Manipulate configuration */ + GNUNET_CONFIGURATION_set_value_string (c, + "statistics", + "UNIXPATH", + ""); + GNUNET_CONFIGURATION_set_value_string (c, + "statistics", + "HOSTNAME", + remote_host); + GNUNET_CONFIGURATION_set_value_number (c, + "statistics", + "PORT", + remote_port); + } + GNUNET_SCHEDULER_add_now (&main_task, + c); } @@ -404,15 +404,20 @@ main (int argc, char *const *argv) }; remote_port = 0; remote_host = NULL; - if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, - &argc, &argv)) + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) return 2; ret = (GNUNET_OK == - GNUNET_PROGRAM_run (argc, argv, "gnunet-statistics [options [value]]", + GNUNET_PROGRAM_run (argc, + argv, + "gnunet-statistics [options [value]]", gettext_noop ("Print statistics about GNUnet operations."), - options, &run, NULL)) ? ret : 1; + options, + &run, + NULL)) ? ret : 1; GNUNET_free_non_null (remote_host); GNUNET_free ((void*) argv); return ret; diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c index 856873d00..e8aa9cf9c 100644 --- a/src/statistics/statistics_api.c +++ b/src/statistics/statistics_api.c @@ -576,16 +576,16 @@ do_destroy (void *cls) /** - * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this - * message at the end of the shutdown when the service confirms that - * all data has been written to disk. + * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM + * message. We receive this message at the end of the shutdown when + * the service confirms that all data has been written to disk. * * @param cls our `struct GNUNET_STATISTICS_Handle *` * @param msg the message */ static void -handle_test (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_disconnect_confirm (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_STATISTICS_Handle *h = cls; @@ -598,7 +598,7 @@ handle_test (void *cls, return; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received TEST message from statistics, can complete disconnect\n"); + "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n"); if (NULL != h->destroy_task) GNUNET_SCHEDULER_cancel (h->destroy_task); h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy, @@ -653,8 +653,8 @@ static int try_connect (struct GNUNET_STATISTICS_Handle *h) { struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_fixed_size (test, - GNUNET_MESSAGE_TYPE_TEST, + GNUNET_MQ_hd_fixed_size (disconnect_confirm, + GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM, struct GNUNET_MessageHeader, h), GNUNET_MQ_hd_fixed_size (statistics_end, @@ -1032,7 +1032,7 @@ schedule_action (void *cls) "Notifying service that we are done\n"); h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */ env = GNUNET_MQ_msg (hdr, - GNUNET_MESSAGE_TYPE_TEST); + GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT); GNUNET_MQ_notify_sent (env, &schedule_action, h); diff --git a/src/util/mq.c b/src/util/mq.c index e69d1c483..61b5471e1 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -1046,7 +1046,10 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) } GNUNET_assert (0 == mq->queue_length); while (NULL != (dnh = mq->dnh_head)) + { dnh->cb (dnh->cb_cls); + GNUNET_MQ_destroy_notify_cancel (dnh); + } if (NULL != mq->assoc_map) { GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); diff --git a/src/util/service_new.c b/src/util/service_new.c index fe8e79f17..123b40d5b 100644 --- a/src/util/service_new.c +++ b/src/util/service_new.c @@ -1954,6 +1954,7 @@ service_client_mst_cb (void *cls, client->needs_continue = GNUNET_YES; client->warn_type = ntohs (message->type); client->warn_start = GNUNET_TIME_absolute_get (); + GNUNET_assert (NULL == client->warn_task); client->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_no_client_continue, -- 2.25.1