X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fpeerstore%2Fgnunet-service-peerstore.c;h=92d020799b9800549f721eb5083078ae65e15c40;hb=79fb947eb8fba243ea65e19b40b65e04f8806865;hp=50af4342c6c81cb23a8cfd6a1266bd9cf4ca40d1;hpb=3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8;p=oweals%2Fgnunet.git diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index 50af4342c..92d020799 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) + Copyright (C) 2014, 2015, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -29,8 +29,11 @@ #include "gnunet_peerstore_plugin.h" #include "peerstore_common.h" -//TODO: GNUNET_SERVER_receive_done() ? -//TODO: implement value lifetime + +/** + * Interval for expired records cleanup (in seconds) + */ +#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */ /** * Our configuration. @@ -40,32 +43,173 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Database plugin library name */ -char *db_lib_name; +static char *db_lib_name; /** * Database handle */ static struct GNUNET_PEERSTORE_PluginFunctions *db; +/** + * Hashmap with all watch requests + */ +static struct GNUNET_CONTAINER_MultiHashMap *watchers; + +/** + * Task run to clean up expired records. + */ +static struct GNUNET_SCHEDULER_Task *expire_task; + +/** + * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO + */ +static int in_shutdown; + +/** + * Number of connected clients. + */ +static unsigned int num_clients; + + +/** + * Perform the actual shutdown operations + */ +static void +do_shutdown () +{ + if (NULL != db_lib_name) + { + GNUNET_break (NULL == + GNUNET_PLUGIN_unload (db_lib_name, + db)); + GNUNET_free (db_lib_name); + db_lib_name = NULL; + } + if (NULL != watchers) + { + GNUNET_CONTAINER_multihashmap_destroy (watchers); + watchers = NULL; + } + if (NULL != expire_task) + { + GNUNET_SCHEDULER_cancel (expire_task); + expire_task = NULL; + } + GNUNET_SCHEDULER_shutdown (); +} + + /** * Task run during shutdown. * * @param cls unused - * @param tc unused */ static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls) { - if(NULL != db_lib_name) + in_shutdown = GNUNET_YES; + if (0 == num_clients) /* Only when no connected clients. */ + do_shutdown (); +} + + +/* Forward declaration */ +static void +expire_records_continuation (void *cls, + int success); + + +/** + * Deletes any expired records from storage + */ +static void +cleanup_expired_records (void *cls) +{ + int ret; + + expire_task = NULL; + GNUNET_assert (NULL != db); + ret = db->expire_records (db->cls, + GNUNET_TIME_absolute_get (), + &expire_records_continuation, + NULL); + if (GNUNET_OK != ret) { - GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db)); - GNUNET_free (db_lib_name); - db_lib_name = NULL; + GNUNET_assert (NULL == expire_task); + expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, + EXPIRED_RECORDS_CLEANUP_INTERVAL), + &cleanup_expired_records, + NULL); } } +/** + * Continuation to expire_records called by the peerstore plugin + * + * @param cls unused + * @param success count of records deleted or #GNUNET_SYSERR + */ +static void +expire_records_continuation (void *cls, + int success) +{ + if (success > 0) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%d records expired.\n", + success); + GNUNET_assert (NULL == expire_task); + expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, + EXPIRED_RECORDS_CLEANUP_INTERVAL), + &cleanup_expired_records, + NULL); +} + + +/** + * A client disconnected. Remove all of its data structure entries. + * + * @param cls closure, NULL + * @param client identification of the client + * @param mq the message queue + * @return + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) +{ + num_clients++; + return client; +} + + +/** + * Search for a disconnected client and remove it + * + * @param cls closuer, a `struct GNUNET_SERVICE_Client` + * @param key hash of record key + * @param value the watcher client, a `struct GNUNET_SERVICE_Client *` + * @return #GNUNET_OK to continue iterating + */ +static int +client_disconnect_it (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + if (value == cls) + { + GNUNET_CONTAINER_multihashmap_remove (watchers, + key, + value); + num_clients++; + } + return GNUNET_OK; +} + + /** * A client disconnected. Remove all of its data structure entries. * @@ -73,201 +217,425 @@ shutdown_task (void *cls, * @param client identification of the client */ static void -handle_client_disconnect (void *cls, - struct GNUNET_SERVER_Client - * client) +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_cls) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "A client disconnected, cleaning up.\n"); + if (NULL != watchers) + GNUNET_CONTAINER_multihashmap_iterate (watchers, + &client_disconnect_it, + client); + num_clients--; + if ( (0 == num_clients) && + in_shutdown) + do_shutdown (); } + /** * Function called by for each matching record. * * @param cls closure - * @param peer peer identity - * @param sub_system name of the GNUnet sub system responsible - * @param value stored value - * @param size size of stored value + * @param record peerstore record found + * @param emsg error message or NULL if no errors + * @return #GNUNET_YES to continue iteration + */ +static void +record_iterator (void *cls, + const struct GNUNET_PEERSTORE_Record *record, + const char *emsg) +{ + struct GNUNET_PEERSTORE_Record *cls_record = cls; + struct GNUNET_MQ_Envelope *env; + + if (NULL == record) + { + /* No more records */ + struct GNUNET_MessageHeader *endmsg; + + env = GNUNET_MQ_msg (endmsg, + GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), + env); + if (NULL == emsg) + { + GNUNET_SERVICE_client_continue (cls_record->client); + } + else + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to iterate: %s\n", + emsg); + GNUNET_SERVICE_client_drop (cls_record->client); + } + PEERSTORE_destroy_record (cls_record); + return; + } + + env = PEERSTORE_create_record_mq_envelope (record->sub_system, + &record->peer, + record->key, + record->value, + record->value_size, + record->expiry, + 0, + GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), + env); +} + + +/** + * Iterator over all watcher clients + * to notify them of a new record * -int record_iterator(void *cls, - const char *sub_system, - const struct GNUNET_PeerIdentity *peer, - const char *key, - const void *value, - size_t size, - struct GNUNET_TIME_Absolute expiry) + * @param cls closure, a `struct GNUNET_PEERSTORE_Record *` + * @param key hash of record key + * @param value the watcher client, a `struct GNUNET_SERVICE_Client *` + * @return #GNUNET_YES to continue iterating + */ +static int +watch_notifier_it (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct StoreRecordMessage *srm; - - srm = PEERSTORE_create_record_message(sub_system, - peer, - key, - value, - size, - expiry, - GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); - GNUNET_SERVER_transmit_context_append_message(tc, (const struct GNUNET_MessageHeader *)srm); + struct GNUNET_PEERSTORE_Record *record = cls; + struct GNUNET_SERVICE_Client *client = value; + struct GNUNET_MQ_Envelope *env; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found a watcher to update.\n"); + env = PEERSTORE_create_record_mq_envelope (record->sub_system, + &record->peer, + record->key, + record->value, + record->value_size, + record->expiry, + 0, + GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); return GNUNET_YES; -}*/ +} + /** - * Handle an iterate request from client + * Given a new record, notifies watchers * - * @param cls unused - * @param client identification of the client - * @param message the actual message + * @param record changed record to update watchers with + */ +static void +watch_notifier (struct GNUNET_PEERSTORE_Record *record) +{ + struct GNUNET_HashCode keyhash; + + PEERSTORE_hash_key (record->sub_system, + &record->peer, + record->key, + &keyhash); + GNUNET_CONTAINER_multihashmap_get_multiple (watchers, + &keyhash, + &watch_notifier_it, + record); +} + + +/** + * Handle a watch cancel request from client * -void handle_iterate (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + * @param cls identification of the client + * @param hm the actual message + */ +static void +handle_watch_cancel (void *cls, + const struct StoreKeyHashMessage *hm) { - struct GNUNET_PEERSTORE_Record *record; - struct GNUNET_SERVER_TransmitContext *tc; + struct GNUNET_SERVICE_Client *client = cls; - record = PEERSTORE_parse_record_message(message); - if(NULL == record) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received a watch cancel request.\n"); + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_remove (watchers, + &hm->keyhash, + client)) { - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Malformed iterate request from client\n"); - GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); return; } - if(NULL == record->sub_system) + num_clients++; + GNUNET_SERVICE_client_continue (client); +} + + +/** + * Handle a watch request from client + * + * @param cls identification of the client + * @param hm the actual message + */ +static void +handle_watch (void *cls, + const struct StoreKeyHashMessage *hm) +{ + struct GNUNET_SERVICE_Client *client = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received a watch request.\n"); + num_clients--; /* do not count watchers */ + GNUNET_SERVICE_client_mark_monitor (client); + GNUNET_CONTAINER_multihashmap_put (watchers, + &hm->keyhash, + client, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_SERVICE_client_continue (client); +} + + +/** + * Check an iterate request from client + * + * @param cls client identification of the client + * @param srm the actual message + * @return #GNUNET_OK if @a srm is well-formed + */ +static int +check_iterate (void *cls, + const struct StoreRecordMessage *srm) +{ + struct GNUNET_PEERSTORE_Record *record; + + record = PEERSTORE_parse_record_message (srm); + if (NULL == record) { - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Sub system not supplied in client iterate request\n"); - GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); - return; + GNUNET_break (0); + return GNUNET_SYSERR; } - tc = GNUNET_SERVER_transmit_context_create (client); - if(GNUNET_OK == db->iterate_records(db->cls, - record->sub_system, - record->peer, - record->key, - &record_iterator, - tc)) + if (NULL == record->sub_system) { - + GNUNET_break (0); + PEERSTORE_destroy_record (record); + return GNUNET_SYSERR; } -}*/ + PEERSTORE_destroy_record (record); + return GNUNET_OK; +} + /** - * Handle a store request from client + * Handle an iterate request from client * - * @param cls unused - * @param client identification of the client - * @param message the actual message + * @param cls identification of the client + * @param srm the actual message */ -void handle_store (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +static void +handle_iterate (void *cls, + const struct StoreRecordMessage *srm) { + struct GNUNET_SERVICE_Client *client = cls; struct GNUNET_PEERSTORE_Record *record; - uint16_t response_type; - struct GNUNET_SERVER_TransmitContext *tc; - record = PEERSTORE_parse_record_message(message); - if(NULL == record) + record = PEERSTORE_parse_record_message (srm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Iterate request: ss `%s', peer `%s', key `%s'\n", + record->sub_system, + GNUNET_i2s (&record->peer), + (NULL == record->key) ? "NULL" : record->key); + record->client = client; + if (GNUNET_OK != + db->iterate_records (db->cls, + record->sub_system, + (ntohs (srm->peer_set)) ? &record->peer : NULL, + record->key, + &record_iterator, + record)) { - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Malformed store request from client\n"); - GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); - return; + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + PEERSTORE_destroy_record (record); + } +} + + +/** + * Continuation of store_record called by the peerstore plugin + * + * @param cls closure + * @param success result + */ +static void +store_record_continuation (void *cls, + int success) +{ + struct GNUNET_PEERSTORE_Record *record = cls; + + if (GNUNET_OK == success) + { + watch_notifier (record); + GNUNET_SERVICE_client_continue (record->client); } - if(NULL == record->sub_system - || NULL == record->peer - || NULL == record->key) + else { - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n"); - GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); - return; + GNUNET_break (0); + GNUNET_SERVICE_client_drop (record->client); } - GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a store request (size: %lu) for sub system `%s', peer `%s', key `%s'\n", - record->value_size, - record->sub_system, - GNUNET_i2s (record->peer), - record->key); - if(GNUNET_OK == db->store_record(db->cls, - record->sub_system, - record->peer, - record->key, - record->value, - record->value_size, - *record->expiry)) + PEERSTORE_destroy_record (record); +} + + +/** + * Check a store request from client + * + * @param cls client identification of the client + * @param srm the actual message + * @return #GNUNET_OK if @a srm is well-formed + */ +static int +check_store (void *cls, + const struct StoreRecordMessage *srm) +{ + struct GNUNET_PEERSTORE_Record *record; + + record = PEERSTORE_parse_record_message (srm); + if (NULL == record) { - response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK; + GNUNET_break (0); + return GNUNET_SYSERR; } - else + if ( (NULL == record->sub_system) || + (NULL == record->key) ) { - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error."); - response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL; + GNUNET_break (0); + PEERSTORE_destroy_record (record); + return GNUNET_SYSERR; } + PEERSTORE_destroy_record (record); + return GNUNET_OK; +} - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); +/** + * Handle a store request from client + * + * @param cls client identification of the client + * @param srm the actual message + */ +static void +handle_store (void *cls, + const struct StoreRecordMessage *srm) +{ + struct GNUNET_SERVICE_Client *client = cls; + struct GNUNET_PEERSTORE_Record *record; + + record = PEERSTORE_parse_record_message (srm); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %u.\n", + record->sub_system, + GNUNET_i2s (&record->peer), + record->key, + (uint32_t) ntohl (srm->options)); + record->client = client; + if (GNUNET_OK != + db->store_record (db->cls, + record->sub_system, + &record->peer, + record->key, + record->value, + record->value_size, + record->expiry, + ntohl (srm->options), + &store_record_continuation, + record)) + { + GNUNET_break (0); + PEERSTORE_destroy_record (record); + GNUNET_SERVICE_client_drop (client); + return; + } } + /** * Peerstore service runner. * * @param cls closure - * @param server the initialized server * @param c configuration to use + * @param service the initialized service */ static void run (void *cls, - struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *service) { - static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0}, -// {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0}, - {NULL, NULL, 0, 0} - }; char *database; + in_shutdown = GNUNET_NO; cfg = c; if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cfg, "peerstore", "DATABASE", - &database)) - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No database backend configured\n"); - - else + GNUNET_CONFIGURATION_get_value_string (cfg, + "peerstore", + "DATABASE", + &database)) { - GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_peerstore_%s", database); - db = GNUNET_PLUGIN_load(db_lib_name, (void *) cfg); - GNUNET_free(database); + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "peerstore", + "DATABASE"); + GNUNET_SCHEDULER_shutdown (); + return; } - if(NULL == db) - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name); - else + GNUNET_asprintf (&db_lib_name, + "libgnunet_plugin_peerstore_%s", + database); + db = GNUNET_PLUGIN_load (db_lib_name, + (void *) cfg); + GNUNET_free (database); + if (NULL == db) { - GNUNET_SERVER_add_handlers (server, handlers); - GNUNET_SERVER_disconnect_notify (server, - &handle_client_disconnect, - NULL); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Could not load database backend `%s'\n"), + db_lib_name); + GNUNET_SCHEDULER_shutdown (); + return; } - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, - NULL); + watchers = GNUNET_CONTAINER_multihashmap_create (10, + GNUNET_NO); + expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records, + NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); } /** - * The main function for the peerstore service. - * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error + * Define "main" method using service macro. */ -int -main (int argc, char *const *argv) -{ - return (GNUNET_OK == - GNUNET_SERVICE_run (argc, - argv, - "peerstore", - GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; -} +GNUNET_SERVICE_MAIN +("peerstore", + GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_var_size (store, + GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, + struct StoreRecordMessage, + NULL), + GNUNET_MQ_hd_var_size (iterate, + GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, + struct StoreRecordMessage, + NULL), + GNUNET_MQ_hd_fixed_size (watch, + GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, + struct StoreKeyHashMessage, + NULL), + GNUNET_MQ_hd_fixed_size (watch_cancel, + GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, + struct StoreKeyHashMessage, + NULL), + GNUNET_MQ_handler_end ()); + /* end of gnunet-service-peerstore.c */