From c0ef1c0414caf5d3f3e886df18584665026f9af8 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 5 Apr 2011 14:07:10 +0000 Subject: [PATCH] insanity --- src/datastore/perf_datastore_api.c | 2 +- src/datastore/perf_plugin_datastore.c | 235 ++++++++--- src/datastore/plugin_datastore_mysql.c | 538 ++++++++++-------------- src/datastore/plugin_datastore_sqlite.c | 42 +- 4 files changed, 421 insertions(+), 396 deletions(-) diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c index f8d68a37a..be2bc477b 100644 --- a/src/datastore/perf_datastore_api.c +++ b/src/datastore/perf_datastore_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c index c21d9551b..940dd9b97 100644 --- a/src/datastore/perf_plugin_datastore.c +++ b/src/datastore/perf_plugin_datastore.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -27,6 +27,7 @@ #include "gnunet_util_lib.h" #include "gnunet_protocols.h" #include "gnunet_datastore_plugin.h" +#include #define VERBOSE GNUNET_NO @@ -45,6 +46,10 @@ */ #define PUT_10 (MAX_SIZE / 32 / 1024 / ITERATIONS) +static char category[256]; + +static unsigned int hits[PUT_10 / 8 + 1]; + static unsigned long long stored_bytes; static unsigned long long stored_entries; @@ -57,11 +62,12 @@ static int ok; enum RunPhase { - RP_DONE = 0, + RP_ERROR = 0, RP_PUT, - RP_LP_GET, - RP_AE_GET, - RP_ZA_GET + RP_REP_GET, + RP_ZA_GET, + RP_EXP_GET, + RP_DONE }; @@ -72,7 +78,6 @@ struct CpsRunContext struct GNUNET_TIME_Absolute end; const struct GNUNET_CONFIGURATION_Handle *cfg; struct GNUNET_DATASTORE_PluginFunctions * api; - const char *msg; enum RunPhase phase; unsigned int cnt; }; @@ -105,9 +110,8 @@ putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k) /* most content is 32k */ size = 32 * 1024; - if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0) /* but some of it is less! */ - size = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 32 * 1024); + size = 8 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 32 * 1024); size = size - (size & 7); /* always multiple of 8 */ /* generate random key */ @@ -117,15 +121,16 @@ putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k) if (i > 255) memset (value, i - 255, size / 2); value[0] = k; + memcpy (&value[4], &i, sizeof (i)); msg = NULL; prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100); if (GNUNET_OK != api->put (api->cls, &key, size, value, - i /* type */, + 1 + i % 4 /* type */, prio, - i /* anonymity */, + i % 4 /* anonymity */, 0 /* replication */, GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, @@ -149,7 +154,7 @@ test (void *cls, static int -iterateDummy (void *cls, +iterate_zeros (void *cls, void *next_cls, const GNUNET_HashCode * key, uint32_t size, @@ -162,31 +167,49 @@ iterateDummy (void *cls, uint64_t uid) { struct CpsRunContext *crc = cls; - + int i; + const char *cdata = data; + if (key == NULL) { + char buf[256]; + unsigned int bc; + + bc = 0; + for (i = 0;iend = GNUNET_TIME_absolute_get(); - printf (crc->msg, - crc->i, + GNUNET_snprintf (buf, sizeof (buf), + "Iteration over %u zero-anonymity items", + crc->cnt); + printf ("%s took %llu ms yielding %u/%u items\n", + buf, (unsigned long long) (crc->end.abs_value - crc->start.abs_value), + bc, crc->cnt); - if (crc->phase != RP_ZA_GET) - { - crc->phase++; - } - else + GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms"); + memset (hits, 0, sizeof (hits)); + if ( (int) (PUT_10 / 4 - crc->cnt) > 2) { - if (crc->i == ITERATIONS) - crc->phase = RP_DONE; - else - crc->phase = RP_PUT; + fprintf (stderr, + "Got %d items, expected %d\n", + (int) crc->cnt, (int) PUT_10 / 4); + GNUNET_break (0); + crc->phase = RP_ERROR; } + crc->phase++; crc->cnt = 0; crc->start = GNUNET_TIME_absolute_get (); GNUNET_SCHEDULER_add_now (&test, crc); return GNUNET_OK; } -#if VERBOSE + GNUNET_assert (size >= 8); + memcpy (&i, &cdata[4], sizeof (i)); + hits[i/8] |= (1 << (i % 8)); + +#if VERBOSE fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n", type, priority, size, (unsigned long long) expiration.abs_value); @@ -198,38 +221,110 @@ iterateDummy (void *cls, } +static int +expiration_get (void *cls, + void *next_cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid) +{ + struct CpsRunContext *crc = cls; + int i; + const char *cdata = data; + + GNUNET_assert (size >= 8); + memcpy (&i, &cdata[4], sizeof (i)); + hits[i/8] |= (1 << (i % 8)); + crc->cnt++; + if (PUT_10 == crc->cnt) + { + char buf[256]; + unsigned int bc; + + bc = 0; + for (i = 0;iend = GNUNET_TIME_absolute_get(); + GNUNET_snprintf (buf, sizeof (buf), + "Execution of %u expiration+deletion-GET requests", + PUT_10); + printf ("%s took %llu ms yielding %u/%u items\n", + buf, + (unsigned long long) (crc->end.abs_value - crc->start.abs_value), + bc, + (unsigned int) PUT_10); + GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms"); + memset (hits, 0, sizeof (hits)); + crc->phase++; + crc->cnt = 0; + crc->start = GNUNET_TIME_absolute_get (); + } + GNUNET_SCHEDULER_add_now (&test, crc); + return GNUNET_NO; +} + static int -dummy_get (void *cls, - void *next_cls, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid) +replication_get (void *cls, + void *next_cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid) { struct CpsRunContext *crc = cls; + int i; + const char *cdata = data; + GNUNET_assert (NULL != key); + GNUNET_assert (size >= 8); + memcpy (&i, &cdata[4], sizeof (i)); + hits[i/8] |= (1 << (i % 8)); crc->cnt++; - if (1000 == crc->cnt) + if (PUT_10 == crc->cnt) { + char buf[256]; + unsigned int bc; + + bc = 0; + for (i = 0;iend = GNUNET_TIME_absolute_get(); - printf (crc->msg, - crc->i, + GNUNET_snprintf (buf, sizeof (buf), + "Execution of %u replication-GET requests", + PUT_10); + printf ("%s took %llu ms yielding %u/%u items\n", + buf, (unsigned long long) (crc->end.abs_value - crc->start.abs_value), - crc->cnt); + bc, + (unsigned int) PUT_10); + GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms"); + memset (hits, 0, sizeof (hits)); crc->phase++; - crc->cnt = 0; + crc->cnt = 0; crc->start = GNUNET_TIME_absolute_get (); } + GNUNET_SCHEDULER_add_now (&test, crc); return GNUNET_OK; } + /** * Function called when the service shuts * down. Unloads our datastore plugin. @@ -284,47 +379,57 @@ test (void *cls, int j; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - { - crc->phase = RP_DONE; - ok = 1; - } + crc->phase = RP_ERROR; switch (crc->phase) { + case RP_ERROR: + GNUNET_break (0); + crc->api->drop (crc->api->cls); + ok = 1; + GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, + &cleaning_task, crc); + break; case RP_PUT: crc->start = GNUNET_TIME_absolute_get (); for (j=0;japi, j, crc->i); crc->end = GNUNET_TIME_absolute_get (); - printf ("%3u insertion took %20llums for %u\n", - crc->i, - (unsigned long long) (crc->end.abs_value - crc->start.abs_value), - (unsigned int) PUT_10); + { + char buf[256]; + + GNUNET_snprintf (buf, sizeof (buf), + "Execution of %u PUT requests", + PUT_10); + printf ("%s took %llu ms\n", + buf, + (unsigned long long) (crc->end.abs_value - crc->start.abs_value)); + GAUGER (category, + buf, crc->end.abs_value - crc->start.abs_value, "ms"); + } crc->i++; crc->start = GNUNET_TIME_absolute_get (); - crc->phase = RP_LP_GET; - GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK, - &test, crc); + crc->phase++; + GNUNET_SCHEDULER_add_now (&test, crc); break; - case RP_LP_GET: - crc->msg = "%3u replication iteration took %20llums for %u\n"; + case RP_REP_GET: crc->api->replication_get (crc->api->cls, - &dummy_get, + &replication_get, crc); break; - case RP_AE_GET: - crc->msg = "%3u expiration iteration took %20llums for %u\n"; - crc->api->expiration_get (crc->api->cls, - &dummy_get, - crc); - break; case RP_ZA_GET: - crc->msg = "%3u zero anonymity iteration took %20llums for %u\n"; - crc->api->iter_zero_anonymity (crc->api->cls, 0, - &iterateDummy, + crc->api->iter_zero_anonymity (crc->api->cls, 1, + &iterate_zeros, crc); break; + case RP_EXP_GET: + crc->api->expiration_get (crc->api->cls, + &expiration_get, + crc); + break; case RP_DONE: + exit (0); crc->api->drop (crc->api->cls); + ok = 0; GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, &cleaning_task, crc); break; @@ -392,6 +497,7 @@ run (void *cls, crc->api = api; crc->cfg = c; crc->phase = RP_PUT; + ok = 2; GNUNET_SCHEDULER_add_now (&test, crc); } @@ -413,6 +519,9 @@ check () GNUNET_GETOPT_OPTION_END }; + GNUNET_snprintf (category, sizeof (category), + "DATASTORE-%s", + plugin_name); GNUNET_snprintf (cfg_name, sizeof (cfg_name), "perf_plugin_datastore_data_%s.conf", diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index 773a40ba3..ddef3fb14 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -151,44 +151,6 @@ #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0); -/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle - "OR" very well, so we need to use UNION instead. And UNION does not - automatically apply a LIMIT on the outermost clause, so we need to - repeat ourselves quite a bit. All hail the performance gods (and thanks - to #mysql on freenode) */ -#define SELECT_IT_LOW_PRIORITY "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\ - "ORDER BY prio ASC,vkey ASC LIMIT 1) " \ - "UNION "\ - "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\ - "ORDER BY prio ASC,vkey ASC LIMIT 1)"\ - "ORDER BY prio ASC,vkey ASC LIMIT 1" - -#define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\ - " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ - "UNION "\ - "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\ - " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ - "ORDER BY prio DESC,vkey DESC LIMIT 1" - -#define SELECT_IT_EXPIRATION_TIME "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\ - "ORDER BY expire ASC,vkey ASC LIMIT 1) "\ - "UNION "\ - "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\ - "ORDER BY expire ASC,vkey ASC LIMIT 1)"\ - "ORDER BY expire ASC,vkey ASC LIMIT 1" - - -#define SELECT_IT_MIGRATION_ORDER "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\ - " AND expire > ? AND type!=3"\ - " ORDER BY expire DESC,vkey DESC LIMIT 1) "\ - "UNION "\ - "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\ - " AND expire > ? AND type!=3"\ - " ORDER BY expire DESC,vkey DESC LIMIT 1)"\ - "ORDER BY expire DESC,vkey DESC LIMIT 1" - - - struct GNUNET_MysqlStatementHandle { struct GNUNET_MysqlStatementHandle *next; @@ -242,18 +204,12 @@ struct NextRequestClosure MYSQL_BIND rbind[6]; - unsigned int type; - - unsigned int iter_select; + enum GNUNET_BLOCK_Type type; PluginIterator dviter; void *dviter_cls; - unsigned int last_prio; - - unsigned long long last_expire; - unsigned long long last_vkey; int end_it; @@ -306,7 +262,7 @@ struct Plugin /** * Statements dealing with gn090 table */ -#define INSERT_ENTRY "INSERT INTO gn090 (type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?)" +#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)" struct GNUNET_MysqlStatementHandle *insert_entry; #define DELETE_ENTRY_BY_VKEY "DELETE FROM gn090 WHERE vkey=?" @@ -342,7 +298,30 @@ struct Plugin #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072" struct GNUNET_MysqlStatementHandle *get_size; - struct GNUNET_MysqlStatementHandle *iter[4]; +/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle + "OR" very well, so we need to use UNION instead. And UNION does not + automatically apply a LIMIT on the outermost clause, so we need to + repeat ourselves quite a bit. All hail the performance gods (and thanks + to #mysql on freenode) */ +#define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\ + " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ + "UNION "\ + "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\ + " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ + "ORDER BY prio DESC,vkey DESC LIMIT 1" + struct GNUNET_MysqlStatementHandle *zero_iter; + +#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ?) "\ + "ORDER BY prio ASC LIMIT 1) "\ + "UNION "\ + "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) "\ + "ORDER BY prio ASC LIMIT 1) ORDER BY expire ASC LIMIT 1" + struct GNUNET_MysqlStatementHandle *select_expiration; + +#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) "\ + "WHERE expire > ?"\ + " ORDER BY repl DESC,RAND() LIMIT 1) " + struct GNUNET_MysqlStatementHandle *select_replication; }; @@ -731,7 +710,7 @@ init_params (struct Plugin *plugin, */ typedef int (*GNUNET_MysqlDataProcessor) (void *cls, unsigned int num_values, - MYSQL_BIND * values); + MYSQL_BIND *values); /** @@ -752,12 +731,10 @@ typedef int (*GNUNET_MysqlDataProcessor) (void *cls, */ static int prepared_statement_run_select (struct Plugin *plugin, - struct GNUNET_MysqlStatementHandle - *s, + struct GNUNET_MysqlStatementHandle *s, unsigned int result_size, - MYSQL_BIND * results, - GNUNET_MysqlDataProcessor - processor, void *processor_cls, + MYSQL_BIND *results, + GNUNET_MysqlDataProcessor processor, void *processor_cls, ...) { va_list ap; @@ -987,100 +964,6 @@ return_ok (void *cls, } -/** - * Run the prepared statement to get the next data item ready. - * - * @param cls not used - * @param nrc closure for the next request iterator - * @return GNUNET_OK on success, GNUNET_NO if there is no additional item - */ -static int -iterator_helper_prepare (void *cls, - struct NextRequestClosure *nrc) -{ - struct Plugin *plugin; - int ret; - - if (nrc == NULL) - return GNUNET_NO; - plugin = nrc->plugin; - ret = GNUNET_SYSERR; - switch (nrc->iter_select) - { - case 0: - case 1: - ret = prepared_statement_run_select (plugin, - plugin->iter[nrc->iter_select], - 6, - nrc->rbind, - &return_ok, - NULL, - MYSQL_TYPE_LONG, - &nrc->last_prio, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_vkey, - GNUNET_YES, - MYSQL_TYPE_LONG, - &nrc->last_prio, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_vkey, - GNUNET_YES, -1); - break; - case 2: - ret = prepared_statement_run_select (plugin, - plugin->iter[nrc->iter_select], - 6, - nrc->rbind, - &return_ok, - NULL, - MYSQL_TYPE_LONGLONG, - &nrc->last_expire, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_vkey, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_expire, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_vkey, - GNUNET_YES, -1); - break; - case 3: - ret = prepared_statement_run_select (plugin, - plugin->iter[nrc->iter_select], - 6, - nrc->rbind, - &return_ok, - NULL, - MYSQL_TYPE_LONGLONG, - &nrc->last_expire, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_vkey, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->now.abs_value, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_expire, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->last_vkey, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &nrc->now.abs_value, - GNUNET_YES, -1); - break; - default: - GNUNET_assert (0); - } - return ret; -} - - /** * Continuation of "mysql_next_request". * @@ -1141,10 +1024,8 @@ mysql_next_request_cont (void *next_cls, (GNUNET_OK != nrc->prep (nrc->prep_cls, nrc))) goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); nrc->last_vkey = vkey; - nrc->last_prio = priority; - nrc->last_expire = exp; + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || (hashSize != sizeof (GNUNET_HashCode)) ) { @@ -1263,51 +1144,6 @@ mysql_plugin_next_request (void *next_cls, } -/** - * Iterate over the items in the datastore - * using the given query to select and order - * the items. - * - * @param plugin plugin context - * @param type entries of which type should be considered? - * @param iter_select which iterator statement are we using - * @param is_asc are we using ascending order? - * @param dviter function to call on each matching item - * @param dviter_cls closure for dviter - */ -static void -iterateHelper (struct Plugin *plugin, - enum GNUNET_BLOCK_Type type, - int is_asc, - unsigned int iter_select, - PluginIterator dviter, - void *dviter_cls) -{ - struct NextRequestClosure *nrc; - - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->plugin = plugin; - nrc->type = type; - nrc->iter_select = iter_select; - nrc->dviter = dviter; - nrc->dviter_cls = dviter_cls; - nrc->prep = &iterator_helper_prepare; - if (is_asc) - { - nrc->last_prio = 0; - nrc->last_vkey = 0; - nrc->last_expire = 0; - } - else - { - nrc->last_prio = 0x7FFFFFFFL; - nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */ - nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */ - } - mysql_plugin_next_request (nrc, GNUNET_NO); -} - - /** * Get an estimate of how much space the database is * currently using. @@ -1366,6 +1202,7 @@ mysql_plugin_put (void *cls, char **msg) { struct Plugin *plugin = cls; + unsigned int irepl = replication; unsigned int itype = type; unsigned int ipriority = priority; unsigned int ianonymity = anonymity; @@ -1391,6 +1228,9 @@ mysql_plugin_put (void *cls, plugin->insert_entry, NULL, MYSQL_TYPE_LONG, + &irepl, + GNUNET_YES, + MYSQL_TYPE_LONG, &itype, GNUNET_YES, MYSQL_TYPE_LONG, @@ -1431,25 +1271,69 @@ mysql_plugin_put (void *cls, /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * Update the priority for a particular key in the datastore. If + * the expiration time in value is different than the time found in + * the datastore, the higher value should be kept. For the + * anonymity level, the lower value is to be used. The specified + * priority should be added to the existing priority, ignoring the + * priority in value. + * + * Note that it is possible for multiple values to match this put. + * In that case, all of the respective values are updated. * * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param uid unique identifier of the datum + * @param delta by how much should the priority + * change? If priority + delta < 0 the + * priority should be set to 0 (never go + * negative). + * @param expire new expiration time should be the + * MAX of any existing expiration time and + * this value + * @param msg set to error message + * @return GNUNET_OK on success */ -static void -mysql_plugin_iter_low_priority (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +static int +mysql_plugin_update (void *cls, + uint64_t uid, + int delta, + struct GNUNET_TIME_Absolute expire, + char **msg) { struct Plugin *plugin = cls; - iterateHelper (plugin, type, GNUNET_YES, - 0, iter, iter_cls); + unsigned long long vkey = uid; + unsigned long long lexpire = expire.abs_value; + int ret; + +#if DEBUG_MYSQL + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Updating value %llu adding %d to priority and maxing exp at %llu\n", + vkey, + delta, + lexpire); +#endif + ret = prepared_statement_run (plugin, + plugin->update_entry, + NULL, + MYSQL_TYPE_LONG, + &delta, + GNUNET_NO, + MYSQL_TYPE_LONGLONG, + &lexpire, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &lexpire, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &vkey, + GNUNET_YES, -1); + if (ret != GNUNET_OK) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to update value %llu\n", + vkey); + } + return ret; } @@ -1577,8 +1461,8 @@ get_statement_prepare (void *cls, */ static void mysql_plugin_get (void *cls, - const GNUNET_HashCode * key, - const GNUNET_HashCode * vhash, + const GNUNET_HashCode *key, + const GNUNET_HashCode *vhash, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { @@ -1591,15 +1475,9 @@ mysql_plugin_get (void *cls, long long total; unsigned long hashSize; + GNUNET_assert (key != NULL); if (iter == NULL) return; - if (key == NULL) - { - mysql_plugin_iter_low_priority (plugin, - type, - iter, iter_cls); - return; - } hashSize = sizeof (GNUNET_HashCode); memset (cbind, 0, sizeof (cbind)); total = -1; @@ -1679,7 +1557,6 @@ mysql_plugin_get (void *cls, nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); nrc->plugin = plugin; nrc->type = type; - nrc->iter_select = -1; nrc->dviter = iter; nrc->dviter_cls = iter_cls; nrc->prep = &get_statement_prepare; @@ -1690,28 +1567,95 @@ mysql_plugin_get (void *cls, /** - * Get a random item for replication. Returns a single, not expired, random item - * from those with the highest replication counters. The item's - * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Run the prepared statement to get the next data item ready. + * + * @param cls not used + * @param nrc closure for the next request iterator + * @return GNUNET_OK on success, GNUNET_NO if there is no additional item + */ +static int +iterator_zero_prepare (void *cls, + struct NextRequestClosure *nrc) +{ + struct Plugin *plugin; + + if (nrc == NULL) + return GNUNET_NO; + plugin = nrc->plugin; + return prepared_statement_run_select (plugin, + plugin->zero_iter, + 6, + nrc->rbind, + &return_ok, + NULL, + MYSQL_TYPE_LONGLONG, + &nrc->now.abs_value, + GNUNET_YES, + MYSQL_TYPE_LONG, + &nrc->type, + GNUNET_YES, -1); +} + + +/** + * Select a subset of the items in the datastore and call + * the given iterator for each of them. * - * @param cls closure - * @param iter function to call the value (once only). + * @param cls our "struct Plugin*" + * @param type entries of which type should be considered? + * Use 0 for any type. + * @param iter function to call on each matching value; + * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void -mysql_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +mysql_plugin_iter_zero_anonymity (void *cls, + enum GNUNET_BLOCK_Type type, + PluginIterator iter, + void *iter_cls) { - /* FIXME: not implemented! */ - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + struct Plugin *plugin = cls; + struct NextRequestClosure *nrc; + + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->plugin = plugin; + nrc->type = type; + nrc->dviter = iter; + nrc->dviter_cls = iter_cls; + nrc->prep = &iterator_zero_prepare; + nrc->last_vkey = INT64_MAX; /* MySQL only supports 63 bits, hence signed */ + mysql_plugin_next_request (nrc, GNUNET_NO); } +/** + * Run the SELECT statement for the replication function. + * + * @param cls the 'struct Plugin' + * @param nrc the context (not used) + */ +static int +replication_prepare (void *cls, + struct NextRequestClosure *nrc) +{ + struct Plugin *plugin = cls; + long long nt; + + nt = (long long) nrc->now.abs_value; + return prepared_statement_run_select + (plugin, + plugin->select_replication, + 6, nrc->rbind, + &return_ok, NULL, + MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, + -1); +} + /** - * Get a random item for expiration. + * Get a random item for replication. Returns a single, not expired, random item + * from those with the highest replication counters. The item's + * replication counter is decremented by one IF it was positive before. * Call 'iter' with all values ZERO or NULL if the datastore is empty. * * @param cls closure @@ -1719,106 +1663,78 @@ mysql_plugin_replication_get (void *cls, * @param iter_cls closure for iter */ static void -mysql_plugin_expiration_get (void *cls, - PluginIterator iter, void *iter_cls) +mysql_plugin_replication_get (void *cls, + PluginIterator iter, void *iter_cls) { - /* FIXME: not implemented! */ - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + struct Plugin *plugin = cls; + struct NextRequestClosure nrc; + + memset (&nrc, 0, sizeof (nrc)); + nrc.plugin = plugin; + nrc.now = GNUNET_TIME_absolute_get (); + nrc.prep = &replication_prepare; + nrc.prep_cls = plugin; + nrc.type = 0; + nrc.dviter = iter; + nrc.dviter_cls = iter_cls; + nrc.end_it = GNUNET_NO; + mysql_next_request_cont (&nrc, NULL); } /** - * Update the priority for a particular key in the datastore. If - * the expiration time in value is different than the time found in - * the datastore, the higher value should be kept. For the - * anonymity level, the lower value is to be used. The specified - * priority should be added to the existing priority, ignoring the - * priority in value. - * - * Note that it is possible for multiple values to match this put. - * In that case, all of the respective values are updated. - * - * @param cls our "struct Plugin*" - * @param uid unique identifier of the datum - * @param delta by how much should the priority - * change? If priority + delta < 0 the - * priority should be set to 0 (never go - * negative). - * @param expire new expiration time should be the - * MAX of any existing expiration time and - * this value - * @param msg set to error message - * @return GNUNET_OK on success + * Run the SELECT statement for the expiration function. + * + * @param cls the 'struct Plugin' + * @param nrc the context (not used) */ static int -mysql_plugin_update (void *cls, - uint64_t uid, - int delta, - struct GNUNET_TIME_Absolute expire, - char **msg) +expiration_prepare (void *cls, + struct NextRequestClosure *nrc) { struct Plugin *plugin = cls; - unsigned long long vkey = uid; - unsigned long long lexpire = expire.abs_value; - int ret; -#if DEBUG_MYSQL - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Updating value %llu adding %d to priority and maxing exp at %llu\n", - vkey, - delta, - lexpire); -#endif - ret = prepared_statement_run (plugin, - plugin->update_entry, - NULL, - MYSQL_TYPE_LONG, - &delta, - GNUNET_NO, - MYSQL_TYPE_LONGLONG, - &lexpire, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &lexpire, - GNUNET_YES, - MYSQL_TYPE_LONGLONG, - &vkey, - GNUNET_YES, -1); - if (ret != GNUNET_OK) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to update value %llu\n", - vkey); - } - return ret; + return prepared_statement_run_select + (plugin, + plugin->select_expiration, + 6, nrc->rbind, + &return_ok, NULL, + -1); } /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * Get a random item for expiration. + * Call 'iter' with all values ZERO or NULL if the datastore is empty. * - * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end + * @param cls closure + * @param iter function to call the value (once only). * @param iter_cls closure for iter */ static void -mysql_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +mysql_plugin_expiration_get (void *cls, + PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; - iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls); + struct NextRequestClosure nrc; + + memset (&nrc, 0, sizeof (nrc)); + nrc.plugin = plugin; + nrc.now = GNUNET_TIME_absolute_get (); + nrc.prep = &expiration_prepare; + nrc.prep_cls = plugin; + nrc.type = 0; + nrc.dviter = iter; + nrc.dviter_cls = iter_cls; + nrc.end_it = GNUNET_NO; + mysql_next_request_cont (&nrc, NULL); } /** * Drop database. + * + * @param cls the "struct Plugin*" */ static void mysql_plugin_drop (void *cls) @@ -1860,6 +1776,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls) #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) ) #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b))) if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 (" + " repl INT(11) UNSIGNED NOT NULL DEFAULT 0," " type INT(11) UNSIGNED NOT NULL DEFAULT 0," " prio INT(11) UNSIGNED NOT NULL DEFAULT 0," " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0," @@ -1896,10 +1813,9 @@ libgnunet_plugin_datastore_mysql_init (void *cls) || PINIT (plugin->count_entry_by_hash_vhash_and_type, COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) || PINIT (plugin->update_entry, UPDATE_ENTRY) - || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY) - || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS) - || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME) - || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER)) + || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) + || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) + || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ) { iclose (plugin); GNUNET_free_non_null (plugin->cnffile); diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index eca82a6fb..216d9b7cf 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -31,7 +31,7 @@ /** * Enable or disable logging debug messages. */ -#define DEBUG_SQLITE GNUNET_NO +#define DEBUG_SQLITE GNUNET_YES /** * We allocate items on the stack at times. To prevent a stack @@ -174,14 +174,14 @@ create_indices (sqlite3 * dbh) /* create indices */ sqlite3_exec (dbh, "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL, - NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL, - NULL); sqlite3_exec (dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)", + sqlite3_exec (dbh, "CREATE INDEX idx_expire_repl ON gn090 (expire ASC,repl DESC)", NULL, NULL, + NULL); + sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (anonLevel ASC,expire ASC,prio,type,hash)", + NULL, NULL, NULL); + sqlite3_exec (dbh, "CREATE INDEX expire ON gn090 (expire)", NULL, NULL, NULL); } @@ -263,9 +263,15 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh, "PRAGMA synchronous=OFF", NULL, NULL, ENULL)); + CHECK (SQLITE_OK == + sqlite3_exec (plugin->dbh, + "PRAGMA legacy_file_format=OFF", NULL, NULL, ENULL)); CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL)); + CHECK (SQLITE_OK == + sqlite3_exec (plugin->dbh, + "PRAGMA locking_mode=EXCLUSIVE", NULL, NULL, ENULL)); CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh, "PRAGMA count_changes=OFF", NULL, NULL, ENULL)); @@ -322,26 +328,24 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, sqlite3_finalize (stmt); if ((sq_prepare (plugin->dbh, - "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE " - "_ROWID_ = ?", + "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE _ROWID_ = ?", &plugin->updPrio) != SQLITE_OK) || (sq_prepare (plugin->dbh, - "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE " - "_ROWID_ = ?", + "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?", &plugin->updRepl) != SQLITE_OK) || (sq_prepare (plugin->dbh, - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) " + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE expire > ?" " ORDER BY repl DESC, Random() LIMIT 1", &plugin->selRepl) != SQLITE_OK) || (sq_prepare (plugin->dbh, - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) " - " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) " + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " + " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 " " ORDER BY prio ASC LIMIT 1", &plugin->selExpi) != SQLITE_OK) || (sq_prepare (plugin->dbh, "INSERT INTO gn090 (repl, type, prio, " - "anonLevel, expire, hash, vhash, value) VALUES " - "(?, ?, ?, ?, ?, ?, ?, ?)", + "anonLevel, expire, hash, vhash, value) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", &plugin->insertContent) != SQLITE_OK) || (sq_prepare (plugin->dbh, "DELETE FROM gn090 WHERE _ROWID_ = ?", @@ -1032,7 +1036,7 @@ sqlite_plugin_iter_zero_anonymity (void *cls, now = GNUNET_TIME_absolute_get (); GNUNET_asprintf (&q, "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " - "WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND type=%d AND hash < ?2) " + "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) " "ORDER BY hash DESC LIMIT 1", (unsigned long long) now.abs_value, type); @@ -1048,7 +1052,7 @@ sqlite_plugin_iter_zero_anonymity (void *cls, GNUNET_free (q); GNUNET_asprintf (&q, "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " - "WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0 AND type=%d) " + "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) " "ORDER BY prio DESC, hash DESC LIMIT 1", (unsigned long long) now.abs_value, type); @@ -1531,10 +1535,6 @@ sqlite_plugin_get_size (void *cls) _("sqlite version to old to determine size, assuming zero\n")); return 0; } - if (SQLITE_OK != - sqlite3_exec (plugin->dbh, - "VACUUM", NULL, NULL, ENULL)) - abort (); CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh, "VACUUM", NULL, NULL, ENULL)); -- 2.25.1