/*
This file is part of GNUnet
- (C) 2009 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
#define SELECT_IT_LOW_PRIORITY_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash > ?) "\
"ORDER BY hash ASC LIMIT 1"
#define SELECT_IT_LOW_PRIORITY_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio > ?) "\
"ORDER BY prio ASC, hash ASC LIMIT 1"
#define SELECT_IT_NON_ANONYMOUS_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
" ORDER BY hash DESC LIMIT 1"
#define SELECT_IT_NON_ANONYMOUS_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
" ORDER BY prio DESC, hash DESC LIMIT 1"
#define SELECT_IT_EXPIRATION_TIME_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash > ?) "\
" ORDER BY hash ASC LIMIT 1"
#define SELECT_IT_EXPIRATION_TIME_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\
" ORDER BY expire ASC, hash ASC LIMIT 1"
#define SELECT_IT_MIGRATION_ORDER_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash < ?) "\
" ORDER BY hash DESC LIMIT 1"
#define SELECT_IT_MIGRATION_ORDER_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ? AND expire > %llu) "\
" ORDER BY expire DESC, hash DESC LIMIT 1"
+
+#define SELECT_IT_REPLICATION_ORDER \
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\
+ " ORDER BY repl DESC, Random() LIMIT 1"
+
+
/**
* After how many ms "busy" should a DB operation fail for good?
* A low value makes sure that we are more responsive to requests
*/
sqlite3_stmt *updPrio;
+ /**
+ * Precompiled SQL for replication decrement.
+ */
+ sqlite3_stmt *updRepl;
+
+ /**
+ * Precompiled SQL for replication decrement.
+ */
+ sqlite3_stmt *selRepl;
+
/**
* Precompiled SQL for insertion.
*/
{
/* create indices */
sqlite3_exec (dbh,
- "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
+ "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
sqlite3_exec (dbh,
- "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
+ "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
NULL, NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
+ sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
+ sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL,
NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
+ sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL,
NULL, NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
+ sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
NULL, NULL, NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
+ sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL,
+ NULL, NULL);
+ sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL,
NULL, NULL);
}
/* We have to do it here, because otherwise precompiling SQL might fail */
CHECK (SQLITE_OK ==
sq_prepare (plugin->dbh,
- "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
+ "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
&stmt));
if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
(sqlite3_exec (plugin->dbh,
- "CREATE TABLE gn080 ("
- " size INT4 NOT NULL DEFAULT 0,"
+ "CREATE TABLE gn090 ("
+ " repl INT4 NOT NULL DEFAULT 0,"
" type INT4 NOT NULL DEFAULT 0,"
" prio INT4 NOT NULL DEFAULT 0,"
" anonLevel INT4 NOT NULL DEFAULT 0,"
sqlite3_finalize (stmt);
if ((sq_prepare (plugin->dbh,
- "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
+ "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
"_ROWID_ = ?",
&plugin->updPrio) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
- "INSERT INTO gn080 (size, type, prio, "
+ "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE "
+ "_ROWID_ = ?",
+ &plugin->updRepl) != SQLITE_OK) ||
+ (sq_prepare (plugin->dbh,
+ SELECT_IT_REPLICATION_ORDER,
+ &plugin->selRepl) != SQLITE_OK) ||
+ (sq_prepare (plugin->dbh,
+ "INSERT INTO gn090 (repl, type, prio, "
"anonLevel, expire, hash, vhash, value) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?)",
&plugin->insertContent) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
- "DELETE FROM gn080 WHERE _ROWID_ = ?",
+ "DELETE FROM gn090 WHERE _ROWID_ = ?",
&plugin->delRow) != SQLITE_OK))
{
LOG_SQLITE (plugin, NULL,
sqlite3_finalize (plugin->delRow);
if (plugin->updPrio != NULL)
sqlite3_finalize (plugin->updPrio);
+ if (plugin->updRepl != NULL)
+ sqlite3_finalize (plugin->updRepl);
+ if (plugin->selRepl != NULL)
+ sqlite3_finalize (plugin->selRepl);
if (plugin->insertContent != NULL)
sqlite3_finalize (plugin->insertContent);
result = sqlite3_close(plugin->dbh);
delete_by_rowid (struct Plugin* plugin,
unsigned long long rid)
{
-
sqlite3_bind_int64 (plugin->delRow, 1, rid);
if (SQLITE_DONE != sqlite3_step (plugin->delRow))
{
return;
}
- rowid = sqlite3_column_int64 (nc->stmt, 7);
+ rowid = sqlite3_column_int64 (nc->stmt, 6);
nc->last_rowid = rowid;
- type = sqlite3_column_int (nc->stmt, 1);
- size = sqlite3_column_bytes (nc->stmt, 6);
- if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
+ type = sqlite3_column_int (nc->stmt, 0);
+ size = sqlite3_column_bytes (nc->stmt, 5);
+ if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode))
{
GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
"sqlite",
GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
if (sq_prepare
(nc->plugin->dbh,
- "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
+ "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?",
&stmtd) != SQLITE_OK)
{
LOG_SQLITE (nc->plugin, NULL,
goto END;
}
- priority = sqlite3_column_int (nc->stmt, 2);
- anonymity = sqlite3_column_int (nc->stmt, 3);
- expiration.abs_value = sqlite3_column_int64 (nc->stmt, 4);
- key = sqlite3_column_blob (nc->stmt, 5);
+ priority = sqlite3_column_int (nc->stmt, 1);
+ anonymity = sqlite3_column_int (nc->stmt, 2);
+ expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
+ key = sqlite3_column_blob (nc->stmt, 4);
nc->lastPriority = priority;
nc->lastExpiration = expiration;
memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
- data = sqlite3_column_blob (nc->stmt, 6);
+ data = sqlite3_column_blob (nc->stmt, 5);
nc->count++;
ret = nc->iter (nc->iter_cls,
nc,
}
-
/**
* Store an item in the datastore.
*
#endif
GNUNET_CRYPTO_hash (data, size, &vhash);
stmt = plugin->insertContent;
- if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
+ if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
(SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
(SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
(SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
sqlite3_stmt *stmt;
if (sq_prepare (plugin->dbh,
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090",
&stmt) != SQLITE_OK)
{
LOG_SQLITE (plugin, NULL,
return;
}
GNUNET_snprintf (scratch, sizeof (scratch),
- "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
+ "SELECT count(*) FROM gn090 WHERE hash=:1%s%s",
vhash == NULL ? "" : " AND vhash=:2",
type == 0 ? "" : (vhash ==
NULL) ? " AND type=:2" : " AND type=:3");
}
GNUNET_snprintf (scratch, sizeof (scratch),
- "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
- "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
+ "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
+ "FROM gn090 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
"ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
vhash == NULL ? "" : " AND vhash=:2",
type == 0 ? "" : (vhash ==
sqlite_plugin_replication_get (void *cls,
PluginIterator iter, void *iter_cls)
{
- /* FIXME: not implemented! */
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
+ struct Plugin *plugin = cls;
+ int n;
+ sqlite3_stmt *stmt;
+ struct GNUNET_TIME_Absolute expiration;
+ unsigned long long rowid;
+
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Getting random block based on replication order.\n");
+#endif
+ stmt = plugin->selRepl;
+ if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value))
+ {
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ n = sqlite3_step (stmt);
+ switch (n)
+ {
+ case SQLITE_ROW:
+ rowid = sqlite3_column_int64 (stmt, 6);
+ if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
+ "sqlite",
+ _("Invalid data in database. Trying to fix (by deletion).\n"));
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ delete_by_rowid (plugin, rowid);
+ break;
+ }
+ expiration.abs_value = sqlite3_column_int64 (stmt, 3);
+ (void) iter (iter_cls,
+ NULL,
+ sqlite3_column_blob (stmt, 4) /* key */,
+ sqlite3_column_bytes (stmt, 5) /* size of data */,
+ sqlite3_column_blob (stmt, 5) /* data */,
+ sqlite3_column_int (stmt, 0) /* type */,
+ sqlite3_column_int (stmt, 1) /* priority */,
+ sqlite3_column_int (stmt, 2) /* anonymity */,
+ expiration,
+ rowid);
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ return;
+ case SQLITE_DONE:
+ /* database must be empty */
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ break;
+ case SQLITE_BUSY:
+ case SQLITE_ERROR:
+ case SQLITE_MISUSE:
+ default:
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ (void) sqlite3_reset (stmt);
+ GNUNET_break (0);
+ database_shutdown (plugin);
+ database_setup (plugin->env->cfg,
+ plugin);
+ break;
+ }
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
}