batch NAMESTORE operation also in zonemaster
authorChristian Grothoff <christian@grothoff.org>
Sun, 29 Apr 2018 21:16:05 +0000 (23:16 +0200)
committerChristian Grothoff <christian@grothoff.org>
Sun, 29 Apr 2018 21:16:05 +0000 (23:16 +0200)
src/zonemaster/gnunet-service-zonemaster.c

index 25baf43964f68d0e619c0d64d3953aecf285d314..5c3356784333d8c79e146d67cadf7499d3e38155 100644 (file)
 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
 
 
+/**
+ * How often do we measure the delta between desired zone
+ * iteration speed and actual speed, and tell statistics
+ * service about it?
+ */
+#define DELTA_INTERVAL 100
+
+/**
+ * How many records do we fetch
+ * in one shot from the namestore?
+ */
+#define NS_BLOCK_SIZE 100
 
 /**
  * The initial interval in milliseconds btween puts in
 /**
  * Handle for DHT PUT activity triggered from the namestore monitor.
  */
-struct MonitorActivity
+struct DhtPutActivity
 {
   /**
    * Kept in a DLL.
    */
-  struct MonitorActivity *next;
+  struct DhtPutActivity *next;
 
   /**
    * Kept in a DLL.
    */
-  struct MonitorActivity *prev;
+  struct DhtPutActivity *prev;
 
   /**
    * Handle for the DHT PUT operation.
@@ -131,12 +143,22 @@ static struct GNUNET_NAMESTORE_ZoneMonitor *zmon;
 /**
  * Head of monitor activities; kept in a DLL.
  */
-static struct MonitorActivity *ma_head;
+static struct DhtPutActivity *ma_head;
 
 /**
  * Tail of monitor activities; kept in a DLL.
  */
-static struct MonitorActivity *ma_tail;
+static struct DhtPutActivity *ma_tail;
+
+/**
+ * Head of iteration put activities; kept in a DLL.
+ */
+static struct DhtPutActivity *it_head;
+
+/**
+ * Tail of iteration put activities; kept in a DLL.
+ */
+static struct DhtPutActivity *it_tail;
 
 /**
  * Useful for zone update for DHT put
@@ -148,6 +170,21 @@ static unsigned long long num_public_records;
  */
 static unsigned long long last_num_public_records;
 
+/**
+ * Number of successful put operations performed in the current
+ * measurement cycle (as measured in #check_zone_dht_next()).
+ */
+static unsigned long long put_cnt;
+
+/**
+ * What is the frequency at which we currently would like
+ * to perform DHT puts (per record)?  Calculated in
+ * update_velocity() from the #zone_publish_time_window()
+ * and the total number of record sets we have (so far)
+ * observed in the zone.
+ */
+static struct GNUNET_TIME_Relative next_put_interval;
+
 /**
  * Minimum relative expiration time of records seem during the current
  * zone iteration.
@@ -170,11 +207,31 @@ static struct GNUNET_TIME_Relative zone_publish_time_window_default;
  */
 static struct GNUNET_TIME_Relative zone_publish_time_window;
 
+/**
+ * When did we last start measuring the #DELTA_INTERVAL successful
+ * DHT puts? Used for velocity calculations.
+ */
+static struct GNUNET_TIME_Absolute last_put_100;
+
+/**
+ * By how much should we try to increase our per-record iteration speed
+ * (over the desired speed calculated directly from the #put_interval)?
+ * Basically this value corresponds to the per-record CPU time overhead
+ * we have.
+ */
+static struct GNUNET_TIME_Relative sub_delta;
+
 /**
  * zone publish task
  */
 static struct GNUNET_SCHEDULER_Task *zone_publish_task;
 
+/**
+ * How many more values are left for the current query before we need
+ * to explicitly ask the namestore for more?
+ */
+static unsigned int ns_iteration_left;
+
 /**
  * #GNUNET_YES if zone has never been published before
  */
@@ -196,7 +253,7 @@ static int cache_keys;
 static void
 shutdown_task (void *cls)
 {
-  struct MonitorActivity *ma;
+  struct DhtPutActivity *ma;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Shutting down!\n");
@@ -257,8 +314,10 @@ publish_zone_dht_next (void *cls)
 {
   zone_publish_task = NULL;
   GNUNET_assert (NULL != namestore_iter);
+  GNUNET_assert (0 == ns_iteration_left);
+  ns_iteration_left = NS_BLOCK_SIZE;
   GNUNET_NAMESTORE_zone_iterator_next (namestore_iter,
-                                       1);
+                                       NS_BLOCK_SIZE);
 }
 
 
@@ -272,144 +331,188 @@ publish_zone_dht_start (void *cls);
 
 
 /**
- * How often do we measure the delta between desired zone
- * iteration speed and actual speed, and tell statistics
- * service about it?
+ * Continuation called from DHT once the PUT operation triggered
+ * by a monitor is done.
+ *
+ * @param cls a `struct DhtPutActivity`
+ * @param success #GNUNET_OK on success
  */
-#define DELTA_INTERVAL 100
+static void
+dht_put_monitor_continuation (void *cls,
+                              int success)
+{
+  struct DhtPutActivity *ma = cls;
+
+  num_public_records++;
+  GNUNET_CONTAINER_DLL_remove (ma_head,
+                               ma_tail,
+                               ma);
+  GNUNET_free (ma);
+}
 
 
 /**
- * Continuation called from DHT once the PUT operation is done.
- *
- * @param cls closure, NULL if called from regular iteration,
- *        `struct MonitorActivity` if called from #handle_monitor_event.
- * @param success #GNUNET_OK on success
+ * Check if the current zone iteration needs to be continued
+ * by calling #publish_zone_dht_next(), and if so with what delay.
  */
 static void
-dht_put_continuation (void *cls,
-                      int success)
+check_zone_dht_next ()
 {
-  struct MonitorActivity *ma = cls;
-  static unsigned long long put_cnt;
-  static struct GNUNET_TIME_Absolute last_put_100;
-  static struct GNUNET_TIME_Relative sub_delta;
-  struct GNUNET_TIME_Relative next_put_interval;
   struct GNUNET_TIME_Relative delay;
 
-  num_public_records++;
-  if (NULL == ma)
+  if (0 != ns_iteration_left)
+    return; /* current NAMESTORE iteration not yet done */
+  if (NULL != it_head)
+    return; /* waiting on DHT */
+  delay = GNUNET_TIME_relative_subtract (next_put_interval,
+                                         sub_delta);
+  /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
+     per-record delay calculated so far with the #NS_BLOCK_SIZE */
+  delay = GNUNET_TIME_relative_multiply (delay,
+                                         NS_BLOCK_SIZE);
+  GNUNET_assert (NULL == zone_publish_task);
+  zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                    &publish_zone_dht_next,
+                                                    NULL);
+}
+
+
+/**
+ * Re-calculate our velocity and the desired velocity.
+ * We have succeeded in making #DELTA_INTERVAL puts, so
+ * now calculate the new desired delay between puts.
+ */
+static void
+update_velocity ()
+{
+  struct GNUNET_TIME_Relative delta;
+  unsigned long long pct = 0;
+
+  /* How fast were we really? */
+  delta = GNUNET_TIME_absolute_get_duration (last_put_100);
+  delta.rel_value_us /= DELTA_INTERVAL;
+  last_put_100 = GNUNET_TIME_absolute_get ();
+
+  /* calculate expected frequency */
+  if ( (num_public_records > last_num_public_records) &&
+       (GNUNET_NO == first_zone_iteration) )
   {
-    active_put = NULL;
-    if ( (num_public_records > last_num_public_records) &&
-         (GNUNET_NO == first_zone_iteration) )
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Last record count was lower than current record count.  Reducing interval.\n");
+    put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
+                                                num_public_records);
+    next_put_interval = GNUNET_TIME_relative_divide (put_interval,
+                                                     LATE_ITERATION_SPEEDUP_FACTOR);
+  }
+  else
+  {
+    next_put_interval = put_interval;
+  }
+
+  next_put_interval = GNUNET_TIME_relative_min (next_put_interval,
+                                                MAXIMUM_ZONE_ITERATION_INTERVAL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Desired global zone iteration interval is %s/record!\n",
+              GNUNET_STRINGS_relative_time_to_string (next_put_interval,
+                                                      GNUNET_YES));
+
+  /* Tell statistics actual vs. desired speed */
+  GNUNET_STATISTICS_set (statistics,
+                         "Target zone iteration velocity (μs)",
+                         next_put_interval.rel_value_us,
+                         GNUNET_NO);
+  GNUNET_STATISTICS_set (statistics,
+                         "Current zone iteration velocity (μs)",
+                         delta.rel_value_us,
+                         GNUNET_NO);
+  /* update "sub_delta" based on difference, taking
+     previous sub_delta into account! */
+  if (next_put_interval.rel_value_us > delta.rel_value_us)
+  {
+    /* We were too fast, reduce sub_delta! */
+    struct GNUNET_TIME_Relative corr;
+
+    corr = GNUNET_TIME_relative_subtract (next_put_interval,
+                                          delta);
+    if (sub_delta.rel_value_us > delta.rel_value_us)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Last record count was lower than current record count.  Reducing interval.\n");
-      put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
-                                                  num_public_records);
-      next_put_interval = GNUNET_TIME_relative_divide (put_interval,
-                                                       LATE_ITERATION_SPEEDUP_FACTOR);
+      /* Reduce sub_delta by corr */
+      sub_delta = GNUNET_TIME_relative_subtract (sub_delta,
+                                                 corr);
     }
     else
     {
-      next_put_interval = put_interval;
+      /* We're doing fine with waiting the full time, this
+         should theoretically only happen if we run at
+         infinite speed. */
+      sub_delta = GNUNET_TIME_UNIT_ZERO;
     }
-    next_put_interval = GNUNET_TIME_relative_min (next_put_interval,
-                                                  MAXIMUM_ZONE_ITERATION_INTERVAL);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "PUT complete, next PUT in %s!\n",
-                GNUNET_STRINGS_relative_time_to_string (next_put_interval,
-                                                        GNUNET_YES));
-    /* compute velocities and delay corrections to apply */
-    if (0 == put_cnt)
-      last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */
-    put_cnt++;
-    if (0 == put_cnt % DELTA_INTERVAL)
+  }
+  else if (next_put_interval.rel_value_us < delta.rel_value_us)
+  {
+    /* We were too slow, increase sub_delta! */
+    struct GNUNET_TIME_Relative corr;
+
+    corr = GNUNET_TIME_relative_subtract (delta,
+                                          next_put_interval);
+    sub_delta = GNUNET_TIME_relative_add (sub_delta,
+                                          corr);
+    if (sub_delta.rel_value_us > next_put_interval.rel_value_us)
     {
-      struct GNUNET_TIME_Relative delta;
-      unsigned long long pct = 0;
-         
-      /* How fast were we really? */
-      delta = GNUNET_TIME_absolute_get_duration (last_put_100);
-      delta.rel_value_us /= DELTA_INTERVAL;
-      last_put_100 = GNUNET_TIME_absolute_get ();
-      /* Tell statistics actual vs. desired speed */
-      GNUNET_STATISTICS_set (statistics,
-                            "Target zone iteration velocity (μs)",
-                            next_put_interval.rel_value_us,
-                            GNUNET_NO);
-      GNUNET_STATISTICS_set (statistics,
-                            "Current zone iteration velocity (μs)",
-                            delta.rel_value_us,
-                            GNUNET_NO);
-      /* update "sub_delta" based on difference, taking
-        previous sub_delta into account! */
-      if (next_put_interval.rel_value_us > delta.rel_value_us)
-      {
-       /* We were too fast, reduce sub_delta! */
-       struct GNUNET_TIME_Relative corr;
-
-       corr = GNUNET_TIME_relative_subtract (next_put_interval,
-                                             delta);
-       if (sub_delta.rel_value_us > delta.rel_value_us)
-       {
-         /* Reduce sub_delta by corr */
-         sub_delta = GNUNET_TIME_relative_subtract (sub_delta,
-                                                    corr);
-       }
-       else
-       {
-         /* We're doing fine with waiting the full time, this
-            should theoretically only happen if we run at
-            infinite speed. */
-         sub_delta = GNUNET_TIME_UNIT_ZERO;
-       }
-      }
-      else if (next_put_interval.rel_value_us < delta.rel_value_us)
-      {
-       /* We were too slow, increase sub_delta! */
-       struct GNUNET_TIME_Relative corr;
-
-       corr = GNUNET_TIME_relative_subtract (delta,
-                                             next_put_interval);
-       sub_delta = GNUNET_TIME_relative_add (sub_delta,
-                                             corr);
-       if (sub_delta.rel_value_us > next_put_interval.rel_value_us)
-       {
-         /* CPU overload detected, we cannot go at desired speed,
-            as this would mean using a negative delay. */
-         sub_delta = next_put_interval;
-         /* compute how much faster we would want to be for
-            the desired velocity */
-         if (0 == next_put_interval.rel_value_us)
-           pct = UINT64_MAX; /* desired speed is infinity ... */
-         else
-           pct = sub_delta.rel_value_us * 100 / next_put_interval.rel_value_us;
-       }
-      }
-      GNUNET_STATISTICS_set (statistics,
-                            "% speed increase needed for target velocity",
-                            pct,
-                            GNUNET_NO);
-    } /* end of periodic velocity calculations */
-    delay = GNUNET_TIME_relative_subtract (next_put_interval,
-                                          sub_delta);
-    GNUNET_assert (NULL == zone_publish_task);
-    zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                                      &publish_zone_dht_next,
-                                                      NULL);
+      /* CPU overload detected, we cannot go at desired speed,
+         as this would mean using a negative delay. */
+      /* compute how much faster we would want to be for
+         the desired velocity */
+      if (0 == next_put_interval.rel_value_us)
+        pct = UINT64_MAX; /* desired speed is infinity ... */
+      else
+        pct = (sub_delta.rel_value_us - next_put_interval.rel_value_us) * 100LLU
+          / next_put_interval.rel_value_us;
+      sub_delta = next_put_interval;
+    }
   }
-  else
+  GNUNET_STATISTICS_set (statistics,
+                         "% speed increase needed for target velocity",
+                         pct,
+                         GNUNET_NO);
+  GNUNET_STATISTICS_set (statistics,
+                         "# records processed in current iteration",
+                         num_public_records,
+                         GNUNET_NO);
+}
+
+
+/**
+ * Continuation called from DHT once the PUT operation is done.
+ *
+ * @param cls a `struct DhtPutActivity`
+ * @param success #GNUNET_OK on success
+ */
+static void
+dht_put_continuation (void *cls,
+                      int success)
+{
+  struct DhtPutActivity *ma = cls;
+
+  num_public_records++;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "PUT complete (%s)\n",
+              (GNUNET_OK == success) ? "success" : "failure");
+  GNUNET_CONTAINER_DLL_remove (it_head,
+                               it_tail,
+                               ma);
+  GNUNET_free (ma);
+  if (GNUNET_OK == success)
   {
-    GNUNET_CONTAINER_DLL_remove (ma_head,
-                                 ma_tail,
-                                 ma);
-    GNUNET_free (ma);
+    put_cnt++;
+    if (0 == put_cnt % DELTA_INTERVAL)
+      update_velocity ();
   }
+  check_zone_dht_next ();
 }
 
 
+
 /**
  * Convert namestore records from the internal format to that
  * suitable for publication (removes private records, converts
@@ -460,7 +563,8 @@ convert_records_for_export (const struct GNUNET_GNSRECORD_Data *rd,
  * @param label label to store under
  * @param rd_public public record data
  * @param rd_public_count number of records in @a rd_public
- * @param pc_arg closure argument to pass to the #dht_put_continuation
+ * @param cont function to call with PUT result
+ * @param cont_cls closure for @a cont
  * @return DHT PUT handle, NULL on error
  */
 static struct GNUNET_DHT_PutHandle *
@@ -468,7 +572,8 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
                  const char *label,
                  const struct GNUNET_GNSRECORD_Data *rd_public,
                  unsigned int rd_public_count,
-                 void *pc_arg)
+                 GNUNET_DHT_PutContinuation cont,
+                 void *cont_cls)
 {
   struct GNUNET_GNSRECORD_Block *block;
   struct GNUNET_HashCode query;
@@ -519,8 +624,8 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
                         block_size,
                         block,
                         expire,
-                        &dht_put_continuation,
-                        pc_arg);
+                        cont,
+                        cont_cls);
   GNUNET_free (block);
   return ret;
 }
@@ -642,8 +747,10 @@ put_gns_record (void *cls,
 {
   struct GNUNET_GNSRECORD_Data rd_public[rd_count];
   unsigned int rd_public_count;
+  struct DhtPutActivity *ma;
 
   (void) cls;
+  ns_iteration_left--;
   rd_public_count = convert_records_for_export (rd,
                                                 rd_count,
                                                 rd_public);
@@ -652,25 +759,30 @@ put_gns_record (void *cls,
     GNUNET_assert (NULL == zone_publish_task);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Record set empty, moving to next record set\n");
-    zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_next,
-                                                  NULL);
+    check_zone_dht_next ();
     return;
   }
   /* We got a set of records to publish */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting DHT PUT\n");
-  active_put = perform_dht_put (key,
-                                label,
-                                rd_public,
-                                rd_public_count,
-                                NULL);
-  if (NULL == active_put)
+  ma = GNUNET_new (struct DhtPutActivity);
+  ma->ph = perform_dht_put (key,
+                            label,
+                            rd_public,
+                            rd_public_count,
+                            &dht_put_continuation,
+                            ma);
+  if (NULL == ma->ph)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Could not perform DHT PUT, is the DHT running?\n");
-    dht_put_continuation (NULL,
-                          GNUNET_NO);
+    GNUNET_free (ma);
+    check_zone_dht_next ();
+    return;
   }
+  GNUNET_CONTAINER_DLL_insert (it_head,
+                               it_tail,
+                               ma);
 }
 
 
@@ -693,6 +805,7 @@ publish_zone_dht_start (void *cls)
   /* start counting again */
   num_public_records = 0;
   GNUNET_assert (NULL == namestore_iter);
+  ns_iteration_left = 1;
   namestore_iter
     = GNUNET_NAMESTORE_zone_iteration_start (namestore_handle,
                                              NULL, /* All zones */
@@ -724,7 +837,7 @@ handle_monitor_event (void *cls,
 {
   struct GNUNET_GNSRECORD_Data rd_public[rd_count];
   unsigned int rd_public_count;
-  struct MonitorActivity *ma;
+  struct DhtPutActivity *ma;
 
   GNUNET_STATISTICS_update (statistics,
                             "Namestore monitor events received",
@@ -741,11 +854,12 @@ handle_monitor_event (void *cls,
                                                 rd_public);
   if (0 == rd_public_count)
     return; /* nothing to do */
-  ma = GNUNET_new (struct MonitorActivity);
+  ma = GNUNET_new (struct DhtPutActivity);
   ma->ph = perform_dht_put (zone,
                             label,
                             rd,
                             rd_count,
+                            &dht_put_monitor_continuation,
                             ma);
   if (NULL == ma->ph)
   {
@@ -825,6 +939,7 @@ run (void *cls,
   unsigned long long max_parallel_bg_queries = 128;
 
   (void) cls;
+  last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */
   min_relative_record_time = GNUNET_TIME_UNIT_FOREVER_REL;
   namestore_handle = GNUNET_NAMESTORE_connect (c);
   if (NULL == namestore_handle)