/*
This file is part of GNUnet
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* MANUAL SETUP INSTRUCTIONS
*
* 1) in /etc/gnunet.conf, set
- * <pre>
- * [datastore]
- * DATABASE = "mysql"
- * </pre>
+ * @verbatim
+ [datastore]
+ DATABASE = "mysql"
+ @endverbatim
* 2) Then access mysql as root,
- * <pre>
- *
- * $ mysql -u root -p
- *
- * </pre>
+ * @verbatim
+ $ mysql -u root -p
+ @endverbatim
* and do the following. [You should replace $USER with the username
* that will be running the gnunetd process].
- * <pre>
- *
+ * @verbatim
CREATE DATABASE gnunet;
GRANT select,insert,update,delete,create,alter,drop,create temporary tables
ON gnunet.* TO $USER@localhost;
SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
FLUSH PRIVILEGES;
- *
- * </pre>
+ @endverbatim
* 3) In the $HOME directory of $USER, create a ".my.cnf" file
* with the following lines
- * <pre>
-
+ * @verbatim
[client]
user=$USER
password=$the_password_you_like
-
- * </pre>
+ @endverbatim
*
* Thats it. Note that .my.cnf file is a security risk unless its on
* a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
* 4) Still, perhaps you should briefly try if the DB connection
* works. First, login as $USER. Then use,
*
- * <pre>
- * $ mysql -u $USER -p $the_password_you_like
- * mysql> use gnunet;
- * </pre>
+ * @verbatim
+ $ mysql -u $USER -p $the_password_you_like
+ mysql> use gnunet;
+ @endverbatim
*
* If you get the message "Database changed" it probably works.
*
* - The tables can be verified/fixed in two ways;
* 1) by running mysqlcheck -A, or
* 2) by executing (inside of mysql using the GNUnet database):
- * mysql> REPAIR TABLE gn080;
- * mysql> REPAIR TABLE gn072;
+ * @verbatim
+ mysql> REPAIR TABLE gn090;
+ @endverbatim
*
* PROBLEMS?
*
* friend is probably the mysql manual. The first thing to check
* is that mysql is basically operational, that you can connect
* to it, create tables, issue queries etc.
- *
- * TODO:
- * - implement GET
- * - remove 'size' field in gn080.
- * - use FOREIGN KEY for 'uid/vkey'
- * - consistent naming of uid/vkey
*/
#include "platform.h"
-#include "plugin_datastore.h"
+#include "gnunet_datastore_plugin.h"
#include "gnunet_util_lib.h"
#include <mysql/mysql.h>
#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
-/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle
- "OR" very well, so we need to use UNION instead. And UNION does not
- automatically apply a LIMIT on the outermost clause, so we need to
- repeat ourselves quite a bit. All hail the performance gods (and thanks
- to #mysql on freenode) */
-#define SELECT_IT_LOW_PRIORITY "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
- "ORDER BY prio ASC,vkey ASC LIMIT 1) "\
- "UNION "\
- "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
- "ORDER BY prio ASC,vkey ASC LIMIT 1)"\
- "ORDER BY prio ASC,vkey ASC LIMIT 1"
-
-#define SELECT_IT_NON_ANONYMOUS "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
- " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
- "UNION "\
- "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
- " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
- "ORDER BY prio DESC,vkey DESC LIMIT 1"
-
-#define SELECT_IT_EXPIRATION_TIME "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
- "ORDER BY expire ASC,vkey ASC LIMIT 1) "\
- "UNION "\
- "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
- "ORDER BY expire ASC,vkey ASC LIMIT 1)"\
- "ORDER BY expire ASC,vkey ASC LIMIT 1"
-
-
-#define SELECT_IT_MIGRATION_ORDER "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
- " AND expire > ? AND type!=3"\
- " ORDER BY expire DESC,vkey DESC LIMIT 1) "\
- "UNION "\
- "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
- " AND expire > ? AND type!=3"\
- " ORDER BY expire DESC,vkey DESC LIMIT 1)"\
- "ORDER BY expire DESC,vkey DESC LIMIT 1"
-
-#define SELECT_SIZE "SELECT sum(size) FROM gn080"
-
-
struct GNUNET_MysqlStatementHandle
{
struct GNUNET_MysqlStatementHandle *next;
MYSQL_BIND rbind[7];
- unsigned int type;
-
- unsigned int iter_select;
+ enum GNUNET_BLOCK_Type type;
PluginIterator dviter;
void *dviter_cls;
- unsigned int last_prio;
-
- unsigned long long last_expire;
-
- unsigned long long last_vkey;
+ unsigned int count;
int end_it;
+
+ int one_shot;
};
*/
struct GNUNET_DATASTORE_PluginEnvironment *env;
+ /**
+ * Handle to talk to MySQL.
+ */
MYSQL *dbf;
+ /**
+ * We keep all prepared statements in a DLL. This is the head.
+ */
struct GNUNET_MysqlStatementHandle *shead;
+ /**
+ * We keep all prepared statements in a DLL. This is the tail.
+ */
struct GNUNET_MysqlStatementHandle *stail;
/**
GNUNET_SCHEDULER_TaskIdentifier next_task;
/**
- * Statements dealing with gn072 table
- */
-#define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *select_value;
-
-#define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *delete_value;
-
-#define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)"
- struct GNUNET_MysqlStatementHandle *insert_value;
-
- /**
- * Statements dealing with gn080 table
+ * Prepared statements.
*/
-#define INSERT_ENTRY "INSERT INTO gn080 (size,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)"
+#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
struct GNUNET_MysqlStatementHandle *insert_entry;
-#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn080 WHERE vkey=?"
- struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey;
-
-#define SELECT_ENTRY_BY_HASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
-
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
+#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
+ struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
-
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
- struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
-
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=?"
+#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 WHERE hash=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
+
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+ struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
+#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=? AND type=?"
+#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
+ struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
+
+#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND type=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
+ struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
+
+#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
+
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
+ struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
-#define UPDATE_ENTRY "UPDATE gn080 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
+#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
struct GNUNET_MysqlStatementHandle *update_entry;
- struct GNUNET_MysqlStatementHandle *iter[4];
+#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
+ struct GNUNET_MysqlStatementHandle *dec_repl;
- //static unsigned int stat_size;
+#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
+ struct GNUNET_MysqlStatementHandle *get_size;
- /**
- * Size of the mysql database on disk.
- */
- unsigned long long content_size;
+#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
+ struct GNUNET_MysqlStatementHandle *zero_iter;
+
+#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
+ "UNION "\
+ "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY prio ASC LIMIT 1) "\
+ "ORDER BY expire ASC LIMIT 1"
+ struct GNUNET_MysqlStatementHandle *select_expiration;
+
+#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY repl DESC,RAND() LIMIT 1"
+ struct GNUNET_MysqlStatementHandle *select_replication;
};
/**
* Obtain the location of ".my.cnf".
+ *
+ * @param cfg our configuration
* @return NULL on error
*/
static char *
/**
* Free a prepared statement.
+ *
+ * @param plugin plugin context
+ * @param s prepared statement
*/
static void
prepared_statement_destroy (struct Plugin *plugin,
}
GNUNET_assert (mysql_dbname != NULL);
- mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
- mysql_dbname, (unsigned int) mysql_port, NULL,
+ mysql_real_connect (ret->dbf,
+ mysql_server,
+ mysql_user, mysql_password,
+ mysql_dbname,
+ (unsigned int) mysql_port, NULL,
CLIENT_IGNORE_SIGPIPE);
GNUNET_free_non_null (mysql_server);
GNUNET_free_non_null (mysql_user);
/**
* Run the given MySQL statement.
*
+ * @param plugin plugin context
+ * @param statement SQL statement to run
* @return GNUNET_OK on success, GNUNET_SYSERR on error
*/
static int
}
-#if 0
-/**
- * Run the given MySQL SELECT statement. The statement
- * must have only a single result (one column, one row).
- *
- * @return result on success, NULL on error
- */
-static char *
-run_statement_select (struct Plugin *plugin,
- const char *statement)
-{
- MYSQL_RES *sql_res;
- MYSQL_ROW sql_row;
- char *ret;
-
- if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
- return NULL;
- mysql_query (plugin->dbf, statement);
- if ((mysql_error (plugin->dbf)[0]) ||
- (!(sql_res = mysql_use_result (plugin->dbf))) ||
- (!(sql_row = mysql_fetch_row (sql_res))))
- {
- LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
- "mysql_query", plugin);
- return NULL;
- }
- if ((mysql_num_fields (sql_res) != 1) || (sql_row[0] == NULL))
- {
- GNUNET_break (mysql_num_fields (sql_res) == 1);
- if (sql_res != NULL)
- mysql_free_result (sql_res);
- return NULL;
- }
- ret = GNUNET_strdup (sql_row[0]);
- mysql_free_result (sql_res);
- return ret;
-}
-#endif
-
-
/**
* Create a prepared statement.
*
+ * @param plugin plugin context
+ * @param statement SQL statement text to prepare
* @return NULL on error
*/
static struct GNUNET_MysqlStatementHandle *
/**
* Prepare a statement for running.
*
+ * @param plugin plugin context
+ * @param ret handle to prepared statement
* @return GNUNET_OK on success
*/
static int
* Bind the parameters for the given MySQL statement
* and run it.
*
+ * @param plugin plugin context
* @param s statement to bind and run
* @param ap arguments for the binding
* @return GNUNET_SYSERR on error, GNUNET_OK on success
memset (qbind, 0, sizeof (qbind));
off = 0;
ft = 0;
- while ((pc > 0) && (-1 != (ft = va_arg (ap, enum enum_field_types))))
+ while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
{
qbind[off].buffer_type = ft;
switch (ft)
pc--;
off++;
}
- if (!((pc == 0) && (ft != -1) && (va_arg (ap, int) == -1)))
+ if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
{
- GNUNET_break (0);
+ GNUNET_assert (0);
return GNUNET_SYSERR;
}
if (mysql_stmt_bind_param (s->statement, qbind))
*/
typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
unsigned int num_values,
- MYSQL_BIND * values);
+ MYSQL_BIND *values);
/**
* Run a prepared SELECT statement.
*
+ * @param plugin plugin context
+ * @param s statement to run
* @param result_size number of elements in results array
* @param results pointer to already initialized MYSQL_BIND
* array (of sufficient size) for passing results
*/
static int
prepared_statement_run_select (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle
- *s,
+ struct GNUNET_MysqlStatementHandle *s,
unsigned int result_size,
- MYSQL_BIND * results,
- GNUNET_MysqlDataProcessor
- processor, void *processor_cls,
+ MYSQL_BIND *results,
+ GNUNET_MysqlDataProcessor processor, void *processor_cls,
...)
{
va_list ap;
/**
* Run a prepared statement that does NOT produce results.
*
+ * @param plugin plugin context
+ * @param s statement to run
+ * @param insert_id NULL or address where to store the row ID of whatever
+ * was inserted (only for INSERT statements!)
* @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
* values (size + buffer-reference for pointers); terminated
* with "-1"
- * @param insert_id NULL or address where to store the row ID of whatever
- * was inserted (only for INSERT statements!)
* @return GNUNET_SYSERR on error, otherwise
* the number of successfully affected rows
*/
/**
- * Delete an value from the gn072 table.
+ * Delete an entry from the gn090 table.
*
- * @param vkey vkey identifying the value to delete
+ * @param plugin plugin context
+ * @param uid unique ID of the entry to delete
* @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
*/
static int
-do_delete_value (struct Plugin *plugin,
- unsigned long long vkey)
+do_delete_entry (struct Plugin *plugin,
+ unsigned long long uid)
{
int ret;
-
+
#if DEBUG_MYSQL
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Deleting value %llu from gn072 table\n",
- vkey);
+ "Deleting value %llu from gn090 table\n",
+ uid);
#endif
ret = prepared_statement_run (plugin,
- plugin->delete_value,
+ plugin->delete_entry_by_uid,
NULL,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1);
- if (ret > 0)
- {
- ret = GNUNET_OK;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Deleting value %llu from gn072 table failed\n",
- vkey);
- }
+ MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
+ -1);
+ if (ret >= 0)
+ return GNUNET_OK;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Deleting value %llu from gn090 table failed\n",
+ uid);
return ret;
}
+
/**
- * Insert a value into the gn072 table.
+ * Function that simply returns GNUNET_OK
*
- * @param value the value to insert
- * @param size size of the value
- * @param vkey vkey identifying the value henceforth (set)
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @param cls closure, not used
+ * @param num_values not used
+ * @param values not used
+ * @return GNUNET_OK
*/
static int
-do_insert_value (struct Plugin *plugin,
- const void *value, unsigned int size,
- unsigned long long *vkey)
+return_ok (void *cls,
+ unsigned int num_values,
+ MYSQL_BIND *values)
{
- unsigned long length = size;
- int ret;
+ return GNUNET_OK;
+}
- ret = prepared_statement_run (plugin,
- plugin->insert_value,
- vkey,
- MYSQL_TYPE_BLOB,
- value, length, &length, -1);
- if (ret == GNUNET_OK)
+
+/**
+ * Get an estimate of how much space the database is
+ * currently using.
+ *
+ * @param cls our "struct Plugin *"
+ * @return number of bytes used on disk
+ */
+static unsigned long long
+mysql_plugin_get_size (void *cls)
+{
+ struct Plugin *plugin = cls;
+ MYSQL_BIND cbind[1];
+ long long total;
+
+ memset (cbind, 0, sizeof (cbind));
+ total = 0;
+ cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
+ cbind[0].buffer = &total;
+ cbind[0].is_unsigned = GNUNET_NO;
+ if (GNUNET_OK !=
+ prepared_statement_run_select (plugin,
+ plugin->get_size,
+ 1, cbind,
+ &return_ok, NULL,
+ -1))
+ return 0;
+ return total;
+}
+
+
+/**
+ * Store an item in the datastore.
+ *
+ * @param cls closure
+ * @param key key for the item
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
+ * @param expiration expiration time for the content
+ * @param msg set to error message
+ * @return GNUNET_OK on success
+ */
+static int
+mysql_plugin_put (void *cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ char **msg)
+{
+ struct Plugin *plugin = cls;
+ unsigned int irepl = replication;
+ unsigned int itype = type;
+ unsigned int ipriority = priority;
+ unsigned int ianonymity = anonymity;
+ unsigned long long lexpiration = expiration.abs_value;
+ unsigned long hashSize;
+ unsigned long hashSize2;
+ unsigned long lsize;
+ GNUNET_HashCode vhash;
+
+ if (size > MAX_DATUM_SIZE)
{
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ hashSize = sizeof (GNUNET_HashCode);
+ hashSize2 = sizeof (GNUNET_HashCode);
+ lsize = size;
+ GNUNET_CRYPTO_hash (data, size, &vhash);
+ if (GNUNET_OK !=
+ prepared_statement_run (plugin,
+ plugin->insert_entry,
+ NULL,
+ MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
+ MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+ MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
+ MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
+ MYSQL_TYPE_BLOB, data, lsize, &lsize,
+ -1))
+ return GNUNET_SYSERR;
#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Inserted value number %llu with length %u into gn072 table\n",
- *vkey,
- size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Inserted value `%s' with size %u into gn090 table\n",
+ GNUNET_h2s (key),
+ (unsigned int) size);
#endif
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to insert %u byte value into gn072 table\n",
- size);
- }
- return ret;
+ if (size > 0)
+ plugin->env->duc (plugin->env->cls,
+ size);
+ return GNUNET_OK;
}
+
/**
- * Delete an entry from the gn080 table.
+ * Update the priority for a particular key in the datastore. If
+ * the expiration time in value is different than the time found in
+ * the datastore, the higher value should be kept. For the
+ * anonymity level, the lower value is to be used. The specified
+ * priority should be added to the existing priority, ignoring the
+ * priority in value.
*
- * @param vkey vkey identifying the entry to delete
- * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
+ * Note that it is possible for multiple values to match this put.
+ * In that case, all of the respective values are updated.
+ *
+ * @param cls our "struct Plugin*"
+ * @param uid unique identifier of the datum
+ * @param delta by how much should the priority
+ * change? If priority + delta < 0 the
+ * priority should be set to 0 (never go
+ * negative).
+ * @param expire new expiration time should be the
+ * MAX of any existing expiration time and
+ * this value
+ * @param msg set to error message
+ * @return GNUNET_OK on success
*/
static int
-do_delete_entry_by_vkey (struct Plugin *plugin,
- unsigned long long vkey)
+mysql_plugin_update (void *cls,
+ uint64_t uid,
+ int delta,
+ struct GNUNET_TIME_Absolute expire,
+ char **msg)
{
+ struct Plugin *plugin = cls;
+ unsigned long long vkey = uid;
+ unsigned long long lexpire = expire.abs_value;
int ret;
#if DEBUG_MYSQL
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Deleting value %llu from gn080 table\n",
- vkey);
+ "Updating value %llu adding %d to priority and maxing exp at %llu\n",
+ vkey,
+ delta,
+ lexpire);
#endif
ret = prepared_statement_run (plugin,
- plugin->delete_entry_by_vkey,
+ plugin->update_entry,
NULL,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1);
- if (ret > 0)
- {
- ret = GNUNET_OK;
- }
- else
+ MYSQL_TYPE_LONG, &delta, GNUNET_NO,
+ MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES,
+ -1);
+ if (ret != GNUNET_OK)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Deleting value %llu from gn080 table failed\n",
+ "Failed to update value %llu\n",
vkey);
}
return ret;
}
-static int
-return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values)
-{
- return GNUNET_OK;
-}
-
-
-static int
-iterator_helper_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct Plugin *plugin;
- int ret;
- if (nrc == NULL)
- return GNUNET_NO;
- plugin = nrc->plugin;
- ret = GNUNET_SYSERR;
- switch (nrc->iter_select)
- {
- case 0:
- case 1:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 7,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONG,
- &nrc->last_prio,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &nrc->last_prio,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES, -1);
- break;
- case 2:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 7,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES, -1);
- break;
- case 3:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 7,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->now.value,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->now.value,
- GNUNET_YES, -1);
- break;
- default:
- GNUNET_assert (0);
- }
- return ret;
-}
/**
*/
static void
mysql_next_request_cont (void *next_cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct NextRequestClosure *nrc = next_cls;
struct Plugin *plugin;
int ret;
- unsigned int size;
unsigned int type;
unsigned int priority;
unsigned int anonymity;
unsigned long long exp;
- unsigned long long vkey;
unsigned long hashSize;
+ unsigned long size;
+ unsigned long long uid;
+ char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
GNUNET_HashCode key;
struct GNUNET_TIME_Absolute expiration;
- unsigned long length;
- MYSQL_BIND *rbind; /* size 7 */
- MYSQL_BIND dbind[1];
- char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+ MYSQL_BIND *rbind = nrc->rbind;
plugin = nrc->plugin;
plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
plugin->next_task_nc = NULL;
- AGAIN:
+ if (GNUNET_YES == nrc->end_it)
+ goto END_SET;
GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
nrc->now = GNUNET_TIME_absolute_get ();
hashSize = sizeof (GNUNET_HashCode);
memset (nrc->rbind, 0, sizeof (nrc->rbind));
rbind = nrc->rbind;
rbind[0].buffer_type = MYSQL_TYPE_LONG;
- rbind[0].buffer = &size;
+ rbind[0].buffer = &type;
rbind[0].is_unsigned = 1;
rbind[1].buffer_type = MYSQL_TYPE_LONG;
- rbind[1].buffer = &type;
+ rbind[1].buffer = &priority;
rbind[1].is_unsigned = 1;
rbind[2].buffer_type = MYSQL_TYPE_LONG;
- rbind[2].buffer = &priority;
+ rbind[2].buffer = &anonymity;
rbind[2].is_unsigned = 1;
- rbind[3].buffer_type = MYSQL_TYPE_LONG;
- rbind[3].buffer = &anonymity;
+ rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[3].buffer = &exp;
rbind[3].is_unsigned = 1;
- rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[4].buffer = &exp;
- rbind[4].is_unsigned = 1;
+ rbind[4].buffer_type = MYSQL_TYPE_BLOB;
+ rbind[4].buffer = &key;
+ rbind[4].buffer_length = hashSize;
+ rbind[4].length = &hashSize;
rbind[5].buffer_type = MYSQL_TYPE_BLOB;
- rbind[5].buffer = &key;
- rbind[5].buffer_length = hashSize;
- rbind[5].length = &hashSize;
+ rbind[5].buffer = value;
+ rbind[5].buffer_length = size = sizeof (value);
+ rbind[5].length = &size;
rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[6].buffer = &vkey;
- rbind[6].is_unsigned = GNUNET_YES;
+ rbind[6].buffer = &uid;
+ rbind[6].is_unsigned = 1;
- if ( (GNUNET_YES == nrc->end_it) ||
- (GNUNET_OK != nrc->prep (nrc->prep_cls,
- nrc)))
+ if (GNUNET_OK != nrc->prep (nrc->prep_cls,
+ nrc))
goto END_SET;
GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
- {
- GNUNET_break (0); /* far too big */
- goto END_SET;
- }
- nrc->last_vkey = vkey;
- nrc->last_prio = priority;
- nrc->last_expire = exp;
- if ((rbind[0].buffer_type != MYSQL_TYPE_LONG) ||
- (!rbind[0].is_unsigned) ||
- (rbind[1].buffer_type != MYSQL_TYPE_LONG) ||
- (!rbind[1].is_unsigned) ||
- (rbind[2].buffer_type != MYSQL_TYPE_LONG) ||
- (!rbind[2].is_unsigned) ||
- (rbind[3].buffer_type != MYSQL_TYPE_LONG) ||
- (!rbind[3].is_unsigned) ||
- (rbind[4].buffer_type != MYSQL_TYPE_LONGLONG) ||
- (!rbind[4].is_unsigned) ||
- (rbind[5].buffer_type != MYSQL_TYPE_BLOB) ||
- (rbind[5].buffer_length != sizeof (GNUNET_HashCode)) ||
- (*rbind[5].length != sizeof (GNUNET_HashCode)) ||
- (rbind[6].buffer_type != MYSQL_TYPE_LONGLONG) ||
- (!rbind[6].is_unsigned))
+ GNUNET_assert (size <= sizeof(value));
+ if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
+ (hashSize != sizeof (GNUNET_HashCode)) )
{
GNUNET_break (0);
goto END_SET;
}
#if DEBUG_MYSQL
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found value %llu with size %u, prio %u, anon %u, expire %llu selecting from gn080 table\n",
- vkey,
- size,
- priority,
- anonymity,
- exp);
-#endif
- /* now do query on gn072 */
- length = size;
- memset (dbind, 0, sizeof (dbind));
- dbind[0].buffer_type = MYSQL_TYPE_BLOB;
- dbind[0].buffer_length = size;
- dbind[0].length = &length;
- dbind[0].buffer = datum;
- ret = prepared_statement_run_select (plugin,
- plugin->select_value,
- 1,
- dbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1);
- GNUNET_break (ret <= 1); /* should only have one rbind! */
- if (ret > 0)
- ret = GNUNET_OK;
- if ((ret != GNUNET_OK) ||
- (dbind[0].buffer_length != size) || (length != size))
- {
- GNUNET_break (ret != 0); /* should have one rbind! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to obtain %llu from gn072\n",
- vkey);
- GNUNET_break (length == size); /* length should match! */
- GNUNET_break (dbind[0].buffer_length == size); /* length should be internally consistent! */
- if (ret != 0)
- {
- do_delete_value (plugin, vkey);
- do_delete_entry_by_vkey (plugin, vkey);
- plugin->content_size -= size;
- }
- goto AGAIN;
- }
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling iterator with value `%s' number %llu of size %u with type %u, priority %u, anonymity %u and expiration %llu\n",
+ "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
+ (unsigned int) size,
GNUNET_h2s (&key),
- vkey,
- size,
- type,
priority,
anonymity,
exp);
#endif
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- expiration.value = exp;
- ret = nrc->dviter (nrc->dviter_cls,
- nrc,
+ expiration.abs_value = exp;
+ ret = nrc->dviter (nrc->dviter_cls,
+ (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
&key,
- size,
- datum,
- type,
- priority,
- anonymity,
- expiration,
- vkey);
+ size, value,
+ type, priority, anonymity, expiration,
+ uid);
if (ret == GNUNET_SYSERR)
{
nrc->end_it = GNUNET_YES;
}
if (ret == GNUNET_NO)
{
- do_delete_value (plugin, vkey);
- do_delete_entry_by_vkey (plugin, vkey);
- plugin->content_size -= size;
+ do_delete_entry (plugin, uid);
+ if (size != 0)
+ plugin->env->duc (plugin->env->cls,
+ - size);
}
+ if (nrc->one_shot == GNUNET_YES)
+ GNUNET_free (nrc);
return;
END_SET:
/* call dviter with "end of set" */
if (GNUNET_YES == end_it)
nrc->end_it = GNUNET_YES;
nrc->plugin->next_task_nc = nrc;
- nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
- &mysql_next_request_cont,
+ nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
nrc);
}
/**
- * Iterate over the items in the datastore
- * using the given query to select and order
- * the items.
- *
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter never NULL
- * @param is_asc are we using ascending order?
+ * Context for 'get_statement_prepare'.
*/
-static void
-iterateHelper (struct Plugin *plugin,
- unsigned int type,
- int is_asc,
- unsigned int iter_select,
- PluginIterator dviter,
- void *dviter_cls)
-{
- struct NextRequestClosure *nrc;
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->type = type;
- nrc->iter_select = iter_select;
- nrc->dviter = dviter;
- nrc->dviter_cls = dviter_cls;
- nrc->prep = &iterator_helper_prepare;
- if (is_asc)
- {
- nrc->last_prio = 0;
- nrc->last_vkey = 0;
- nrc->last_expire = 0;
- }
- else
- {
- nrc->last_prio = 0x7FFFFFFFL;
- nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- }
- mysql_plugin_next_request (nrc, GNUNET_NO);
-}
-
-
-/**
- * Get an estimate of how much space the database is
- * currently using.
- *
- * @param cls our "struct Plugin*"
- * @return number of bytes used on disk
- */
-static unsigned long long
-mysql_plugin_get_size (void *cls)
-{
- struct Plugin *plugin = cls;
- return plugin->content_size;
-}
-
-
-/**
- * Store an item in the datastore.
- *
- * @param cls closure
- * @param key key for the item
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
- */
-static int
-mysql_plugin_put (void *cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute expiration,
- char **msg)
-{
- struct Plugin *plugin = cls;
- unsigned int itype = type;
- unsigned int isize = size;
- unsigned int ipriority = priority;
- unsigned int ianonymity = anonymity;
- unsigned long long lexpiration = expiration.value;
- unsigned long hashSize;
- unsigned long hashSize2;
- unsigned long long vkey;
- GNUNET_HashCode vhash;
-
- if (size > MAX_DATUM_SIZE)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- hashSize = sizeof (GNUNET_HashCode);
- hashSize2 = sizeof (GNUNET_HashCode);
- GNUNET_CRYPTO_hash (data, size, &vhash);
- if (GNUNET_OK != do_insert_value (plugin,
- data, size, &vkey))
- return GNUNET_SYSERR;
- if (GNUNET_OK !=
- prepared_statement_run (plugin,
- plugin->insert_entry,
- NULL,
- MYSQL_TYPE_LONG,
- &isize,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &itype,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &ipriority,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &ianonymity,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &lexpiration,
- GNUNET_YES,
- MYSQL_TYPE_BLOB,
- key,
- hashSize,
- &hashSize,
- MYSQL_TYPE_BLOB,
- &vhash,
- hashSize2,
- &hashSize2,
- MYSQL_TYPE_LONGLONG,
- &vkey, GNUNET_YES, -1))
- {
- do_delete_value (plugin, vkey);
- return GNUNET_SYSERR;
- }
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Inserted value `%s' number %llu with size %u into gn080 table\n",
- GNUNET_h2s (key),
- vkey,
- isize);
-#endif
- plugin->content_size += size;
- return GNUNET_OK;
-}
-
-
-/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
- */
-static void
-mysql_plugin_iter_low_priority (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
-{
- struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_YES,
- 0, iter, iter_cls);
-}
-
-
struct GetContext
{
GNUNET_HashCode key;
unsigned long long expiration;
unsigned long long vkey;
unsigned long long total;
- int off;
- int count;
+ unsigned int off;
+ unsigned int count;
int have_vhash;
- unsigned long size; /* OBSOLETE! */
};
struct GetContext *gc = cls;
struct Plugin *plugin;
int ret;
- unsigned int limit_off;
unsigned long hashSize;
-
+
if (NULL == nrc)
{
GNUNET_free (gc);
return GNUNET_NO;
plugin = nrc->plugin;
hashSize = sizeof (GNUNET_HashCode);
- if (gc->count + gc->off == gc->total)
- nrc->last_vkey = 0; /* back to start */
- if (gc->count == 0)
- limit_off = gc->off;
- else
- limit_off = 0;
+ if (++gc->off >= gc->total)
+ gc->off = 0;
#if DEBUG_MYSQL
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Obtaining result number %d/%lld at offset %d with lvc %llu for GET `%s'\n",
+ "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
gc->count+1,
gc->total,
- limit_off,
- nrc->last_vkey,
+ gc->off,
GNUNET_h2s (&gc->key));
#endif
if (nrc->type != 0)
{
if (gc->have_vhash)
{
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_vhash_and_type, 7, nrc->rbind, &return_ok,
- NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
- -1);
+ ret = prepared_statement_run_select (plugin,
+ plugin->select_entry_by_hash_vhash_and_type,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES,
+ MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
+ -1);
}
else
{
ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_and_type, 7, nrc->rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
- -1);
+ prepared_statement_run_select (plugin,
+ plugin->select_entry_by_hash_and_type,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES,
+ MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
+ -1);
}
}
else
if (gc->have_vhash)
{
ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_and_vhash, 7, nrc->rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
- GNUNET_YES, -1);
+ prepared_statement_run_select (plugin,
+ plugin->select_entry_by_hash_and_vhash,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
+ -1);
}
else
{
ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash, 7, nrc->rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &limit_off, GNUNET_YES, -1);
+ prepared_statement_run_select (plugin,
+ plugin->select_entry_by_hash,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
+ -1);
}
}
gc->count++;
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Iterate over the results for a particular key in the datastore.
*
* @param cls closure
* @param key maybe NULL (to match all entries)
* betwen key and vhash, but for other blocks
* there may be!
* @param type entries of which type are relevant?
- * Use 0 for any type.
+ * Use 0 for any type.
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
*/
static void
mysql_plugin_get (void *cls,
- const GNUNET_HashCode * key,
- const GNUNET_HashCode * vhash,
+ const GNUNET_HashCode *key,
+ const GNUNET_HashCode *vhash,
enum GNUNET_BLOCK_Type type,
PluginIterator iter, void *iter_cls)
{
struct NextRequestClosure *nrc;
long long total;
unsigned long hashSize;
+ unsigned long hashSize2;
+ GNUNET_assert (key != NULL);
if (iter == NULL)
return;
- if (key == NULL)
- {
- mysql_plugin_iter_low_priority (plugin,
- type,
- iter, iter_cls);
- return;
- }
hashSize = sizeof (GNUNET_HashCode);
+ hashSize2 = sizeof (GNUNET_HashCode);
memset (cbind, 0, sizeof (cbind));
total = -1;
cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
if (vhash != NULL)
{
ret =
- prepared_statement_run_select
- (plugin,
- plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES,
- -1);
+ prepared_statement_run_select (plugin,
+ plugin->count_entry_by_hash_vhash_and_type,
+ 1, cbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
+ MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+ -1);
}
else
{
ret =
- prepared_statement_run_select
- (plugin,
- plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG,
- &itype, GNUNET_YES, -1);
-
+ prepared_statement_run_select (plugin,
+ plugin->count_entry_by_hash_and_type,
+ 1, cbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+ -1);
}
}
else
if (vhash != NULL)
{
ret =
- prepared_statement_run_select
- (plugin,
- plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- vhash, hashSize, &hashSize, -1);
+ prepared_statement_run_select (plugin,
+ plugin->count_entry_by_hash_and_vhash,
+ 1, cbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
+ -1);
}
else
ret =
prepared_statement_run_select (plugin,
plugin->count_entry_by_hash,
- 1, cbind, &return_ok,
- NULL, MYSQL_TYPE_BLOB,
- key, hashSize,
- &hashSize, -1);
+ 1, cbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ -1);
}
}
if ((ret != GNUNET_OK) || (0 >= total))
nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
nrc->plugin = plugin;
nrc->type = type;
- nrc->iter_select = -1;
nrc->dviter = iter;
nrc->dviter_cls = iter_cls;
nrc->prep = &get_statement_prepare;
nrc->prep_cls = gc;
- nrc->last_vkey = 0;
mysql_plugin_next_request (nrc, GNUNET_NO);
}
/**
- * Update the priority for a particular key in the datastore. If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept. For the
- * anonymity level, the lower value is to be used. The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- * change? If priority + delta < 0 the
- * priority should be set to 0 (never go
- * negative).
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * Run the prepared statement to get the next data item ready.
+ *
+ * @param cls not used
+ * @param nrc closure for the next request iterator
+ * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
*/
static int
-mysql_plugin_update (void *cls,
- uint64_t uid,
- int delta,
- struct GNUNET_TIME_Absolute expire,
- char **msg)
+iterator_zero_prepare (void *cls,
+ struct NextRequestClosure *nrc)
{
- struct Plugin *plugin = cls;
- unsigned long long vkey = uid;
- unsigned long long lexpire = expire.value;
+ struct Plugin *plugin;
int ret;
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Updating value %llu adding %d to priority and maxing exp at %llu\n",
- vkey,
- delta,
- lexpire);
-#endif
- ret = prepared_statement_run (plugin,
- plugin->update_entry,
- NULL,
- MYSQL_TYPE_LONG,
- &delta,
- GNUNET_NO,
- MYSQL_TYPE_LONGLONG,
- &lexpire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &lexpire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &vkey,
- GNUNET_YES, -1);
- if (ret != GNUNET_OK)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to update value %llu\n",
- vkey);
- }
+ if (nrc == NULL)
+ return GNUNET_NO;
+ plugin = nrc->plugin;
+ ret = prepared_statement_run_select (plugin,
+ plugin->zero_iter,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
+ -1);
+ nrc->count++;
return ret;
}
*/
static void
mysql_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+ enum GNUNET_BLOCK_Type type,
+ PluginIterator iter,
+ void *iter_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls);
+ struct NextRequestClosure *nrc;
+
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->plugin = plugin;
+ nrc->type = type;
+ nrc->dviter = iter;
+ nrc->dviter_cls = iter_cls;
+ nrc->prep = &iterator_zero_prepare;
+ mysql_plugin_next_request (nrc, GNUNET_NO);
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * Run the SELECT statement for the replication function.
+ *
+ * @param cls the 'struct Plugin'
+ * @param nrc the context (not used)
*/
-static void
-mysql_plugin_iter_ascending_expiration (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+static int
+replication_prepare (void *cls,
+ struct NextRequestClosure *nrc)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_YES, 2, iter, iter_cls);
+
+ return prepared_statement_run_select (plugin,
+ plugin->select_replication,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ -1);
}
+
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Context for 'repl_iter' function.
+ */
+struct ReplCtx
+{
+
+ /**
+ * Plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call for the result (or the NULL).
+ */
+ PluginIterator iter;
+
+ /**
+ * Closure for iter.
+ */
+ void *iter_cls;
+};
+
+
+/**
+ * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Decrements the replication counter and calls the original
+ * iterator.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
+ * @param cls closure
+ * @param next_cls closure to pass to the "next" function.
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * (continue on call to "next", of course),
+ * GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+repl_iter (void *cls,
+ void *next_cls,
+ const GNUNET_HashCode *key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
+{
+ struct ReplCtx *rc = cls;
+ struct Plugin *plugin = rc->plugin;
+ unsigned long long oid;
+ int ret;
+ int iret;
+
+ ret = rc->iter (rc->iter_cls,
+ next_cls, key,
+ size, data,
+ type, priority, anonymity, expiration,
+ uid);
+ if (NULL != key)
+ {
+ oid = (unsigned long long) uid;
+ iret = prepared_statement_run (plugin,
+ plugin->dec_repl,
+ NULL,
+ MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES,
+ -1);
+ if (iret == GNUNET_SYSERR)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to reduce replication counter\n");
+ return GNUNET_SYSERR;
+ }
+ }
+ return ret;
+}
+
+
+/**
+ * Get a random item for replication. Returns a single, not expired, random item
+ * from those with the highest replication counters. The item's
+ * replication counter is decremented by one IF it was positive before.
+ * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ *
+ * @param cls closure
+ * @param iter function to call the value (once only).
* @param iter_cls closure for iter
*/
static void
-mysql_plugin_iter_migration_order (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_replication_get (void *cls,
+ PluginIterator iter, void *iter_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, 0, GNUNET_NO, 3, iter, iter_cls);
+ struct NextRequestClosure *nrc;
+ struct ReplCtx rc;
+
+ rc.plugin = plugin;
+ rc.iter = iter;
+ rc.iter_cls = iter_cls;
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->plugin = plugin;
+ nrc->now = GNUNET_TIME_absolute_get ();
+ nrc->prep = &replication_prepare;
+ nrc->prep_cls = plugin;
+ nrc->type = 0;
+ nrc->dviter = &repl_iter;
+ nrc->dviter_cls = &rc;
+ nrc->end_it = GNUNET_NO;
+ nrc->one_shot = GNUNET_YES;
+ mysql_next_request_cont (nrc, NULL);
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Run the SELECT statement for the expiration function.
+ *
+ * @param cls the 'struct Plugin'
+ * @param nrc the context (not used)
+ * @return GNUNET_OK on success, GNUNET_NO if there are
+ * no more values, GNUNET_SYSERR on error
+ */
+static int
+expiration_prepare (void *cls,
+ struct NextRequestClosure *nrc)
+{
+ struct Plugin *plugin = cls;
+ long long nt;
+
+ if (NULL == nrc)
+ return GNUNET_NO;
+ nt = (long long) nrc->now.abs_value;
+ return prepared_statement_run_select
+ (plugin,
+ plugin->select_expiration,
+ 7, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
+ -1);
+}
+
+
+/**
+ * Get a random item for expiration.
+ * Call 'iter' with all values ZERO or NULL if the datastore is empty.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
+ * @param cls closure
+ * @param iter function to call the value (once only).
* @param iter_cls closure for iter
*/
static void
-mysql_plugin_iter_all_now (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_expiration_get (void *cls,
+ PluginIterator iter, void *iter_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, 0, GNUNET_YES, 0, iter, iter_cls);
+ struct NextRequestClosure *nrc;
+
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->plugin = plugin;
+ nrc->now = GNUNET_TIME_absolute_get ();
+ nrc->prep = &expiration_prepare;
+ nrc->prep_cls = plugin;
+ nrc->type = 0;
+ nrc->dviter = iter;
+ nrc->dviter_cls = iter_cls;
+ nrc->end_it = GNUNET_NO;
+ nrc->one_shot = GNUNET_YES;
+ mysql_next_request_cont (nrc, NULL);
}
/**
* Drop database.
+ *
+ * @param cls the "struct Plugin*"
*/
static void
mysql_plugin_drop (void *cls)
{
struct Plugin *plugin = cls;
- if ((GNUNET_OK != run_statement (plugin,
- "DROP TABLE gn080")) ||
- (GNUNET_OK != run_statement (plugin,
- "DROP TABLE gn072")))
- return; /* error */
- plugin->content_size = 0;
+ if (GNUNET_OK != run_statement (plugin,
+ "DROP TABLE gn090"))
+ return; /* error */
+ plugin->env->duc (plugin->env->cls, 0);
}
}
#define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
#define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
- if (MRUNS ("CREATE TABLE IF NOT EXISTS gn080 ("
- " size INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+ if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
+ " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
" hash BINARY(64) NOT NULL DEFAULT '',"
" vhash BINARY(64) NOT NULL DEFAULT '',"
- " vkey BIGINT UNSIGNED NOT NULL DEFAULT 0,"
- " INDEX hash (hash(64)),"
- " INDEX hash_vhash_vkey (hash(64),vhash(64),vkey),"
- " INDEX hash_vkey (hash(64),vkey),"
- " INDEX vkey (vkey),"
- " INDEX prio (prio,vkey),"
- " INDEX expire (expire,vkey,type),"
- " INDEX anonLevel (anonLevel,prio,vkey,type)"
+ " value BLOB NOT NULL DEFAULT '',"
+ " uid BIGINT NOT NULL AUTO_INCREMENT,"
+ " PRIMARY KEY (uid)"
+#if 0
+ " INDEX idx_hash (hash(64)),"
+ " INDEX idx_hash_uid (hash(64),uid),"
+ " INDEX idx_hash_vhash (hash(64),vhash(64)),"
+ " INDEX idx_hash_type_uid (hash(64),type,uid),"
+ " INDEX idx_prio (prio),"
+ " INDEX idx_repl (repl),"
+ " INDEX idx_expire_prio (expire,prio),"
+ " INDEX idx_anonLevel_uid (anonLevel,uid)"
+#endif
") ENGINE=InnoDB") ||
- MRUNS ("CREATE TABLE IF NOT EXISTS gn072 ("
- " vkey BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,"
- " value BLOB NOT NULL DEFAULT '') ENGINE=MyISAM") ||
MRUNS ("SET AUTOCOMMIT = 1") ||
- PINIT (plugin->select_value, SELECT_VALUE) ||
- PINIT (plugin->delete_value, DELETE_VALUE) ||
- PINIT (plugin->insert_value, INSERT_VALUE) ||
PINIT (plugin->insert_entry, INSERT_ENTRY) ||
- PINIT (plugin->delete_entry_by_vkey, DELETE_ENTRY_BY_VKEY) ||
+ PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
|| PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
|| PINIT (plugin->select_entry_by_hash_vhash_and_type,
SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
|| PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
+ || PINIT (plugin->get_size, SELECT_SIZE)
|| PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
|| PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
|| PINIT (plugin->count_entry_by_hash_vhash_and_type,
COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
|| PINIT (plugin->update_entry, UPDATE_ENTRY)
- || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY)
- || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS)
- || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME)
- || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER))
+ || PINIT (plugin->dec_repl, DEC_REPL)
+ || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS)
+ || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION)
+ || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
{
iclose (plugin);
GNUNET_free_non_null (plugin->cnffile);
api->put = &mysql_plugin_put;
api->next_request = &mysql_plugin_next_request;
api->get = &mysql_plugin_get;
+ api->replication_get = &mysql_plugin_replication_get;
+ api->expiration_get = &mysql_plugin_expiration_get;
api->update = &mysql_plugin_update;
- api->iter_low_priority = &mysql_plugin_iter_low_priority;
api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
- api->iter_ascending_expiration = &mysql_plugin_iter_ascending_expiration;
- api->iter_migration_order = &mysql_plugin_iter_migration_order;
- api->iter_all_now = &mysql_plugin_iter_all_now;
api->drop = &mysql_plugin_drop;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"mysql", _("Mysql database running\n"));
iclose (plugin);
if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
{
- GNUNET_SCHEDULER_cancel (plugin->env->sched,
- plugin->next_task);
+ GNUNET_SCHEDULER_cancel (plugin->next_task);
plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
GNUNET_free (plugin->next_task_nc);