start preparations for flow control by namestore monitors
authorChristian Grothoff <christian@grothoff.org>
Wed, 2 May 2018 15:27:36 +0000 (17:27 +0200)
committerChristian Grothoff <christian@grothoff.org>
Wed, 2 May 2018 15:27:42 +0000 (17:27 +0200)
src/include/gnunet_namestore_service.h
src/include/gnunet_protocols.h
src/namestore/gnunet-service-namestore.c
src/namestore/namestore.h
src/namestore/namestore_api_monitor.c

index 4828f72ade1428c7eff949759f5d807ddad883cc..b8c7ea810fb59e33bd2fc23e45b4e929b87cce7a 100644 (file)
@@ -109,6 +109,12 @@ typedef void
  * it is replaced with the new record.  Use an empty array to
  * remove all records under the given name.
  *
+ * The continuation is called after the value has been stored in the
+ * database. Monitors may be notified asynchronously (basically with
+ * a buffer). However, if any monitor is consistently too slow to
+ * keep up with the changes, calling @a cont will be delayed until the
+ * monitors do keep up.
+ *
  * @param h handle to the namestore
  * @param pkey private key of the zone
  * @param label name that is being mapped
@@ -313,7 +319,7 @@ struct GNUNET_NAMESTORE_ZoneMonitor;
  *         that monitoring is down. You need to still explicitly call
  *         #GNUNET_NAMESTORE_zone_monitor_stop().
  * @param error_cb_cls closure for @a error_cb
- * @param monitor function to call on zone changes
+ * @param monitor function to call on zone changes, with an initial limit of 1
  * @param monitor_cls closure for @a monitor
  * @param sync_cb function called when we're in sync with the namestore
  * @param sync_cb_cls closure for @a sync_cb
@@ -331,6 +337,32 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c
                                     void *sync_cb_cls);
 
 
+/**
+ * Calls the monitor processor specified in #GNUNET_NAMESTORE_zone_monitor_start
+ * for the next record(s).  This function is used to allow clients that merely
+ * monitor the NAMESTORE to still throttle namestore operations, so we can be
+ * sure that the monitors can keep up.
+ *
+ * Note that #GNUNET_NAMESTORE_records_store() only waits for this
+ * call if the previous limit set by the client was already reached.
+ * Thus, by using a @a limit greater than 1, monitors basically enable
+ * a queue of notifications to be processed asynchronously with some
+ * delay.  Note that even with a limit of 1 the
+ * #GNUNET_NAMESTORE_records_store() function will run asynchronously
+ * and the continuation may be invoked before the monitors completed
+ * (or even started) processing the notification.  Thus, monitors will
+ * only closely track the current state of the namestore, but not
+ * be involved in the transactions.
+ *
+ * @param zm the monitor
+ * @param limit number of records to return to the iterator in one shot
+ *        (before #GNUNET_NAMESTORE_zone_monitor_next is to be called again)
+ */
+void
+GNUNET_NAMESTORE_zone_monitor_next (struct GNUNET_NAMESTORE_ZoneMonitor *zm,
+                                    uint64_t limit);
+
+
 /**
  * Stop monitoring a zone for changes.
  *
index bf1b48679c00a04813eafd39c48f1741c33a2fcd..1ef00cfc8eeee4cb4724db197c335be48c62559b 100644 (file)
@@ -1305,6 +1305,13 @@ extern "C"
  */
 #define GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT 443
 
+/**
+ * Client to service: I am now ready for the next (set of) monitor
+ * events. Monitoring equivlaent of
+ * #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT.
+ */
+#define GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT 444
+
 /**
  * Client to service: please start iteration; receives
  * "GNUNET_MESSAGE_TYPE_NAMESTORE_LOOKUP_NAME_RESPONSE" messages in return.
@@ -1312,7 +1319,7 @@ extern "C"
 #define GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START 445
 
 /**
- * Client to service: next record in iteration please.
+ * Client to service: next record(s) in iteration please.
  */
 #define GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT 447
 
index fa189dbc39000e857f92a24c86d05d0e6f965914..f47c8776bb990920d854343fce24b7483a85114a 100644 (file)
@@ -169,6 +169,12 @@ struct ZoneMonitor
    */
   uint64_t seq;
 
+  /**
+   * Current limit of how many more messages we are allowed
+   * to queue to this monitor.
+   */
+  uint64_t limit;
+
 };
 
 
@@ -1667,7 +1673,7 @@ monitor_iterate_cb (void *cls,
  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
  *
  * @param cls the client sending the message
- * @param message message from the client
+ * @param zis_msg message from the client
  */
 static void
 handle_monitor_start (void *cls,
@@ -1685,7 +1691,7 @@ handle_monitor_start (void *cls,
                               monitor_tail,
                               zm);
   GNUNET_SERVICE_client_mark_monitor (nc->client);
-  GNUNET_SERVICE_client_disable_continue_warning (nc->client);
+  GNUNET_SERVICE_client_continue (nc->client);
   GNUNET_notification_context_add (monitor_nc,
                                   nc->mq);
   if (GNUNET_YES == ntohl (zis_msg->iterate_first))
@@ -1732,6 +1738,51 @@ monitor_next (void *cls)
 }
 
 
+/**
+ * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
+ *
+ * @param cls the client sending the message
+ * @param nm message from the client
+ */
+static void
+handle_monitor_next (void *cls,
+                     const struct ZoneMonitorNextMessage *nm)
+{
+  struct NamestoreClient *nc = cls;
+  struct ZoneMonitor *zm;
+  uint64_t inc;
+
+  inc = GNUNET_ntohll (nm->limit);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received ZONE_MONITOR_NEXT message with limit %llu\n",
+              (unsigned long long) inc);
+  for (zm = monitor_head; NULL != zm; zm = zm->next)
+    if (zm->nc == nc)
+      break;
+  if (NULL == zm)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (nc->client);
+    return;
+  }
+  GNUNET_SERVICE_client_continue (nc->client);
+  if (zm->limit + inc < zm->limit)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (nc->client);
+    return;
+  }
+  zm->limit += inc;
+#if 0
+  if (GNUNET_YES == ntohl (zis_msg->iterate_first))
+    zm->task = GNUNET_SCHEDULER_add_now (&monitor_next,
+                                        zm);
+  else
+    monitor_sync (zm);
+#endif
+}
+
+
 /**
  * Process namestore requests.
  *
@@ -1831,6 +1882,10 @@ GNUNET_SERVICE_MAIN
                          GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
                          struct ZoneMonitorStartMessage,
                          NULL),
+ GNUNET_MQ_hd_fixed_size (monitor_next,
+                         GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
+                         struct ZoneMonitorNextMessage,
+                         NULL),
  GNUNET_MQ_handler_end ());
 
 
index 207b35662fc1fefa232c250c30a1eeecc0a26b78..679ca3d3de0efb3a51c217af0afff20f698bf6d2 100644 (file)
@@ -161,7 +161,7 @@ struct LabelLookupResponseMessage
    * Length of serialized record data
    */
   uint16_t rd_len GNUNET_PACKED;
-  
+
   /**
    * Number of records contained
    */
@@ -318,6 +318,32 @@ struct ZoneMonitorStartMessage
 };
 
 
+/**
+ * Ask for next result of zone iteration for the given operation
+ */
+struct ZoneMonitorNextMessage
+{
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Always zero.
+   */
+  uint32_t reserved;
+
+  /**
+   * Number of records to return to the iterator in one shot
+   * (before #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_MONITOR_NEXT
+   * should be send again). In NBO.
+   */
+  uint64_t limit;
+
+};
+
+
+
 /**
  * Start a zone iteration for the given zone
  */
index 8e6d39ad7e2612bd242b6e31af6331f56fa3d029..9ba90833b355846f36835f01d3a164a1caf71fcc 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2013, 2016 GNUnet e.V.
+     Copyright (C) 2013, 2016, 2018 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -17,7 +17,6 @@
      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
      Boston, MA 02110-1301, USA.
 */
-
 /**
  * @file namestore/namestore_api_monitor.c
  * @brief API to monitor changes in the NAMESTORE
@@ -339,6 +338,42 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c
 }
 
 
+/**
+ * Calls the monitor processor specified in #GNUNET_NAMESTORE_zone_monitor_start
+ * for the next record(s).  This function is used to allow clients that merely
+ * monitor the NAMESTORE to still throttle namestore operations, so we can be
+ * sure that the monitors can keep up.
+ *
+ * Note that #GNUNET_NAMESTORE_records_store() only waits for this
+ * call if the previous limit set by the client was already reached.
+ * Thus, by using a @a limit greater than 1, monitors basically enable
+ * a queue of notifications to be processed asynchronously with some
+ * delay.  Note that even with a limit of 1 the
+ * #GNUNET_NAMESTORE_records_store() function will run asynchronously
+ * and the continuation may be invoked before the monitors completed
+ * (or even started) processing the notification.  Thus, monitors will
+ * only closely track the current state of the namestore, but not
+ * be involved in the transactions.
+ *
+ * @param zm the monitor
+ * @param limit number of records to return to the iterator in one shot
+ *        (before #GNUNET_NAMESTORE_zone_monitor_next is to be called again)
+ */
+void
+GNUNET_NAMESTORE_zone_monitor_next (struct GNUNET_NAMESTORE_ZoneMonitor *zm,
+                                    uint64_t limit)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct ZoneMonitorNextMessage *nm;
+
+  env = GNUNET_MQ_msg (nm,
+                       GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT);
+  nm->limit = GNUNET_htonll (limit);
+  GNUNET_MQ_send (zm->mq,
+                  env);
+}
+
+
 /**
  * Stop monitoring a zone for changes.
  *