GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
+ by the Free Software Foundation; either version 3, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
*/
#include "platform.h"
-#include "gnunet_statistics_service.h"
-#include "plugin_datastore.h"
+#include "gnunet_datastore_plugin.h"
#include <sqlite3.h>
-#define DEBUG_SQLITE GNUNET_YES
-
-/**
- * After how many payload-changing operations
- * do we sync our statistics?
- */
-#define MAX_STAT_SYNC_LAG 50
-
-#define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
+#define DEBUG_SQLITE GNUNET_NO
-/**
- * Die with an error message that indicates
- * a failure of the command 'cmd' with the message given
- * by strerror(errno).
- */
-#define DIE_SQLITE(db, cmd) do { GNUNET_log_from(GNUNET_ERROR_TYPE_ERROR, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); abort(); } while(0)
-
/**
* Log an error message at log-level 'level' that indicates
* a failure of the command 'cmd' on file 'filename'
* with the message given by strerror(errno).
*/
-#define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s\n"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
+#define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
#define SELECT_IT_LOW_PRIORITY_1 \
"SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
#define BUSY_TIMEOUT_MS 250
+
/**
* Context for all functions in this plugin.
*/
*/
sqlite3 *dbh;
+ /**
+ * Precompiled SQL for deletion.
+ */
+ sqlite3_stmt *delRow;
+
/**
* Precompiled SQL for update.
*/
sqlite3_stmt *insertContent;
/**
- * Handle to the statistics service.
- */
- struct GNUNET_STATISTICS_Handle *statistics;
-
- /**
- * How much data are we currently storing
- * in the database?
+ * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
*/
- unsigned long long payload;
+ struct NextContext *next_task_nc;
/**
- * Number of updates that were made to the
- * payload value since we last synchronized
- * it with the statistics service.
+ * Pending task with scheduler for running the next request.
*/
- unsigned int lastSync;
+ GNUNET_SCHEDULER_TaskIdentifier next_task;
/**
* Should the database be dropped on shutdown?
*/
int drop_on_shutdown;
+
};
/**
* @brief Prepare a SQL statement
*
+ * @param dbh handle to the database
* @param zSql SQL statement, UTF-8 encoded
+ * @param ppStmt set to the prepared statement
+ * @return 0 on success
*/
static int
sq_prepare (sqlite3 * dbh, const char *zSql,
sqlite3_stmt ** ppStmt)
{
char *dummy;
- return sqlite3_prepare (dbh,
- zSql,
- strlen (zSql), ppStmt, (const char **) &dummy);
+ int result;
+ result = sqlite3_prepare_v2 (dbh,
+ zSql,
+ strlen (zSql), ppStmt, (const char **) &dummy);
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Prepared %p: %d\n", *ppStmt, result);
+#endif
+ return result;
}
/**
* Create our database indices.
+ *
+ * @param dbh handle to the database
*/
static void
create_indices (sqlite3 * dbh)
-#if 1
+#if 0
#define CHECK(a) GNUNET_break(a)
#define ENULL NULL
#else
#define ENULL &e
#define ENULL_DEFINED 1
-#define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
+#define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
#endif
* data structures (create tables and indices
* as needed as well).
*
+ * @param cfg our configuration
+ * @param plugin the plugin context (state for this module)
* @return GNUNET_OK on success
*/
static int
-database_setup (struct GNUNET_CONFIGURATION_Handle *cfg,
+database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
struct Plugin *plugin)
{
sqlite3_stmt *stmt;
"datastore-sqlite");
return GNUNET_SYSERR;
}
- if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
+ if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
{
- GNUNET_break (0);
- GNUNET_free (afsdir);
- return GNUNET_SYSERR;
+ if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
+ {
+ GNUNET_break (0);
+ GNUNET_free (afsdir);
+ return GNUNET_SYSERR;
+ }
+ /* database is new or got deleted, reset payload to zero! */
+ plugin->env->duc (plugin->env->cls, 0);
}
- plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
#ifdef ENABLE_NLS
- nl_langinfo (CODESET)
+ plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
+ nl_langinfo (CODESET));
#else
- "UTF-8" /* good luck */
+ plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
+ "UTF-8"); /* good luck */
#endif
- );
GNUNET_free (afsdir);
/* Open database and precompile statements */
CHECK (SQLITE_OK ==
sqlite3_exec (plugin->dbh,
"PRAGMA synchronous=OFF", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
CHECK (SQLITE_OK ==
sqlite3_exec (plugin->dbh,
"PRAGMA count_changes=OFF", NULL, NULL, ENULL));
CHECK (SQLITE_OK ==
- sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
+ sqlite3_exec (plugin->dbh,
+ "PRAGMA page_size=4092", NULL, NULL, ENULL));
CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
(sqlite3_exec (plugin->dbh,
"CREATE TABLE gn080 ("
- " size INTEGER NOT NULL DEFAULT 0,"
- " type INTEGER NOT NULL DEFAULT 0,"
- " prio INTEGER NOT NULL DEFAULT 0,"
- " anonLevel INTEGER NOT NULL DEFAULT 0,"
- " expire INTEGER NOT NULL DEFAULT 0,"
+ " size INT4 NOT NULL DEFAULT 0,"
+ " type INT4 NOT NULL DEFAULT 0,"
+ " prio INT4 NOT NULL DEFAULT 0,"
+ " anonLevel INT4 NOT NULL DEFAULT 0,"
+ " expire INT8 NOT NULL DEFAULT 0,"
" hash TEXT NOT NULL DEFAULT '',"
" vhash TEXT NOT NULL DEFAULT '',"
" value BLOB NOT NULL DEFAULT '')", NULL, NULL,
"INSERT INTO gn080 (size, type, prio, "
"anonLevel, expire, hash, vhash, value) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?)",
- &plugin->insertContent) != SQLITE_OK))
+ &plugin->insertContent) != SQLITE_OK) ||
+ (sq_prepare (plugin->dbh,
+ "DELETE FROM gn080 WHERE _ROWID_ = ?",
+ &plugin->delRow) != SQLITE_OK))
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR, "precompiling");
return GNUNET_SYSERR;
}
- return GNUNET_OK;
-}
-
-/**
- * Synchronize our utilization statistics with the
- * statistics service.
- */
-static void
-sync_stats (struct Plugin *plugin)
-{
- GNUNET_STATISTICS_set (plugin->statistics,
- QUOTA_STAT_NAME,
- plugin->payload,
- GNUNET_YES);
- plugin->lastSync = 0;
+ return GNUNET_OK;
}
/**
* Shutdown database connection and associate data
* structures.
+ * @param plugin the plugin context (state for this module)
*/
static void
database_shutdown (struct Plugin *plugin)
{
- if (plugin->lastSync > 0)
- sync_stats (plugin);
+ int result;
+ sqlite3_stmt *stmt;
+ stmt = NULL;
+
+ if (plugin->delRow != NULL)
+ sqlite3_finalize (plugin->delRow);
if (plugin->updPrio != NULL)
sqlite3_finalize (plugin->updPrio);
if (plugin->insertContent != NULL)
sqlite3_finalize (plugin->insertContent);
- sqlite3_close (plugin->dbh);
- GNUNET_free_non_null (plugin->fn);
-}
-
+ result = sqlite3_close(plugin->dbh);
+#if SQLITE_VERSION_NUMBER >= 3007000
+ if (result == SQLITE_BUSY)
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
+ "sqlite",
+ _("Tried to close sqlite without finalizing all prepared statements.\n"));
+ stmt = sqlite3_next_stmt(plugin->dbh, NULL);
+ while (stmt != NULL)
+ {
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite", "Closing statement %p\n", stmt);
+#endif
+ result = sqlite3_finalize(stmt);
+#if DEBUG_SQLITE
+ if (result != SQLITE_OK)
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Failed to close statement %p: %d\n", stmt, result);
+#endif
+ stmt = sqlite3_next_stmt(plugin->dbh, NULL);
+ }
+ result = sqlite3_close(plugin->dbh);
+ }
+#endif
+ if (SQLITE_OK != result)
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR,
+ "sqlite3_close");
-/**
- * Get an estimate of how much space the database is
- * currently using.
- * @return number of bytes used on disk
- */
-static unsigned long long sqlite_plugin_get_size (void *cls)
-{
- struct Plugin *plugin = cls;
- return plugin->payload;
+ GNUNET_free_non_null (plugin->fn);
}
/**
* Delete the database entry with the given
* row identifier.
+ *
+ * @param plugin the plugin context (state for this module)
+ * @param rid the ID of the row to delete
*/
static int
delete_by_rowid (struct Plugin* plugin,
unsigned long long rid)
{
- sqlite3_stmt *stmt;
- if (sq_prepare (plugin->dbh,
- "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
+ sqlite3_bind_int64 (plugin->delRow, 1, rid);
+ if (SQLITE_DONE != sqlite3_step (plugin->delRow))
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sq_prepare");
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
+ if (SQLITE_OK != sqlite3_reset (plugin->delRow))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
return GNUNET_SYSERR;
}
- sqlite3_bind_int64 (stmt, 1, rid);
- if (SQLITE_DONE != sqlite3_step (stmt))
- {
+ if (SQLITE_OK != sqlite3_reset (plugin->delRow))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
- sqlite3_finalize (stmt);
- return GNUNET_SYSERR;
- }
- sqlite3_finalize (stmt);
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
return GNUNET_OK;
}
* call which gives the callback a chance to
* clean up the closure
* @return GNUNET_OK on success, GNUNET_NO if there are
- * no more values, GNUNET_SYSERR on error
+ * no more values, GNUNET_SYSERR on error
*/
typedef int (*PrepareFunction)(void *cls,
struct NextContext *nc);
*/
unsigned long long last_rowid;
+ /**
+ * Key of the last result.
+ */
+ GNUNET_HashCode lastKey;
+
/**
* Expiration time of the last value visited.
*/
/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
+ * Continuation of "sqlite_next_request".
*
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
+ * @param cls the next context
+ * @param tc the task context (unused)
*/
static void
-sqlite_next_request (void *next_cls,
- int end_it)
+sqlite_next_request_cont (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- static struct GNUNET_TIME_Absolute zero;
- struct NextContext * nc= next_cls;
+ struct NextContext * nc = cls;
struct Plugin *plugin;
unsigned long long rowid;
sqlite3_stmt *stmtd;
struct GNUNET_TIME_Absolute expiration;
const GNUNET_HashCode *key;
const void *data;
-
+
plugin = nc->plugin;
- sqlite3_reset (nc->stmt);
- if ( (GNUNET_YES == end_it) ||
- (GNUNET_YES == nc->end_it) ||
+ plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+ plugin->next_task_nc = NULL;
+ if ( (GNUNET_YES == nc->end_it) ||
(GNUNET_OK != (nc->prep(nc->prep_cls,
- nc))) ||
- (SQLITE_ROW != sqlite3_step (nc->stmt)) )
+ nc))) )
{
END:
nc->iter (nc->iter_cls,
NULL, NULL, 0, NULL, 0, 0, 0,
- zero, 0);
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
nc->prep (nc->prep_cls, NULL);
GNUNET_free (nc);
return;
}
priority = sqlite3_column_int (nc->stmt, 2);
- nc->lastPriority = priority;
anonymity = sqlite3_column_int (nc->stmt, 3);
- expiration.value = sqlite3_column_int64 (nc->stmt, 4);
- nc->lastExpiration = expiration;
+ expiration.abs_value = sqlite3_column_int64 (nc->stmt, 4);
key = sqlite3_column_blob (nc->stmt, 5);
+ nc->lastPriority = priority;
+ nc->lastExpiration = expiration;
+ memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
data = sqlite3_column_blob (nc->stmt, 6);
nc->count++;
ret = nc->iter (nc->iter_cls,
nc->end_it = GNUNET_YES;
return;
}
+#if DEBUG_SQLITE
+ if (ret == GNUNET_NO)
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Asked to remove entry %llu (%u bytes)\n",
+ (unsigned long long) rowid,
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+#endif
if ( (ret == GNUNET_NO) &&
(GNUNET_OK == delete_by_rowid (plugin, rowid)) )
{
- plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
- plugin->lastSync++;
- if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
- sync_stats (plugin);
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Removed entry %llu (%u bytes)\n",
+ (unsigned long long) rowid,
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+#endif
}
}
+/**
+ * Function invoked on behalf of a "PluginIterator"
+ * asking the database plugin to call the iterator
+ * with the next item.
+ *
+ * @param next_cls whatever argument was given
+ * to the PluginIterator as "next_cls".
+ * @param end_it set to GNUNET_YES if we
+ * should terminate the iteration early
+ * (iterator should be still called once more
+ * to signal the end of the iteration).
+ */
+static void
+sqlite_next_request (void *next_cls,
+ int end_it)
+{
+ struct NextContext * nc= next_cls;
+
+ if (GNUNET_YES == end_it)
+ nc->end_it = GNUNET_YES;
+ nc->plugin->next_task_nc = nc;
+ nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
+ nc);
+}
+
+
+
/**
* Store an item in the datastore.
*
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute expiration,
#if DEBUG_SQLITE
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
- "Storing in database block with type %u/key `%s'/priority %u/expiration %llu.\n",
+ "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
type,
GNUNET_h2s(key),
priority,
- GNUNET_TIME_absolute_get_remaining (expiration).value);
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
+ (long long) expiration.abs_value);
#endif
GNUNET_CRYPTO_hash (data, size, &vhash);
stmt = plugin->insertContent;
(SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
(SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
(SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
- (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.value)) ||
+ (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
(SQLITE_OK !=
sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
SQLITE_TRANSIENT)) ||
return GNUNET_SYSERR;
}
n = sqlite3_step (stmt);
- if (n != SQLITE_DONE)
+ if (n != SQLITE_DONE)
{
if (n == SQLITE_BUSY)
{
LOG_SQLITE (plugin, msg,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
sqlite3_reset (stmt);
+ database_shutdown (plugin);
+ database_setup (plugin->env->cfg,
+ plugin);
return GNUNET_SYSERR;
}
if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- plugin->lastSync++;
- plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
- if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
- sync_stats (plugin);
+ plugin->env->duc (plugin->env->cls,
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Stored new entry (%u bytes)\n",
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+#endif
return GNUNET_OK;
}
* Note that it is possible for multiple values to match this put.
* In that case, all of the respective values are updated.
*
+ * @param cls the plugin context (state for this module)
* @param uid unique identifier of the datum
* @param delta by how much should the priority
* change? If priority + delta < 0 the
int n;
sqlite3_bind_int (plugin->updPrio, 1, delta);
- sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
+ sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
sqlite3_bind_int64 (plugin->updPrio, 3, uid);
n = sqlite3_step (plugin->updPrio);
- if (n != SQLITE_OK)
+ if (n != SQLITE_DONE)
LOG_SQLITE (plugin, msg,
GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
if (n == SQLITE_BUSY)
return GNUNET_NO;
- return n == SQLITE_OK ? GNUNET_OK : GNUNET_SYSERR;
+ return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
}
+/**
+ * Internal context for an iteration.
+ */
struct IterContext
{
+ /**
+ * FIXME.
+ */
sqlite3_stmt *stmt_1;
+
+ /**
+ * FIXME.
+ */
sqlite3_stmt *stmt_2;
+
+ /**
+ * FIXME.
+ */
int is_asc;
+
+ /**
+ * FIXME.
+ */
int is_prio;
+
+ /**
+ * FIXME.
+ */
int is_migr;
+
+ /**
+ * FIXME.
+ */
int limit_nonanonymous;
- uint32_t type;
- GNUNET_HashCode key;
+
+ /**
+ * Desired type for blocks returned by this iterator.
+ */
+ enum GNUNET_BLOCK_Type type;
};
+/**
+ * Prepare our SQL query to obtain the next record from the database.
+ *
+ * @param cls our "struct IterContext"
+ * @param nc NULL to terminate the iteration, otherwise our context for
+ * getting the next result.
+ * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
+ * GNUNET_SYSERR on error (or end of iteration)
+ */
static int
iter_next_prepare (void *cls,
struct NextContext *nc)
if (nc == NULL)
{
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to clean up iterator state.\n");
+#endif
sqlite3_finalize (ic->stmt_1);
sqlite3_finalize (ic->stmt_2);
return GNUNET_SYSERR;
}
+ sqlite3_reset (ic->stmt_1);
+ sqlite3_reset (ic->stmt_2);
plugin = nc->plugin;
if (ic->is_prio)
{
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Restricting to results larger than the last priority %u\n",
+ nc->lastPriority);
+#endif
sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
}
else
{
- sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
- sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Restricting to results larger than the last expiration %llu\n",
+ (unsigned long long) nc->lastExpiration.abs_value);
+#endif
+ sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value);
+ sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value);
}
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Restricting to results larger than the last key `%s'\n",
+ GNUNET_h2s(&nc->lastKey));
+#endif
sqlite3_bind_blob (ic->stmt_1, 2,
- &ic->key,
+ &nc->lastKey,
sizeof (GNUNET_HashCode),
SQLITE_TRANSIENT);
if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
- {
+ {
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Result found using iterator 1\n");
+#endif
nc->stmt = ic->stmt_1;
return GNUNET_OK;
}
"sqlite3_reset");
if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2)))
{
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Result found using iterator 2\n");
+#endif
nc->stmt = ic->stmt_2;
return GNUNET_OK;
}
GNUNET_ERROR_TYPE_ERROR |
GNUNET_ERROR_TYPE_BULK,
"sqlite3_reset");
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No result found using either iterator\n");
+#endif
return GNUNET_NO;
}
* Call a method for each key in the database and
* call the callback method on it.
*
+ * @param plugin our plugin context
* @param type entries of which type should be considered?
+ * @param is_asc are we iterating in ascending order?
+ * @param is_prio are we iterating by priority (otherwise by expiration)
+ * @param is_migr are we iterating in migration order?
+ * @param limit_nonanonymous are we restricting results to those with anonymity
+ * level zero?
+ * @param stmt_str_1 first SQL statement to execute
+ * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
- * @return the number of results processed,
- * GNUNET_SYSERR on error
*/
static void
basic_iter (struct Plugin *plugin,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
int is_asc,
int is_prio,
int is_migr,
PluginIterator iter,
void *iter_cls)
{
- static struct GNUNET_TIME_Absolute zero;
struct NextContext *nc;
struct IterContext *ic;
sqlite3_stmt *stmt_1;
sqlite3_stmt *stmt_2;
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "At %llu, using queries `%s' and `%s'\n",
+ (unsigned long long) GNUNET_TIME_absolute_get ().abs_value,
+ stmt_str_1,
+ stmt_str_2);
+#endif
if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
sqlite3_finalize (stmt_1);
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
nc = GNUNET_malloc (sizeof(struct NextContext) +
if (is_asc)
{
nc->lastPriority = 0;
- nc->lastExpiration.value = 0;
- memset (&ic->key, 0, sizeof (GNUNET_HashCode));
+ nc->lastExpiration.abs_value = 0;
+ memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
}
else
{
nc->lastPriority = 0x7FFFFFFF;
- nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
- memset (&ic->key, 255, sizeof (GNUNET_HashCode));
+ nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL;
+ memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
}
sqlite_next_request (nc, GNUNET_NO);
}
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*
+ * @param cls our plugin context
* @param type entries of which type should be considered?
* Use 0 for any type.
* @param iter function to call on each matching value;
*/
static void
sqlite_plugin_iter_low_priority (void *cls,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
PluginIterator iter,
void *iter_cls)
{
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*
+ * @param cls our plugin context
* @param type entries of which type should be considered?
* Use 0 for any type.
* @param iter function to call on each matching value;
*/
static void
sqlite_plugin_iter_zero_anonymity (void *cls,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
PluginIterator iter,
void *iter_cls)
{
char *q2;
now = GNUNET_TIME_absolute_get ();
- GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
- now.value);
- GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
- now.value);
+ GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
+ (unsigned long long) now.abs_value);
+ GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
+ (unsigned long long) now.abs_value);
basic_iter (cls,
type,
GNUNET_NO, GNUNET_YES,
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*
+ * @param cls our plugin context
* @param type entries of which type should be considered?
* Use 0 for any type.
* @param iter function to call on each matching value;
*/
static void
sqlite_plugin_iter_ascending_expiration (void *cls,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
PluginIterator iter,
void *iter_cls)
{
now = GNUNET_TIME_absolute_get ();
GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
- now.value);
+ (unsigned long long) 0*now.abs_value);
GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
- now.value);
+ (unsigned long long) 0*now.abs_value);
basic_iter (cls,
type,
GNUNET_YES, GNUNET_NO,
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*
+ * @param cls our plugin context
* @param type entries of which type should be considered?
* Use 0 for any type.
* @param iter function to call on each matching value;
*/
static void
sqlite_plugin_iter_migration_order (void *cls,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
PluginIterator iter,
void *iter_cls)
{
now = GNUNET_TIME_absolute_get ();
GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
- now.value);
+ (unsigned long long) now.abs_value);
basic_iter (cls,
type,
GNUNET_NO, GNUNET_NO,
}
+/**
+ * Call sqlite using the already prepared query to get
+ * the next result.
+ *
+ * @param cls context with the prepared query
+ * @param nc context with the prepared query
+ * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
+ * there are no more results
+ */
+static int
+all_next_prepare (void *cls,
+ struct NextContext *nc)
+{
+ struct Plugin *plugin;
+ int ret;
+
+ if (nc == NULL)
+ {
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to clean up iterator state.\n");
+#endif
+ nc = (struct NextContext *)cls;
+ if (nc->stmt)
+ sqlite3_finalize (nc->stmt);
+ nc->stmt = NULL;
+ return GNUNET_SYSERR;
+ }
+ plugin = nc->plugin;
+ if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
+ {
+ return GNUNET_OK;
+ }
+ if (ret != SQLITE_DONE)
+ {
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_NO;
+}
+
/**
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*
+ * @param cls our plugin context
* @param type entries of which type should be considered?
* Use 0 for any type.
* @param iter function to call on each matching value;
*/
static void
sqlite_plugin_iter_all_now (void *cls,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
PluginIterator iter,
void *iter_cls)
{
- static struct GNUNET_TIME_Absolute zero;
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ struct Plugin *plugin = cls;
+ struct NextContext *nc;
+ sqlite3_stmt *stmt;
+
+ if (sq_prepare (plugin->dbh,
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
+ &stmt) != SQLITE_OK)
+ {
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ nc = GNUNET_malloc (sizeof(struct NextContext));
+ nc->plugin = plugin;
+ nc->iter = iter;
+ nc->iter_cls = iter_cls;
+ nc->stmt = stmt;
+ nc->prep = &all_next_prepare;
+ nc->prep_cls = nc;
+ sqlite_next_request (nc, GNUNET_NO);
}
+/**
+ * FIXME.
+ */
struct GetNextContext
{
+
+ /**
+ * FIXME.
+ */
int total;
+
+ /**
+ * FIXME.
+ */
int off;
+
+ /**
+ * FIXME.
+ */
int have_vhash;
+
+ /**
+ * FIXME.
+ */
unsigned int type;
+
+ /**
+ * FIXME.
+ */
sqlite3_stmt *stmt;
+
+ /**
+ * FIXME.
+ */
GNUNET_HashCode key;
+
+ /**
+ * FIXME.
+ */
GNUNET_HashCode vhash;
};
+
+/**
+ * FIXME.
+ *
+ * @param cls our "struct GetNextContext*"
+ * @param nc FIXME
+ * @return GNUNET_YES if there are more results,
+ * GNUNET_NO if there are no more results,
+ * GNUNET_SYSERR on internal error
+ */
static int
get_next_prepare (void *cls,
struct NextContext *nc)
else
limit_off = 0;
sqoff = 1;
+ sqlite3_reset (nc->stmt);
ret = sqlite3_bind_blob (nc->stmt,
sqoff++,
&gnc->key,
sqlite_plugin_get (void *cls,
const GNUNET_HashCode * key,
const GNUNET_HashCode * vhash,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
PluginIterator iter, void *iter_cls)
{
- static struct GNUNET_TIME_Absolute zero;
struct Plugin *plugin = cls;
struct GetNextContext *gpc;
struct NextContext *nc;
sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
return;
}
- GNUNET_snprintf (scratch, 256,
+ GNUNET_snprintf (scratch, sizeof (scratch),
"SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
vhash == NULL ? "" : " AND vhash=:2",
type == 0 ? "" : (vhash ==
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
sqoff = 1;
GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
sqlite3_reset (stmt);
sqlite3_finalize (stmt);
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
ret = sqlite3_step (stmt);
"sqlite_step");
sqlite3_reset (stmt);
sqlite3_finalize (stmt);
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
total = sqlite3_column_int (stmt, 0);
sqlite3_finalize (stmt);
if (0 == total)
{
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- GNUNET_snprintf (scratch, 256,
+ GNUNET_snprintf (scratch, sizeof (scratch),
"SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
"FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
"ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
nc = GNUNET_malloc (sizeof(struct NextContext) +
/**
* Drop database.
+ *
+ * @param cls our plugin context
*/
static void
sqlite_plugin_drop (void *cls)
}
-/**
- * Callback function to process statistic values.
- *
- * @param cls closure
- * @param subsystem name of subsystem that created the statistic
- * @param name the name of the datum
- * @param value the current value
- * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
- * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
- */
-static int
-process_stat_in (void *cls,
- const char *subsystem,
- const char *name,
- unsigned long long value,
- int is_persistent)
+static unsigned long long
+sqlite_plugin_get_size (void *cls)
{
struct Plugin *plugin = cls;
- plugin->payload += value;
- return GNUNET_OK;
+ sqlite3_stmt *stmt;
+ uint64_t pages;
+ uint64_t page_size;
+#if ENULL_DEFINED
+ char *e;
+#endif
+
+ if (SQLITE_VERSION_NUMBER < 3006000)
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
+ "datastore-sqlite",
+ _("sqlite version to old to determine size, assuming zero\n"));
+ return 0;
+ }
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "VACUUM", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sq_prepare (plugin->dbh,
+ "PRAGMA page_count",
+ &stmt));
+ if (SQLITE_ROW ==
+ sqlite3_step (stmt))
+ pages = sqlite3_column_int64 (stmt, 0);
+ else
+ pages = 0;
+ sqlite3_finalize (stmt);
+ CHECK (SQLITE_OK ==
+ sq_prepare (plugin->dbh,
+ "PRAGMA page_size",
+ &stmt));
+ CHECK (SQLITE_ROW ==
+ sqlite3_step (stmt));
+ page_size = sqlite3_column_int64 (stmt, 0);
+ sqlite3_finalize (stmt);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
+ (unsigned long long) pages,
+ (unsigned long long) page_size);
+ return pages * page_size;
}
/**
* Entry point for the plugin.
+ *
+ * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
+ * @return NULL on error, othrewise the plugin context
*/
void *
libgnunet_plugin_datastore_sqlite_init (void *cls)
return NULL; /* can only initialize once! */
memset (&plugin, 0, sizeof(struct Plugin));
plugin.env = env;
- plugin.statistics = GNUNET_STATISTICS_create (env->sched,
- "sqlite",
- env->cfg);
- GNUNET_STATISTICS_get (plugin.statistics,
- "sqlite",
- QUOTA_STAT_NAME,
- GNUNET_TIME_UNIT_MINUTES,
- NULL,
- &process_stat_in,
- &plugin);
if (GNUNET_OK !=
database_setup (env->cfg, &plugin))
{
/**
* Exit point from the plugin.
+ *
+ * @param cls the plugin context (as returned by "init")
+ * @return always NULL
*/
void *
libgnunet_plugin_datastore_sqlite_done (void *cls)
struct GNUNET_DATASTORE_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "sqlite plugin is doneing\n");
+#endif
+
+ if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Canceling next task\n");
+#endif
+ GNUNET_SCHEDULER_cancel (plugin->next_task);
+ plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Prep'ing next task\n");
+#endif
+ plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
+ GNUNET_free (plugin->next_task_nc);
+ plugin->next_task_nc = NULL;
+ }
fn = NULL;
if (plugin->drop_on_shutdown)
fn = GNUNET_strdup (plugin->fn);
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Shutting down database\n");
+#endif
database_shutdown (plugin);
plugin->env = NULL;
- plugin->payload = 0;
GNUNET_free (api);
if (fn != NULL)
{
fn);
GNUNET_free (fn);
}
+#if DEBUG_SQLITE
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "sqlite plugin is finished doneing\n");
+#endif
return NULL;
}