fix bad free
[oweals/gnunet.git] / src / zonemaster / gnunet-service-zonemaster.c
index bf11f7d20d98920d712c85e6464b829d8142c646..fcc6b42265e263e2eba36478186bcc35e54b24f5 100644 (file)
@@ -1,21 +1,19 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012, 2013, 2014, 2017 GNUnet e.V.
+     Copyright (C) 2012, 2013, 2014, 2017, 2018 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 /**
 #include "gnunet_dht_service.h"
 #include "gnunet_namestore_service.h"
 #include "gnunet_statistics_service.h"
-#include "gnunet_namestore_plugin.h"
-#include "gnunet_signatures.h"
 
 
 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
 
 
+/**
+ * How often should we (re)publish each record before
+ * it expires?
+ */
+#define PUBLISH_OPS_PER_EXPIRATION 4
 
 /**
- * The initial interval in milliseconds btween puts in
- * a zone iteration
+ * How often do we measure the delta between desired zone
+ * iteration speed and actual speed, and tell statistics
+ * service about it?
  */
-#define INITIAL_PUT_INTERVAL GNUNET_TIME_UNIT_MILLISECONDS
+#define DELTA_INTERVAL 100
 
 /**
- * The lower bound for the zone iteration interval
+ * How many records do we fetch in one shot from the namestore?
  */
-#define MINIMUM_ZONE_ITERATION_INTERVAL GNUNET_TIME_UNIT_SECONDS
+#define NS_BLOCK_SIZE 1000
 
 /**
- * The upper bound for the zone iteration interval
+ * How many pending DHT operations do we allow at most?
  */
-#define MAXIMUM_ZONE_ITERATION_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
+#define DHT_QUEUE_LIMIT 2000
 
 /**
- * The default put interval for the zone iteration. In case
- * no option is found
+ * How many events may the namestore give us before it has to wait
+ * for us to keep up?
  */
-#define DEFAULT_ZONE_PUBLISH_TIME_WINDOW GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+#define NAMESTORE_QUEUE_LIMIT 50
 
 /**
- * The factor the current zone iteration interval is divided by for each
- * additional new record
+ * The initial interval in milliseconds btween puts in
+ * a zone iteration
  */
-#define LATE_ITERATION_SPEEDUP_FACTOR 2
+#define INITIAL_ZONE_ITERATION_INTERVAL GNUNET_TIME_UNIT_MILLISECONDS
+
+/**
+ * The upper bound for the zone iteration interval
+ * (per record).
+ */
+#define MAXIMUM_ZONE_ITERATION_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
 
 /**
- * How long until a DHT PUT attempt should time out?
+ * The factor the current zone iteration interval is divided by for each
+ * additional new record
  */
-#define DHT_OPERATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+#define LATE_ITERATION_SPEEDUP_FACTOR 2
 
 /**
  * What replication level do we use for DHT PUT operations?
 /**
  * Handle for DHT PUT activity triggered from the namestore monitor.
  */
-struct MonitorActivity
+struct DhtPutActivity
 {
   /**
    * Kept in a DLL.
    */
-  struct MonitorActivity *next;
+  struct DhtPutActivity *next;
 
   /**
    * Kept in a DLL.
    */
-  struct MonitorActivity *prev;
+  struct DhtPutActivity *prev;
 
   /**
    * Handle for the DHT PUT operation.
    */
   struct GNUNET_DHT_PutHandle *ph;
+
+  /**
+   * When was this PUT initiated?
+   */
+  struct GNUNET_TIME_Absolute start_date;
 };
 
 
@@ -108,11 +122,6 @@ static struct GNUNET_STATISTICS_Handle *statistics;
  */
 static struct GNUNET_DHT_Handle *dht_handle;
 
-/**
- * Active DHT put operation (or NULL)
- */
-static struct GNUNET_DHT_PutHandle *active_put;
-
 /**
  * Our handle to the namestore service
  */
@@ -124,19 +133,19 @@ static struct GNUNET_NAMESTORE_Handle *namestore_handle;
 static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter;
 
 /**
- * Handle to monitor namestore changes to instant propagation.
+ * Head of iteration put activities; kept in a DLL.
  */
-static struct GNUNET_NAMESTORE_ZoneMonitor *zmon;
+static struct DhtPutActivity *it_head;
 
 /**
- * Head of monitor activities; kept in a DLL.
+ * Tail of iteration put activities; kept in a DLL.
  */
-static struct MonitorActivity *ma_head;
+static struct DhtPutActivity *it_tail;
 
 /**
- * Tail of monitor activities; kept in a DLL.
+ * Number of entries in the DHT queue #it_head.
  */
-static struct MonitorActivity *ma_tail;
+static unsigned int dht_queue_length;
 
 /**
  * Useful for zone update for DHT put
@@ -148,6 +157,21 @@ static unsigned long long num_public_records;
  */
 static unsigned long long last_num_public_records;
 
+/**
+ * Number of successful put operations performed in the current
+ * measurement cycle (as measured in #check_zone_namestore_next()).
+ */
+static unsigned long long put_cnt;
+
+/**
+ * What is the frequency at which we currently would like
+ * to perform DHT puts (per record)?  Calculated in
+ * update_velocity() from the #zone_publish_time_window()
+ * and the total number of record sets we have (so far)
+ * observed in the zone.
+ */
+static struct GNUNET_TIME_Relative target_iteration_velocity_per_record;
+
 /**
  * Minimum relative expiration time of records seem during the current
  * zone iteration.
@@ -155,9 +179,10 @@ static unsigned long long last_num_public_records;
 static struct GNUNET_TIME_Relative min_relative_record_time;
 
 /**
- * Zone iteration PUT interval.
+ * Minimum relative expiration time of records seem during the last
+ * zone iteration.
  */
-static struct GNUNET_TIME_Relative put_interval;
+static struct GNUNET_TIME_Relative last_min_relative_record_time;
 
 /**
  * Default time window for zone iteration
@@ -170,16 +195,43 @@ static struct GNUNET_TIME_Relative zone_publish_time_window_default;
  */
 static struct GNUNET_TIME_Relative zone_publish_time_window;
 
+/**
+ * When did we last start measuring the #DELTA_INTERVAL successful
+ * DHT puts? Used for velocity calculations.
+ */
+static struct GNUNET_TIME_Absolute last_put_100;
+
+/**
+ * By how much should we try to increase our per-record iteration speed
+ * (over the desired speed calculated directly from the #put_interval)?
+ * Basically this value corresponds to the per-record CPU time overhead
+ * we have.
+ */
+static struct GNUNET_TIME_Relative sub_delta;
+
 /**
  * zone publish task
  */
 static struct GNUNET_SCHEDULER_Task *zone_publish_task;
 
+/**
+ * How many more values are left for the current query before we need
+ * to explicitly ask the namestore for more?
+ */
+static unsigned int ns_iteration_left;
+
 /**
  * #GNUNET_YES if zone has never been published before
  */
 static int first_zone_iteration;
 
+/**
+ * Optimize block insertion by caching map of private keys to
+ * public keys in memory?
+ */
+static int cache_keys;
+
+
 /**
  * Task run during shutdown.
  *
@@ -189,16 +241,19 @@ static int first_zone_iteration;
 static void
 shutdown_task (void *cls)
 {
-  struct MonitorActivity *ma;
+  struct DhtPutActivity *ma;
 
+  (void) cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Shutting down!\n");
-  while (NULL != (ma = ma_head))
+  while (NULL != (ma = it_head))
   {
     GNUNET_DHT_put_cancel (ma->ph);
-    GNUNET_CONTAINER_DLL_remove (ma_head,
-                                 ma_tail,
+    dht_queue_length--;
+    GNUNET_CONTAINER_DLL_remove (it_head,
+                                 it_tail,
                                  ma);
+    dht_queue_length--;
     GNUNET_free (ma);
   }
   if (NULL != statistics)
@@ -217,21 +272,11 @@ shutdown_task (void *cls)
     GNUNET_NAMESTORE_zone_iteration_stop (namestore_iter);
     namestore_iter = NULL;
   }
-  if (NULL != zmon)
-  {
-    GNUNET_NAMESTORE_zone_monitor_stop (zmon);
-    zmon = NULL;
-  }
   if (NULL != namestore_handle)
   {
     GNUNET_NAMESTORE_disconnect (namestore_handle);
     namestore_handle = NULL;
   }
-  if (NULL != active_put)
-  {
-    GNUNET_DHT_put_cancel (active_put);
-    active_put = NULL;
-  }
   if (NULL != dht_handle)
   {
     GNUNET_DHT_disconnect (dht_handle);
@@ -243,14 +288,18 @@ shutdown_task (void *cls)
 /**
  * Method called periodically that triggers iteration over authoritative records
  *
- * @param cls closure
+ * @param cls NULL
  */
 static void
-publish_zone_dht_next (void *cls)
+publish_zone_namestore_next (void *cls)
 {
+  (void) cls;
   zone_publish_task = NULL;
   GNUNET_assert (NULL != namestore_iter);
-  GNUNET_NAMESTORE_zone_iterator_next (namestore_iter);
+  GNUNET_assert (0 == ns_iteration_left);
+  ns_iteration_left = NS_BLOCK_SIZE;
+  GNUNET_NAMESTORE_zone_iterator_next (namestore_iter,
+                                       NS_BLOCK_SIZE);
 }
 
 
@@ -264,58 +313,207 @@ publish_zone_dht_start (void *cls);
 
 
 /**
- * Continuation called from DHT once the PUT operation is done.
+ * Calculate #target_iteration_velocity_per_record.
+ */
+static void
+calculate_put_interval ()
+{
+  if (0 == num_public_records)
+  {
+    /**
+     * If no records are known (startup) or none present
+     * we can safely set the interval to the value for a single
+     * record
+     */
+    target_iteration_velocity_per_record = zone_publish_time_window;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+                "No records in namestore database.\n");
+  }
+  else
+  {
+    last_min_relative_record_time
+      = GNUNET_TIME_relative_min (last_min_relative_record_time,
+                                 min_relative_record_time);
+    zone_publish_time_window
+      = GNUNET_TIME_relative_min (GNUNET_TIME_relative_divide (last_min_relative_record_time,
+                                                              PUBLISH_OPS_PER_EXPIRATION),
+                                  zone_publish_time_window_default);
+    target_iteration_velocity_per_record
+      = GNUNET_TIME_relative_divide (zone_publish_time_window,
+                                    last_num_public_records);
+  }
+  target_iteration_velocity_per_record
+    = GNUNET_TIME_relative_min (target_iteration_velocity_per_record,
+                               MAXIMUM_ZONE_ITERATION_INTERVAL);
+  GNUNET_STATISTICS_set (statistics,
+                        "Minimum relative record expiration (in μs)",
+                        last_min_relative_record_time.rel_value_us,
+                        GNUNET_NO);
+  GNUNET_STATISTICS_set (statistics,
+                        "Zone publication time window (in μs)",
+                        zone_publish_time_window.rel_value_us,
+                        GNUNET_NO);
+  GNUNET_STATISTICS_set (statistics,
+                         "Target zone iteration velocity (μs)",
+                         target_iteration_velocity_per_record.rel_value_us,
+                         GNUNET_NO);
+}
+
+
+/**
+ * Re-calculate our velocity and the desired velocity.
+ * We have succeeded in making #DELTA_INTERVAL puts, so
+ * now calculate the new desired delay between puts.
  *
- * @param cls closure, NULL if called from regular iteration,
- *        `struct MonitorActivity` if called from #handle_monitor_event.
- * @param success #GNUNET_OK on success
+ * @param cnt how many records were processed since the last call?
  */
 static void
-dht_put_continuation (void *cls,
-                      int success)
+update_velocity (unsigned int cnt)
 {
-  struct MonitorActivity *ma = cls;
-  struct GNUNET_TIME_Relative next_put_interval;
+  struct GNUNET_TIME_Relative delta;
+  unsigned long long pct = 0;
 
-  num_public_records++;
-  if (NULL == ma)
+  if (0 == cnt)
+    return;
+  /* How fast were we really? */
+  delta = GNUNET_TIME_absolute_get_duration (last_put_100);
+  delta.rel_value_us /= cnt;
+  last_put_100 = GNUNET_TIME_absolute_get ();
+
+  /* calculate expected frequency */
+  if ( (num_public_records > last_num_public_records) &&
+       (GNUNET_NO == first_zone_iteration) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Last record count was lower than current record count.  Reducing interval.\n");
+    last_num_public_records = num_public_records * LATE_ITERATION_SPEEDUP_FACTOR;
+    calculate_put_interval ();
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Desired global zone iteration interval is %s/record!\n",
+              GNUNET_STRINGS_relative_time_to_string (target_iteration_velocity_per_record,
+                                                      GNUNET_YES));
+
+  /* Tell statistics actual vs. desired speed */
+  GNUNET_STATISTICS_set (statistics,
+                         "Current zone iteration velocity (μs/record)",
+                         delta.rel_value_us,
+                         GNUNET_NO);
+  /* update "sub_delta" based on difference, taking
+     previous sub_delta into account! */
+  if (target_iteration_velocity_per_record.rel_value_us > delta.rel_value_us)
   {
-    active_put = NULL;
-    if ( (num_public_records > last_num_public_records) &&
-         (GNUNET_NO == first_zone_iteration) )
+    /* We were too fast, reduce sub_delta! */
+    struct GNUNET_TIME_Relative corr;
+
+    corr = GNUNET_TIME_relative_subtract (target_iteration_velocity_per_record,
+                                          delta);
+    if (sub_delta.rel_value_us > delta.rel_value_us)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Last record count was lower than current record count.  Reducing interval.\n");
-      put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
-                                                  num_public_records);
-      next_put_interval = GNUNET_TIME_relative_divide (put_interval,
-                                                       LATE_ITERATION_SPEEDUP_FACTOR);
+      /* Reduce sub_delta by corr */
+      sub_delta = GNUNET_TIME_relative_subtract (sub_delta,
+                                                 corr);
     }
     else
-      next_put_interval = put_interval;
-    next_put_interval = GNUNET_TIME_relative_min (next_put_interval,
-                                                  MAXIMUM_ZONE_ITERATION_INTERVAL);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "PUT complete, next PUT in %s!\n",
-                GNUNET_STRINGS_relative_time_to_string (next_put_interval,
-                                                        GNUNET_YES));
-
-    GNUNET_STATISTICS_set (statistics,
-                           "Current zone iteration interval (ms)",
-                           next_put_interval.rel_value_us / 1000LL,
-                           GNUNET_NO);
-    GNUNET_assert (NULL == zone_publish_task);
-    zone_publish_task = GNUNET_SCHEDULER_add_delayed (next_put_interval,
-                                                      &publish_zone_dht_next,
-                                                      NULL);
+    {
+      /* We're doing fine with waiting the full time, this
+         should theoretically only happen if we run at
+         infinite speed. */
+      sub_delta = GNUNET_TIME_UNIT_ZERO;
+    }
   }
-  else
+  else if (target_iteration_velocity_per_record.rel_value_us < delta.rel_value_us)
   {
-    GNUNET_CONTAINER_DLL_remove (ma_head,
-                                 ma_tail,
-                                 ma);
-    GNUNET_free (ma);
+    /* We were too slow, increase sub_delta! */
+    struct GNUNET_TIME_Relative corr;
+
+    corr = GNUNET_TIME_relative_subtract (delta,
+                                          target_iteration_velocity_per_record);
+    sub_delta = GNUNET_TIME_relative_add (sub_delta,
+                                          corr);
+    if (sub_delta.rel_value_us > target_iteration_velocity_per_record.rel_value_us)
+    {
+      /* CPU overload detected, we cannot go at desired speed,
+         as this would mean using a negative delay. */
+      /* compute how much faster we would want to be for
+         the desired velocity */
+      if (0 == target_iteration_velocity_per_record.rel_value_us)
+        pct = UINT64_MAX; /* desired speed is infinity ... */
+      else
+        pct = (sub_delta.rel_value_us -
+              target_iteration_velocity_per_record.rel_value_us) * 100LLU
+          / target_iteration_velocity_per_record.rel_value_us;
+      sub_delta = target_iteration_velocity_per_record;
+    }
   }
+  GNUNET_STATISTICS_set (statistics,
+                         "# size of the DHT queue (it)",
+                         dht_queue_length,
+                         GNUNET_NO);
+  GNUNET_STATISTICS_set (statistics,
+                         "% speed increase needed for target velocity",
+                         pct,
+                         GNUNET_NO);
+  GNUNET_STATISTICS_set (statistics,
+                         "# records processed in current iteration",
+                         num_public_records,
+                         GNUNET_NO);
+}
+
+
+/**
+ * Check if the current zone iteration needs to be continued
+ * by calling #publish_zone_namestore_next(), and if so with what delay.
+ */
+static void
+check_zone_namestore_next ()
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (0 != ns_iteration_left)
+    return; /* current NAMESTORE iteration not yet done */
+  update_velocity (put_cnt);
+  put_cnt = 0;
+  delay = GNUNET_TIME_relative_subtract (target_iteration_velocity_per_record,
+                                         sub_delta);
+  /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
+     per-record delay calculated so far with the #NS_BLOCK_SIZE */
+  GNUNET_STATISTICS_set (statistics,
+                         "Current artificial NAMESTORE delay (μs/record)",
+                         delay.rel_value_us,
+                         GNUNET_NO);
+  delay = GNUNET_TIME_relative_multiply (delay,
+                                         NS_BLOCK_SIZE);
+  /* make sure we do not overshoot because of the #NS_BLOCK_SIZE factor */
+  delay = GNUNET_TIME_relative_min (MAXIMUM_ZONE_ITERATION_INTERVAL,
+                                    delay);
+  /* no delays on first iteration */
+  if (GNUNET_YES == first_zone_iteration)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  GNUNET_assert (NULL == zone_publish_task);
+  zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                    &publish_zone_namestore_next,
+                                                    NULL);
+}
+
+
+/**
+ * Continuation called from DHT once the PUT operation is done.
+ *
+ * @param cls a `struct DhtPutActivity`
+ */
+static void
+dht_put_continuation (void *cls)
+{
+  struct DhtPutActivity *ma = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "PUT complete\n");
+  dht_queue_length--;
+  GNUNET_CONTAINER_DLL_remove (it_head,
+                               it_tail,
+                               ma);
+  GNUNET_free (ma);
 }
 
 
@@ -340,24 +538,22 @@ convert_records_for_export (const struct GNUNET_GNSRECORD_Data *rd,
   rd_public_count = 0;
   now = GNUNET_TIME_absolute_get ();
   for (unsigned int i=0;i<rd_count;i++)
-    if (0 == (rd[i].flags & GNUNET_GNSRECORD_RF_PRIVATE))
+  {
+    if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_PRIVATE))
+      continue;
+    if ( (0 == (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION)) &&
+         (rd[i].expiration_time < now.abs_value_us) )
+      continue;  /* record already expired, skip it */
+    if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
     {
-      rd_public[rd_public_count] = rd[i];
-      if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
-      {
-        /* GNUNET_GNSRECORD_block_create will convert to absolute time;
-           we just need to adjust our iteration frequency */
-        min_relative_record_time.rel_value_us =
-          GNUNET_MIN (rd_public[rd_public_count].expiration_time,
-                      min_relative_record_time.rel_value_us);
-      }
-      else if (rd_public[rd_public_count].expiration_time < now.abs_value_us)
-      {
-        /* record already expired, skip it */
-        continue;
-      }
-      rd_public_count++;
+      /* GNUNET_GNSRECORD_block_create will convert to absolute time;
+         we just need to adjust our iteration frequency */
+      min_relative_record_time.rel_value_us =
+        GNUNET_MIN (rd[i].expiration_time,
+                    min_relative_record_time.rel_value_us);
     }
+    rd_public[rd_public_count++] = rd[i];
+  }
   return rd_public_count;
 }
 
@@ -369,7 +565,7 @@ convert_records_for_export (const struct GNUNET_GNSRECORD_Data *rd,
  * @param label label to store under
  * @param rd_public public record data
  * @param rd_public_count number of records in @a rd_public
- * @param pc_arg closure argument to pass to the #dht_put_continuation
+ * @param ma handle for the put operation
  * @return DHT PUT handle, NULL on error
  */
 static struct GNUNET_DHT_PutHandle *
@@ -377,7 +573,7 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
                  const char *label,
                  const struct GNUNET_GNSRECORD_Data *rd_public,
                  unsigned int rd_public_count,
-                 void *pc_arg)
+                 struct DhtPutActivity *ma)
 {
   struct GNUNET_GNSRECORD_Block *block;
   struct GNUNET_HashCode query;
@@ -387,13 +583,23 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
 
   expire = GNUNET_GNSRECORD_record_get_expiration_time (rd_public_count,
                                                         rd_public);
-  block = GNUNET_GNSRECORD_block_create (key,
-                                         expire,
-                                         label,
-                                         rd_public,
-                                         rd_public_count);
+  if (cache_keys)
+    block = GNUNET_GNSRECORD_block_create2 (key,
+                                            expire,
+                                            label,
+                                            rd_public,
+                                            rd_public_count);
+  else
+    block = GNUNET_GNSRECORD_block_create (key,
+                                           expire,
+                                           label,
+                                           rd_public,
+                                           rd_public_count);
   if (NULL == block)
+  {
+    GNUNET_break (0);
     return NULL; /* whoops */
+  }
   block_size = ntohl (block->purpose.size)
     + sizeof (struct GNUNET_CRYPTO_EcdsaSignature)
     + sizeof (struct GNUNET_CRYPTO_EcdsaPublicKey);
@@ -410,6 +616,7 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
               label,
               GNUNET_STRINGS_absolute_time_to_string (expire),
               GNUNET_h2s (&query));
+  num_public_records++;
   ret = GNUNET_DHT_put (dht_handle,
                         &query,
                         DHT_GNS_REPLICATION_LEVEL,
@@ -419,7 +626,7 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
                         block,
                         expire,
                         &dht_put_continuation,
-                        pc_arg);
+                        ma);
   GNUNET_free (block);
   return ret;
 }
@@ -445,11 +652,6 @@ zone_iteration_error (void *cls)
     GNUNET_SCHEDULER_cancel (zone_publish_task);
     zone_publish_task = NULL;
   }
-  if (NULL != active_put)
-  {
-    GNUNET_DHT_put_cancel (active_put);
-    active_put = NULL;
-  }
   zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
                                                 NULL);
 }
@@ -468,58 +670,35 @@ zone_iteration_finished (void *cls)
   namestore_iter = NULL;
   last_num_public_records = num_public_records;
   first_zone_iteration = GNUNET_NO;
-  if (0 == num_public_records)
-  {
-    /**
-     * If no records are known (startup) or none present
-     * we can safely set the interval to the value for a single
-     * record
-     */
-    put_interval = zone_publish_time_window;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                "No records in namestore database.\n");
-  }
-  else
-  {
-    /* If records are present, next publication is based on the minimum
-     * relative expiration time of the records published divided by 4
-     */
-    zone_publish_time_window
-      = GNUNET_TIME_relative_min (GNUNET_TIME_relative_divide (min_relative_record_time, 4),
-                                  zone_publish_time_window_default);
-    put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
-                                                num_public_records);
-  }
+  last_min_relative_record_time = min_relative_record_time;
+  calculate_put_interval ();
   /* reset for next iteration */
-  min_relative_record_time = GNUNET_TIME_UNIT_FOREVER_REL;
-  put_interval = GNUNET_TIME_relative_max (MINIMUM_ZONE_ITERATION_INTERVAL,
-                                           put_interval);
-  put_interval = GNUNET_TIME_relative_min (put_interval,
-                                           MAXIMUM_ZONE_ITERATION_INTERVAL);
+  min_relative_record_time
+    = GNUNET_TIME_UNIT_FOREVER_REL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Zone iteration finished. Adjusted zone iteration interval to %s\n",
-              GNUNET_STRINGS_relative_time_to_string (put_interval,
+              GNUNET_STRINGS_relative_time_to_string (target_iteration_velocity_per_record,
                                                       GNUNET_YES));
   GNUNET_STATISTICS_set (statistics,
-                         "Current zone iteration interval (in ms)",
-                         put_interval.rel_value_us / 1000LL,
+                         "Target zone iteration velocity (μs)",
+                         target_iteration_velocity_per_record.rel_value_us,
                          GNUNET_NO);
-  GNUNET_STATISTICS_update (statistics,
-                            "Number of zone iterations",
-                            1,
-                            GNUNET_NO);
   GNUNET_STATISTICS_set (statistics,
                          "Number of public records in DHT",
                          last_num_public_records,
                          GNUNET_NO);
   GNUNET_assert (NULL == zone_publish_task);
-  if (0 == num_public_records)
-    zone_publish_task = GNUNET_SCHEDULER_add_delayed (put_interval,
+  if (0 == last_num_public_records)
+  {
+    zone_publish_task = GNUNET_SCHEDULER_add_delayed (target_iteration_velocity_per_record,
                                                       &publish_zone_dht_start,
                                                       NULL);
+  }
   else
+  {
     zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
                                                   NULL);
+  }
 }
 
 
@@ -541,32 +720,58 @@ put_gns_record (void *cls,
 {
   struct GNUNET_GNSRECORD_Data rd_public[rd_count];
   unsigned int rd_public_count;
+  struct DhtPutActivity *ma;
 
   (void) cls;
+  ns_iteration_left--;
   rd_public_count = convert_records_for_export (rd,
                                                 rd_count,
                                                 rd_public);
   if (0 == rd_public_count)
   {
-    GNUNET_assert (NULL == zone_publish_task);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Record set empty, moving to next record set\n");
-    zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_next,
-                                                  NULL);
+    check_zone_namestore_next ();
     return;
   }
   /* We got a set of records to publish */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting DHT PUT\n");
-  active_put = perform_dht_put (key,
-                                label,
-                                rd_public,
-                                rd_public_count,
-                                NULL);
-  if (NULL == active_put)
+  ma = GNUNET_new (struct DhtPutActivity);
+  ma->start_date = GNUNET_TIME_absolute_get ();
+  ma->ph = perform_dht_put (key,
+                            label,
+                            rd_public,
+                            rd_public_count,
+                            ma);
+  put_cnt++;
+  if (0 == put_cnt % DELTA_INTERVAL)
+    update_velocity (DELTA_INTERVAL);
+  check_zone_namestore_next ();
+  if (NULL == ma->ph)
   {
-    GNUNET_break (0);
-    dht_put_continuation (NULL, GNUNET_NO);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Could not perform DHT PUT, is the DHT running?\n");
+    GNUNET_free (ma);
+    return;
+  }
+  dht_queue_length++;
+  GNUNET_CONTAINER_DLL_insert_tail (it_head,
+                                    it_tail,
+                                    ma);
+  if (dht_queue_length > DHT_QUEUE_LIMIT)
+  {
+    ma = it_head;
+    GNUNET_CONTAINER_DLL_remove (it_head,
+                                 it_tail,
+                                 ma);
+    GNUNET_DHT_put_cancel (ma->ph);
+    dht_queue_length--;
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "DHT PUT unconfirmed after %s, aborting PUT\n",
+                GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (ma->start_date),
+                                                        GNUNET_YES));
+    GNUNET_free (ma);
   }
 }
 
@@ -579,6 +784,7 @@ put_gns_record (void *cls,
 static void
 publish_zone_dht_start (void *cls)
 {
+  (void) cls;
   zone_publish_task = NULL;
   GNUNET_STATISTICS_update (statistics,
                             "Full zone iterations launched",
@@ -589,6 +795,7 @@ publish_zone_dht_start (void *cls)
   /* start counting again */
   num_public_records = 0;
   GNUNET_assert (NULL == namestore_iter);
+  ns_iteration_left = 1;
   namestore_iter
     = GNUNET_NAMESTORE_zone_iteration_start (namestore_handle,
                                              NULL, /* All zones */
@@ -598,111 +805,7 @@ publish_zone_dht_start (void *cls)
                                              NULL,
                                              &zone_iteration_finished,
                                              NULL);
-}
-
-
-/**
- * Process a record that was stored in the namestore
- * (invoked by the monitor).
- *
- * @param cls closure, NULL
- * @param zone private key of the zone; NULL on disconnect
- * @param label label of the records; NULL on disconnect
- * @param rd_count number of entries in @a rd array, 0 if label was deleted
- * @param rd array of records with data to store
- */
-static void
-handle_monitor_event (void *cls,
-                      const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
-                      const char *label,
-                      unsigned int rd_count,
-                      const struct GNUNET_GNSRECORD_Data *rd)
-{
-  struct GNUNET_GNSRECORD_Data rd_public[rd_count];
-  unsigned int rd_public_count;
-  struct MonitorActivity *ma;
-
-  GNUNET_STATISTICS_update (statistics,
-                            "Namestore monitor events received",
-                            1,
-                            GNUNET_NO);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received %u records for label `%s' via namestore monitor\n",
-              rd_count,
-              label);
-  /* filter out records that are not public, and convert to
-     absolute expiration time. */
-  rd_public_count = convert_records_for_export (rd,
-                                                rd_count,
-                                                rd_public);
-  if (0 == rd_public_count)
-    return; /* nothing to do */
-  ma = GNUNET_new (struct MonitorActivity);
-  ma->ph = perform_dht_put (zone,
-                            label,
-                            rd,
-                            rd_count,
-                            ma);
-  if (NULL == ma->ph)
-  {
-    /* PUT failed, do not remember operation */
-    GNUNET_free (ma);
-    return;
-  }
-  GNUNET_CONTAINER_DLL_insert (ma_head,
-                               ma_tail,
-                               ma);
-}
-
-
-/**
- * The zone monitor is now in SYNC with the current state of the
- * name store.  Start to perform periodic iterations.
- *
- * @param cls NULL
- */
-static void
-monitor_sync_event (void *cls)
-{
-  (void) cls;
-  if ( (NULL == zone_publish_task) &&
-       (NULL == namestore_iter) )
-    zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
-                                                  NULL);
-}
-
-
-/**
- * The zone monitor encountered an IPC error trying to to get in
- * sync. Restart from the beginning.
- *
- * @param cls NULL
- */
-static void
-handle_monitor_error (void *cls)
-{
-  (void) cls;
-  GNUNET_STATISTICS_update (statistics,
-                            "Namestore monitor errors encountered",
-                            1,
-                            GNUNET_NO);
-  if (NULL != zone_publish_task)
-  {
-    GNUNET_SCHEDULER_cancel (zone_publish_task);
-    zone_publish_task = NULL;
-  }
-  if (NULL != namestore_iter)
-  {
-    GNUNET_NAMESTORE_zone_iteration_stop (namestore_iter);
-    namestore_iter = NULL;
-  }
-  if (NULL != active_put)
-  {
-    GNUNET_DHT_put_cancel (active_put);
-    active_put = NULL;
-  }
-  zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
-                                                NULL);
+  GNUNET_assert (NULL != namestore_iter);
 }
 
 
@@ -721,7 +824,11 @@ run (void *cls,
   unsigned long long max_parallel_bg_queries = 128;
 
   (void) cls;
-  min_relative_record_time = GNUNET_TIME_UNIT_FOREVER_REL;
+  (void) service;
+  last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */
+  min_relative_record_time
+    = GNUNET_TIME_UNIT_FOREVER_REL;
+  target_iteration_velocity_per_record = INITIAL_ZONE_ITERATION_INTERVAL;
   namestore_handle = GNUNET_NAMESTORE_connect (c);
   if (NULL == namestore_handle)
   {
@@ -730,9 +837,10 @@ run (void *cls,
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-
-  put_interval = INITIAL_PUT_INTERVAL;
-  zone_publish_time_window_default = DEFAULT_ZONE_PUBLISH_TIME_WINDOW;
+  cache_keys = GNUNET_CONFIGURATION_get_value_yesno (c,
+                                                     "namestore",
+                                                     "CACHE_KEYS");
+  zone_publish_time_window_default = GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY;
   if (GNUNET_OK ==
       GNUNET_CONFIGURATION_get_value_time (c,
                                           "zonemaster",
@@ -763,25 +871,23 @@ run (void *cls,
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("Could not connect to DHT!\n"));
-    GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+    GNUNET_SCHEDULER_add_now (&shutdown_task,
+                             NULL);
     return;
   }
 
   /* Schedule periodic put for our records. */
-  first_zone_iteration = GNUNET_YES;\
+  first_zone_iteration = GNUNET_YES;
   statistics = GNUNET_STATISTICS_create ("zonemaster",
                                          c);
-  zmon = GNUNET_NAMESTORE_zone_monitor_start (c,
-                                              NULL,
-                                              GNUNET_NO,
-                                              &handle_monitor_error,
-                                              NULL,
-                                              &handle_monitor_event,
-                                              NULL,
-                                              &monitor_sync_event,
-                                              NULL);
-  GNUNET_break (NULL != zmon);
-  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
+  GNUNET_STATISTICS_set (statistics,
+                         "Target zone iteration velocity (μs)",
+                         target_iteration_velocity_per_record.rel_value_us,
+                         GNUNET_NO);
+  zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
+                                                NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
 }