From 7fef1456bd44bacaf5aa927c89282a31e89bdcf7 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 30 Apr 2018 10:55:26 +0200 Subject: [PATCH] enable more parallelism with DHT queue, but limit to 1000 entries, then kill hard --- src/dht/dht_api.c | 4 +- src/util/mq.c | 4 +- src/zonemaster/gnunet-service-zonemaster.c | 56 +++++++++++++++++++--- 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 42ddc7b60..7a0771de0 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -1028,8 +1028,8 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, put_msg->expiration = GNUNET_TIME_absolute_hton (exp); put_msg->key = *key; GNUNET_memcpy (&put_msg[1], - data, - size); + data, + size); GNUNET_MQ_send (handle->mq, env); return ph; diff --git a/src/util/mq.c b/src/util/mq.c index af700836c..0f9ad9a12 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -578,11 +578,9 @@ void GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls) { - unsigned int i; - if (NULL == mq->handlers) return; - for (i=0;NULL != mq->handlers[i].cb; i++) + for (unsigned int i=0;NULL != mq->handlers[i].cb; i++) mq->handlers[i].cls = handlers_cls; } diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index 5c3356784..b45ed576c 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c @@ -49,6 +49,11 @@ */ #define NS_BLOCK_SIZE 100 +/** + * How many pending DHT operations do we allow at most? + */ +#define DHT_QUEUE_LIMIT 1000 + /** * The initial interval in milliseconds btween puts in * a zone iteration @@ -107,6 +112,11 @@ struct DhtPutActivity * Handle for the DHT PUT operation. */ struct GNUNET_DHT_PutHandle *ph; + + /** + * When was this PUT initiated? + */ + struct GNUNET_TIME_Absolute start_date; }; @@ -160,6 +170,11 @@ static struct DhtPutActivity *it_head; */ static struct DhtPutActivity *it_tail; +/** + * Number of entries in the DHT queue. + */ +static unsigned int dht_queue_length; + /** * Useful for zone update for DHT put */ @@ -265,6 +280,15 @@ shutdown_task (void *cls) ma); GNUNET_free (ma); } + while (NULL != (ma = it_head)) + { + GNUNET_DHT_put_cancel (ma->ph); + GNUNET_CONTAINER_DLL_remove (it_head, + it_tail, + ma); + dht_queue_length--; + GNUNET_free (ma); + } if (NULL != statistics) { GNUNET_STATISTICS_destroy (statistics, @@ -362,8 +386,6 @@ check_zone_dht_next () if (0 != ns_iteration_left) return; /* current NAMESTORE iteration not yet done */ - if (NULL != it_head) - return; /* waiting on DHT */ delay = GNUNET_TIME_relative_subtract (next_put_interval, sub_delta); /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the @@ -471,6 +493,10 @@ update_velocity () sub_delta = next_put_interval; } } + GNUNET_STATISTICS_set (statistics, + "# size of the DHT queue", + dht_queue_length, + GNUNET_NO); GNUNET_STATISTICS_set (statistics, "% speed increase needed for target velocity", pct, @@ -498,6 +524,7 @@ dht_put_continuation (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT complete (%s)\n", (GNUNET_OK == success) ? "success" : "failure"); + dht_queue_length--; GNUNET_CONTAINER_DLL_remove (it_head, it_tail, ma); @@ -508,7 +535,6 @@ dht_put_continuation (void *cls, if (0 == put_cnt % DELTA_INTERVAL) update_velocity (); } - check_zone_dht_next (); } @@ -766,6 +792,7 @@ put_gns_record (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting DHT PUT\n"); ma = GNUNET_new (struct DhtPutActivity); + ma->start_date = GNUNET_TIME_absolute_get (); ma->ph = perform_dht_put (key, label, rd_public, @@ -780,9 +807,24 @@ put_gns_record (void *cls, check_zone_dht_next (); return; } - GNUNET_CONTAINER_DLL_insert (it_head, - it_tail, - ma); + dht_queue_length++; + GNUNET_CONTAINER_DLL_insert_tail (it_head, + it_tail, + ma); + if (dht_queue_length > DHT_QUEUE_LIMIT) + { + ma = it_head; + GNUNET_CONTAINER_DLL_remove (it_head, + it_tail, + ma); + GNUNET_DHT_put_cancel (ma->ph); + dht_queue_length--; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "DHT PUT unconfirmed after %s, aborting PUT\n", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (ma->start_date), + GNUNET_YES)); + GNUNET_free (ma); + } } @@ -815,6 +857,7 @@ publish_zone_dht_start (void *cls) NULL, &zone_iteration_finished, NULL); + GNUNET_assert (NULL != namestore_iter); } @@ -855,6 +898,7 @@ handle_monitor_event (void *cls, if (0 == rd_public_count) return; /* nothing to do */ ma = GNUNET_new (struct DhtPutActivity); + ma->start_date = GNUNET_TIME_absolute_get (); ma->ph = perform_dht_put (zone, label, rd, -- 2.25.1