/*
This file is part of GNUnet
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* 2) by executing (inside of mysql using the GNUnet database):
* @verbatim
mysql> REPAIR TABLE gn090;
- mysql> REPAIR TABLE gn072;
@endverbatim
*
* PROBLEMS?
* friend is probably the mysql manual. The first thing to check
* is that mysql is basically operational, that you can connect
* to it, create tables, issue queries etc.
- *
- * TODO:
- * - use FOREIGN KEY for 'uid/vkey'
- * - consistent naming of uid/vkey
*/
#include "platform.h"
-#include "plugin_datastore.h"
+#include "gnunet_datastore_plugin.h"
#include "gnunet_util_lib.h"
-#include <mysql/mysql.h>
+#include "gnunet_mysql_lib.h"
-#define DEBUG_MYSQL GNUNET_NO
#define MAX_DATUM_SIZE 65536
-/**
- * Maximum number of supported parameters for a prepared
- * statement. Increase if needed.
- */
-#define MAX_PARAM 16
-
-/**
- * Die with an error message that indicates
- * a failure of the command 'cmd' with the message given
- * by strerror(errno).
- */
-#define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
-
-/**
- * Log an error message at log-level 'level' that indicates
- * a failure of the command 'cmd' on file 'filename'
- * with the message given by strerror(errno).
- */
-#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
-
-
-/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle
- "OR" very well, so we need to use UNION instead. And UNION does not
- automatically apply a LIMIT on the outermost clause, so we need to
- repeat ourselves quite a bit. All hail the performance gods (and thanks
- to #mysql on freenode) */
-#define SELECT_IT_LOW_PRIORITY "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
- "ORDER BY prio ASC,vkey ASC LIMIT 1) " \
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
- "ORDER BY prio ASC,vkey ASC LIMIT 1)"\
- "ORDER BY prio ASC,vkey ASC LIMIT 1"
-
-#define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
- " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
- " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
- "ORDER BY prio DESC,vkey DESC LIMIT 1"
-
-#define SELECT_IT_EXPIRATION_TIME "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
- "ORDER BY expire ASC,vkey ASC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
- "ORDER BY expire ASC,vkey ASC LIMIT 1)"\
- "ORDER BY expire ASC,vkey ASC LIMIT 1"
-
-
-#define SELECT_IT_MIGRATION_ORDER "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
- " AND expire > ? AND type!=3"\
- " ORDER BY expire DESC,vkey DESC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
- " AND expire > ? AND type!=3"\
- " ORDER BY expire DESC,vkey DESC LIMIT 1)"\
- "ORDER BY expire DESC,vkey DESC LIMIT 1"
-
-
-
-struct GNUNET_MysqlStatementHandle
-{
- struct GNUNET_MysqlStatementHandle *next;
-
- struct GNUNET_MysqlStatementHandle *prev;
-
- char *query;
-
- MYSQL_STMT *statement;
-
- int valid;
-
-};
-
-/**
- * Context for the universal iterator.
- */
-struct NextRequestClosure;
-
-/**
- * Type of a function that will prepare
- * the next iteration.
- *
- * @param cls closure
- * @param nc the next context; NULL for the last
- * call which gives the callback a chance to
- * clean up the closure
- * @return GNUNET_OK on success, GNUNET_NO if there are
- * no more values, GNUNET_SYSERR on error
- */
-typedef int (*PrepareFunction)(void *cls,
- struct NextRequestClosure *nc);
-
-
-struct NextRequestClosure
-{
- struct Plugin *plugin;
-
- struct GNUNET_TIME_Absolute now;
-
- /**
- * Function to call to prepare the next
- * iteration.
- */
- PrepareFunction prep;
-
- /**
- * Closure for prep.
- */
- void *prep_cls;
-
- MYSQL_BIND rbind[6];
-
- unsigned int type;
-
- unsigned int iter_select;
-
- PluginIterator dviter;
-
- void *dviter_cls;
-
- unsigned int last_prio;
-
- unsigned long long last_expire;
-
- unsigned long long last_vkey;
-
- int end_it;
-};
-
/**
* Context for all functions in this plugin.
*/
-struct Plugin
+struct Plugin
{
/**
* Our execution environment.
*/
struct GNUNET_DATASTORE_PluginEnvironment *env;
- MYSQL *dbf;
-
- struct GNUNET_MysqlStatementHandle *shead;
-
- struct GNUNET_MysqlStatementHandle *stail;
-
- /**
- * Filename of "my.cnf" (msyql configuration).
- */
- char *cnffile;
-
- /**
- * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
- */
- struct NextRequestClosure *next_task_nc;
-
/**
- * Pending task with scheduler for running the next request.
+ * Handle to talk to MySQL.
*/
- GNUNET_SCHEDULER_TaskIdentifier next_task;
+ struct GNUNET_MYSQL_Context *mc;
/**
- * Statements dealing with gn072 table
+ * Prepared statements.
*/
-#define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *select_value;
-
-#define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *delete_value;
-
-#define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)"
- struct GNUNET_MysqlStatementHandle *insert_value;
-
- /**
- * Statements dealing with gn090 table
- */
-#define INSERT_ENTRY "INSERT INTO gn090 (type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?)"
- struct GNUNET_MysqlStatementHandle *insert_entry;
-
-#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn090 WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey;
-
-#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
-
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
-
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
-
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
-
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=?"
- struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
-
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
- struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
-
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=? AND type=?"
- struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
-
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
- struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
-
-#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *update_entry;
-
-#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072"
- struct GNUNET_MysqlStatementHandle *get_size;
-
- struct GNUNET_MysqlStatementHandle *iter[4];
-
-};
-
-
-/**
- * Obtain the location of ".my.cnf".
- *
- * @param cfg our configuration
- * @return NULL on error
- */
-static char *
-get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
- char *cnffile;
- char *home_dir;
- struct stat st;
-#ifndef WINDOWS
- struct passwd *pw;
-#endif
- int configured;
-
-#ifndef WINDOWS
- pw = getpwuid (getuid ());
- if (!pw)
- {
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
- "getpwuid");
- return NULL;
- }
- if (GNUNET_YES ==
- GNUNET_CONFIGURATION_have_value (cfg,
- "datastore-mysql", "CONFIG"))
- {
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_filename (cfg,
- "datastore-mysql", "CONFIG", &cnffile));
- configured = GNUNET_YES;
- }
- else
- {
- home_dir = GNUNET_strdup (pw->pw_dir);
-#else
- home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
- plibc_conv_to_win_path ("~/", home_dir);
-#endif
- GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
- GNUNET_free (home_dir);
- configured = GNUNET_NO;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Trying to use file `%s' for MySQL configuration.\n"),
- cnffile);
- if ((0 != STAT (cnffile, &st)) ||
- (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
- {
- if (configured == GNUNET_YES)
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Could not access file `%s': %s\n"), cnffile,
- STRERROR (errno));
- GNUNET_free (cnffile);
- return NULL;
- }
- return cnffile;
-}
-
-
-
-/**
- * Free a prepared statement.
- *
- * @param plugin plugin context
- * @param s prepared statement
- */
-static void
-prepared_statement_destroy (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle
- *s)
-{
- GNUNET_CONTAINER_DLL_remove (plugin->shead,
- plugin->stail,
- s);
- if (s->valid)
- mysql_stmt_close (s->statement);
- GNUNET_free (s->query);
- GNUNET_free (s);
-}
+#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
+ struct GNUNET_MYSQL_StatementHandle *insert_entry;
+#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
+ struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
-/**
- * Close database connection and all prepared statements (we got a DB
- * disconnect error).
- */
-static int
-iclose (struct Plugin *plugin)
-{
- struct GNUNET_MysqlStatementHandle *spos;
-
- spos = plugin->shead;
- while (NULL != plugin->shead)
- prepared_statement_destroy (plugin,
- plugin->shead);
- if (plugin->dbf != NULL)
- {
- mysql_close (plugin->dbf);
- plugin->dbf = NULL;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Open the connection with the database (and initialize
- * our default options).
- *
- * @return GNUNET_OK on success
- */
-static int
-iopen (struct Plugin *ret)
-{
- char *mysql_dbname;
- char *mysql_server;
- char *mysql_user;
- char *mysql_password;
- unsigned long long mysql_port;
- my_bool reconnect;
- unsigned int timeout;
-
- ret->dbf = mysql_init (NULL);
- if (ret->dbf == NULL)
- return GNUNET_SYSERR;
- if (ret->cnffile != NULL)
- mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
- mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
- reconnect = 0;
- mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
- mysql_options (ret->dbf,
- MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
- mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
- timeout = 60; /* in seconds */
- mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
- mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
- mysql_dbname = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
- "datastore-mysql", "DATABASE"))
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
- "datastore-mysql", "DATABASE",
- &mysql_dbname));
- else
- mysql_dbname = GNUNET_strdup ("gnunet");
- mysql_user = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
- "datastore-mysql", "USER"))
- {
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
- "datastore-mysql", "USER",
- &mysql_user));
- }
- mysql_password = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
- "datastore-mysql", "PASSWORD"))
- {
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
- "datastore-mysql", "PASSWORD",
- &mysql_password));
- }
- mysql_server = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
- "datastore-mysql", "HOST"))
- {
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
- "datastore-mysql", "HOST",
- &mysql_server));
- }
- mysql_port = 0;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
- "datastore-mysql", "PORT"))
- {
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
- "PORT", &mysql_port));
- }
-
- GNUNET_assert (mysql_dbname != NULL);
- mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
- mysql_dbname, (unsigned int) mysql_port, NULL,
- CLIENT_IGNORE_SIGPIPE);
- GNUNET_free_non_null (mysql_server);
- GNUNET_free_non_null (mysql_user);
- GNUNET_free_non_null (mysql_password);
- GNUNET_free (mysql_dbname);
- if (mysql_error (ret->dbf)[0])
- {
- LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
- "mysql_real_connect", ret);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
-}
+#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
+ struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+ struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
-/**
- * Run the given MySQL statement.
- *
- * @param plugin plugin context
- * @param statement SQL statement to run
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
- */
-static int
-run_statement (struct Plugin *plugin,
- const char *statement)
-{
- if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
- return GNUNET_SYSERR;
- mysql_query (plugin->dbf, statement);
- if (mysql_error (plugin->dbf)[0])
- {
- LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
- "mysql_query", plugin);
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
-}
-
+#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
+ struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
-/**
- * Create a prepared statement.
- *
- * @param plugin plugin context
- * @param statement SQL statement text to prepare
- * @return NULL on error
- */
-static struct GNUNET_MysqlStatementHandle *
-prepared_statement_create (struct Plugin *plugin,
- const char *statement)
-{
- struct GNUNET_MysqlStatementHandle *ret;
+#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
+ struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
- ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
- ret->query = GNUNET_strdup (statement);
- GNUNET_CONTAINER_DLL_insert (plugin->shead,
- plugin->stail,
- ret);
- return ret;
-}
+#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
+ struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
+ struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
-/**
- * Prepare a statement for running.
- *
- * @param plugin plugin context
- * @param ret handle to prepared statement
- * @return GNUNET_OK on success
- */
-static int
-prepare_statement (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle *ret)
-{
- if (GNUNET_YES == ret->valid)
- return GNUNET_OK;
- if ((NULL == plugin->dbf) &&
- (GNUNET_OK != iopen (plugin)))
- return GNUNET_SYSERR;
- ret->statement = mysql_stmt_init (plugin->dbf);
- if (ret->statement == NULL)
- {
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- if (mysql_stmt_prepare (ret->statement,
- ret->query,
- strlen (ret->query)))
- {
- LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
- "mysql_stmt_prepare",
- plugin);
- mysql_stmt_close (ret->statement);
- ret->statement = NULL;
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- ret->valid = GNUNET_YES;
- return GNUNET_OK;
+#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
+ struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
-}
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
+ struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
+#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
+ struct GNUNET_MYSQL_StatementHandle *update_entry;
-/**
- * Bind the parameters for the given MySQL statement
- * and run it.
- *
- * @param plugin plugin context
- * @param s statement to bind and run
- * @param ap arguments for the binding
- * @return GNUNET_SYSERR on error, GNUNET_OK on success
- */
-static int
-init_params (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle *s,
- va_list ap)
-{
- MYSQL_BIND qbind[MAX_PARAM];
- unsigned int pc;
- unsigned int off;
- enum enum_field_types ft;
+#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
+ struct GNUNET_MYSQL_StatementHandle *dec_repl;
- pc = mysql_stmt_param_count (s->statement);
- if (pc > MAX_PARAM)
- {
- /* increase internal constant! */
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- memset (qbind, 0, sizeof (qbind));
- off = 0;
- ft = 0;
- while ((pc > 0) && (-1 != (ft = va_arg (ap, enum enum_field_types))))
- {
- qbind[off].buffer_type = ft;
- switch (ft)
- {
- case MYSQL_TYPE_FLOAT:
- qbind[off].buffer = va_arg (ap, float *);
- break;
- case MYSQL_TYPE_LONGLONG:
- qbind[off].buffer = va_arg (ap, unsigned long long *);
- qbind[off].is_unsigned = va_arg (ap, int);
- break;
- case MYSQL_TYPE_LONG:
- qbind[off].buffer = va_arg (ap, unsigned int *);
- qbind[off].is_unsigned = va_arg (ap, int);
- break;
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_STRING:
- case MYSQL_TYPE_BLOB:
- qbind[off].buffer = va_arg (ap, void *);
- qbind[off].buffer_length = va_arg (ap, unsigned long);
- qbind[off].length = va_arg (ap, unsigned long *);
- break;
- default:
- /* unsupported type */
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- pc--;
- off++;
- }
- if (!((pc == 0) && (ft != -1) && (va_arg (ap, int) == -1)))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- if (mysql_stmt_bind_param (s->statement, qbind))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("`%s' failed at %s:%d with error: %s\n"),
- "mysql_stmt_bind_param",
- __FILE__, __LINE__, mysql_stmt_error (s->statement));
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- if (mysql_stmt_execute (s->statement))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("`%s' failed at %s:%d with error: %s\n"),
- "mysql_stmt_execute",
- __FILE__, __LINE__, mysql_stmt_error (s->statement));
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
-}
-
-/**
- * Type of a callback that will be called for each
- * data set returned from MySQL.
- *
- * @param cls user-defined argument
- * @param num_values number of elements in values
- * @param values values returned by MySQL
- * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
- */
-typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
- unsigned int num_values,
- MYSQL_BIND * values);
-
-
-/**
- * Run a prepared SELECT statement.
- *
- * @param plugin plugin context
- * @param s statement to run
- * @param result_size number of elements in results array
- * @param results pointer to already initialized MYSQL_BIND
- * array (of sufficient size) for passing results
- * @param processor function to call on each result
- * @param processor_cls extra argument to processor
- * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
- * values (size + buffer-reference for pointers); terminated
- * with "-1"
- * @return GNUNET_SYSERR on error, otherwise
- * the number of successfully affected (or queried) rows
- */
-static int
-prepared_statement_run_select (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle
- *s,
- unsigned int result_size,
- MYSQL_BIND * results,
- GNUNET_MysqlDataProcessor
- processor, void *processor_cls,
- ...)
-{
- va_list ap;
- int ret;
- unsigned int rsize;
- int total;
-
- if (GNUNET_OK != prepare_statement (plugin, s))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- va_start (ap, processor_cls);
- if (GNUNET_OK != init_params (plugin, s, ap))
- {
- GNUNET_break (0);
- va_end (ap);
- return GNUNET_SYSERR;
- }
- va_end (ap);
- rsize = mysql_stmt_field_count (s->statement);
- if (rsize > result_size)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- if (mysql_stmt_bind_result (s->statement, results))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("`%s' failed at %s:%d with error: %s\n"),
- "mysql_stmt_bind_result",
- __FILE__, __LINE__, mysql_stmt_error (s->statement));
- iclose (plugin);
- return GNUNET_SYSERR;
- }
-
- total = 0;
- while (1)
- {
- ret = mysql_stmt_fetch (s->statement);
- if (ret == MYSQL_NO_DATA)
- break;
- if (ret != 0)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("`%s' failed at %s:%d with error: %s\n"),
- "mysql_stmt_fetch",
- __FILE__, __LINE__, mysql_stmt_error (s->statement));
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- if (processor != NULL)
- if (GNUNET_OK != processor (processor_cls, rsize, results))
- break;
- total++;
- }
- mysql_stmt_reset (s->statement);
- return total;
-}
-
-
-/**
- * Run a prepared statement that does NOT produce results.
- *
- * @param plugin plugin context
- * @param s statement to run
- * @param insert_id NULL or address where to store the row ID of whatever
- * was inserted (only for INSERT statements!)
- * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
- * values (size + buffer-reference for pointers); terminated
- * with "-1"
- * @return GNUNET_SYSERR on error, otherwise
- * the number of successfully affected rows
- */
-static int
-prepared_statement_run (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle *s,
- unsigned long long *insert_id, ...)
-{
- va_list ap;
- int affected;
-
- if (GNUNET_OK != prepare_statement (plugin, s))
- return GNUNET_SYSERR;
- va_start (ap, insert_id);
- if (GNUNET_OK != init_params (plugin, s, ap))
- {
- va_end (ap);
- return GNUNET_SYSERR;
- }
- va_end (ap);
- affected = mysql_stmt_affected_rows (s->statement);
- if (NULL != insert_id)
- *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
- mysql_stmt_reset (s->statement);
- return affected;
-}
-
-
-/**
- * Delete an value from the gn072 table.
- *
- * @param plugin plugin context
- * @param vkey vkey identifying the value to delete
- * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
- */
-static int
-do_delete_value (struct Plugin *plugin,
- unsigned long long vkey)
-{
- int ret;
-
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Deleting value %llu from gn072 table\n",
- vkey);
-#endif
- ret = prepared_statement_run (plugin,
- plugin->delete_value,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1);
- if (ret > 0)
- {
- ret = GNUNET_OK;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Deleting value %llu from gn072 table failed\n",
- vkey);
- }
- return ret;
-}
-
-/**
- * Insert a value into the gn072 table.
- *
- * @param plugin plugin context
- * @param value the value to insert
- * @param size size of the value
- * @param vkey vkey identifying the value henceforth (set)
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
- */
-static int
-do_insert_value (struct Plugin *plugin,
- const void *value, unsigned int size,
- unsigned long long *vkey)
-{
- unsigned long length = size;
- int ret;
-
- ret = prepared_statement_run (plugin,
- plugin->insert_value,
- vkey,
- MYSQL_TYPE_BLOB,
- value, length, &length, -1);
- if (ret == GNUNET_OK)
- {
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Inserted value number %llu with length %u into gn072 table\n",
- *vkey,
- size);
-#endif
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to insert %u byte value into gn072 table\n",
- size);
- }
- return ret;
-}
-
-/**
- * Delete an entry from the gn090 table.
- *
- * @param plugin plugin context
- * @param vkey vkey identifying the entry to delete
- * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
- */
-static int
-do_delete_entry_by_vkey (struct Plugin *plugin,
- unsigned long long vkey)
-{
- int ret;
-
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Deleting value %llu from gn090 table\n",
- vkey);
-#endif
- ret = prepared_statement_run (plugin,
- plugin->delete_entry_by_vkey,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1);
- if (ret > 0)
- {
- ret = GNUNET_OK;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Deleting value %llu from gn090 table failed\n",
- vkey);
- }
- return ret;
-}
-
-
-/**
- * Function that simply returns GNUNET_OK
- *
- * @param cls closure, not used
- * @param num_values not used
- * @param values not used
- * @return GNUNET_OK
- */
-static int
-return_ok (void *cls,
- unsigned int num_values,
- MYSQL_BIND * values)
-{
- return GNUNET_OK;
-}
-
-
-/**
- * Run the prepared statement to get the next data item ready.
- *
- * @param cls not used
- * @param nrc closure for the next request iterator
- * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
- */
-static int
-iterator_helper_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct Plugin *plugin;
- int ret;
-
- if (nrc == NULL)
- return GNUNET_NO;
- plugin = nrc->plugin;
- ret = GNUNET_SYSERR;
- switch (nrc->iter_select)
- {
- case 0:
- case 1:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 6,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONG,
- &nrc->last_prio,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &nrc->last_prio,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES, -1);
- break;
- case 2:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 6,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES, -1);
- break;
- case 3:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 6,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->now.value,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->now.value,
- GNUNET_YES, -1);
- break;
- default:
- GNUNET_assert (0);
- }
- return ret;
-}
-
-
-/**
- * Continuation of "mysql_next_request".
- *
- * @param next_cls the next context
- * @param tc the task context (unused)
- */
-static void
-mysql_next_request_cont (void *next_cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct NextRequestClosure *nrc = next_cls;
- struct Plugin *plugin;
- int ret;
- unsigned int type;
- unsigned int priority;
- unsigned int anonymity;
- unsigned long long exp;
- unsigned long long vkey;
- unsigned long hashSize;
- GNUNET_HashCode key;
- struct GNUNET_TIME_Absolute expiration;
- unsigned long length;
- MYSQL_BIND *rbind; /* size 7 */
- MYSQL_BIND dbind[1];
- char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
-
- plugin = nrc->plugin;
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc = NULL;
-
- AGAIN:
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->now = GNUNET_TIME_absolute_get ();
- hashSize = sizeof (GNUNET_HashCode);
- memset (nrc->rbind, 0, sizeof (nrc->rbind));
- rbind = nrc->rbind;
- rbind[0].buffer_type = MYSQL_TYPE_LONG;
- rbind[0].buffer = &type;
- rbind[0].is_unsigned = 1;
- rbind[1].buffer_type = MYSQL_TYPE_LONG;
- rbind[1].buffer = &priority;
- rbind[1].is_unsigned = 1;
- rbind[2].buffer_type = MYSQL_TYPE_LONG;
- rbind[2].buffer = &anonymity;
- rbind[2].is_unsigned = 1;
- rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[3].buffer = &exp;
- rbind[3].is_unsigned = 1;
- rbind[4].buffer_type = MYSQL_TYPE_BLOB;
- rbind[4].buffer = &key;
- rbind[4].buffer_length = hashSize;
- rbind[4].length = &hashSize;
- rbind[5].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[5].buffer = &vkey;
- rbind[5].is_unsigned = GNUNET_YES;
-
- if ( (GNUNET_YES == nrc->end_it) ||
- (GNUNET_OK != nrc->prep (nrc->prep_cls,
- nrc)))
- goto END_SET;
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->last_vkey = vkey;
- nrc->last_prio = priority;
- nrc->last_expire = exp;
- if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
- (hashSize != sizeof (GNUNET_HashCode)) )
- {
- GNUNET_break (0);
- goto END_SET;
- }
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found value %llu with prio %u, anon %u, expire %llu selecting from gn090 table\n",
- vkey,
- priority,
- anonymity,
- exp);
-#endif
- /* now do query on gn072 */
- length = sizeof (datum);
- memset (dbind, 0, sizeof (dbind));
- dbind[0].buffer_type = MYSQL_TYPE_BLOB;
- dbind[0].buffer_length = length;
- dbind[0].length = &length;
- dbind[0].buffer = datum;
- ret = prepared_statement_run_select (plugin,
- plugin->select_value,
- 1,
- dbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1);
- GNUNET_break (ret <= 1); /* should only have one rbind! */
- if (ret > 0)
- ret = GNUNET_OK;
- if (ret != GNUNET_OK)
- {
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Failed to obtain value %llu from table `%s'\n"),
- vkey,
- "gn072");
- goto AGAIN;
- }
- GNUNET_break (length <= sizeof(datum));
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling iterator with value `%s' number %llu of size %u with type %u, priority %u, anonymity %u and expiration %llu\n",
- GNUNET_h2s (&key),
- vkey,
- length,
- type,
- priority,
- anonymity,
- exp);
-#endif
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- expiration.value = exp;
- ret = nrc->dviter (nrc->dviter_cls,
- nrc,
- &key,
- length,
- datum,
- type,
- priority,
- anonymity,
- expiration,
- vkey);
- if (ret == GNUNET_SYSERR)
- {
- nrc->end_it = GNUNET_YES;
- return;
- }
- if (ret == GNUNET_NO)
- {
- do_delete_value (plugin, vkey);
- do_delete_entry_by_vkey (plugin, vkey);
- if (length != 0)
- plugin->env->duc (plugin->env->cls,
- - length);
- }
- return;
- END_SET:
- /* call dviter with "end of set" */
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->dviter (nrc->dviter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->prep (nrc->prep_cls, NULL);
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- GNUNET_free (nrc);
-}
+#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
+ struct GNUNET_MYSQL_StatementHandle *get_size;
+#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
+ "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
+ "WHERE anonLevel=0 AND type=? AND "\
+ "(rvalue >= ? OR"\
+ " NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
+ "ORDER BY rvalue ASC LIMIT 1"
+ struct GNUNET_MYSQL_StatementHandle *zero_iter;
-/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
- */
-static void
-mysql_plugin_next_request (void *next_cls,
- int end_it)
-{
- struct NextRequestClosure *nrc = next_cls;
+#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
+ struct GNUNET_MYSQL_StatementHandle *select_expiration;
+
+#define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
+ struct GNUNET_MYSQL_StatementHandle *select_priority;
+
+#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
+ "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
+ "WHERE repl=? AND "\
+ " (rvalue>=? OR"\
+ " NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
+ "ORDER BY rvalue ASC "\
+ "LIMIT 1"
+ struct GNUNET_MYSQL_StatementHandle *select_replication;
+
+#define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
+ struct GNUNET_MYSQL_StatementHandle *max_repl;
+
+#define GET_ALL_KEYS "SELECT hash from gn090"
+ struct GNUNET_MYSQL_StatementHandle *get_all_keys;
- if (GNUNET_YES == end_it)
- nrc->end_it = GNUNET_YES;
- nrc->plugin->next_task_nc = nrc;
- nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
- &mysql_next_request_cont,
- nrc);
-}
+};
/**
- * Iterate over the items in the datastore
- * using the given query to select and order
- * the items.
+ * Delete an entry from the gn090 table.
*
* @param plugin plugin context
- * @param type entries of which type should be considered?
- * @param iter_select which iterator statement are we using
- * @param is_asc are we using ascending order?
- * @param dviter function to call on each matching item
- * @param dviter_cls closure for dviter
+ * @param uid unique ID of the entry to delete
+ * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
*/
-static void
-iterateHelper (struct Plugin *plugin,
- enum GNUNET_BLOCK_Type type,
- int is_asc,
- unsigned int iter_select,
- PluginIterator dviter,
- void *dviter_cls)
+static int
+do_delete_entry (struct Plugin *plugin, unsigned long long uid)
{
- struct NextRequestClosure *nrc;
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->type = type;
- nrc->iter_select = iter_select;
- nrc->dviter = dviter;
- nrc->dviter_cls = dviter_cls;
- nrc->prep = &iterator_helper_prepare;
- if (is_asc)
- {
- nrc->last_prio = 0;
- nrc->last_vkey = 0;
- nrc->last_expire = 0;
- }
- else
- {
- nrc->last_prio = 0x7FFFFFFFL;
- nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- }
- mysql_plugin_next_request (nrc, GNUNET_NO);
+ int ret;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
+ uid);
+ ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
+ plugin->delete_entry_by_uid, NULL,
+ MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
+ if (ret >= 0)
+ return GNUNET_OK;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Deleting value %llu from gn090 table failed\n", uid);
+ return ret;
}
* @return number of bytes used on disk
*/
static unsigned long long
-mysql_plugin_get_size (void *cls)
+mysql_plugin_estimate_size (void *cls)
{
struct Plugin *plugin = cls;
MYSQL_BIND cbind[1];
cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
cbind[0].buffer = &total;
cbind[0].is_unsigned = GNUNET_NO;
- if (GNUNET_OK !=
- prepared_statement_run_select (plugin,
- plugin->get_size,
- 1, cbind,
- &return_ok, NULL,
- -1))
+ if (GNUNET_OK !=
+ GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
return 0;
return total;
}
* @param type type of the content
* @param priority priority of the content
* @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
* @param expiration expiration time for the content
* @param msg set to error message
* @return GNUNET_OK on success
*/
static int
-mysql_plugin_put (void *cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute expiration,
- char **msg)
+mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
+ const void *data, enum GNUNET_BLOCK_Type type,
+ uint32_t priority, uint32_t anonymity, uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration, char **msg)
{
struct Plugin *plugin = cls;
- unsigned int itype = type;
+ unsigned int irepl = replication;
unsigned int ipriority = priority;
unsigned int ianonymity = anonymity;
- unsigned long long lexpiration = expiration.value;
+ unsigned long long lexpiration = expiration.abs_value;
+ unsigned long long lrvalue =
+ (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
unsigned long hashSize;
unsigned long hashSize2;
- unsigned long long vkey;
- GNUNET_HashCode vhash;
+ unsigned long lsize;
+ struct GNUNET_HashCode vhash;
if (size > MAX_DATUM_SIZE)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- hashSize = sizeof (GNUNET_HashCode);
- hashSize2 = sizeof (GNUNET_HashCode);
- GNUNET_CRYPTO_hash (data, size, &vhash);
- if (GNUNET_OK != do_insert_value (plugin,
- data, size, &vkey))
+ {
+ GNUNET_break (0);
return GNUNET_SYSERR;
+ }
+ hashSize = sizeof (struct GNUNET_HashCode);
+ hashSize2 = sizeof (struct GNUNET_HashCode);
+ lsize = size;
+ GNUNET_CRYPTO_hash (data, size, &vhash);
if (GNUNET_OK !=
- prepared_statement_run (plugin,
- plugin->insert_entry,
- NULL,
- MYSQL_TYPE_LONG,
- &itype,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &ipriority,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &ianonymity,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &lexpiration,
- GNUNET_YES,
- MYSQL_TYPE_BLOB,
- key,
- hashSize,
- &hashSize,
- MYSQL_TYPE_BLOB,
- &vhash,
- hashSize2,
- &hashSize2,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1))
- {
- do_delete_value (plugin, vkey);
- return GNUNET_SYSERR;
- }
-#if DEBUG_MYSQL
+ GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
+ MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
+ MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
+ MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
+ MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
+ return GNUNET_SYSERR;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Inserted value `%s' number %llu with size %u into gn090 table\n",
- GNUNET_h2s (key),
- vkey,
- (unsigned int) size);
-#endif
+ "Inserted value `%s' with size %u into gn090 table\n",
+ GNUNET_h2s (key), (unsigned int) size);
if (size > 0)
- plugin->env->duc (plugin->env->cls,
- size);
+ plugin->env->duc (plugin->env->cls, size);
return GNUNET_OK;
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Update the priority for a particular key in the datastore. If
+ * the expiration time in value is different than the time found in
+ * the datastore, the higher value should be kept. For the
+ * anonymity level, the lower value is to be used. The specified
+ * priority should be added to the existing priority, ignoring the
+ * priority in value.
+ *
+ * Note that it is possible for multiple values to match this put.
+ * In that case, all of the respective values are updated.
*
* @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param uid unique identifier of the datum
+ * @param delta by how much should the priority
+ * change? If priority + delta < 0 the
+ * priority should be set to 0 (never go
+ * negative).
+ * @param expire new expiration time should be the
+ * MAX of any existing expiration time and
+ * this value
+ * @param msg set to error message
+ * @return GNUNET_OK on success
*/
-static void
-mysql_plugin_iter_low_priority (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+static int
+mysql_plugin_update (void *cls, uint64_t uid, int delta,
+ struct GNUNET_TIME_Absolute expire, char **msg)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_YES,
- 0, iter, iter_cls);
-}
-
-
-struct GetContext
-{
- GNUNET_HashCode key;
- GNUNET_HashCode vhash;
+ unsigned long long vkey = uid;
+ unsigned long long lexpire = expire.abs_value;
+ int ret;
- unsigned int prio;
- unsigned int anonymity;
- unsigned long long expiration;
- unsigned long long vkey;
- unsigned long long total;
- int off;
- int count;
- int have_vhash;
-};
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Updating value %llu adding %d to priority and maxing exp at %llu\n",
+ vkey, delta, lexpire);
+ ret =
+ GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
+ MYSQL_TYPE_LONG, &delta, GNUNET_NO,
+ MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
+ if (ret != GNUNET_OK)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
+ vkey);
+ }
+ return ret;
+}
-static int
-get_statement_prepare (void *cls,
- struct NextRequestClosure *nrc)
+/**
+ * Run the given select statement and call 'proc' on the resulting
+ * values (which must be in particular positions).
+ *
+ * @param plugin the plugin handle
+ * @param stmt select statement to run
+ * @param proc function to call on result
+ * @param proc_cls closure for proc
+ * @param ... arguments to initialize stmt
+ */
+static void
+execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
+ PluginDatumProcessor proc, void *proc_cls, ...)
{
- struct GetContext *gc = cls;
- struct Plugin *plugin;
+ va_list ap;
int ret;
- unsigned int limit_off;
+ unsigned int type;
+ unsigned int priority;
+ unsigned int anonymity;
+ unsigned long long exp;
unsigned long hashSize;
+ unsigned long size;
+ unsigned long long uid;
+ char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
+ struct GNUNET_HashCode key;
+ struct GNUNET_TIME_Absolute expiration;
+ MYSQL_BIND rbind[7];
- if (NULL == nrc)
- {
- GNUNET_free (gc);
- return GNUNET_NO;
- }
- if (gc->count == gc->total)
- return GNUNET_NO;
- plugin = nrc->plugin;
- hashSize = sizeof (GNUNET_HashCode);
- if (gc->count + gc->off == gc->total)
- nrc->last_vkey = 0; /* back to start */
- if (gc->count == 0)
- limit_off = gc->off;
- else
- limit_off = 0;
-#if DEBUG_MYSQL
+ hashSize = sizeof (struct GNUNET_HashCode);
+ memset (rbind, 0, sizeof (rbind));
+ rbind[0].buffer_type = MYSQL_TYPE_LONG;
+ rbind[0].buffer = &type;
+ rbind[0].is_unsigned = 1;
+ rbind[1].buffer_type = MYSQL_TYPE_LONG;
+ rbind[1].buffer = &priority;
+ rbind[1].is_unsigned = 1;
+ rbind[2].buffer_type = MYSQL_TYPE_LONG;
+ rbind[2].buffer = &anonymity;
+ rbind[2].is_unsigned = 1;
+ rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[3].buffer = &exp;
+ rbind[3].is_unsigned = 1;
+ rbind[4].buffer_type = MYSQL_TYPE_BLOB;
+ rbind[4].buffer = &key;
+ rbind[4].buffer_length = hashSize;
+ rbind[4].length = &hashSize;
+ rbind[5].buffer_type = MYSQL_TYPE_BLOB;
+ rbind[5].buffer = value;
+ rbind[5].buffer_length = size = sizeof (value);
+ rbind[5].length = &size;
+ rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[6].buffer = &uid;
+ rbind[6].is_unsigned = 1;
+
+ va_start (ap, proc_cls);
+ ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
+ va_end (ap);
+ if (ret <= 0)
+ {
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ GNUNET_assert (size <= sizeof (value));
+ if ((rbind[4].buffer_length != sizeof (struct GNUNET_HashCode)) ||
+ (hashSize != sizeof (struct GNUNET_HashCode)))
+ {
+ GNUNET_break (0);
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Obtaining result number %d/%lld at offset %d with lvc %llu for GET `%s'\n",
- gc->count+1,
- gc->total,
- limit_off,
- nrc->last_vkey,
- GNUNET_h2s (&gc->key));
-#endif
- if (nrc->type != 0)
- {
- if (gc->have_vhash)
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_vhash_and_type, 6, nrc->rbind, &return_ok,
- NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
- -1);
- }
- else
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_and_type, 6, nrc->rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
- -1);
- }
- }
- else
- {
- if (gc->have_vhash)
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_and_vhash, 6, nrc->rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
- GNUNET_YES, -1);
- }
- else
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash, 6, nrc->rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &limit_off, GNUNET_YES, -1);
- }
- }
- gc->count++;
- return ret;
+ "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
+ (unsigned int) size, GNUNET_h2s (&key), priority, anonymity, exp);
+ GNUNET_assert (size < MAX_DATUM_SIZE);
+ expiration.abs_value = exp;
+ ret =
+ proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
+ uid);
+ if (ret == GNUNET_NO)
+ {
+ do_delete_entry (plugin, uid);
+ if (size != 0)
+ plugin->env->duc (plugin->env->cls, -size);
+ }
}
+
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
*
* @param cls closure
- * @param key maybe NULL (to match all entries)
+ * @param offset offset of the result (modulo num-results);
+ * specific ordering does not matter for the offset
+ * @param key key to match, never NULL
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* Note that for DBlocks there is no difference
* there may be!
* @param type entries of which type are relevant?
* Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc function to call on the matching value,
+ * with NULL for if no value matches
+ * @param proc_cls closure for proc
*/
static void
-mysql_plugin_get (void *cls,
- const GNUNET_HashCode * key,
- const GNUNET_HashCode * vhash,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter, void *iter_cls)
+mysql_plugin_get_key (void *cls, uint64_t offset, const struct GNUNET_HashCode * key,
+ const struct GNUNET_HashCode * vhash,
+ enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- unsigned int itype = type;
int ret;
MYSQL_BIND cbind[1];
- struct GetContext *gc;
- struct NextRequestClosure *nrc;
long long total;
unsigned long hashSize;
+ unsigned long hashSize2;
+ unsigned long long off;
- if (iter == NULL)
- return;
- if (key == NULL)
- {
- mysql_plugin_iter_low_priority (plugin,
- type,
- iter, iter_cls);
- return;
- }
- hashSize = sizeof (GNUNET_HashCode);
+ GNUNET_assert (key != NULL);
+ GNUNET_assert (NULL != proc);
+ hashSize = sizeof (struct GNUNET_HashCode);
+ hashSize2 = sizeof (struct GNUNET_HashCode);
memset (cbind, 0, sizeof (cbind));
total = -1;
cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
cbind[0].buffer = &total;
cbind[0].is_unsigned = GNUNET_NO;
if (type != 0)
+ {
+ if (vhash != NULL)
+ {
+ ret =
+ GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
+ plugin->
+ count_entry_by_hash_vhash_and_type, 1,
+ cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
+ &hashSize, MYSQL_TYPE_BLOB, vhash,
+ hashSize2, &hashSize2, MYSQL_TYPE_LONG,
+ &type, GNUNET_YES, -1);
+ }
+ else
{
- if (vhash != NULL)
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES,
- -1);
- }
- else
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG,
- &itype, GNUNET_YES, -1);
-
- }
+ ret =
+ GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
+ plugin->count_entry_by_hash_and_type,
+ 1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
+ hashSize, &hashSize, MYSQL_TYPE_LONG,
+ &type, GNUNET_YES, -1);
}
+ }
else
+ {
+ if (vhash != NULL)
{
- if (vhash != NULL)
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- vhash, hashSize, &hashSize, -1);
-
- }
- else
- {
- ret =
- prepared_statement_run_select (plugin,
- plugin->count_entry_by_hash,
- 1, cbind, &return_ok,
- NULL, MYSQL_TYPE_BLOB,
- key, hashSize,
- &hashSize, -1);
- }
+ ret =
+ GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
+ plugin->count_entry_by_hash_and_vhash,
+ 1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
+ hashSize, &hashSize, MYSQL_TYPE_BLOB,
+ vhash, hashSize2, &hashSize2, -1);
+
}
- if ((ret != GNUNET_OK) || (0 >= total))
+ else
{
- iter (iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
+ ret =
+ GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
+ cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
+ &hashSize, -1);
}
-#if DEBUG_MYSQL
+ }
+ if ((ret != GNUNET_OK) || (0 >= total))
+ {
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ offset = offset % total;
+ off = (unsigned long long) offset;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Iterating over %lld results for GET `%s'\n",
- total,
- GNUNET_h2s (key));
-#endif
- gc = GNUNET_malloc (sizeof (struct GetContext));
- gc->key = *key;
- if (vhash != NULL)
+ "Obtaining %llu/%lld result for GET `%s'\n", off, total,
+ GNUNET_h2s (key));
+ if (type != GNUNET_BLOCK_TYPE_ANY)
+ {
+ if (NULL != vhash)
{
- gc->have_vhash = GNUNET_YES;
- gc->vhash = *vhash;
+ execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
+ proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+ &off, GNUNET_YES, -1);
}
- gc->total = total;
- gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
-
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->type = type;
- nrc->iter_select = -1;
- nrc->dviter = iter;
- nrc->dviter_cls = iter_cls;
- nrc->prep = &get_statement_prepare;
- nrc->prep_cls = gc;
- nrc->last_vkey = 0;
- mysql_plugin_next_request (nrc, GNUNET_NO);
+ else
+ {
+ execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
+ proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+ &off, GNUNET_YES, -1);
+ }
+ }
+ else
+ {
+ if (NULL != vhash)
+ {
+ execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
+ proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
+ }
+ else
+ {
+ execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
+ }
+ }
}
/**
- * Update the priority for a particular key in the datastore. If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept. For the
- * anonymity level, the lower value is to be used. The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
+ * Get a zero-anonymity datum from the datastore.
*
* @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- * change? If priority + delta < 0 the
- * priority should be set to 0 (never go
- * negative).
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param offset offset of the result
+ * @param type entries of which type should be considered?
+ * Use 0 for any type.
+ * @param proc function to call on a matching value or NULL
+ * @param proc_cls closure for iter
*/
-static int
-mysql_plugin_update (void *cls,
- uint64_t uid,
- int delta,
- struct GNUNET_TIME_Absolute expire,
- char **msg)
+static void
+mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- unsigned long long vkey = uid;
- unsigned long long lexpire = expire.value;
- int ret;
+ unsigned long long rvalue =
+ (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+
+ execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
+ &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+ &rvalue, GNUNET_YES, -1);
+}
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Updating value %llu adding %d to priority and maxing exp at %llu\n",
- vkey,
- delta,
- lexpire);
-#endif
- ret = prepared_statement_run (plugin,
- plugin->update_entry,
- NULL,
- MYSQL_TYPE_LONG,
- &delta,
- GNUNET_NO,
- MYSQL_TYPE_LONGLONG,
- &lexpire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &lexpire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &vkey,
- GNUNET_YES, -1);
- if (ret != GNUNET_OK)
+
+/**
+ * Context for 'repl_proc' function.
+ */
+struct ReplCtx
+{
+
+ /**
+ * Plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call for the result (or the NULL).
+ */
+ PluginDatumProcessor proc;
+
+ /**
+ * Closure for proc.
+ */
+ void *proc_cls;
+};
+
+
+/**
+ * Wrapper for the processor for 'mysql_plugin_get_replication'.
+ * Decrements the replication counter and calls the original
+ * iterator.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * (continue on call to "next", of course),
+ * GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
+ const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
+ uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
+{
+ struct ReplCtx *rc = cls;
+ struct Plugin *plugin = rc->plugin;
+ unsigned long long oid;
+ int ret;
+ int iret;
+
+ ret =
+ rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
+ expiration, uid);
+ if (NULL != key)
+ {
+ oid = (unsigned long long) uid;
+ iret =
+ GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
+ MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
+ if (iret == GNUNET_SYSERR)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to update value %llu\n",
- vkey);
+ "Failed to reduce replication counter\n");
+ return GNUNET_SYSERR;
}
+ }
return ret;
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Get a random item for replication. Returns a single, not expired,
+ * random item from those with the highest replication counters. The
+ * item's replication counter is decremented by one IF it was positive
+ * before. Call 'proc' with all values ZERO or NULL if the datastore
+ * is empty.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param cls closure
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-mysql_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls);
+ struct ReplCtx rc;
+ unsigned long long rvalue;
+ unsigned long repl;
+ MYSQL_BIND results;
+
+ rc.plugin = plugin;
+ rc.proc = proc;
+ rc.proc_cls = proc_cls;
+ memset (&results, 0, sizeof (results));
+ results.buffer_type = MYSQL_TYPE_LONG;
+ results.buffer = &repl;
+ results.is_unsigned = GNUNET_YES;
+
+ if (1 !=
+ GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
+ {
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+
+ rvalue =
+ (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
+ MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+ &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
+
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Get all of the keys in the datastore.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param cls closure
+ * @param proc function to call on each key
+ * @param proc_cls closure for proc
*/
static void
-mysql_plugin_iter_ascending_expiration (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_get_keys (void *cls,
+ PluginKeyProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_YES, 2, iter, iter_cls);
+ const char *query = "SELECT hash FROM gn090";
+ int ret;
+ MYSQL_STMT *statement;
+ struct GNUNET_HashCode key;
+ MYSQL_BIND cbind[1];
+ unsigned long length;
+
+ statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
+ plugin->get_all_keys);
+ if (statement == NULL)
+ {
+ GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ return;
+ }
+ if (mysql_stmt_prepare (statement, query, strlen (query)))
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
+ _("Failed to prepare statement `%s'\n"), query);
+ GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ return;
+ }
+ GNUNET_assert (proc != NULL);
+ if (mysql_stmt_execute (statement))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("`%s' for `%s' failed at %s:%d with error: %s\n"),
+ "mysql_stmt_execute", query, __FILE__, __LINE__,
+ mysql_stmt_error (statement));
+ GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ return;
+ }
+ memset (cbind, 0, sizeof (cbind));
+ cbind[0].buffer_type = MYSQL_TYPE_BLOB;
+ cbind[0].buffer = &key;
+ cbind[0].buffer_length = sizeof (key);
+ cbind[0].length = &length;
+ cbind[0].is_unsigned = GNUNET_NO;
+ if (mysql_stmt_bind_result (statement, cbind))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "mysql_stmt_bind_result", __FILE__, __LINE__,
+ mysql_stmt_error (statement));
+ GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ return;
+ }
+ while (0 == (ret = mysql_stmt_fetch (statement)))
+ {
+ if (sizeof (struct GNUNET_HashCode) == length)
+ proc (proc_cls, &key, 1);
+ }
+ if (ret != MYSQL_NO_DATA)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "mysql_stmt_fetch", __FILE__, __LINE__,
+ mysql_stmt_error (statement));
+ GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ return;
+ }
+ mysql_stmt_reset (statement);
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Context for 'expi_proc' function.
+ */
+struct ExpiCtx
+{
+
+ /**
+ * Plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call for the result (or the NULL).
+ */
+ PluginDatumProcessor proc;
+
+ /**
+ * Closure for proc.
+ */
+ void *proc_cls;
+};
+
+
+
+/**
+ * Wrapper for the processor for 'mysql_plugin_get_expiration'.
+ * If no expired value was found, we do a second query for
+ * low-priority content.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * (continue on call to "next", of course),
+ * GNUNET_NO to delete the item and continue (if supported)
*/
-static void
-mysql_plugin_iter_migration_order (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+static int
+expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
+ const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
+ uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
{
- struct Plugin *plugin = cls;
- iterateHelper (plugin, 0, GNUNET_NO, 3, iter, iter_cls);
+ struct ExpiCtx *rc = cls;
+ struct Plugin *plugin = rc->plugin;
+
+ if (NULL == key)
+ {
+ execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
+ -1);
+ return GNUNET_SYSERR;
+ }
+ return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
+ expiration, uid);
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Get a random item for expiration.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param cls closure
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-mysql_plugin_iter_all_now (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, 0, GNUNET_YES, 0, iter, iter_cls);
+ long long nt;
+ struct ExpiCtx rc;
+
+ rc.plugin = plugin;
+ rc.proc = proc;
+ rc.proc_cls = proc_cls;
+ nt = (long long) GNUNET_TIME_absolute_get ().abs_value;
+ execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
+ MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
+
}
/**
* Drop database.
+ *
+ * @param cls the "struct Plugin*"
*/
-static void
+static void
mysql_plugin_drop (void *cls)
{
struct Plugin *plugin = cls;
- if ((GNUNET_OK != run_statement (plugin,
- "DROP TABLE gn090")) ||
- (GNUNET_OK != run_statement (plugin,
- "DROP TABLE gn072")))
- return; /* error */
+ if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
+ return; /* error */
plugin->env->duc (plugin->env->cls, 0);
}
plugin = GNUNET_malloc (sizeof (struct Plugin));
plugin->env = env;
- plugin->cnffile = get_my_cnf_path (env->cfg);
- if (GNUNET_OK != iopen (plugin))
- {
- iclose (plugin);
- GNUNET_free_non_null (plugin->cnffile);
- GNUNET_free (plugin);
- return NULL;
- }
-#define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
-#define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
- if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
- " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
- " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
- " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
- " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
- " hash BINARY(64) NOT NULL DEFAULT '',"
- " vhash BINARY(64) NOT NULL DEFAULT '',"
- " vkey BIGINT UNSIGNED NOT NULL DEFAULT 0,"
- " INDEX hash (hash(64)),"
- " INDEX hash_vhash_vkey (hash(64),vhash(64),vkey),"
- " INDEX hash_vkey (hash(64),vkey),"
- " INDEX vkey (vkey),"
- " INDEX prio (prio,vkey),"
- " INDEX expire (expire,vkey,type),"
- " INDEX anonLevel (anonLevel,prio,vkey,type)"
- ") ENGINE=InnoDB") ||
- MRUNS ("CREATE TABLE IF NOT EXISTS gn072 ("
- " vkey BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,"
- " value BLOB NOT NULL DEFAULT '') ENGINE=MyISAM") ||
- MRUNS ("SET AUTOCOMMIT = 1") ||
- PINIT (plugin->select_value, SELECT_VALUE) ||
- PINIT (plugin->delete_value, DELETE_VALUE) ||
- PINIT (plugin->insert_value, INSERT_VALUE) ||
+ plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
+ if (NULL == plugin->mc)
+ {
+ GNUNET_free (plugin);
+ return NULL;
+ }
+#define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
+#define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
+ if (MRUNS
+ ("CREATE TABLE IF NOT EXISTS gn090 ("
+ " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+ " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+ " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+ " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+ " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
+ " rvalue BIGINT UNSIGNED NOT NULL,"
+ " hash BINARY(64) NOT NULL DEFAULT '',"
+ " vhash BINARY(64) NOT NULL DEFAULT '',"
+ " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
+ " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
+ " INDEX idx_hash_uid (hash(64),uid),"
+ " INDEX idx_hash_vhash (hash(64),vhash(64)),"
+ " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
+ " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
+ " INDEX idx_expire (expire),"
+ " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
+ ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
PINIT (plugin->insert_entry, INSERT_ENTRY) ||
- PINIT (plugin->delete_entry_by_vkey, DELETE_ENTRY_BY_VKEY) ||
+ PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
- PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
- || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
- || PINIT (plugin->select_entry_by_hash_vhash_and_type,
- SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
- || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
- || PINIT (plugin->get_size, SELECT_SIZE)
- || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
- || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
+ PINIT (plugin->select_entry_by_hash_and_vhash,
+ SELECT_ENTRY_BY_HASH_AND_VHASH) ||
+ PINIT (plugin->select_entry_by_hash_and_type,
+ SELECT_ENTRY_BY_HASH_AND_TYPE) ||
+ PINIT (plugin->select_entry_by_hash_vhash_and_type,
+ SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
+ PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
+ PINIT (plugin->get_size, SELECT_SIZE) ||
+ PINIT (plugin->count_entry_by_hash_and_vhash,
+ COUNT_ENTRY_BY_HASH_AND_VHASH) ||
+ PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
|| PINIT (plugin->count_entry_by_hash_vhash_and_type,
- COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
- || PINIT (plugin->update_entry, UPDATE_ENTRY)
- || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY)
- || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS)
- || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME)
- || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER))
- {
- iclose (plugin);
- GNUNET_free_non_null (plugin->cnffile);
- GNUNET_free (plugin);
- return NULL;
- }
+ COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
+ PINIT (plugin->update_entry, UPDATE_ENTRY) ||
+ PINIT (plugin->dec_repl, DEC_REPL) ||
+ PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
+ PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
+ PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
+ PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
+ PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
+ PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
+ {
+ GNUNET_MYSQL_context_destroy (plugin->mc);
+ GNUNET_free (plugin);
+ return NULL;
+ }
#undef PINIT
#undef MRUNS
api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
api->cls = plugin;
- api->get_size = &mysql_plugin_get_size;
+ api->estimate_size = &mysql_plugin_estimate_size;
api->put = &mysql_plugin_put;
- api->next_request = &mysql_plugin_next_request;
- api->get = &mysql_plugin_get;
api->update = &mysql_plugin_update;
- api->iter_low_priority = &mysql_plugin_iter_low_priority;
- api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
- api->iter_ascending_expiration = &mysql_plugin_iter_ascending_expiration;
- api->iter_migration_order = &mysql_plugin_iter_migration_order;
- api->iter_all_now = &mysql_plugin_iter_all_now;
+ api->get_key = &mysql_plugin_get_key;
+ api->get_replication = &mysql_plugin_get_replication;
+ api->get_expiration = &mysql_plugin_get_expiration;
+ api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
+ api->get_keys = &mysql_plugin_get_keys;
api->drop = &mysql_plugin_drop;
- GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
- "mysql", _("Mysql database running\n"));
+ GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
+ _("Mysql database running\n"));
return api;
}
struct GNUNET_DATASTORE_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
- iclose (plugin);
- if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (plugin->env->sched,
- plugin->next_task);
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
- GNUNET_free (plugin->next_task_nc);
- plugin->next_task_nc = NULL;
- }
- GNUNET_free_non_null (plugin->cnffile);
+ GNUNET_MYSQL_context_destroy (plugin->mc);
GNUNET_free (plugin);
GNUNET_free (api);
- mysql_library_end ();
return NULL;
}