From d6e1cea9a8b3a1eee91e52eb46adc82e2005b975 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 23 Apr 2018 15:01:43 +0200 Subject: [PATCH] more incremental processing of zoneimporter --- src/namecache/plugin_namecache_sqlite.c | 8 +- src/namestore/gnunet-zoneimport.c | 95 +++++++++++++++++++---- src/namestore/plugin_namestore_postgres.c | 2 + 3 files changed, 84 insertions(+), 21 deletions(-) diff --git a/src/namecache/plugin_namecache_sqlite.c b/src/namecache/plugin_namecache_sqlite.c index 37d6d3b62..6f5f2d952 100644 --- a/src/namecache/plugin_namecache_sqlite.c +++ b/src/namecache/plugin_namecache_sqlite.c @@ -411,10 +411,10 @@ namecache_sqlite_cache_block (void *cls, GNUNET_CRYPTO_hash (&block->derived_key, sizeof (struct GNUNET_CRYPTO_EcdsaPublicKey), &query); - fprintf (stderr, - "Caching new version of block %s (expires %llu)\n", - GNUNET_h2s (&query), - (unsigned long long) expiration.abs_value_us); + fprintf (stderr, // GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Caching new version of block %s (expires %s)\n", + GNUNET_h2s (&query), + GNUNET_STRINGS_absolute_time_to_string (expiration)); expiration = GNUNET_TIME_absolute_ntoh (block->expiration_time); if (block_size > 64 * 65536) { diff --git a/src/namestore/gnunet-zoneimport.c b/src/namestore/gnunet-zoneimport.c index 0148f42a7..a01772e67 100644 --- a/src/namestore/gnunet-zoneimport.c +++ b/src/namestore/gnunet-zoneimport.c @@ -37,7 +37,7 @@ /** * Maximum number of queries pending at the same time. */ -#define THRESH 20 +#define THRESH 100 /** * TIME_THRESH is in usecs. How quickly do we submit fresh queries. @@ -249,6 +249,11 @@ static unsigned int failures; */ static unsigned int records; +/** + * #GNUNET_YES if we have more work to be read from `stdin`. + */ +static int stdin_waiting; + /** * Heap of all requests to perform, sorted by * the time we should next do the request (i.e. by expires). @@ -794,6 +799,7 @@ store_completed_cb (void *cls, int32_t success, const char *emsg) { + static unsigned int pdot; struct Request *req = cls; req->qe = NULL; @@ -810,6 +816,9 @@ store_completed_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Stored records under `%s'\n", req->label); + pdot++; + if (0 == pdot % 1000) + fprintf (stderr, "."); } } @@ -1061,28 +1070,32 @@ do_shutdown (void *cls) GNUNET_SCHEDULER_cancel (t); t = NULL; } - if (NULL != ns) - { - GNUNET_NAMESTORE_disconnect (ns); - ns = NULL; - } - if (NULL != ctx) - { - GNUNET_DNSSTUB_stop (ctx); - ctx = NULL; - } while (NULL != (req = req_head)) { GNUNET_CONTAINER_DLL_remove (req_head, req_tail, req); + if (NULL != req->qe) + GNUNET_NAMESTORE_cancel (req->qe); free_request (req); } while (NULL != (req = GNUNET_CONTAINER_heap_remove_root (req_heap))) { req->hn = NULL; + if (NULL != req->qe) + GNUNET_NAMESTORE_cancel (req->qe); free_request (req); } + if (NULL != ns) + { + GNUNET_NAMESTORE_disconnect (ns); + ns = NULL; + } + if (NULL != ctx) + { + GNUNET_DNSSTUB_stop (ctx); + ctx = NULL; + } if (NULL != req_heap) { GNUNET_CONTAINER_heap_destroy (req_heap); @@ -1099,6 +1112,32 @@ do_shutdown (void *cls) } +/** + * Begin processing hostnames from stdin. + * + * @param cls NULL + */ +static void +process_stdin (void *cls); + + +/** + * If applicable, continue processing from stdin. + */ +static void +continue_stdin () +{ + if ( (pending < THRESH) && + (stdin_waiting) ) + { + if (NULL != t) + GNUNET_SCHEDULER_cancel (t); + t = GNUNET_SCHEDULER_add_now (&process_stdin, + NULL); + } +} + + /** * Function called if #GNUNET_NAMESTORE_records_lookup() failed. * Continues resolution based on assumption namestore has no data. @@ -1115,6 +1154,8 @@ ns_lookup_error_cb (void *cls) "Failed to load data from namestore for `%s'\n", req->label); insert_sorted (req); + pending--; + continue_stdin (); } @@ -1137,6 +1178,7 @@ ns_lookup_result_cb (void *cls, struct Request *req = cls; req->qe = NULL; + pending--; GNUNET_break (0 == memcmp (zone, &req->zone->key, sizeof (*zone))); @@ -1187,6 +1229,7 @@ ns_lookup_result_cb (void *cls, req->hostname, GNUNET_STRINGS_absolute_time_to_string (req->expires)); insert_sorted (req); + continue_stdin (); } @@ -1213,6 +1256,7 @@ queue (const char *hostname) "Refusing invalid hostname `%s'\n", hostname); rejects++; + continue_stdin (); return; } dot = strchr (hostname, @@ -1223,6 +1267,7 @@ queue (const char *hostname) "Refusing invalid hostname `%s' (lacks '.')\n", hostname); rejects++; + continue_stdin (); return; } for (zone = zone_head; NULL != zone; zone = zone->next) @@ -1235,6 +1280,7 @@ queue (const char *hostname) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Domain name `%s' not in ego list!\n", dot + 1); + continue_stdin (); return; } q.name = (char *) hostname; @@ -1259,9 +1305,11 @@ queue (const char *hostname) "Failed to pack query for hostname `%s'\n", hostname); rejects++; + continue_stdin (); return; } + pending++; req = GNUNET_new (struct Request); req->zone = zone; req->hostname = GNUNET_strdup (hostname); @@ -1288,21 +1336,33 @@ queue (const char *hostname) static void process_stdin (void *cls) { + static unsigned int pdot; char hn[256]; (void) cls; t = NULL; - GNUNET_IDENTITY_disconnect (id); - id = NULL; - while (NULL != - fgets (hn, - sizeof (hn), - stdin)) + if (NULL != id) + { + GNUNET_IDENTITY_disconnect (id); + id = NULL; + } + if (NULL != + fgets (hn, + sizeof (hn), + stdin)) { if (strlen(hn) > 0) hn[strlen(hn)-1] = '\0'; /* eat newline */ + pdot++; + if (0 == pdot % 1000) + fprintf (stderr, "."); queue (hn); + return; } + stdin_waiting = GNUNET_NO; + fprintf (stderr, "\n"); + t = GNUNET_SCHEDULER_add_now (&process_queue, + NULL); } @@ -1352,6 +1412,7 @@ identity_cb (void *cls, { if (NULL != zone_head) { + stdin_waiting = GNUNET_YES; t = GNUNET_SCHEDULER_add_now (&process_stdin, NULL); } diff --git a/src/namestore/plugin_namestore_postgres.c b/src/namestore/plugin_namestore_postgres.c index a9c19d517..e38fcafb1 100644 --- a/src/namestore/plugin_namestore_postgres.c +++ b/src/namestore/plugin_namestore_postgres.c @@ -296,6 +296,8 @@ parse_result_call_iterator (void *cls, { struct ParserContext *pc = cls; + if (NULL == pc->iter) + return; /* no need to do more work */ for (unsigned int i=0;i