/*
This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2013, 2016, 2018 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
+ Affero General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
/**
* @file namestore/namestore_api_monitor.c
* @brief API to monitor changes in the NAMESTORE
/**
* Handle to namestore service.
*/
- struct GNUNET_CLIENT_Connection *h;
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Function to call on errors.
+ */
+ GNUNET_SCHEDULER_TaskCallback error_cb;
+
+ /**
+ * Closure for @e error_cb.
+ */
+ void *error_cb_cls;
/**
* Function to call on events.
GNUNET_NAMESTORE_RecordMonitor monitor;
/**
- * Function called when we've synchronized.
+ * Closure for @e monitor.
*/
- GNUNET_NAMESTORE_RecordsSynchronizedCallback sync_cb;
+ void *monitor_cls;
/**
- * Closure for 'monitor' and 'sync_cb'.
+ * Function called when we've synchronized.
*/
- void *cls;
+ GNUNET_SCHEDULER_TaskCallback sync_cb;
/**
- * Transmission handle to client.
+ * Closure for @e sync_cb.
*/
- struct GNUNET_CLIENT_TransmitHandle *th;
+ void *sync_cb_cls;
/**
* Monitored zone.
*/
- struct GNUNET_CRYPTO_EccPrivateKey zone;
+ struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
+ /**
+ * Do we first iterate over all existing records?
+ */
+ int iterate_first;
};
/**
- * Send our request to start monitoring to the service.
+ * Reconnect to the namestore service.
*
- * @param cls the monitor handle
- * @param size number of bytes available in buf
- * @param buf where to copy the message to the service
- * @return number of bytes copied to buf
+ * @param zm monitor to reconnect
*/
-static size_t
-transmit_monitor_message (void *cls,
- size_t size,
- void *buf);
+static void
+reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm);
/**
- * Reconnect to the namestore service.
+ * Handle SYNC message from the namestore service.
*
- * @param zm monitor to reconnect
+ * @param cls the monitor
+ * @param msg the sync message
*/
static void
-reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
+handle_sync (void *cls, const struct GNUNET_MessageHeader *msg)
{
- if (NULL != zm->h)
- GNUNET_CLIENT_disconnect (zm->h);
- zm->monitor (zm->cls,
- NULL,
- NULL, 0, NULL);
- GNUNET_assert (NULL != (zm->h = GNUNET_CLIENT_connect ("namestore", zm->cfg)));
- zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h,
- sizeof (struct ZoneMonitorStartMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_monitor_message,
- zm);
+ struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
+
+ (void) cls;
+ (void) msg;
+ if (NULL != zm->sync_cb)
+ zm->sync_cb (zm->sync_cb_cls);
}
/**
* We've received a notification about a change to our zone.
- * Forward to monitor callback.
+ * Check that it is well-formed.
*
* @param cls the zone monitor handle
- * @param msg the message from the service.
+ * @param lrm the message from the service.
*/
-static void
-handle_updates (void *cls,
- const struct GNUNET_MessageHeader *msg)
+static int
+check_result (void *cls, const struct RecordResultMessage *lrm)
{
struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
- const struct RecordResultMessage *lrm;
size_t lrm_len;
size_t exp_lrm_len;
size_t name_len;
const char *name_tmp;
const char *rd_ser_tmp;
- if (NULL == msg)
- {
- reconnect (zm);
- return;
- }
- if ( (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)) &&
- (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC == ntohs (msg->type) ) )
- {
- GNUNET_CLIENT_receive (zm->h,
- &handle_updates,
- zm,
- GNUNET_TIME_UNIT_FOREVER_REL);
- if (NULL != zm->sync_cb)
- zm->sync_cb (zm->cls);
- return;
- }
- if ( (ntohs (msg->size) < sizeof (struct RecordResultMessage)) ||
- (GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT != ntohs (msg->type) ) )
+ (void) cls;
+ if ((0 != GNUNET_memcmp (&lrm->private_key, &zm->zone)) &&
+ (0 != GNUNET_is_zero (&zm->zone)))
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
- lrm = (const struct RecordResultMessage *) msg;
lrm_len = ntohs (lrm->gns_header.header.size);
rd_len = ntohs (lrm->rd_len);
rd_count = ntohs (lrm->rd_count);
name_len = ntohs (lrm->name_len);
- exp_lrm_len = sizeof (struct RecordResultMessage) + name_len + rd_len;
+ if (name_len > MAX_NAME_LEN)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ exp_lrm_len = sizeof(struct RecordResultMessage) + name_len + rd_len;
if (lrm_len != exp_lrm_len)
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
if (0 == name_len)
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
name_tmp = (const char *) &lrm[1];
- if ((name_tmp[name_len -1] != '\0') || (name_len > MAX_NAME_LEN))
+ if (name_tmp[name_len - 1] != '\0')
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
rd_ser_tmp = (const char *) &name_tmp[name_len];
{
- struct GNUNET_NAMESTORE_RecordData rd[rd_count];
+ struct GNUNET_GNSRECORD_Data rd[rd_count];
- if (GNUNET_OK != GNUNET_NAMESTORE_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd))
+ if (GNUNET_OK !=
+ GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd))
{
GNUNET_break (0);
- reconnect (zm);
- return;
- }
- GNUNET_CLIENT_receive (zm->h,
- &handle_updates,
- zm,
- GNUNET_TIME_UNIT_FOREVER_REL);
- zm->monitor (zm->cls,
- &lrm->private_key,
- name_tmp,
- rd_count, rd);
+ return GNUNET_SYSERR;
+ }
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * We've received a notification about a change to our zone.
+ * Forward to monitor callback.
+ *
+ * @param cls the zone monitor handle
+ * @param lrm the message from the service.
+ */
+static void
+handle_result (void *cls, const struct RecordResultMessage *lrm)
+{
+ struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
+ size_t name_len;
+ size_t rd_len;
+ unsigned rd_count;
+ const char *name_tmp;
+ const char *rd_ser_tmp;
+
+ rd_len = ntohs (lrm->rd_len);
+ rd_count = ntohs (lrm->rd_count);
+ name_len = ntohs (lrm->name_len);
+ name_tmp = (const char *) &lrm[1];
+ rd_ser_tmp = (const char *) &name_tmp[name_len];
+ {
+ struct GNUNET_GNSRECORD_Data rd[rd_count];
+
+ GNUNET_assert (
+ GNUNET_OK ==
+ GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd));
+ zm->monitor (zm->monitor_cls, &lrm->private_key, name_tmp, rd_count, rd);
}
}
/**
- * Send our request to start monitoring to the service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param cls the monitor handle
- * @param size number of bytes available in buf
- * @param buf where to copy the message to the service
- * @return number of bytes copied to buf
+ * @param cls closure with the `struct GNUNET_NAMESTORE_ZoneMonitor *`
+ * @param error error code
*/
-static size_t
-transmit_monitor_message (void *cls,
- size_t size,
- void *buf)
+static void
+mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
{
struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
- struct ZoneMonitorStartMessage sm;
- zm->th = NULL;
- if (size < sizeof (struct ZoneMonitorStartMessage))
- {
- reconnect (zm);
- return 0;
+ (void) error;
+ reconnect (zm);
+}
+
+
+/**
+ * Reconnect to the namestore service.
+ *
+ * @param zm monitor to reconnect
+ */
+static void
+reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
+{
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ { GNUNET_MQ_hd_fixed_size (sync,
+ GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC,
+ struct GNUNET_MessageHeader,
+ zm),
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT,
+ struct RecordResultMessage,
+ zm),
+ GNUNET_MQ_handler_end () };
+ struct GNUNET_MQ_Envelope *env;
+ struct ZoneMonitorStartMessage *sm;
+
+ if (NULL != zm->mq)
+ {
+ GNUNET_MQ_destroy (zm->mq);
+ zm->error_cb (zm->error_cb_cls);
}
- sm.gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START);
- sm.gns_header.header.size = htons (sizeof (struct ZoneMonitorStartMessage));
- sm.gns_header.r_id = htonl (0);
- sm.zone = zm->zone;
- memcpy (buf, &sm, sizeof (sm));
- GNUNET_CLIENT_receive (zm->h,
- &handle_updates,
- zm,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return sizeof (sm);
+ zm->mq = GNUNET_CLIENT_connect (zm->cfg,
+ "namestore",
+ handlers,
+ &mq_error_handler,
+ zm);
+ if (NULL == zm->mq)
+ return;
+ env = GNUNET_MQ_msg (sm, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START);
+ sm->iterate_first = htonl (zm->iterate_first);
+ sm->zone = zm->zone;
+ GNUNET_MQ_send (zm->mq, env);
}
/**
- * Begin monitoring a zone for changes. Will first call the 'monitor' function
- * on all existing records in the selected zone(s) and then call it whenever
- * a record changes.
+ * Begin monitoring a zone for changes. If @a iterate_first is set,
+ * we Will first call the @a monitor function on all existing records
+ * in the selected zone(s). In any case, we will call @a sync and
+ * afterwards call @a monitor whenever a record changes.
*
* @param cfg configuration to use to connect to namestore
* @param zone zone to monitor
+ * @param iterate_first #GNUNET_YES to first iterate over all existing records,
+ * #GNUNET_NO to only return changes that happen from now
+ * on
+ * @param error_cb function to call on error (i.e. disconnect); note that
+ * unlike the other error callbacks in this API, a call to this
+ * function does NOT destroy the monitor handle, it merely signals
+ * that monitoring is down. You need to still explicitly call
+ * #GNUNET_NAMESTORE_zone_monitor_stop().
+ * @param error_cb_cls closure for @a error_cb
* @param monitor function to call on zone changes
+ * @param monitor_cls closure for @a monitor
* @param sync_cb function called when we're in sync with the namestore
- * @param cls closure for 'monitor' and 'sync_cb'
+ * @param cls closure for @a sync_cb
* @return handle to stop monitoring
*/
struct GNUNET_NAMESTORE_ZoneMonitor *
-GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const struct GNUNET_CRYPTO_EccPrivateKey *zone,
- GNUNET_NAMESTORE_RecordMonitor monitor,
- GNUNET_NAMESTORE_RecordsSynchronizedCallback sync_cb,
- void *cls)
+GNUNET_NAMESTORE_zone_monitor_start (
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
+ int iterate_first,
+ GNUNET_SCHEDULER_TaskCallback error_cb,
+ void *error_cb_cls,
+ GNUNET_NAMESTORE_RecordMonitor monitor,
+ void *monitor_cls,
+ GNUNET_SCHEDULER_TaskCallback sync_cb,
+ void *sync_cb_cls)
{
struct GNUNET_NAMESTORE_ZoneMonitor *zm;
- struct GNUNET_CLIENT_Connection *client;
- if (NULL == (client = GNUNET_CLIENT_connect ("namestore", cfg)))
- return NULL;
zm = GNUNET_new (struct GNUNET_NAMESTORE_ZoneMonitor);
- zm->cfg = cfg;
- zm->h = client;
- zm->zone = *zone;
+ if (NULL != zone)
+ zm->zone = *zone;
+ zm->iterate_first = iterate_first;
+ zm->error_cb = error_cb;
+ zm->error_cb_cls = error_cb_cls;
zm->monitor = monitor;
+ zm->monitor_cls = monitor_cls;
zm->sync_cb = sync_cb;
- zm->cls = cls;
- zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h,
- sizeof (struct ZoneMonitorStartMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_monitor_message,
- zm);
+ zm->sync_cb_cls = sync_cb_cls;
+ zm->cfg = cfg;
+ reconnect (zm);
+ if (NULL == zm->mq)
+ {
+ GNUNET_free (zm);
+ return NULL;
+ }
return zm;
}
+/**
+ * Calls the monitor processor specified in #GNUNET_NAMESTORE_zone_monitor_start
+ * for the next record(s). This function is used to allow clients that merely
+ * monitor the NAMESTORE to still throttle namestore operations, so we can be
+ * sure that the monitors can keep up.
+ *
+ * Note that #GNUNET_NAMESTORE_records_store() only waits for this
+ * call if the previous limit set by the client was already reached.
+ * Thus, by using a @a limit greater than 1, monitors basically enable
+ * a queue of notifications to be processed asynchronously with some
+ * delay. Note that even with a limit of 1 the
+ * #GNUNET_NAMESTORE_records_store() function will run asynchronously
+ * and the continuation may be invoked before the monitors completed
+ * (or even started) processing the notification. Thus, monitors will
+ * only closely track the current state of the namestore, but not
+ * be involved in the transactions.
+ *
+ * @param zm the monitor
+ * @param limit number of records to return to the iterator in one shot
+ * (before #GNUNET_NAMESTORE_zone_monitor_next is to be called again)
+ */
+void
+GNUNET_NAMESTORE_zone_monitor_next (struct GNUNET_NAMESTORE_ZoneMonitor *zm,
+ uint64_t limit)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct ZoneMonitorNextMessage *nm;
+
+ env = GNUNET_MQ_msg (nm, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT);
+ nm->limit = GNUNET_htonll (limit);
+ GNUNET_MQ_send (zm->mq, env);
+}
+
+
/**
* Stop monitoring a zone for changes.
*
void
GNUNET_NAMESTORE_zone_monitor_stop (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
{
- if (NULL != zm->th)
+ if (NULL != zm->mq)
{
- GNUNET_CLIENT_notify_transmit_ready_cancel (zm->th);
- zm->th = NULL;
+ GNUNET_MQ_destroy (zm->mq);
+ zm->mq = NULL;
}
- GNUNET_CLIENT_disconnect (zm->h);
GNUNET_free (zm);
}
+
/* end of namestore_api_monitor.c */