#define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
+/**
+ * How often do we measure the delta between desired zone
+ * iteration speed and actual speed, and tell statistics
+ * service about it?
+ */
+#define DELTA_INTERVAL 100
+
+/**
+ * How many records do we fetch
+ * in one shot from the namestore?
+ */
+#define NS_BLOCK_SIZE 100
/**
* The initial interval in milliseconds btween puts in
/**
* Handle for DHT PUT activity triggered from the namestore monitor.
*/
-struct MonitorActivity
+struct DhtPutActivity
{
/**
* Kept in a DLL.
*/
- struct MonitorActivity *next;
+ struct DhtPutActivity *next;
/**
* Kept in a DLL.
*/
- struct MonitorActivity *prev;
+ struct DhtPutActivity *prev;
/**
* Handle for the DHT PUT operation.
/**
* Head of monitor activities; kept in a DLL.
*/
-static struct MonitorActivity *ma_head;
+static struct DhtPutActivity *ma_head;
/**
* Tail of monitor activities; kept in a DLL.
*/
-static struct MonitorActivity *ma_tail;
+static struct DhtPutActivity *ma_tail;
+
+/**
+ * Head of iteration put activities; kept in a DLL.
+ */
+static struct DhtPutActivity *it_head;
+
+/**
+ * Tail of iteration put activities; kept in a DLL.
+ */
+static struct DhtPutActivity *it_tail;
/**
* Useful for zone update for DHT put
*/
static unsigned long long last_num_public_records;
+/**
+ * Number of successful put operations performed in the current
+ * measurement cycle (as measured in #check_zone_dht_next()).
+ */
+static unsigned long long put_cnt;
+
+/**
+ * What is the frequency at which we currently would like
+ * to perform DHT puts (per record)? Calculated in
+ * update_velocity() from the #zone_publish_time_window()
+ * and the total number of record sets we have (so far)
+ * observed in the zone.
+ */
+static struct GNUNET_TIME_Relative next_put_interval;
+
/**
* Minimum relative expiration time of records seem during the current
* zone iteration.
*/
static struct GNUNET_TIME_Relative zone_publish_time_window;
+/**
+ * When did we last start measuring the #DELTA_INTERVAL successful
+ * DHT puts? Used for velocity calculations.
+ */
+static struct GNUNET_TIME_Absolute last_put_100;
+
+/**
+ * By how much should we try to increase our per-record iteration speed
+ * (over the desired speed calculated directly from the #put_interval)?
+ * Basically this value corresponds to the per-record CPU time overhead
+ * we have.
+ */
+static struct GNUNET_TIME_Relative sub_delta;
+
/**
* zone publish task
*/
static struct GNUNET_SCHEDULER_Task *zone_publish_task;
+/**
+ * How many more values are left for the current query before we need
+ * to explicitly ask the namestore for more?
+ */
+static unsigned int ns_iteration_left;
+
/**
* #GNUNET_YES if zone has never been published before
*/
static void
shutdown_task (void *cls)
{
- struct MonitorActivity *ma;
+ struct DhtPutActivity *ma;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down!\n");
{
zone_publish_task = NULL;
GNUNET_assert (NULL != namestore_iter);
+ GNUNET_assert (0 == ns_iteration_left);
+ ns_iteration_left = NS_BLOCK_SIZE;
GNUNET_NAMESTORE_zone_iterator_next (namestore_iter,
- 1);
+ NS_BLOCK_SIZE);
}
/**
- * How often do we measure the delta between desired zone
- * iteration speed and actual speed, and tell statistics
- * service about it?
+ * Continuation called from DHT once the PUT operation triggered
+ * by a monitor is done.
+ *
+ * @param cls a `struct DhtPutActivity`
+ * @param success #GNUNET_OK on success
*/
-#define DELTA_INTERVAL 100
+static void
+dht_put_monitor_continuation (void *cls,
+ int success)
+{
+ struct DhtPutActivity *ma = cls;
+
+ num_public_records++;
+ GNUNET_CONTAINER_DLL_remove (ma_head,
+ ma_tail,
+ ma);
+ GNUNET_free (ma);
+}
/**
- * Continuation called from DHT once the PUT operation is done.
- *
- * @param cls closure, NULL if called from regular iteration,
- * `struct MonitorActivity` if called from #handle_monitor_event.
- * @param success #GNUNET_OK on success
+ * Check if the current zone iteration needs to be continued
+ * by calling #publish_zone_dht_next(), and if so with what delay.
*/
static void
-dht_put_continuation (void *cls,
- int success)
+check_zone_dht_next ()
{
- struct MonitorActivity *ma = cls;
- static unsigned long long put_cnt;
- static struct GNUNET_TIME_Absolute last_put_100;
- static struct GNUNET_TIME_Relative sub_delta;
- struct GNUNET_TIME_Relative next_put_interval;
struct GNUNET_TIME_Relative delay;
- num_public_records++;
- if (NULL == ma)
+ if (0 != ns_iteration_left)
+ return; /* current NAMESTORE iteration not yet done */
+ if (NULL != it_head)
+ return; /* waiting on DHT */
+ delay = GNUNET_TIME_relative_subtract (next_put_interval,
+ sub_delta);
+ /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
+ per-record delay calculated so far with the #NS_BLOCK_SIZE */
+ delay = GNUNET_TIME_relative_multiply (delay,
+ NS_BLOCK_SIZE);
+ GNUNET_assert (NULL == zone_publish_task);
+ zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &publish_zone_dht_next,
+ NULL);
+}
+
+
+/**
+ * Re-calculate our velocity and the desired velocity.
+ * We have succeeded in making #DELTA_INTERVAL puts, so
+ * now calculate the new desired delay between puts.
+ */
+static void
+update_velocity ()
+{
+ struct GNUNET_TIME_Relative delta;
+ unsigned long long pct = 0;
+
+ /* How fast were we really? */
+ delta = GNUNET_TIME_absolute_get_duration (last_put_100);
+ delta.rel_value_us /= DELTA_INTERVAL;
+ last_put_100 = GNUNET_TIME_absolute_get ();
+
+ /* calculate expected frequency */
+ if ( (num_public_records > last_num_public_records) &&
+ (GNUNET_NO == first_zone_iteration) )
{
- active_put = NULL;
- if ( (num_public_records > last_num_public_records) &&
- (GNUNET_NO == first_zone_iteration) )
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Last record count was lower than current record count. Reducing interval.\n");
+ put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
+ num_public_records);
+ next_put_interval = GNUNET_TIME_relative_divide (put_interval,
+ LATE_ITERATION_SPEEDUP_FACTOR);
+ }
+ else
+ {
+ next_put_interval = put_interval;
+ }
+
+ next_put_interval = GNUNET_TIME_relative_min (next_put_interval,
+ MAXIMUM_ZONE_ITERATION_INTERVAL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Desired global zone iteration interval is %s/record!\n",
+ GNUNET_STRINGS_relative_time_to_string (next_put_interval,
+ GNUNET_YES));
+
+ /* Tell statistics actual vs. desired speed */
+ GNUNET_STATISTICS_set (statistics,
+ "Target zone iteration velocity (μs)",
+ next_put_interval.rel_value_us,
+ GNUNET_NO);
+ GNUNET_STATISTICS_set (statistics,
+ "Current zone iteration velocity (μs)",
+ delta.rel_value_us,
+ GNUNET_NO);
+ /* update "sub_delta" based on difference, taking
+ previous sub_delta into account! */
+ if (next_put_interval.rel_value_us > delta.rel_value_us)
+ {
+ /* We were too fast, reduce sub_delta! */
+ struct GNUNET_TIME_Relative corr;
+
+ corr = GNUNET_TIME_relative_subtract (next_put_interval,
+ delta);
+ if (sub_delta.rel_value_us > delta.rel_value_us)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Last record count was lower than current record count. Reducing interval.\n");
- put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
- num_public_records);
- next_put_interval = GNUNET_TIME_relative_divide (put_interval,
- LATE_ITERATION_SPEEDUP_FACTOR);
+ /* Reduce sub_delta by corr */
+ sub_delta = GNUNET_TIME_relative_subtract (sub_delta,
+ corr);
}
else
{
- next_put_interval = put_interval;
+ /* We're doing fine with waiting the full time, this
+ should theoretically only happen if we run at
+ infinite speed. */
+ sub_delta = GNUNET_TIME_UNIT_ZERO;
}
- next_put_interval = GNUNET_TIME_relative_min (next_put_interval,
- MAXIMUM_ZONE_ITERATION_INTERVAL);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "PUT complete, next PUT in %s!\n",
- GNUNET_STRINGS_relative_time_to_string (next_put_interval,
- GNUNET_YES));
- /* compute velocities and delay corrections to apply */
- if (0 == put_cnt)
- last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */
- put_cnt++;
- if (0 == put_cnt % DELTA_INTERVAL)
+ }
+ else if (next_put_interval.rel_value_us < delta.rel_value_us)
+ {
+ /* We were too slow, increase sub_delta! */
+ struct GNUNET_TIME_Relative corr;
+
+ corr = GNUNET_TIME_relative_subtract (delta,
+ next_put_interval);
+ sub_delta = GNUNET_TIME_relative_add (sub_delta,
+ corr);
+ if (sub_delta.rel_value_us > next_put_interval.rel_value_us)
{
- struct GNUNET_TIME_Relative delta;
- unsigned long long pct = 0;
-
- /* How fast were we really? */
- delta = GNUNET_TIME_absolute_get_duration (last_put_100);
- delta.rel_value_us /= DELTA_INTERVAL;
- last_put_100 = GNUNET_TIME_absolute_get ();
- /* Tell statistics actual vs. desired speed */
- GNUNET_STATISTICS_set (statistics,
- "Target zone iteration velocity (μs)",
- next_put_interval.rel_value_us,
- GNUNET_NO);
- GNUNET_STATISTICS_set (statistics,
- "Current zone iteration velocity (μs)",
- delta.rel_value_us,
- GNUNET_NO);
- /* update "sub_delta" based on difference, taking
- previous sub_delta into account! */
- if (next_put_interval.rel_value_us > delta.rel_value_us)
- {
- /* We were too fast, reduce sub_delta! */
- struct GNUNET_TIME_Relative corr;
-
- corr = GNUNET_TIME_relative_subtract (next_put_interval,
- delta);
- if (sub_delta.rel_value_us > delta.rel_value_us)
- {
- /* Reduce sub_delta by corr */
- sub_delta = GNUNET_TIME_relative_subtract (sub_delta,
- corr);
- }
- else
- {
- /* We're doing fine with waiting the full time, this
- should theoretically only happen if we run at
- infinite speed. */
- sub_delta = GNUNET_TIME_UNIT_ZERO;
- }
- }
- else if (next_put_interval.rel_value_us < delta.rel_value_us)
- {
- /* We were too slow, increase sub_delta! */
- struct GNUNET_TIME_Relative corr;
-
- corr = GNUNET_TIME_relative_subtract (delta,
- next_put_interval);
- sub_delta = GNUNET_TIME_relative_add (sub_delta,
- corr);
- if (sub_delta.rel_value_us > next_put_interval.rel_value_us)
- {
- /* CPU overload detected, we cannot go at desired speed,
- as this would mean using a negative delay. */
- sub_delta = next_put_interval;
- /* compute how much faster we would want to be for
- the desired velocity */
- if (0 == next_put_interval.rel_value_us)
- pct = UINT64_MAX; /* desired speed is infinity ... */
- else
- pct = sub_delta.rel_value_us * 100 / next_put_interval.rel_value_us;
- }
- }
- GNUNET_STATISTICS_set (statistics,
- "% speed increase needed for target velocity",
- pct,
- GNUNET_NO);
- } /* end of periodic velocity calculations */
- delay = GNUNET_TIME_relative_subtract (next_put_interval,
- sub_delta);
- GNUNET_assert (NULL == zone_publish_task);
- zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay,
- &publish_zone_dht_next,
- NULL);
+ /* CPU overload detected, we cannot go at desired speed,
+ as this would mean using a negative delay. */
+ /* compute how much faster we would want to be for
+ the desired velocity */
+ if (0 == next_put_interval.rel_value_us)
+ pct = UINT64_MAX; /* desired speed is infinity ... */
+ else
+ pct = (sub_delta.rel_value_us - next_put_interval.rel_value_us) * 100LLU
+ / next_put_interval.rel_value_us;
+ sub_delta = next_put_interval;
+ }
}
- else
+ GNUNET_STATISTICS_set (statistics,
+ "% speed increase needed for target velocity",
+ pct,
+ GNUNET_NO);
+ GNUNET_STATISTICS_set (statistics,
+ "# records processed in current iteration",
+ num_public_records,
+ GNUNET_NO);
+}
+
+
+/**
+ * Continuation called from DHT once the PUT operation is done.
+ *
+ * @param cls a `struct DhtPutActivity`
+ * @param success #GNUNET_OK on success
+ */
+static void
+dht_put_continuation (void *cls,
+ int success)
+{
+ struct DhtPutActivity *ma = cls;
+
+ num_public_records++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "PUT complete (%s)\n",
+ (GNUNET_OK == success) ? "success" : "failure");
+ GNUNET_CONTAINER_DLL_remove (it_head,
+ it_tail,
+ ma);
+ GNUNET_free (ma);
+ if (GNUNET_OK == success)
{
- GNUNET_CONTAINER_DLL_remove (ma_head,
- ma_tail,
- ma);
- GNUNET_free (ma);
+ put_cnt++;
+ if (0 == put_cnt % DELTA_INTERVAL)
+ update_velocity ();
}
+ check_zone_dht_next ();
}
+
/**
* Convert namestore records from the internal format to that
* suitable for publication (removes private records, converts
* @param label label to store under
* @param rd_public public record data
* @param rd_public_count number of records in @a rd_public
- * @param pc_arg closure argument to pass to the #dht_put_continuation
+ * @param cont function to call with PUT result
+ * @param cont_cls closure for @a cont
* @return DHT PUT handle, NULL on error
*/
static struct GNUNET_DHT_PutHandle *
const char *label,
const struct GNUNET_GNSRECORD_Data *rd_public,
unsigned int rd_public_count,
- void *pc_arg)
+ GNUNET_DHT_PutContinuation cont,
+ void *cont_cls)
{
struct GNUNET_GNSRECORD_Block *block;
struct GNUNET_HashCode query;
block_size,
block,
expire,
- &dht_put_continuation,
- pc_arg);
+ cont,
+ cont_cls);
GNUNET_free (block);
return ret;
}
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
unsigned int rd_public_count;
+ struct DhtPutActivity *ma;
(void) cls;
+ ns_iteration_left--;
rd_public_count = convert_records_for_export (rd,
rd_count,
rd_public);
GNUNET_assert (NULL == zone_publish_task);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Record set empty, moving to next record set\n");
- zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_next,
- NULL);
+ check_zone_dht_next ();
return;
}
/* We got a set of records to publish */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting DHT PUT\n");
- active_put = perform_dht_put (key,
- label,
- rd_public,
- rd_public_count,
- NULL);
- if (NULL == active_put)
+ ma = GNUNET_new (struct DhtPutActivity);
+ ma->ph = perform_dht_put (key,
+ label,
+ rd_public,
+ rd_public_count,
+ &dht_put_continuation,
+ ma);
+ if (NULL == ma->ph)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Could not perform DHT PUT, is the DHT running?\n");
- dht_put_continuation (NULL,
- GNUNET_NO);
+ GNUNET_free (ma);
+ check_zone_dht_next ();
+ return;
}
+ GNUNET_CONTAINER_DLL_insert (it_head,
+ it_tail,
+ ma);
}
/* start counting again */
num_public_records = 0;
GNUNET_assert (NULL == namestore_iter);
+ ns_iteration_left = 1;
namestore_iter
= GNUNET_NAMESTORE_zone_iteration_start (namestore_handle,
NULL, /* All zones */
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
unsigned int rd_public_count;
- struct MonitorActivity *ma;
+ struct DhtPutActivity *ma;
GNUNET_STATISTICS_update (statistics,
"Namestore monitor events received",
rd_public);
if (0 == rd_public_count)
return; /* nothing to do */
- ma = GNUNET_new (struct MonitorActivity);
+ ma = GNUNET_new (struct DhtPutActivity);
ma->ph = perform_dht_put (zone,
label,
rd,
rd_count,
+ &dht_put_monitor_continuation,
ma);
if (NULL == ma->ph)
{
unsigned long long max_parallel_bg_queries = 128;
(void) cls;
+ last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */
min_relative_record_time = GNUNET_TIME_UNIT_FOREVER_REL;
namestore_handle = GNUNET_NAMESTORE_connect (c);
if (NULL == namestore_handle)