From 9f871785d57da57ba128ac2279fda1db1d9b8bfb Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 3 Apr 2011 16:30:06 +0000 Subject: [PATCH] first hack at implementing new replication select code --- src/datastore/plugin_datastore_sqlite.c | 185 ++++++++++++++++++------ 1 file changed, 144 insertions(+), 41 deletions(-) diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index 260bd54cc..b8661f46d 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2009 Christian Grothoff (and other contributing authors) + (C) 2009, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -39,37 +39,43 @@ #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0) #define SELECT_IT_LOW_PRIORITY_1 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash > ?) "\ "ORDER BY hash ASC LIMIT 1" #define SELECT_IT_LOW_PRIORITY_2 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio > ?) "\ "ORDER BY prio ASC, hash ASC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS_1 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\ " ORDER BY hash DESC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS_2 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\ " ORDER BY prio DESC, hash DESC LIMIT 1" #define SELECT_IT_EXPIRATION_TIME_1 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash > ?) "\ " ORDER BY hash ASC LIMIT 1" #define SELECT_IT_EXPIRATION_TIME_2 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ " ORDER BY expire ASC, hash ASC LIMIT 1" #define SELECT_IT_MIGRATION_ORDER_1 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash < ?) "\ " ORDER BY hash DESC LIMIT 1" #define SELECT_IT_MIGRATION_ORDER_2 \ - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ? AND expire > %llu) "\ " ORDER BY expire DESC, hash DESC LIMIT 1" + +#define SELECT_IT_REPLICATION_ORDER \ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ + " ORDER BY repl DESC, Random() LIMIT 1" + + /** * After how many ms "busy" should a DB operation fail for good? * A low value makes sure that we are more responsive to requests @@ -114,6 +120,16 @@ struct Plugin */ sqlite3_stmt *updPrio; + /** + * Precompiled SQL for replication decrement. + */ + sqlite3_stmt *updRepl; + + /** + * Precompiled SQL for replication decrement. + */ + sqlite3_stmt *selRepl; + /** * Precompiled SQL for insertion. */ @@ -173,19 +189,21 @@ create_indices (sqlite3 * dbh) { /* create indices */ sqlite3_exec (dbh, - "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL); + "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL); sqlite3_exec (dbh, - "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL, + "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL, + sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL, + sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL, + sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)", + sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL, + sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL, + NULL, NULL); + sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL, NULL, NULL); } @@ -286,12 +304,12 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, /* We have to do it here, because otherwise precompiling SQL might fail */ CHECK (SQLITE_OK == sq_prepare (plugin->dbh, - "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'", + "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'", &stmt)); if ( (sqlite3_step (stmt) == SQLITE_DONE) && (sqlite3_exec (plugin->dbh, - "CREATE TABLE gn080 (" - " size INT4 NOT NULL DEFAULT 0," + "CREATE TABLE gn090 (" + " repl INT4 NOT NULL DEFAULT 0," " type INT4 NOT NULL DEFAULT 0," " prio INT4 NOT NULL DEFAULT 0," " anonLevel INT4 NOT NULL DEFAULT 0," @@ -329,16 +347,23 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, sqlite3_finalize (stmt); if ((sq_prepare (plugin->dbh, - "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE " + "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE " "_ROWID_ = ?", &plugin->updPrio) != SQLITE_OK) || (sq_prepare (plugin->dbh, - "INSERT INTO gn080 (size, type, prio, " + "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE " + "_ROWID_ = ?", + &plugin->updRepl) != SQLITE_OK) || + (sq_prepare (plugin->dbh, + SELECT_IT_REPLICATION_ORDER, + &plugin->selRepl) != SQLITE_OK) || + (sq_prepare (plugin->dbh, + "INSERT INTO gn090 (repl, type, prio, " "anonLevel, expire, hash, vhash, value) VALUES " "(?, ?, ?, ?, ?, ?, ?, ?)", &plugin->insertContent) != SQLITE_OK) || (sq_prepare (plugin->dbh, - "DELETE FROM gn080 WHERE _ROWID_ = ?", + "DELETE FROM gn090 WHERE _ROWID_ = ?", &plugin->delRow) != SQLITE_OK)) { LOG_SQLITE (plugin, NULL, @@ -367,6 +392,10 @@ database_shutdown (struct Plugin *plugin) sqlite3_finalize (plugin->delRow); if (plugin->updPrio != NULL) sqlite3_finalize (plugin->updPrio); + if (plugin->updRepl != NULL) + sqlite3_finalize (plugin->updRepl); + if (plugin->selRepl != NULL) + sqlite3_finalize (plugin->selRepl); if (plugin->insertContent != NULL) sqlite3_finalize (plugin->insertContent); result = sqlite3_close(plugin->dbh); @@ -415,7 +444,6 @@ static int delete_by_rowid (struct Plugin* plugin, unsigned long long rid) { - sqlite3_bind_int64 (plugin->delRow, 1, rid); if (SQLITE_DONE != sqlite3_step (plugin->delRow)) { @@ -564,11 +592,11 @@ sqlite_next_request_cont (void *cls, return; } - rowid = sqlite3_column_int64 (nc->stmt, 7); + rowid = sqlite3_column_int64 (nc->stmt, 6); nc->last_rowid = rowid; - type = sqlite3_column_int (nc->stmt, 1); - size = sqlite3_column_bytes (nc->stmt, 6); - if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode)) + type = sqlite3_column_int (nc->stmt, 0); + size = sqlite3_column_bytes (nc->stmt, 5); + if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode)) { GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "sqlite", @@ -579,7 +607,7 @@ sqlite_next_request_cont (void *cls, GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); if (sq_prepare (nc->plugin->dbh, - "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?", + "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?", &stmtd) != SQLITE_OK) { LOG_SQLITE (nc->plugin, NULL, @@ -604,14 +632,14 @@ sqlite_next_request_cont (void *cls, goto END; } - priority = sqlite3_column_int (nc->stmt, 2); - anonymity = sqlite3_column_int (nc->stmt, 3); - expiration.abs_value = sqlite3_column_int64 (nc->stmt, 4); - key = sqlite3_column_blob (nc->stmt, 5); + priority = sqlite3_column_int (nc->stmt, 1); + anonymity = sqlite3_column_int (nc->stmt, 2); + expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3); + key = sqlite3_column_blob (nc->stmt, 4); nc->lastPriority = priority; nc->lastExpiration = expiration; memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode)); - data = sqlite3_column_blob (nc->stmt, 6); + data = sqlite3_column_blob (nc->stmt, 5); nc->count++; ret = nc->iter (nc->iter_cls, nc, @@ -678,7 +706,6 @@ sqlite_next_request (void *next_cls, } - /** * Store an item in the datastore. * @@ -723,7 +750,7 @@ sqlite_plugin_put (void *cls, #endif GNUNET_CRYPTO_hash (data, size, &vhash); stmt = plugin->insertContent; - if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) || + if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) || (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) || (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) || (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) || @@ -1282,7 +1309,7 @@ sqlite_plugin_iter_all_now (void *cls, sqlite3_stmt *stmt; if (sq_prepare (plugin->dbh, - "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080", + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090", &stmt) != SQLITE_OK) { LOG_SQLITE (plugin, NULL, @@ -1443,7 +1470,7 @@ sqlite_plugin_get (void *cls, return; } GNUNET_snprintf (scratch, sizeof (scratch), - "SELECT count(*) FROM gn080 WHERE hash=:1%s%s", + "SELECT count(*) FROM gn090 WHERE hash=:1%s%s", vhash == NULL ? "" : " AND vhash=:2", type == 0 ? "" : (vhash == NULL) ? " AND type=:2" : " AND type=:3"); @@ -1495,8 +1522,8 @@ sqlite_plugin_get (void *cls, } GNUNET_snprintf (scratch, sizeof (scratch), - "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ " - "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d " + "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ " + "FROM gn090 WHERE hash=:1%s%s AND _ROWID_ >= :%d " "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d", vhash == NULL ? "" : " AND vhash=:2", type == 0 ? "" : (vhash == @@ -1547,8 +1574,84 @@ static void sqlite_plugin_replication_get (void *cls, PluginIterator iter, void *iter_cls) { - /* FIXME: not implemented! */ - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + struct Plugin *plugin = cls; + int n; + sqlite3_stmt *stmt; + struct GNUNET_TIME_Absolute expiration; + unsigned long long rowid; + +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Getting random block based on replication order.\n"); +#endif + stmt = plugin->selRepl; + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value)) + { + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; + } + n = sqlite3_step (stmt); + switch (n) + { + case SQLITE_ROW: + rowid = sqlite3_column_int64 (stmt, 6); + if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, + "sqlite", + _("Invalid data in database. Trying to fix (by deletion).\n")); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + delete_by_rowid (plugin, rowid); + break; + } + expiration.abs_value = sqlite3_column_int64 (stmt, 3); + (void) iter (iter_cls, + NULL, + sqlite3_column_blob (stmt, 4) /* key */, + sqlite3_column_bytes (stmt, 5) /* size of data */, + sqlite3_column_blob (stmt, 5) /* data */, + sqlite3_column_int (stmt, 0) /* type */, + sqlite3_column_int (stmt, 1) /* priority */, + sqlite3_column_int (stmt, 2) /* anonymity */, + expiration, + rowid); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + return; + case SQLITE_DONE: + /* database must be empty */ + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + break; + case SQLITE_BUSY: + case SQLITE_ERROR: + case SQLITE_MISUSE: + default: + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + (void) sqlite3_reset (stmt); + GNUNET_break (0); + database_shutdown (plugin); + database_setup (plugin->env->cfg, + plugin); + break; + } + iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); } -- 2.25.1