tighten formatting rules
[oweals/gnunet.git] / src / namestore / namestore_api_monitor.c
index 050d88ba535f7c7ed829f6554ae71329f9efbfc2..ab61403281ed04f3d0a363dec0eb49f9df1499b9 100644 (file)
@@ -1,23 +1,22 @@
 /*
      This file is part of GNUnet.
-     (C) 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2013, 2016, 2018 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
 /**
  * @file namestore/namestore_api_monitor.c
  * @brief API to monitor changes in the NAMESTORE
@@ -48,7 +47,17 @@ struct GNUNET_NAMESTORE_ZoneMonitor
   /**
    * Handle to namestore service.
    */
-  struct GNUNET_CLIENT_Connection *h;
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Function to call on errors.
+   */
+  GNUNET_SCHEDULER_TaskCallback error_cb;
+
+  /**
+   * Closure for @e error_cb.
+   */
+  void *error_cb_cls;
 
   /**
    * Function to call on events.
@@ -56,78 +65,70 @@ struct GNUNET_NAMESTORE_ZoneMonitor
   GNUNET_NAMESTORE_RecordMonitor monitor;
 
   /**
-   * Function called when we've synchronized.
+   * Closure for @e monitor.
    */
-  GNUNET_NAMESTORE_RecordsSynchronizedCallback sync_cb;
+  void *monitor_cls;
 
   /**
-   * Closure for 'monitor' and 'sync_cb'.
+   * Function called when we've synchronized.
    */
-  void *cls;
+  GNUNET_SCHEDULER_TaskCallback sync_cb;
 
   /**
-   * Transmission handle to client.
+   * Closure for @e sync_cb.
    */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  void *sync_cb_cls;
 
   /**
    * Monitored zone.
    */
-  struct GNUNET_CRYPTO_EccPrivateKey zone;
+  struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
 
+  /**
+   * Do we first iterate over all existing records?
+   */
+  int iterate_first;
 };
 
 
 /**
- * Send our request to start monitoring to the service.
+ * Reconnect to the namestore service.
  *
- * @param cls the monitor handle
- * @param size number of bytes available in buf
- * @param buf where to copy the message to the service
- * @return number of bytes copied to buf
+ * @param zm monitor to reconnect
  */
-static size_t
-transmit_monitor_message (void *cls,
-                         size_t size,
-                         void *buf);
+static void
+reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm);
 
 
 /**
- * Reconnect to the namestore service.
+ * Handle SYNC message from the namestore service.
  *
- * @param zm monitor to reconnect
+ * @param cls the monitor
+ * @param msg the sync message
  */
 static void
-reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
+handle_sync (void *cls, const struct GNUNET_MessageHeader *msg)
 {
-  if (NULL != zm->h)
-    GNUNET_CLIENT_disconnect (zm->h);
-  zm->monitor (zm->cls,
-              NULL,
-              NULL, 0, NULL);
-  GNUNET_assert (NULL != (zm->h = GNUNET_CLIENT_connect ("namestore", zm->cfg)));
-  zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h,
-                                               sizeof (struct ZoneMonitorStartMessage),
-                                               GNUNET_TIME_UNIT_FOREVER_REL,
-                                               GNUNET_YES,
-                                               &transmit_monitor_message,
-                                               zm);
+  struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
+
+  (void) cls;
+  (void) msg;
+  if (NULL != zm->sync_cb)
+    zm->sync_cb (zm->sync_cb_cls);
 }
 
 
 /**
  * We've received a notification about a change to our zone.
- * Forward to monitor callback.
+ * Check that it is well-formed.
  *
  * @param cls the zone monitor handle
- * @param msg the message from the service.
+ * @param lrm the message from the service.
  */
-static void
-handle_updates (void *cls,
-               const struct GNUNET_MessageHeader *msg)
+static int
+check_result (void *cls, const struct RecordResultMessage *lrm)
 {
   struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
-  const struct RecordResultMessage *lrm;
   size_t lrm_len;
   size_t exp_lrm_len;
   size_t name_len;
@@ -136,152 +137,237 @@ handle_updates (void *cls,
   const char *name_tmp;
   const char *rd_ser_tmp;
 
-  if (NULL == msg)
-  {
-    reconnect (zm);
-    return;
-  }
-  if ( (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)) &&
-       (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC == ntohs (msg->type) ) )
-  {
-    GNUNET_CLIENT_receive (zm->h,
-                          &handle_updates,
-                          zm,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    if (NULL != zm->sync_cb)
-      zm->sync_cb (zm->cls);
-    return;
-  }
-  if ( (ntohs (msg->size) < sizeof (struct RecordResultMessage)) ||
-       (GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT != ntohs (msg->type) ) )
+  (void) cls;
+  if ((0 != GNUNET_memcmp (&lrm->private_key, &zm->zone)) &&
+      (0 != GNUNET_is_zero (&zm->zone)))
   {
     GNUNET_break (0);
-    reconnect (zm);
-    return;
+    return GNUNET_SYSERR;
   }
-  lrm = (const struct RecordResultMessage *) msg;
   lrm_len = ntohs (lrm->gns_header.header.size);
   rd_len = ntohs (lrm->rd_len);
   rd_count = ntohs (lrm->rd_count);
   name_len = ntohs (lrm->name_len);
-  exp_lrm_len = sizeof (struct RecordResultMessage) + name_len + rd_len;
+  if (name_len > MAX_NAME_LEN)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  exp_lrm_len = sizeof(struct RecordResultMessage) + name_len + rd_len;
   if (lrm_len != exp_lrm_len)
   {
     GNUNET_break (0);
-    reconnect (zm);
-    return;
+    return GNUNET_SYSERR;
   }
   if (0 == name_len)
   {
     GNUNET_break (0);
-    reconnect (zm);
-    return;
+    return GNUNET_SYSERR;
   }
   name_tmp = (const char *) &lrm[1];
-  if ((name_tmp[name_len -1] != '\0') || (name_len > MAX_NAME_LEN))
+  if (name_tmp[name_len - 1] != '\0')
   {
     GNUNET_break (0);
-    reconnect (zm);
-    return;
+    return GNUNET_SYSERR;
   }
   rd_ser_tmp = (const char *) &name_tmp[name_len];
   {
-    struct GNUNET_NAMESTORE_RecordData rd[rd_count];
+    struct GNUNET_GNSRECORD_Data rd[rd_count];
 
-    if (GNUNET_OK != GNUNET_NAMESTORE_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd))
+    if (GNUNET_OK !=
+        GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd))
     {
       GNUNET_break (0);
-      reconnect (zm);
-      return;
-    }  
-    GNUNET_CLIENT_receive (zm->h,
-                          &handle_updates,
-                          zm,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    zm->monitor (zm->cls, 
-                &lrm->private_key,
-                name_tmp, 
-                rd_count, rd);
+      return GNUNET_SYSERR;
+    }
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * We've received a notification about a change to our zone.
+ * Forward to monitor callback.
+ *
+ * @param cls the zone monitor handle
+ * @param lrm the message from the service.
+ */
+static void
+handle_result (void *cls, const struct RecordResultMessage *lrm)
+{
+  struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
+  size_t name_len;
+  size_t rd_len;
+  unsigned rd_count;
+  const char *name_tmp;
+  const char *rd_ser_tmp;
+
+  rd_len = ntohs (lrm->rd_len);
+  rd_count = ntohs (lrm->rd_count);
+  name_len = ntohs (lrm->name_len);
+  name_tmp = (const char *) &lrm[1];
+  rd_ser_tmp = (const char *) &name_tmp[name_len];
+  {
+    struct GNUNET_GNSRECORD_Data rd[rd_count];
+
+    GNUNET_assert (
+      GNUNET_OK ==
+      GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd));
+    zm->monitor (zm->monitor_cls, &lrm->private_key, name_tmp, rd_count, rd);
   }
 }
 
 
 /**
- * Send our request to start monitoring to the service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
  *
- * @param cls the monitor handle
- * @param size number of bytes available in buf
- * @param buf where to copy the message to the service
- * @return number of bytes copied to buf
+ * @param cls closure with the `struct GNUNET_NAMESTORE_ZoneMonitor *`
+ * @param error error code
  */
-static size_t
-transmit_monitor_message (void *cls,
-                         size_t size,
-                         void *buf)
+static void
+mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
 {
   struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
-  struct ZoneMonitorStartMessage sm;
 
-  zm->th = NULL;
-  if (size < sizeof (struct ZoneMonitorStartMessage))
-  {    
-    reconnect (zm);
-    return 0;
+  (void) error;
+  reconnect (zm);
+}
+
+
+/**
+ * Reconnect to the namestore service.
+ *
+ * @param zm monitor to reconnect
+ */
+static void
+reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
+{
+  struct GNUNET_MQ_MessageHandler handlers[] =
+  { GNUNET_MQ_hd_fixed_size (sync,
+                             GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC,
+                             struct GNUNET_MessageHeader,
+                             zm),
+    GNUNET_MQ_hd_var_size (result,
+                           GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT,
+                           struct RecordResultMessage,
+                           zm),
+    GNUNET_MQ_handler_end () };
+  struct GNUNET_MQ_Envelope *env;
+  struct ZoneMonitorStartMessage *sm;
+
+  if (NULL != zm->mq)
+  {
+    GNUNET_MQ_destroy (zm->mq);
+    zm->error_cb (zm->error_cb_cls);
   }
-  sm.gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START);
-  sm.gns_header.header.size = htons (sizeof (struct ZoneMonitorStartMessage));
-  sm.gns_header.r_id = htonl (0);
-  sm.zone = zm->zone;
-  memcpy (buf, &sm, sizeof (sm));
-  GNUNET_CLIENT_receive (zm->h,
-                        &handle_updates,
-                        zm,
-                        GNUNET_TIME_UNIT_FOREVER_REL);
-  return sizeof (sm);
+  zm->mq = GNUNET_CLIENT_connect (zm->cfg,
+                                  "namestore",
+                                  handlers,
+                                  &mq_error_handler,
+                                  zm);
+  if (NULL == zm->mq)
+    return;
+  env = GNUNET_MQ_msg (sm, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START);
+  sm->iterate_first = htonl (zm->iterate_first);
+  sm->zone = zm->zone;
+  GNUNET_MQ_send (zm->mq, env);
 }
 
 
 /**
- * Begin monitoring a zone for changes.  Will first call the 'monitor' function
- * on all existing records in the selected zone(s) and then call it whenever
- * a record changes.
+ * Begin monitoring a zone for changes.  If @a iterate_first is set,
+ * we Will first call the @a monitor function on all existing records
+ * in the selected zone(s).  In any case, we will call @a sync and
+ * afterwards call @a monitor whenever a record changes.
  *
  * @param cfg configuration to use to connect to namestore
  * @param zone zone to monitor
+ * @param iterate_first #GNUNET_YES to first iterate over all existing records,
+ *                      #GNUNET_NO to only return changes that happen from now
+ * on
+ * @param error_cb function to call on error (i.e. disconnect); note that
+ *         unlike the other error callbacks in this API, a call to this
+ *         function does NOT destroy the monitor handle, it merely signals
+ *         that monitoring is down. You need to still explicitly call
+ *         #GNUNET_NAMESTORE_zone_monitor_stop().
+ * @param error_cb_cls closure for @a error_cb
  * @param monitor function to call on zone changes
+ * @param monitor_cls closure for @a monitor
  * @param sync_cb function called when we're in sync with the namestore
- * @param cls closure for 'monitor' and 'sync_cb'
+ * @param cls closure for @a sync_cb
  * @return handle to stop monitoring
  */
 struct GNUNET_NAMESTORE_ZoneMonitor *
-GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                                    const struct GNUNET_CRYPTO_EccPrivateKey *zone,
-                                    GNUNET_NAMESTORE_RecordMonitor monitor,
-                                    GNUNET_NAMESTORE_RecordsSynchronizedCallback sync_cb,
-                                    void *cls)
+GNUNET_NAMESTORE_zone_monitor_start (
+  const struct GNUNET_CONFIGURATION_Handle *cfg,
+  const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
+  int iterate_first,
+  GNUNET_SCHEDULER_TaskCallback error_cb,
+  void *error_cb_cls,
+  GNUNET_NAMESTORE_RecordMonitor monitor,
+  void *monitor_cls,
+  GNUNET_SCHEDULER_TaskCallback sync_cb,
+  void *sync_cb_cls)
 {
   struct GNUNET_NAMESTORE_ZoneMonitor *zm;
-  struct GNUNET_CLIENT_Connection *client;
 
-  if (NULL == (client = GNUNET_CLIENT_connect ("namestore", cfg)))
-    return NULL; 
   zm = GNUNET_new (struct GNUNET_NAMESTORE_ZoneMonitor);
-  zm->cfg = cfg;
-  zm->h = client;
-  zm->zone = *zone;
+  if (NULL != zone)
+    zm->zone = *zone;
+  zm->iterate_first = iterate_first;
+  zm->error_cb = error_cb;
+  zm->error_cb_cls = error_cb_cls;
   zm->monitor = monitor;
+  zm->monitor_cls = monitor_cls;
   zm->sync_cb = sync_cb;
-  zm->cls = cls;
-  zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h,
-                                               sizeof (struct ZoneMonitorStartMessage),
-                                               GNUNET_TIME_UNIT_FOREVER_REL,
-                                               GNUNET_YES,
-                                               &transmit_monitor_message,
-                                               zm);
+  zm->sync_cb_cls = sync_cb_cls;
+  zm->cfg = cfg;
+  reconnect (zm);
+  if (NULL == zm->mq)
+  {
+    GNUNET_free (zm);
+    return NULL;
+  }
   return zm;
 }
 
 
+/**
+ * Calls the monitor processor specified in #GNUNET_NAMESTORE_zone_monitor_start
+ * for the next record(s).  This function is used to allow clients that merely
+ * monitor the NAMESTORE to still throttle namestore operations, so we can be
+ * sure that the monitors can keep up.
+ *
+ * Note that #GNUNET_NAMESTORE_records_store() only waits for this
+ * call if the previous limit set by the client was already reached.
+ * Thus, by using a @a limit greater than 1, monitors basically enable
+ * a queue of notifications to be processed asynchronously with some
+ * delay.  Note that even with a limit of 1 the
+ * #GNUNET_NAMESTORE_records_store() function will run asynchronously
+ * and the continuation may be invoked before the monitors completed
+ * (or even started) processing the notification.  Thus, monitors will
+ * only closely track the current state of the namestore, but not
+ * be involved in the transactions.
+ *
+ * @param zm the monitor
+ * @param limit number of records to return to the iterator in one shot
+ *        (before #GNUNET_NAMESTORE_zone_monitor_next is to be called again)
+ */
+void
+GNUNET_NAMESTORE_zone_monitor_next (struct GNUNET_NAMESTORE_ZoneMonitor *zm,
+                                    uint64_t limit)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct ZoneMonitorNextMessage *nm;
+
+  env = GNUNET_MQ_msg (nm, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT);
+  nm->limit = GNUNET_htonll (limit);
+  GNUNET_MQ_send (zm->mq, env);
+}
+
+
 /**
  * Stop monitoring a zone for changes.
  *
@@ -290,13 +376,13 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c
 void
 GNUNET_NAMESTORE_zone_monitor_stop (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
 {
-  if (NULL != zm->th)
+  if (NULL != zm->mq)
   {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (zm->th);
-    zm->th = NULL;
+    GNUNET_MQ_destroy (zm->mq);
+    zm->mq = NULL;
   }
-  GNUNET_CLIENT_disconnect (zm->h);
   GNUNET_free (zm);
 }
 
+
 /* end of namestore_api_monitor.c */