/**
* Prepared statements.
*/
-#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,RAND(),?,?,?)"
+#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
struct GNUNET_MysqlStatementHandle *insert_entry;
#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 WHERE hash=?"
+#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
-#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?"
+#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND type=?"
+#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?"
+#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
struct GNUNET_MysqlStatementHandle *get_size;
-#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND type=? ORDER BY uid DESC LIMIT 1 OFFSET ?"
+#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
+ "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
+ "WHERE anonLevel=0 AND type=? AND "\
+ "(rvalue >= ? OR"\
+ " NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
+ "ORDER BY rvalue ASC LIMIT 1"
struct GNUNET_MysqlStatementHandle *zero_iter;
-#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY prio ASC LIMIT 1) "\
- "ORDER BY expire ASC LIMIT 1"
+#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
struct GNUNET_MysqlStatementHandle *select_expiration;
- // select type from (select rand() as v) AS t1 INNER JOIN gn090 ON expire>=t1.v limit 1;
+#define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
+ struct GNUNET_MysqlStatementHandle *select_priority;
-#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM "\
- "(SELECT RAND() AS v) AS t1 INNER JOIN "\
- "(SELECT MAX(repl) AS m FROM gn090) AS t2 INNER JOIN "\
- "gn090 ON repl=t2.m AND"\
- " (rvalue>=t1.v OR"\
- " NOT EXISTS (SELECT 1 FROM gn090 WHERE repl=t2.m AND rvalue>=t1.v))"\
+#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
+ "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
+ "WHERE repl=? AND "\
+ " (rvalue>=? OR"\
+ " NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
"ORDER BY rvalue ASC "\
"LIMIT 1"
struct GNUNET_MysqlStatementHandle *select_replication;
+#define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
+ struct GNUNET_MysqlStatementHandle *max_repl;
+
};
unsigned int ipriority = priority;
unsigned int ianonymity = anonymity;
unsigned long long lexpiration = expiration.abs_value;
+ unsigned long long lrvalue = (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
unsigned long hashSize;
unsigned long hashSize2;
unsigned long lsize;
MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
MYSQL_TYPE_BLOB, data, lsize, &lsize,
PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- unsigned long long off;
-
- off = (unsigned long long) offset;
+ unsigned long long rvalue = (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
execute_select (plugin,
plugin->zero_iter,
proc, proc_cls,
MYSQL_TYPE_LONG, &type, GNUNET_YES,
- MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
-1);
-
}
{
struct Plugin *plugin = cls;
struct ReplCtx rc;
+ unsigned long long rvalue;
+ unsigned long repl;
+ MYSQL_BIND results;
rc.plugin = plugin;
rc.proc = proc;
rc.proc_cls = proc_cls;
+ memset (&results, 0, sizeof (results));
+ results.buffer_type = MYSQL_TYPE_LONG;
+ results.buffer = &repl;
+ results.is_unsigned = GNUNET_YES;
+
+ if (1 !=
+ prepared_statement_run_select (plugin,
+ plugin->max_repl,
+ 1,
+ &results,
+ -1))
+ {
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+
+ rvalue = (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
execute_select (plugin,
plugin->select_replication,
&repl_proc, &rc,
+ MYSQL_TYPE_LONG, &repl, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
+ MYSQL_TYPE_LONG, &repl, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
-1);
}
+/**
+ * Context for 'expi_proc' function.
+ */
+struct ExpiCtx
+{
+
+ /**
+ * Plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call for the result (or the NULL).
+ */
+ PluginDatumProcessor proc;
+
+ /**
+ * Closure for proc.
+ */
+ void *proc_cls;
+};
+
+
+
+/**
+ * Wrapper for the processor for 'mysql_plugin_get_expiration'.
+ * If no expired value was found, we do a second query for
+ * low-priority content.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * (continue on call to "next", of course),
+ * GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+expi_proc (void *cls,
+ const GNUNET_HashCode *key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
+{
+ struct ExpiCtx *rc = cls;
+ struct Plugin *plugin = rc->plugin;
+
+ if (NULL == key)
+ {
+ execute_select (plugin,
+ plugin->select_priority,
+ rc->proc, rc->proc_cls,
+ -1);
+ return GNUNET_SYSERR;
+ }
+ return rc->proc (rc->proc_cls,
+ key,
+ size, data,
+ type, priority, anonymity, expiration,
+ uid);
+}
+
+
/**
* Get a random item for expiration.
* Call 'proc' with all values ZERO or NULL if the datastore is empty.
{
struct Plugin *plugin = cls;
long long nt;
-
+ struct ExpiCtx rc;
+
+ rc.plugin = plugin;
+ rc.proc = proc;
+ rc.proc_cls = proc_cls;
nt = (long long) GNUNET_TIME_absolute_get().abs_value;
execute_select (plugin,
plugin->select_expiration,
- proc, proc_cls,
+ expi_proc, &rc,
MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
-1);
" prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
- " rvalue DOUBLE UNSIGNED NOT NULL,"
+ " rvalue BIGINT UNSIGNED NOT NULL,"
" hash BINARY(64) NOT NULL DEFAULT '',"
" vhash BINARY(64) NOT NULL DEFAULT '',"
" value BLOB NOT NULL DEFAULT '',"
" INDEX idx_hash (hash(64)),"
" INDEX idx_hash_uid (hash(64),uid),"
" INDEX idx_hash_vhash (hash(64),vhash(64)),"
- " INDEX idx_hash_type_uid (hash(64),type,uid),"
+ " INDEX idx_hash_type_rvalue (hash(64),type,rvalue),"
" INDEX idx_prio (prio),"
" INDEX idx_repl_rvalue (repl,rvalue),"
- " INDEX idx_expire_prio (expire,prio),"
- " INDEX idx_anonLevel_uid (anonLevel,uid)"
+ " INDEX idx_expire (expire),"
+ " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
") ENGINE=InnoDB") ||
MRUNS ("SET AUTOCOMMIT = 1") ||
PINIT (plugin->insert_entry, INSERT_ENTRY) ||
|| PINIT (plugin->dec_repl, DEC_REPL)
|| PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS)
|| PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION)
+ || PINIT (plugin->select_priority, SELECT_IT_PRIORITY)
+ || PINIT (plugin->max_repl, SELECT_MAX_REPL)
|| PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
{
iclose (plugin);