X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fnamestore%2Fnamestore_api_monitor.c;h=6c441d786e3d2ddd73995b8a06fe652cf720bbf8;hb=4e2504a967ba09643c6dd7e3b9ce400e30adcb3d;hp=e2caea8a575428e732d09f24abe2f94dd76c2bb2;hpb=8f31d00ecb420926db5363a2882b5302d8635246;p=oweals%2Fgnunet.git diff --git a/src/namestore/namestore_api_monitor.c b/src/namestore/namestore_api_monitor.c index e2caea8a5..6c441d786 100644 --- a/src/namestore/namestore_api_monitor.c +++ b/src/namestore/namestore_api_monitor.c @@ -1,23 +1,20 @@ /* This file is part of GNUnet. - (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013, 2016, 2018 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ - /** * @file namestore/namestore_api_monitor.c * @brief API to monitor changes in the NAMESTORE @@ -48,7 +45,17 @@ struct GNUNET_NAMESTORE_ZoneMonitor /** * Handle to namestore service. */ - struct GNUNET_CLIENT_Connection *h; + struct GNUNET_MQ_Handle *mq; + + /** + * Function to call on errors. + */ + GNUNET_SCHEDULER_TaskCallback error_cb; + + /** + * Closure for @e error_cb. + */ + void *error_cb_cls; /** * Function to call on events. @@ -56,62 +63,122 @@ struct GNUNET_NAMESTORE_ZoneMonitor GNUNET_NAMESTORE_RecordMonitor monitor; /** - * Closure for 'monitor'. + * Closure for @e monitor. */ void *monitor_cls; /** - * Transmission handle to client. + * Function called when we've synchronized. */ - struct GNUNET_CLIENT_TransmitHandle *th; + GNUNET_SCHEDULER_TaskCallback sync_cb; + + /** + * Closure for @e sync_cb. + */ + void *sync_cb_cls; /** * Monitored zone. */ - struct GNUNET_CRYPTO_ShortHashCode zone; + struct GNUNET_CRYPTO_EcdsaPrivateKey zone; /** - * GNUNET_YES if we monitor all zones, GNUNET_NO if we only monitor 'zone'. + * Do we first iterate over all existing records? */ - int all_zones; + int iterate_first; + }; /** - * Send our request to start monitoring to the service. + * Reconnect to the namestore service. * - * @param cls the monitor handle - * @param size number of bytes available in buf - * @param buf where to copy the message to the service - * @return number of bytes copied to buf + * @param zm monitor to reconnect */ -static size_t -transmit_monitor_message (void *cls, - size_t size, - void *buf); +static void +reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm); /** - * Reconnect to the namestore service. + * Handle SYNC message from the namestore service. * - * @param zm monitor to reconnect + * @param cls the monitor + * @param msg the sync message */ static void -reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) +handle_sync (void *cls, + const struct GNUNET_MessageHeader *msg) { - if (NULL != zm->h) - GNUNET_CLIENT_disconnect (zm->h); - zm->monitor (zm->monitor_cls, - NULL, - GNUNET_TIME_UNIT_ZERO_ABS, - NULL, 0, NULL, NULL); - GNUNET_assert (NULL != (zm->h = GNUNET_CLIENT_connect ("namestore", zm->cfg))); - zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h, - sizeof (struct ZoneMonitorStartMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_monitor_message, - zm); + struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; + + (void) cls; + (void) msg; + if (NULL != zm->sync_cb) + zm->sync_cb (zm->sync_cb_cls); +} + + +/** + * We've received a notification about a change to our zone. + * Check that it is well-formed. + * + * @param cls the zone monitor handle + * @param lrm the message from the service. + */ +static int +check_result (void *cls, + const struct RecordResultMessage *lrm) +{ + size_t lrm_len; + size_t exp_lrm_len; + size_t name_len; + size_t rd_len; + unsigned rd_count; + const char *name_tmp; + const char *rd_ser_tmp; + + (void) cls; + lrm_len = ntohs (lrm->gns_header.header.size); + rd_len = ntohs (lrm->rd_len); + rd_count = ntohs (lrm->rd_count); + name_len = ntohs (lrm->name_len); + if (name_len > MAX_NAME_LEN) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + exp_lrm_len = sizeof (struct RecordResultMessage) + name_len + rd_len; + if (lrm_len != exp_lrm_len) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (0 == name_len) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + name_tmp = (const char *) &lrm[1]; + if (name_tmp[name_len -1] != '\0') + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + rd_ser_tmp = (const char *) &name_tmp[name_len]; + { + struct GNUNET_GNSRECORD_Data rd[rd_count]; + + if (GNUNET_OK != + GNUNET_GNSRECORD_records_deserialize (rd_len, + rd_ser_tmp, + rd_count, + rd)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + } + return GNUNET_OK; } @@ -120,104 +187,196 @@ reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) * Forward to monitor callback. * * @param cls the zone monitor handle - * @param msg the message from the service. + * @param lrm the message from the service. */ static void -handle_updates (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_result (void *cls, + const struct RecordResultMessage *lrm) { struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; + size_t name_len; + size_t rd_len; + unsigned rd_count; + const char *name_tmp; + const char *rd_ser_tmp; - if (NULL == msg) + rd_len = ntohs (lrm->rd_len); + rd_count = ntohs (lrm->rd_count); + name_len = ntohs (lrm->name_len); + name_tmp = (const char *) &lrm[1]; + rd_ser_tmp = (const char *) &name_tmp[name_len]; { - reconnect (zm); - return; + struct GNUNET_GNSRECORD_Data rd[rd_count]; + + GNUNET_assert (GNUNET_OK == + GNUNET_GNSRECORD_records_deserialize (rd_len, + rd_ser_tmp, + rd_count, + rd)); + zm->monitor (zm->monitor_cls, + &lrm->private_key, + name_tmp, + rd_count, + rd); } - // FIXME: parse, validate - - GNUNET_CLIENT_receive (zm->h, - &handle_updates, - zm, - GNUNET_TIME_UNIT_FOREVER_REL); - // FIXME: call 'monitor'. - // zm->monitor (zm->monitor_cls, ...); } /** - * Send our request to start monitoring to the service. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls the monitor handle - * @param size number of bytes available in buf - * @param buf where to copy the message to the service - * @return number of bytes copied to buf + * @param cls closure with the `struct GNUNET_NAMESTORE_ZoneMonitor *` + * @param error error code */ -static size_t -transmit_monitor_message (void *cls, - size_t size, - void *buf) +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; - struct ZoneMonitorStartMessage sm; - if (size < sizeof (struct ZoneMonitorStartMessage)) - { - reconnect (zm); - return 0; + (void) error; + reconnect (zm); +} + + +/** + * Reconnect to the namestore service. + * + * @param zm monitor to reconnect + */ +static void +reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) +{ + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_fixed_size (sync, + GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC, + struct GNUNET_MessageHeader, + zm), + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT, + struct RecordResultMessage, + zm), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct ZoneMonitorStartMessage *sm; + + if (NULL != zm->mq) + { + GNUNET_MQ_destroy (zm->mq); + zm->error_cb (zm->error_cb_cls); } - - sm.zone = zm->zone; - sm.all_zones = htonl (zm->all_zones); - memcpy (buf, &sm, sizeof (sm)); - GNUNET_CLIENT_receive (zm->h, - &handle_updates, - zm, - GNUNET_TIME_UNIT_FOREVER_REL); - return sizeof (sm); + zm->mq = GNUNET_CLIENT_connect (zm->cfg, + "namestore", + handlers, + &mq_error_handler, + zm); + if (NULL == zm->mq) + return; + env = GNUNET_MQ_msg (sm, + GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START); + sm->iterate_first = htonl (zm->iterate_first); + sm->zone = zm->zone; + GNUNET_MQ_send (zm->mq, + env); } + /** - * Begin monitoring a zone for changes. Will first call the 'monitor' function - * on all existing records in the selected zone(s) and then call it whenever - * a record changes. + * Begin monitoring a zone for changes. If @a iterate_first is set, + * we Will first call the @a monitor function on all existing records + * in the selected zone(s). In any case, we will call @a sync and + * afterwards call @a monitor whenever a record changes. * * @param cfg configuration to use to connect to namestore - * @param zone zone to monitor, NULL for all zones + * @param zone zone to monitor + * @param iterate_first #GNUNET_YES to first iterate over all existing records, + * #GNUNET_NO to only return changes that happen from now on + * @param error_cb function to call on error (i.e. disconnect); note that + * unlike the other error callbacks in this API, a call to this + * function does NOT destroy the monitor handle, it merely signals + * that monitoring is down. You need to still explicitly call + * #GNUNET_NAMESTORE_zone_monitor_stop(). + * @param error_cb_cls closure for @a error_cb * @param monitor function to call on zone changes - * @param monitor_cls closure for 'monitor' + * @param monitor_cls closure for @a monitor + * @param sync_cb function called when we're in sync with the namestore + * @param cls closure for @a sync_cb * @return handle to stop monitoring */ struct GNUNET_NAMESTORE_ZoneMonitor * GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_CRYPTO_ShortHashCode *zone, + const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone, + int iterate_first, + GNUNET_SCHEDULER_TaskCallback error_cb, + void *error_cb_cls, GNUNET_NAMESTORE_RecordMonitor monitor, - void *monitor_cls) + void *monitor_cls, + GNUNET_SCHEDULER_TaskCallback sync_cb, + void *sync_cb_cls) { struct GNUNET_NAMESTORE_ZoneMonitor *zm; - struct GNUNET_CLIENT_Connection *client; - if (NULL == (client = GNUNET_CLIENT_connect ("namestore", cfg))) - return NULL; zm = GNUNET_new (struct GNUNET_NAMESTORE_ZoneMonitor); - zm->cfg = cfg; - zm->h = client; - if (NULL == zone) - zm->all_zones = GNUNET_YES; - else + if (NULL != zone) zm->zone = *zone; + zm->iterate_first = iterate_first; + zm->error_cb = error_cb; + zm->error_cb_cls = error_cb_cls; zm->monitor = monitor; zm->monitor_cls = monitor_cls; - zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h, - sizeof (struct ZoneMonitorStartMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_monitor_message, - zm); + zm->sync_cb = sync_cb; + zm->sync_cb_cls = sync_cb_cls; + zm->cfg = cfg; + reconnect (zm); + if (NULL == zm->mq) + { + GNUNET_free (zm); + return NULL; + } return zm; } +/** + * Calls the monitor processor specified in #GNUNET_NAMESTORE_zone_monitor_start + * for the next record(s). This function is used to allow clients that merely + * monitor the NAMESTORE to still throttle namestore operations, so we can be + * sure that the monitors can keep up. + * + * Note that #GNUNET_NAMESTORE_records_store() only waits for this + * call if the previous limit set by the client was already reached. + * Thus, by using a @a limit greater than 1, monitors basically enable + * a queue of notifications to be processed asynchronously with some + * delay. Note that even with a limit of 1 the + * #GNUNET_NAMESTORE_records_store() function will run asynchronously + * and the continuation may be invoked before the monitors completed + * (or even started) processing the notification. Thus, monitors will + * only closely track the current state of the namestore, but not + * be involved in the transactions. + * + * @param zm the monitor + * @param limit number of records to return to the iterator in one shot + * (before #GNUNET_NAMESTORE_zone_monitor_next is to be called again) + */ +void +GNUNET_NAMESTORE_zone_monitor_next (struct GNUNET_NAMESTORE_ZoneMonitor *zm, + uint64_t limit) +{ + struct GNUNET_MQ_Envelope *env; + struct ZoneMonitorNextMessage *nm; + + env = GNUNET_MQ_msg (nm, + GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT); + nm->limit = GNUNET_htonll (limit); + GNUNET_MQ_send (zm->mq, + env); +} + + /** * Stop monitoring a zone for changes. * @@ -226,12 +385,11 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c void GNUNET_NAMESTORE_zone_monitor_stop (struct GNUNET_NAMESTORE_ZoneMonitor *zm) { - if (NULL != zm->th) + if (NULL != zm->mq) { - GNUNET_CLIENT_notify_transmit_ready_cancel (zm->th); - zm->th = NULL; + GNUNET_MQ_destroy (zm->mq); + zm->mq = NULL; } - GNUNET_CLIENT_disconnect (zm->h); GNUNET_free (zm); }