From: Christian Grothoff Date: Sun, 29 Apr 2018 21:16:05 +0000 (+0200) Subject: batch NAMESTORE operation also in zonemaster X-Git-Tag: v0.11.0pre66~93^2~12 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=c89e2dcd8976e45c85aef5cc77e2b96baffd4980;p=oweals%2Fgnunet.git batch NAMESTORE operation also in zonemaster --- diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index 25baf4396..5c3356784 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c @@ -36,6 +36,18 @@ #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename) +/** + * How often do we measure the delta between desired zone + * iteration speed and actual speed, and tell statistics + * service about it? + */ +#define DELTA_INTERVAL 100 + +/** + * How many records do we fetch + * in one shot from the namestore? + */ +#define NS_BLOCK_SIZE 100 /** * The initial interval in milliseconds btween puts in @@ -79,17 +91,17 @@ /** * Handle for DHT PUT activity triggered from the namestore monitor. */ -struct MonitorActivity +struct DhtPutActivity { /** * Kept in a DLL. */ - struct MonitorActivity *next; + struct DhtPutActivity *next; /** * Kept in a DLL. */ - struct MonitorActivity *prev; + struct DhtPutActivity *prev; /** * Handle for the DHT PUT operation. @@ -131,12 +143,22 @@ static struct GNUNET_NAMESTORE_ZoneMonitor *zmon; /** * Head of monitor activities; kept in a DLL. */ -static struct MonitorActivity *ma_head; +static struct DhtPutActivity *ma_head; /** * Tail of monitor activities; kept in a DLL. */ -static struct MonitorActivity *ma_tail; +static struct DhtPutActivity *ma_tail; + +/** + * Head of iteration put activities; kept in a DLL. + */ +static struct DhtPutActivity *it_head; + +/** + * Tail of iteration put activities; kept in a DLL. + */ +static struct DhtPutActivity *it_tail; /** * Useful for zone update for DHT put @@ -148,6 +170,21 @@ static unsigned long long num_public_records; */ static unsigned long long last_num_public_records; +/** + * Number of successful put operations performed in the current + * measurement cycle (as measured in #check_zone_dht_next()). + */ +static unsigned long long put_cnt; + +/** + * What is the frequency at which we currently would like + * to perform DHT puts (per record)? Calculated in + * update_velocity() from the #zone_publish_time_window() + * and the total number of record sets we have (so far) + * observed in the zone. + */ +static struct GNUNET_TIME_Relative next_put_interval; + /** * Minimum relative expiration time of records seem during the current * zone iteration. @@ -170,11 +207,31 @@ static struct GNUNET_TIME_Relative zone_publish_time_window_default; */ static struct GNUNET_TIME_Relative zone_publish_time_window; +/** + * When did we last start measuring the #DELTA_INTERVAL successful + * DHT puts? Used for velocity calculations. + */ +static struct GNUNET_TIME_Absolute last_put_100; + +/** + * By how much should we try to increase our per-record iteration speed + * (over the desired speed calculated directly from the #put_interval)? + * Basically this value corresponds to the per-record CPU time overhead + * we have. + */ +static struct GNUNET_TIME_Relative sub_delta; + /** * zone publish task */ static struct GNUNET_SCHEDULER_Task *zone_publish_task; +/** + * How many more values are left for the current query before we need + * to explicitly ask the namestore for more? + */ +static unsigned int ns_iteration_left; + /** * #GNUNET_YES if zone has never been published before */ @@ -196,7 +253,7 @@ static int cache_keys; static void shutdown_task (void *cls) { - struct MonitorActivity *ma; + struct DhtPutActivity *ma; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down!\n"); @@ -257,8 +314,10 @@ publish_zone_dht_next (void *cls) { zone_publish_task = NULL; GNUNET_assert (NULL != namestore_iter); + GNUNET_assert (0 == ns_iteration_left); + ns_iteration_left = NS_BLOCK_SIZE; GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, - 1); + NS_BLOCK_SIZE); } @@ -272,144 +331,188 @@ publish_zone_dht_start (void *cls); /** - * How often do we measure the delta between desired zone - * iteration speed and actual speed, and tell statistics - * service about it? + * Continuation called from DHT once the PUT operation triggered + * by a monitor is done. + * + * @param cls a `struct DhtPutActivity` + * @param success #GNUNET_OK on success */ -#define DELTA_INTERVAL 100 +static void +dht_put_monitor_continuation (void *cls, + int success) +{ + struct DhtPutActivity *ma = cls; + + num_public_records++; + GNUNET_CONTAINER_DLL_remove (ma_head, + ma_tail, + ma); + GNUNET_free (ma); +} /** - * Continuation called from DHT once the PUT operation is done. - * - * @param cls closure, NULL if called from regular iteration, - * `struct MonitorActivity` if called from #handle_monitor_event. - * @param success #GNUNET_OK on success + * Check if the current zone iteration needs to be continued + * by calling #publish_zone_dht_next(), and if so with what delay. */ static void -dht_put_continuation (void *cls, - int success) +check_zone_dht_next () { - struct MonitorActivity *ma = cls; - static unsigned long long put_cnt; - static struct GNUNET_TIME_Absolute last_put_100; - static struct GNUNET_TIME_Relative sub_delta; - struct GNUNET_TIME_Relative next_put_interval; struct GNUNET_TIME_Relative delay; - num_public_records++; - if (NULL == ma) + if (0 != ns_iteration_left) + return; /* current NAMESTORE iteration not yet done */ + if (NULL != it_head) + return; /* waiting on DHT */ + delay = GNUNET_TIME_relative_subtract (next_put_interval, + sub_delta); + /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the + per-record delay calculated so far with the #NS_BLOCK_SIZE */ + delay = GNUNET_TIME_relative_multiply (delay, + NS_BLOCK_SIZE); + GNUNET_assert (NULL == zone_publish_task); + zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay, + &publish_zone_dht_next, + NULL); +} + + +/** + * Re-calculate our velocity and the desired velocity. + * We have succeeded in making #DELTA_INTERVAL puts, so + * now calculate the new desired delay between puts. + */ +static void +update_velocity () +{ + struct GNUNET_TIME_Relative delta; + unsigned long long pct = 0; + + /* How fast were we really? */ + delta = GNUNET_TIME_absolute_get_duration (last_put_100); + delta.rel_value_us /= DELTA_INTERVAL; + last_put_100 = GNUNET_TIME_absolute_get (); + + /* calculate expected frequency */ + if ( (num_public_records > last_num_public_records) && + (GNUNET_NO == first_zone_iteration) ) { - active_put = NULL; - if ( (num_public_records > last_num_public_records) && - (GNUNET_NO == first_zone_iteration) ) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Last record count was lower than current record count. Reducing interval.\n"); + put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window, + num_public_records); + next_put_interval = GNUNET_TIME_relative_divide (put_interval, + LATE_ITERATION_SPEEDUP_FACTOR); + } + else + { + next_put_interval = put_interval; + } + + next_put_interval = GNUNET_TIME_relative_min (next_put_interval, + MAXIMUM_ZONE_ITERATION_INTERVAL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Desired global zone iteration interval is %s/record!\n", + GNUNET_STRINGS_relative_time_to_string (next_put_interval, + GNUNET_YES)); + + /* Tell statistics actual vs. desired speed */ + GNUNET_STATISTICS_set (statistics, + "Target zone iteration velocity (μs)", + next_put_interval.rel_value_us, + GNUNET_NO); + GNUNET_STATISTICS_set (statistics, + "Current zone iteration velocity (μs)", + delta.rel_value_us, + GNUNET_NO); + /* update "sub_delta" based on difference, taking + previous sub_delta into account! */ + if (next_put_interval.rel_value_us > delta.rel_value_us) + { + /* We were too fast, reduce sub_delta! */ + struct GNUNET_TIME_Relative corr; + + corr = GNUNET_TIME_relative_subtract (next_put_interval, + delta); + if (sub_delta.rel_value_us > delta.rel_value_us) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Last record count was lower than current record count. Reducing interval.\n"); - put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window, - num_public_records); - next_put_interval = GNUNET_TIME_relative_divide (put_interval, - LATE_ITERATION_SPEEDUP_FACTOR); + /* Reduce sub_delta by corr */ + sub_delta = GNUNET_TIME_relative_subtract (sub_delta, + corr); } else { - next_put_interval = put_interval; + /* We're doing fine with waiting the full time, this + should theoretically only happen if we run at + infinite speed. */ + sub_delta = GNUNET_TIME_UNIT_ZERO; } - next_put_interval = GNUNET_TIME_relative_min (next_put_interval, - MAXIMUM_ZONE_ITERATION_INTERVAL); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "PUT complete, next PUT in %s!\n", - GNUNET_STRINGS_relative_time_to_string (next_put_interval, - GNUNET_YES)); - /* compute velocities and delay corrections to apply */ - if (0 == put_cnt) - last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */ - put_cnt++; - if (0 == put_cnt % DELTA_INTERVAL) + } + else if (next_put_interval.rel_value_us < delta.rel_value_us) + { + /* We were too slow, increase sub_delta! */ + struct GNUNET_TIME_Relative corr; + + corr = GNUNET_TIME_relative_subtract (delta, + next_put_interval); + sub_delta = GNUNET_TIME_relative_add (sub_delta, + corr); + if (sub_delta.rel_value_us > next_put_interval.rel_value_us) { - struct GNUNET_TIME_Relative delta; - unsigned long long pct = 0; - - /* How fast were we really? */ - delta = GNUNET_TIME_absolute_get_duration (last_put_100); - delta.rel_value_us /= DELTA_INTERVAL; - last_put_100 = GNUNET_TIME_absolute_get (); - /* Tell statistics actual vs. desired speed */ - GNUNET_STATISTICS_set (statistics, - "Target zone iteration velocity (μs)", - next_put_interval.rel_value_us, - GNUNET_NO); - GNUNET_STATISTICS_set (statistics, - "Current zone iteration velocity (μs)", - delta.rel_value_us, - GNUNET_NO); - /* update "sub_delta" based on difference, taking - previous sub_delta into account! */ - if (next_put_interval.rel_value_us > delta.rel_value_us) - { - /* We were too fast, reduce sub_delta! */ - struct GNUNET_TIME_Relative corr; - - corr = GNUNET_TIME_relative_subtract (next_put_interval, - delta); - if (sub_delta.rel_value_us > delta.rel_value_us) - { - /* Reduce sub_delta by corr */ - sub_delta = GNUNET_TIME_relative_subtract (sub_delta, - corr); - } - else - { - /* We're doing fine with waiting the full time, this - should theoretically only happen if we run at - infinite speed. */ - sub_delta = GNUNET_TIME_UNIT_ZERO; - } - } - else if (next_put_interval.rel_value_us < delta.rel_value_us) - { - /* We were too slow, increase sub_delta! */ - struct GNUNET_TIME_Relative corr; - - corr = GNUNET_TIME_relative_subtract (delta, - next_put_interval); - sub_delta = GNUNET_TIME_relative_add (sub_delta, - corr); - if (sub_delta.rel_value_us > next_put_interval.rel_value_us) - { - /* CPU overload detected, we cannot go at desired speed, - as this would mean using a negative delay. */ - sub_delta = next_put_interval; - /* compute how much faster we would want to be for - the desired velocity */ - if (0 == next_put_interval.rel_value_us) - pct = UINT64_MAX; /* desired speed is infinity ... */ - else - pct = sub_delta.rel_value_us * 100 / next_put_interval.rel_value_us; - } - } - GNUNET_STATISTICS_set (statistics, - "% speed increase needed for target velocity", - pct, - GNUNET_NO); - } /* end of periodic velocity calculations */ - delay = GNUNET_TIME_relative_subtract (next_put_interval, - sub_delta); - GNUNET_assert (NULL == zone_publish_task); - zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay, - &publish_zone_dht_next, - NULL); + /* CPU overload detected, we cannot go at desired speed, + as this would mean using a negative delay. */ + /* compute how much faster we would want to be for + the desired velocity */ + if (0 == next_put_interval.rel_value_us) + pct = UINT64_MAX; /* desired speed is infinity ... */ + else + pct = (sub_delta.rel_value_us - next_put_interval.rel_value_us) * 100LLU + / next_put_interval.rel_value_us; + sub_delta = next_put_interval; + } } - else + GNUNET_STATISTICS_set (statistics, + "% speed increase needed for target velocity", + pct, + GNUNET_NO); + GNUNET_STATISTICS_set (statistics, + "# records processed in current iteration", + num_public_records, + GNUNET_NO); +} + + +/** + * Continuation called from DHT once the PUT operation is done. + * + * @param cls a `struct DhtPutActivity` + * @param success #GNUNET_OK on success + */ +static void +dht_put_continuation (void *cls, + int success) +{ + struct DhtPutActivity *ma = cls; + + num_public_records++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "PUT complete (%s)\n", + (GNUNET_OK == success) ? "success" : "failure"); + GNUNET_CONTAINER_DLL_remove (it_head, + it_tail, + ma); + GNUNET_free (ma); + if (GNUNET_OK == success) { - GNUNET_CONTAINER_DLL_remove (ma_head, - ma_tail, - ma); - GNUNET_free (ma); + put_cnt++; + if (0 == put_cnt % DELTA_INTERVAL) + update_velocity (); } + check_zone_dht_next (); } + /** * Convert namestore records from the internal format to that * suitable for publication (removes private records, converts @@ -460,7 +563,8 @@ convert_records_for_export (const struct GNUNET_GNSRECORD_Data *rd, * @param label label to store under * @param rd_public public record data * @param rd_public_count number of records in @a rd_public - * @param pc_arg closure argument to pass to the #dht_put_continuation + * @param cont function to call with PUT result + * @param cont_cls closure for @a cont * @return DHT PUT handle, NULL on error */ static struct GNUNET_DHT_PutHandle * @@ -468,7 +572,8 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, const char *label, const struct GNUNET_GNSRECORD_Data *rd_public, unsigned int rd_public_count, - void *pc_arg) + GNUNET_DHT_PutContinuation cont, + void *cont_cls) { struct GNUNET_GNSRECORD_Block *block; struct GNUNET_HashCode query; @@ -519,8 +624,8 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, block_size, block, expire, - &dht_put_continuation, - pc_arg); + cont, + cont_cls); GNUNET_free (block); return ret; } @@ -642,8 +747,10 @@ put_gns_record (void *cls, { struct GNUNET_GNSRECORD_Data rd_public[rd_count]; unsigned int rd_public_count; + struct DhtPutActivity *ma; (void) cls; + ns_iteration_left--; rd_public_count = convert_records_for_export (rd, rd_count, rd_public); @@ -652,25 +759,30 @@ put_gns_record (void *cls, GNUNET_assert (NULL == zone_publish_task); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Record set empty, moving to next record set\n"); - zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_next, - NULL); + check_zone_dht_next (); return; } /* We got a set of records to publish */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting DHT PUT\n"); - active_put = perform_dht_put (key, - label, - rd_public, - rd_public_count, - NULL); - if (NULL == active_put) + ma = GNUNET_new (struct DhtPutActivity); + ma->ph = perform_dht_put (key, + label, + rd_public, + rd_public_count, + &dht_put_continuation, + ma); + if (NULL == ma->ph) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Could not perform DHT PUT, is the DHT running?\n"); - dht_put_continuation (NULL, - GNUNET_NO); + GNUNET_free (ma); + check_zone_dht_next (); + return; } + GNUNET_CONTAINER_DLL_insert (it_head, + it_tail, + ma); } @@ -693,6 +805,7 @@ publish_zone_dht_start (void *cls) /* start counting again */ num_public_records = 0; GNUNET_assert (NULL == namestore_iter); + ns_iteration_left = 1; namestore_iter = GNUNET_NAMESTORE_zone_iteration_start (namestore_handle, NULL, /* All zones */ @@ -724,7 +837,7 @@ handle_monitor_event (void *cls, { struct GNUNET_GNSRECORD_Data rd_public[rd_count]; unsigned int rd_public_count; - struct MonitorActivity *ma; + struct DhtPutActivity *ma; GNUNET_STATISTICS_update (statistics, "Namestore monitor events received", @@ -741,11 +854,12 @@ handle_monitor_event (void *cls, rd_public); if (0 == rd_public_count) return; /* nothing to do */ - ma = GNUNET_new (struct MonitorActivity); + ma = GNUNET_new (struct DhtPutActivity); ma->ph = perform_dht_put (zone, label, rd, rd_count, + &dht_put_monitor_continuation, ma); if (NULL == ma->ph) { @@ -825,6 +939,7 @@ run (void *cls, unsigned long long max_parallel_bg_queries = 128; (void) cls; + last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */ min_relative_record_time = GNUNET_TIME_UNIT_FOREVER_REL; namestore_handle = GNUNET_NAMESTORE_connect (c); if (NULL == namestore_handle)