-consistently use struct GNUNET_HashCode
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
index 84f9d313ca34747132eb226d73ade0f8d5eb45ca..b92877d9809e7eb7a1e9c6a78b56d1ac2b8c0305 100644 (file)
 #include "platform.h"
 #include "gnunet_datastore_plugin.h"
 #include "gnunet_util_lib.h"
-#include <mysql/mysql.h>
+#include "gnunet_mysql_lib.h"
 
-#define DEBUG_MYSQL GNUNET_NO
 
 #define MAX_DATUM_SIZE 65536
 
-/**
- * Maximum number of supported parameters for a prepared
- * statement.  Increase if needed.
- */
-#define MAX_PARAM 16
-
-/**
- * Die with an error message that indicates
- * a failure of the command 'cmd' with the message given
- * by strerror(errno).
- */
-#define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
-
-/**
- * Log an error message at log-level 'level' that indicates
- * a failure of the command 'cmd' on file 'filename'
- * with the message given by strerror(errno).
- */
-#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
-
-
-struct GNUNET_MysqlStatementHandle
-{
-  struct GNUNET_MysqlStatementHandle *next;
-
-  struct GNUNET_MysqlStatementHandle *prev;
-
-  char *query;
-
-  MYSQL_STMT *statement;
-
-  int valid;
-
-};
-
-/**
- * Context for the universal iterator.
- */
-struct NextRequestClosure;
-
-/**
- * Type of a function that will prepare
- * the next iteration.
- *
- * @param cls closure
- * @param nc the next context; NULL for the last
- *         call which gives the callback a chance to
- *         clean up the closure
- * @return GNUNET_OK on success, GNUNET_NO if there are
- *         no more values, GNUNET_SYSERR on error
- */
-typedef int (*PrepareFunction)(void *cls,
-                              struct NextRequestClosure *nc);
-
-
-struct NextRequestClosure
-{
-  struct Plugin *plugin;
-
-  struct GNUNET_TIME_Absolute now;
-
-  /**
-   * Function to call to prepare the next
-   * iteration.
-   */
-  PrepareFunction prep;
-
-  /**
-   * Closure for prep.
-   */
-  void *prep_cls;
-
-  MYSQL_BIND rbind[7];
-
-  enum GNUNET_BLOCK_Type type;
-
-  PluginIterator dviter;
-
-  void *dviter_cls;
-
-  unsigned int count;
-
-  int end_it;
-};
-
 
 /**
  * Context for all functions in this plugin.
  */
-struct Plugin 
+struct Plugin
 {
   /**
    * Our execution environment.
@@ -224,596 +138,80 @@ struct Plugin
   /**
    * Handle to talk to MySQL.
    */
-  MYSQL *dbf;
-  
-  /**
-   * We keep all prepared statements in a DLL.  This is the head.
-   */
-  struct GNUNET_MysqlStatementHandle *shead;
-
-  /**
-   * We keep all prepared statements in a DLL.  This is the tail.
-   */
-  struct GNUNET_MysqlStatementHandle *stail;
-
-  /**
-   * Filename of "my.cnf" (msyql configuration).
-   */
-  char *cnffile;
-
-  /**
-   * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
-   */
-  struct NextRequestClosure *next_task_nc;
-
-  /**
-   * Pending task with scheduler for running the next request.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier next_task;
+  struct GNUNET_MYSQL_Context *mc;
 
   /**
    * Prepared statements.
    */
-#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
-  struct GNUNET_MysqlStatementHandle *insert_entry;
-  
+#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
+  struct GNUNET_MYSQL_StatementHandle *insert_entry;
+
 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
-  struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
+  struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
 
 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
-  struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
-  
-#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_uid) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
+  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
+
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+  struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
 
 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
-  struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
+  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
 
 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
+  struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
+
 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
-  struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
+  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
 
 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
-  struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
-  
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
-
-#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1"
-  struct GNUNET_MysqlStatementHandle *update_entry;
-
-#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
-  struct GNUNET_MysqlStatementHandle *get_size;
-
-#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_anonLevel_uid) WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *zero_iter;
-
-#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_expire_prio) WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
-  "UNION "\
-  "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_prio) ORDER BY prio ASC LIMIT 1) "\
-  "ORDER BY expire ASC LIMIT 1"
-  struct GNUNET_MysqlStatementHandle *select_expiration;
-
-#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_repl) ORDER BY repl DESC,RAND() LIMIT 1"
-  struct GNUNET_MysqlStatementHandle *select_replication;
-
-};
-
-
-/**
- * Obtain the location of ".my.cnf".
- *
- * @param cfg our configuration
- * @return NULL on error
- */
-static char *
-get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-  char *cnffile;
-  char *home_dir;
-  struct stat st;
-#ifndef WINDOWS
-  struct passwd *pw;
-#endif
-  int configured;
-
-#ifndef WINDOWS
-  pw = getpwuid (getuid ());
-  if (!pw)
-    {
-      GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
-                          "getpwuid");
-      return NULL;
-    }
-  if (GNUNET_YES ==
-      GNUNET_CONFIGURATION_have_value (cfg,
-                                      "datastore-mysql", "CONFIG"))
-    {
-      GNUNET_assert (GNUNET_OK == 
-                    GNUNET_CONFIGURATION_get_value_filename (cfg,
-                                                             "datastore-mysql", "CONFIG", &cnffile));
-      configured = GNUNET_YES;
-    }
-  else
-    {
-      home_dir = GNUNET_strdup (pw->pw_dir);
-#else
-      home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
-      plibc_conv_to_win_path ("~/", home_dir);
-#endif
-      GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
-      GNUNET_free (home_dir);
-      configured = GNUNET_NO;
-    }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             _("Trying to use file `%s' for MySQL configuration.\n"),
-             cnffile);
-  if ((0 != STAT (cnffile, &st)) ||
-      (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
-    {
-      if (configured == GNUNET_YES)
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                   _("Could not access file `%s': %s\n"), cnffile,
-                   STRERROR (errno));
-      GNUNET_free (cnffile);
-      return NULL;
-    }
-  return cnffile;
-}
+  struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
 
+#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
+  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
 
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
+  struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
 
-/**
- * Free a prepared statement.
- *
- * @param plugin plugin context
- * @param s prepared statement
- */
-static void
-prepared_statement_destroy (struct Plugin *plugin, 
-                           struct GNUNET_MysqlStatementHandle
-                           *s)
-{
-  GNUNET_CONTAINER_DLL_remove (plugin->shead,
-                              plugin->stail,
-                              s);
-  if (s->valid)
-    mysql_stmt_close (s->statement);
-  GNUNET_free (s->query);
-  GNUNET_free (s);
-}
-
-
-/**
- * Close database connection and all prepared statements (we got a DB
- * disconnect error).
- */
-static int
-iclose (struct Plugin *plugin)
-{
-  struct GNUNET_MysqlStatementHandle *spos;
-
-  spos = plugin->shead;
-  while (NULL != plugin->shead)
-    prepared_statement_destroy (plugin,
-                               plugin->shead);
-  if (plugin->dbf != NULL)
-    {
-      mysql_close (plugin->dbf);
-      plugin->dbf = NULL;
-    }
-  return GNUNET_OK;
-}
-
-
-/**
- * Open the connection with the database (and initialize
- * our default options).
- *
- * @return GNUNET_OK on success
- */
-static int
-iopen (struct Plugin *ret)
-{
-  char *mysql_dbname;
-  char *mysql_server;
-  char *mysql_user;
-  char *mysql_password;
-  unsigned long long mysql_port;
-  my_bool reconnect;
-  unsigned int timeout;
-
-  ret->dbf = mysql_init (NULL);
-  if (ret->dbf == NULL)
-    return GNUNET_SYSERR;
-  if (ret->cnffile != NULL)
-    mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
-  mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
-  reconnect = 0;
-  mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
-  mysql_options (ret->dbf,
-                 MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
-  mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
-  timeout = 60; /* in seconds */
-  mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
-  mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
-  mysql_dbname = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "datastore-mysql", "DATABASE"))
-    GNUNET_assert (GNUNET_OK == 
-                  GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
-                                                         "datastore-mysql", "DATABASE", 
-                                                         &mysql_dbname));
-  else
-    mysql_dbname = GNUNET_strdup ("gnunet");
-  mysql_user = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "datastore-mysql", "USER"))
-    {
-      GNUNET_assert (GNUNET_OK == 
-                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
-                                                          "datastore-mysql", "USER", 
-                                                          &mysql_user));
-    }
-  mysql_password = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "datastore-mysql", "PASSWORD"))
-    {
-      GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
-                                                          "datastore-mysql", "PASSWORD",
-                                                          &mysql_password));
-    }
-  mysql_server = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "datastore-mysql", "HOST"))
-    {
-      GNUNET_assert (GNUNET_OK == 
-                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
-                                                          "datastore-mysql", "HOST", 
-                                                          &mysql_server));
-    }
-  mysql_port = 0;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "datastore-mysql", "PORT"))
-    {
-      GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
-                                                          "PORT", &mysql_port));
-    }
-
-  GNUNET_assert (mysql_dbname != NULL);
-  mysql_real_connect (ret->dbf, 
-                     mysql_server, 
-                     mysql_user, mysql_password,
-                      mysql_dbname, 
-                     (unsigned int) mysql_port, NULL,
-                     CLIENT_IGNORE_SIGPIPE);
-  GNUNET_free_non_null (mysql_server);
-  GNUNET_free_non_null (mysql_user);
-  GNUNET_free_non_null (mysql_password);
-  GNUNET_free (mysql_dbname);
-  if (mysql_error (ret->dbf)[0])
-    {
-      LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
-                 "mysql_real_connect", ret);
-      return GNUNET_SYSERR;
-    }
-  return GNUNET_OK;
-}
-
-
-/**
- * Run the given MySQL statement.
- *
- * @param plugin plugin context
- * @param statement SQL statement to run
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
- */
-static int
-run_statement (struct Plugin *plugin,
-              const char *statement)
-{
-  if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
-    return GNUNET_SYSERR;
-  mysql_query (plugin->dbf, statement);
-  if (mysql_error (plugin->dbf)[0])
-    {
-      LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
-                 "mysql_query", plugin);
-      iclose (plugin);
-      return GNUNET_SYSERR;
-    }
-  return GNUNET_OK;
-}
-
-
-/**
- * Create a prepared statement.
- *
- * @param plugin plugin context
- * @param statement SQL statement text to prepare
- * @return NULL on error
- */
-static struct GNUNET_MysqlStatementHandle *
-prepared_statement_create (struct Plugin *plugin, 
-                          const char *statement)
-{
-  struct GNUNET_MysqlStatementHandle *ret;
-
-  ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
-  ret->query = GNUNET_strdup (statement);
-  GNUNET_CONTAINER_DLL_insert (plugin->shead,
-                              plugin->stail,
-                              ret);
-  return ret;
-}
-
-
-/**
- * Prepare a statement for running.
- *
- * @param plugin plugin context
- * @param ret handle to prepared statement
- * @return GNUNET_OK on success
- */
-static int
-prepare_statement (struct Plugin *plugin, 
-                  struct GNUNET_MysqlStatementHandle *ret)
-{
-  if (GNUNET_YES == ret->valid)
-    return GNUNET_OK;
-  if ((NULL == plugin->dbf) && 
-      (GNUNET_OK != iopen (plugin)))
-    return GNUNET_SYSERR;
-  ret->statement = mysql_stmt_init (plugin->dbf);
-  if (ret->statement == NULL)
-    {
-      iclose (plugin);
-      return GNUNET_SYSERR;
-    }
-  if (mysql_stmt_prepare (ret->statement, 
-                         ret->query,
-                         strlen (ret->query)))
-    {
-      LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
-                 "mysql_stmt_prepare", 
-                plugin);
-      mysql_stmt_close (ret->statement);
-      ret->statement = NULL;
-      iclose (plugin);
-      return GNUNET_SYSERR;
-    }
-  ret->valid = GNUNET_YES;
-  return GNUNET_OK;
-
-}
-
+#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
+  struct GNUNET_MYSQL_StatementHandle *update_entry;
 
-/**
- * Bind the parameters for the given MySQL statement
- * and run it.
- *
- * @param plugin plugin context
- * @param s statement to bind and run
- * @param ap arguments for the binding
- * @return GNUNET_SYSERR on error, GNUNET_OK on success
- */
-static int
-init_params (struct Plugin *plugin,
-            struct GNUNET_MysqlStatementHandle *s,
-            va_list ap)
-{
-  MYSQL_BIND qbind[MAX_PARAM];
-  unsigned int pc;
-  unsigned int off;
-  enum enum_field_types ft;
+#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
+  struct GNUNET_MYSQL_StatementHandle *dec_repl;
 
-  pc = mysql_stmt_param_count (s->statement);
-  if (pc > MAX_PARAM)
-    {
-      /* increase internal constant! */
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  memset (qbind, 0, sizeof (qbind));
-  off = 0;
-  ft = 0;
-  while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
-    {
-      qbind[off].buffer_type = ft;
-      switch (ft)
-        {
-        case MYSQL_TYPE_FLOAT:
-          qbind[off].buffer = va_arg (ap, float *);
-          break;
-        case MYSQL_TYPE_LONGLONG:
-          qbind[off].buffer = va_arg (ap, unsigned long long *);
-          qbind[off].is_unsigned = va_arg (ap, int);
-          break;
-        case MYSQL_TYPE_LONG:
-          qbind[off].buffer = va_arg (ap, unsigned int *);
-          qbind[off].is_unsigned = va_arg (ap, int);
-          break;
-        case MYSQL_TYPE_VAR_STRING:
-        case MYSQL_TYPE_STRING:
-        case MYSQL_TYPE_BLOB:
-          qbind[off].buffer = va_arg (ap, void *);
-          qbind[off].buffer_length = va_arg (ap, unsigned long);
-          qbind[off].length = va_arg (ap, unsigned long *);
-          break;
-        default:
-          /* unsupported type */
-          GNUNET_break (0);
-          return GNUNET_SYSERR;
-        }
-      pc--;
-      off++;
-    }
-  if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  if (mysql_stmt_bind_param (s->statement, qbind))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("`%s' failed at %s:%d with error: %s\n"),
-                 "mysql_stmt_bind_param",
-                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
-      iclose (plugin);
-      return GNUNET_SYSERR;
-    }
-  if (mysql_stmt_execute (s->statement))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("`%s' failed at %s:%d with error: %s\n"),
-                 "mysql_stmt_execute",
-                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
-      iclose (plugin);
-      return GNUNET_SYSERR;
-    }
-  return GNUNET_OK;
-}
-
-/**
- * Type of a callback that will be called for each
- * data set returned from MySQL.
- *
- * @param cls user-defined argument
- * @param num_values number of elements in values
- * @param values values returned by MySQL
- * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
- */
-typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
-                                          unsigned int num_values,
-                                          MYSQL_BIND *values);
+#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
+  struct GNUNET_MYSQL_StatementHandle *get_size;
 
+#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
+   "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
+   "WHERE anonLevel=0 AND type=? AND "\
+   "(rvalue >= ? OR"\
+   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
+   "ORDER BY rvalue ASC LIMIT 1"
+  struct GNUNET_MYSQL_StatementHandle *zero_iter;
 
-/**
- * Run a prepared SELECT statement.
- *
- * @param plugin plugin context
- * @param s statement to run
- * @param result_size number of elements in results array
- * @param results pointer to already initialized MYSQL_BIND
- *        array (of sufficient size) for passing results
- * @param processor function to call on each result
- * @param processor_cls extra argument to processor
- * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
- *        values (size + buffer-reference for pointers); terminated
- *        with "-1"
- * @return GNUNET_SYSERR on error, otherwise
- *         the number of successfully affected (or queried) rows
- */
-static int
-prepared_statement_run_select (struct Plugin *plugin,
-                              struct GNUNET_MysqlStatementHandle *s,
-                              unsigned int result_size,
-                              MYSQL_BIND *results,
-                              GNUNET_MysqlDataProcessor processor, void *processor_cls,
-                              ...)
-{
-  va_list ap;
-  int ret;
-  unsigned int rsize;
-  int total;
+#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
+  struct GNUNET_MYSQL_StatementHandle *select_expiration;
 
-  if (GNUNET_OK != prepare_statement (plugin, s))
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  va_start (ap, processor_cls);
-  if (GNUNET_OK != init_params (plugin, s, ap))
-    {
-      GNUNET_break (0);
-      va_end (ap);
-      return GNUNET_SYSERR;
-    }
-  va_end (ap);
-  rsize = mysql_stmt_field_count (s->statement);
-  if (rsize > result_size)
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  if (mysql_stmt_bind_result (s->statement, results))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("`%s' failed at %s:%d with error: %s\n"),
-                 "mysql_stmt_bind_result",
-                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
-      iclose (plugin);
-      return GNUNET_SYSERR;
-    }
+#define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
+  struct GNUNET_MYSQL_StatementHandle *select_priority;
 
-  total = 0;
-  while (1)
-    {
-      ret = mysql_stmt_fetch (s->statement);
-      if (ret == MYSQL_NO_DATA)
-        break;
-      if (ret != 0)
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                     _("`%s' failed at %s:%d with error: %s\n"),
-                     "mysql_stmt_fetch",
-                     __FILE__, __LINE__, mysql_stmt_error (s->statement));
-          iclose (plugin);
-          return GNUNET_SYSERR;
-        }
-      if (processor != NULL)
-        if (GNUNET_OK != processor (processor_cls, rsize, results))
-          break;
-      total++;
-    }
-  mysql_stmt_reset (s->statement);
-  return total;
-}
+#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
+  "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
+  "WHERE repl=? AND "\
+  " (rvalue>=? OR"\
+  "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
+  "ORDER BY rvalue ASC "\
+  "LIMIT 1"
+  struct GNUNET_MYSQL_StatementHandle *select_replication;
 
+#define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
+  struct GNUNET_MYSQL_StatementHandle *max_repl;
 
-/**
- * Run a prepared statement that does NOT produce results.
- *
- * @param plugin plugin context
- * @param s statement to run
- * @param insert_id NULL or address where to store the row ID of whatever
- *        was inserted (only for INSERT statements!)
- * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
- *        values (size + buffer-reference for pointers); terminated
- *        with "-1"
- * @return GNUNET_SYSERR on error, otherwise
- *         the number of successfully affected rows
- */
-static int
-prepared_statement_run (struct Plugin *plugin,
-                       struct GNUNET_MysqlStatementHandle *s,
-                       unsigned long long *insert_id, ...)
-{
-  va_list ap;
-  int affected;
+#define GET_ALL_KEYS "SELECT hash from gn090"
+  struct GNUNET_MYSQL_StatementHandle *get_all_keys;
 
-  if (GNUNET_OK != prepare_statement (plugin, s))
-    return GNUNET_SYSERR;
-  va_start (ap, insert_id);
-  if (GNUNET_OK != init_params (plugin, s, ap))
-    {
-      va_end (ap);
-      return GNUNET_SYSERR;
-    }
-  va_end (ap);
-  affected = mysql_stmt_affected_rows (s->statement);
-  if (NULL != insert_id)
-    *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
-  mysql_stmt_reset (s->statement);
-  return affected;
-}
+};
 
 
 /**
@@ -824,185 +222,23 @@ prepared_statement_run (struct Plugin *plugin,
  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
  */
 static int
-do_delete_entry (struct Plugin *plugin,
-                unsigned long long uid)
+do_delete_entry (struct Plugin *plugin, unsigned long long uid)
 {
   int ret;
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Deleting value %llu from gn090 table\n",
-             uid);
-#endif
-  ret = prepared_statement_run (plugin,
-                               plugin->delete_entry_by_uid,
-                               NULL,
-                               MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
-                               -1);
-  if (ret > 0)
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
+              uid);
+  ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
+                                            plugin->delete_entry_by_uid, NULL,
+                                            MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
+  if (ret >= 0)
     return GNUNET_OK;
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-             "Deleting value %llu from gn090 table failed\n",
-             uid);
+              "Deleting value %llu from gn090 table failed\n", uid);
   return ret;
 }
 
 
-/**
- * Function that simply returns GNUNET_OK
- *
- * @param cls closure, not used
- * @param num_values not used
- * @param values not used
- * @return GNUNET_OK
- */
-static int
-return_ok (void *cls, 
-          unsigned int num_values, 
-          MYSQL_BIND * values)
-{
-  return GNUNET_OK;
-}
-
-
-/**
- * Continuation of "mysql_next_request".
- *
- * @param next_cls the next context
- * @param tc the task context (unused)
- */
-static void 
-mysql_next_request_cont (void *next_cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct NextRequestClosure *nrc = next_cls;
-  struct Plugin *plugin;
-  int ret;
-  unsigned int type;
-  unsigned int priority;
-  unsigned int anonymity;
-  unsigned long long exp;
-  unsigned long hashSize;
-  unsigned long size;
-  unsigned long long uid;
-  char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
-  GNUNET_HashCode key;
-  struct GNUNET_TIME_Absolute expiration;
-  MYSQL_BIND *rbind = nrc->rbind;
-
-  plugin = nrc->plugin;
-  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-  plugin->next_task_nc = NULL;
-
-  if (GNUNET_YES == nrc->end_it) 
-    goto END_SET;
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->now = GNUNET_TIME_absolute_get ();
-  hashSize = sizeof (GNUNET_HashCode);
-  memset (nrc->rbind, 0, sizeof (nrc->rbind));
-  rbind = nrc->rbind;
-  rbind[0].buffer_type = MYSQL_TYPE_LONG;
-  rbind[0].buffer = &type;
-  rbind[0].is_unsigned = 1;
-  rbind[1].buffer_type = MYSQL_TYPE_LONG;
-  rbind[1].buffer = &priority;
-  rbind[1].is_unsigned = 1;
-  rbind[2].buffer_type = MYSQL_TYPE_LONG;
-  rbind[2].buffer = &anonymity;
-  rbind[2].is_unsigned = 1;
-  rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[3].buffer = &exp;
-  rbind[3].is_unsigned = 1;
-  rbind[4].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[4].buffer = &key;
-  rbind[4].buffer_length = hashSize;
-  rbind[4].length = &hashSize;
-  rbind[5].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[5].buffer = value;
-  rbind[5].buffer_length = size = sizeof (value);
-  rbind[5].length = &size;
-  rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[6].buffer = &uid;
-  rbind[6].is_unsigned = 1;
-
-  if (GNUNET_OK != nrc->prep (nrc->prep_cls,
-                             nrc))
-    goto END_SET;
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  GNUNET_assert (size <= sizeof(value));
-  if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
-       (hashSize != sizeof (GNUNET_HashCode)) )
-    {
-      GNUNET_break (0);
-      goto END_SET;
-    }    
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
-             (unsigned int) size,
-             GNUNET_h2s (&key),
-             priority,
-             anonymity,
-             exp);
-#endif
-  expiration.abs_value = exp;
-  ret = nrc->dviter (nrc->dviter_cls, nrc,
-                    &key,
-                    size, value,
-                    type, priority, anonymity, expiration,
-                    uid);
-  if (ret == GNUNET_SYSERR)
-    {
-      nrc->end_it = GNUNET_YES;
-      return;
-    }
-  if (ret == GNUNET_NO)
-    {
-      do_delete_entry (plugin, uid);
-      if (size != 0)
-       plugin->env->duc (plugin->env->cls,
-                         - size);
-    }
-  return;
- END_SET:
-  /* call dviter with "end of set" */
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->dviter (nrc->dviter_cls, 
-              NULL, NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->prep (nrc->prep_cls, NULL);
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  GNUNET_free (nrc);
-}
-
-
-/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-static void 
-mysql_plugin_next_request (void *next_cls,
-                          int end_it)
-{
-  struct NextRequestClosure *nrc = next_cls;
-
-  if (GNUNET_YES == end_it)
-    nrc->end_it = GNUNET_YES;
-  nrc->plugin->next_task_nc = nrc;
-  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
-                                                    nrc);
-}  
-
-
 /**
  * Get an estimate of how much space the database is
  * currently using.
@@ -1011,7 +247,7 @@ mysql_plugin_next_request (void *next_cls,
  * @return number of bytes used on disk
  */
 static unsigned long long
-mysql_plugin_get_size (void *cls)
+mysql_plugin_estimate_size (void *cls)
 {
   struct Plugin *plugin = cls;
   MYSQL_BIND cbind[1];
@@ -1022,12 +258,8 @@ mysql_plugin_get_size (void *cls)
   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
   cbind[0].buffer = &total;
   cbind[0].is_unsigned = GNUNET_NO;
-  if (GNUNET_OK != 
-      prepared_statement_run_select (plugin,
-                                    plugin->get_size,
-                                    1, cbind, 
-                                    &return_ok, NULL,
-                                    -1))
+  if (GNUNET_OK !=
+      GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
     return 0;
   return total;
 }
@@ -1049,60 +281,50 @@ mysql_plugin_get_size (void *cls)
  * @return GNUNET_OK on success
  */
 static int
-mysql_plugin_put (void *cls,
-                 const GNUNET_HashCode * key,
-                 uint32_t size,
-                 const void *data,
-                 enum GNUNET_BLOCK_Type type,
-                 uint32_t priority,
-                 uint32_t anonymity,
-                 uint32_t replication,
-                 struct GNUNET_TIME_Absolute expiration,
-                 char **msg)
+mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
+                  const void *data, enum GNUNET_BLOCK_Type type,
+                  uint32_t priority, uint32_t anonymity, uint32_t replication,
+                  struct GNUNET_TIME_Absolute expiration, char **msg)
 {
   struct Plugin *plugin = cls;
   unsigned int irepl = replication;
-  unsigned int itype = type;
   unsigned int ipriority = priority;
   unsigned int ianonymity = anonymity;
   unsigned long long lexpiration = expiration.abs_value;
+  unsigned long long lrvalue =
+      (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                     UINT64_MAX);
   unsigned long hashSize;
   unsigned long hashSize2;
   unsigned long lsize;
-  GNUNET_HashCode vhash;
+  struct GNUNET_HashCode vhash;
 
   if (size > MAX_DATUM_SIZE)
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  hashSize = sizeof (GNUNET_HashCode);
-  hashSize2 = sizeof (GNUNET_HashCode);
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  hashSize = sizeof (struct GNUNET_HashCode);
+  hashSize2 = sizeof (struct GNUNET_HashCode);
   lsize = size;
   GNUNET_CRYPTO_hash (data, size, &vhash);
   if (GNUNET_OK !=
-      prepared_statement_run (plugin,
-                             plugin->insert_entry,
-                             NULL,
-                             MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
-                             MYSQL_TYPE_LONG, &itype, GNUNET_YES,
-                             MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
-                             MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
-                             MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
-                             MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
-                             MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
-                             MYSQL_TYPE_BLOB, data, lsize, &lsize, 
-                             -1))
-    return GNUNET_SYSERR;    
-#if DEBUG_MYSQL
+      GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
+                              MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
+                              MYSQL_TYPE_LONG, &type, GNUNET_YES,
+                              MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
+                              MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
+                              MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
+                              MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
+                              MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                              MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
+                              MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
+    return GNUNET_SYSERR;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Inserted value `%s' with size %u into gn090 table\n",
-             GNUNET_h2s (key),
-             (unsigned int) size);
-#endif
+              "Inserted value `%s' with size %u into gn090 table\n",
+              GNUNET_h2s (key), (unsigned int) size);
   if (size > 0)
-    plugin->env->duc (plugin->env->cls,
-                     size);
+    plugin->env->duc (plugin->env->cls, size);
   return GNUNET_OK;
 }
 
@@ -1131,435 +353,564 @@ mysql_plugin_put (void *cls,
  * @return GNUNET_OK on success
  */
 static int
-mysql_plugin_update (void *cls,
-                    uint64_t uid,
-                    int delta, 
-                    struct GNUNET_TIME_Absolute expire,
-                    char **msg)
+mysql_plugin_update (void *cls, uint64_t uid, int delta,
+                     struct GNUNET_TIME_Absolute expire, char **msg)
 {
   struct Plugin *plugin = cls;
   unsigned long long vkey = uid;
   unsigned long long lexpire = expire.abs_value;
   int ret;
 
-#if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Updating value %llu adding %d to priority and maxing exp at %llu\n",
-             vkey,
-             delta,
-             lexpire);
-#endif
-  ret = prepared_statement_run (plugin,
-                               plugin->update_entry,
-                               NULL,
-                               MYSQL_TYPE_LONG, &delta, GNUNET_NO,
-                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
-                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
-                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
-                               -1);
+              "Updating value %llu adding %d to priority and maxing exp at %llu\n",
+              vkey, delta, lexpire);
+  ret =
+    GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
+                                        MYSQL_TYPE_LONG, &delta, GNUNET_NO,
+                              MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+                              MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+                              MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
   if (ret != GNUNET_OK)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 "Failed to update value %llu\n",
-                 vkey);
-    }
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
+                vkey);
+  }
   return ret;
 }
 
 
-struct GetContext
+/**
+ * Run the given select statement and call 'proc' on the resulting
+ * values (which must be in particular positions).
+ *
+ * @param plugin the plugin handle
+ * @param stmt select statement to run
+ * @param proc function to call on result
+ * @param proc_cls closure for proc
+ * @param ... arguments to initialize stmt
+ */
+static void
+execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
+                PluginDatumProcessor proc, void *proc_cls, ...)
 {
-  GNUNET_HashCode key;
-  GNUNET_HashCode vhash;
-
-  unsigned int prio;
+  va_list ap;
+  int ret;
+  unsigned int type;
+  unsigned int priority;
   unsigned int anonymity;
-  unsigned long long expiration;
-  unsigned long long vkey;
-  unsigned long long total;
-  unsigned int off;
-  unsigned int count;
-  int have_vhash;
-};
+  unsigned long long exp;
+  unsigned long hashSize;
+  unsigned long size;
+  unsigned long long uid;
+  char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
+  struct GNUNET_HashCode key;
+  struct GNUNET_TIME_Absolute expiration;
+  MYSQL_BIND rbind[7];
 
+  hashSize = sizeof (struct GNUNET_HashCode);
+  memset (rbind, 0, sizeof (rbind));
+  rbind[0].buffer_type = MYSQL_TYPE_LONG;
+  rbind[0].buffer = &type;
+  rbind[0].is_unsigned = 1;
+  rbind[1].buffer_type = MYSQL_TYPE_LONG;
+  rbind[1].buffer = &priority;
+  rbind[1].is_unsigned = 1;
+  rbind[2].buffer_type = MYSQL_TYPE_LONG;
+  rbind[2].buffer = &anonymity;
+  rbind[2].is_unsigned = 1;
+  rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[3].buffer = &exp;
+  rbind[3].is_unsigned = 1;
+  rbind[4].buffer_type = MYSQL_TYPE_BLOB;
+  rbind[4].buffer = &key;
+  rbind[4].buffer_length = hashSize;
+  rbind[4].length = &hashSize;
+  rbind[5].buffer_type = MYSQL_TYPE_BLOB;
+  rbind[5].buffer = value;
+  rbind[5].buffer_length = size = sizeof (value);
+  rbind[5].length = &size;
+  rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[6].buffer = &uid;
+  rbind[6].is_unsigned = 1;
 
-static int
-get_statement_prepare (void *cls,
-                      struct NextRequestClosure *nrc)
-{
-  struct GetContext *gc = cls;
-  struct Plugin *plugin;
-  int ret;
-  unsigned long hashSize;
-  
-  if (NULL == nrc)
-    {
-      GNUNET_free (gc);
-      return GNUNET_NO;
-    }
-  if (gc->count == gc->total)
-    return GNUNET_NO;
-  plugin = nrc->plugin;
-  hashSize = sizeof (GNUNET_HashCode);
-  if (++gc->off >= gc->total)
-    gc->off = 0;
-#if DEBUG_MYSQL
+  va_start (ap, proc_cls);
+  ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
+  va_end (ap);
+  if (ret <= 0)
+  {
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    return;
+  }
+  GNUNET_assert (size <= sizeof (value));
+  if ((rbind[4].buffer_length != sizeof (struct GNUNET_HashCode)) ||
+      (hashSize != sizeof (struct GNUNET_HashCode)))
+  {
+    GNUNET_break (0);
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
-             gc->count+1,
-             gc->total,
-             gc->off,
-             GNUNET_h2s (&gc->key));  
-#endif
-  if (nrc->type != 0)
-    {
-      if (gc->have_vhash)
-       {
-         ret = prepared_statement_run_select (plugin,
-                                              plugin->select_entry_by_hash_vhash_and_type, 
-                                              7, nrc->rbind, 
-                                              &return_ok, NULL, 
-                                              MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
-                                              MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
-                                              MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
-                                              MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
-                                              -1);
-       }
-      else
-       {
-         ret =
-           prepared_statement_run_select (plugin,
-                                          plugin->select_entry_by_hash_and_type, 
-                                          7, nrc->rbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
-                                          MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
-                                          MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
-                                          -1);
-       }
-    }
-  else
-    {
-      if (gc->have_vhash)
-       {
-         ret =
-           prepared_statement_run_select (plugin,
-                                          plugin->select_entry_by_hash_and_vhash, 
-                                          7, nrc->rbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
-                                          MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
-                                          MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
-                                          -1);
-       }
-      else
-       {
-         ret =
-           prepared_statement_run_select (plugin,
-                                          plugin->select_entry_by_hash, 
-                                          7, nrc->rbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
-                                          MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
-                                          -1);
-       }
-    }
-  gc->count++;
-  return ret;
+              "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
+              (unsigned int) size, GNUNET_h2s (&key), priority, anonymity, exp);
+  GNUNET_assert (size < MAX_DATUM_SIZE);
+  expiration.abs_value = exp;
+  ret =
+      proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
+            uid);
+  if (ret == GNUNET_NO)
+  {
+    do_delete_entry (plugin, uid);
+    if (size != 0)
+      plugin->env->duc (plugin->env->cls, -size);
+  }
 }
 
 
+
 /**
- * Iterate over the results for a particular key in the datastore.
+ * Get one of the results for a particular key in the datastore.
  *
  * @param cls closure
- * @param key maybe NULL (to match all entries)
+ * @param offset offset of the result (modulo num-results);
+ *               specific ordering does not matter for the offset
+ * @param key key to match, never NULL
  * @param vhash hash of the value, maybe NULL (to
  *        match all values that have the right key).
  *        Note that for DBlocks there is no difference
  *        betwen key and vhash, but for other blocks
  *        there may be!
  * @param type entries of which type are relevant?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ *     Use 0 for any type.
+ * @param proc function to call on the matching value,
+ *        with NULL for if no value matches
+ * @param proc_cls closure for proc
  */
 static void
-mysql_plugin_get (void *cls,
-                 const GNUNET_HashCode *key,
-                 const GNUNET_HashCode *vhash,
-                 enum GNUNET_BLOCK_Type type,
-                 PluginIterator iter, void *iter_cls)
+mysql_plugin_get_key (void *cls, uint64_t offset, const struct GNUNET_HashCode * key,
+                      const struct GNUNET_HashCode * vhash,
+                      enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
+                      void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  unsigned int itype = type;
   int ret;
   MYSQL_BIND cbind[1];
-  struct GetContext *gc;
-  struct NextRequestClosure *nrc;
   long long total;
   unsigned long hashSize;
+  unsigned long hashSize2;
+  unsigned long long off;
 
   GNUNET_assert (key != NULL);
-  if (iter == NULL) 
-    return;
-  hashSize = sizeof (GNUNET_HashCode);
+  GNUNET_assert (NULL != proc);
+  hashSize = sizeof (struct GNUNET_HashCode);
+  hashSize2 = sizeof (struct GNUNET_HashCode);
   memset (cbind, 0, sizeof (cbind));
   total = -1;
   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
   cbind[0].buffer = &total;
   cbind[0].is_unsigned = GNUNET_NO;
   if (type != 0)
+  {
+    if (vhash != NULL)
+    {
+      ret =
+       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
+                                         plugin->
+                                         count_entry_by_hash_vhash_and_type, 1,
+                                                   cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
+                                         &hashSize, MYSQL_TYPE_BLOB, vhash,
+                                         hashSize2, &hashSize2, MYSQL_TYPE_LONG,
+                                         &type, GNUNET_YES, -1);
+    }
+    else
     {
-      if (vhash != NULL)
-        {
-          ret =
-            prepared_statement_run_select (plugin,
-                                          plugin->count_entry_by_hash_vhash_and_type, 
-                                          1, cbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
-                                          MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
-                                          MYSQL_TYPE_LONG, &itype, GNUNET_YES,
-                                          -1);
-        }
-      else
-        {
-          ret =
-            prepared_statement_run_select (plugin,
-                                          plugin->count_entry_by_hash_and_type, 
-                                          1, cbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
-                                          MYSQL_TYPE_LONG, &itype, GNUNET_YES,
-                                          -1);
-        }
+      ret =
+       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
+                                         plugin->count_entry_by_hash_and_type,
+                                                   1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
+                                         hashSize, &hashSize, MYSQL_TYPE_LONG,
+                                         &type, GNUNET_YES, -1);
     }
+  }
   else
+  {
+    if (vhash != NULL)
     {
-      if (vhash != NULL)
-        {
-          ret =
-            prepared_statement_run_select (plugin,
-                                          plugin->count_entry_by_hash_and_vhash, 
-                                          1, cbind,
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
-                                          MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
-                                          -1);
-
-        }
-      else
-        {
-          ret =
-            prepared_statement_run_select (plugin,
-                                          plugin->count_entry_by_hash,
-                                          1, cbind, 
-                                          &return_ok, NULL, 
-                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
-                                          -1);
-        }
+      ret =
+       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
+                                         plugin->count_entry_by_hash_and_vhash,
+                                                   1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
+                                         hashSize, &hashSize, MYSQL_TYPE_BLOB,
+                                         vhash, hashSize2, &hashSize2, -1);
+
     }
-  if ((ret != GNUNET_OK) || (0 >= total))
+    else
     {
-      iter (iter_cls, 
-           NULL, NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      return;
+      ret =
+       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
+                                                   cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
+                                         &hashSize, -1);
     }
-#if DEBUG_MYSQL
+  }
+  if ((ret != GNUNET_OK) || (0 >= total))
+  {
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    return;
+  }
+  offset = offset % total;
+  off = (unsigned long long) offset;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Iterating over %lld results for GET `%s'\n",
-             total,
-             GNUNET_h2s (key));
-#endif
-  gc = GNUNET_malloc (sizeof (struct GetContext));
-  gc->key = *key;
-  if (vhash != NULL)
+              "Obtaining %llu/%lld result for GET `%s'\n", off, total,
+              GNUNET_h2s (key));
+  if (type != GNUNET_BLOCK_TYPE_ANY)
+  {
+    if (NULL != vhash)
     {
-      gc->have_vhash = GNUNET_YES;
-      gc->vhash = *vhash;
+      execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
+                      proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                      MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+                      MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+                      &off, GNUNET_YES, -1);
     }
-  gc->total = total;
-  gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
-  
-
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->type = type;  
-  nrc->dviter = iter;
-  nrc->dviter_cls = iter_cls;
-  nrc->prep = &get_statement_prepare;
-  nrc->prep_cls = gc;
-  mysql_plugin_next_request (nrc, GNUNET_NO);
+    else
+    {
+      execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
+                      proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                      MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+                      &off, GNUNET_YES, -1);
+    }
+  }
+  else
+  {
+    if (NULL != vhash)
+    {
+      execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
+                      proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                      MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+                      MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
+    }
+    else
+    {
+      execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
+                      MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                      MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
+    }
+  }
 }
 
 
 /**
- * Run the prepared statement to get the next data item ready.
- * 
- * @param cls not used
- * @param nrc closure for the next request iterator
- * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
+ * Get a zero-anonymity datum from the datastore.
+ *
+ * @param cls our "struct Plugin*"
+ * @param offset offset of the result
+ * @param type entries of which type should be considered?
+ *        Use 0 for any type.
+ * @param proc function to call on a matching value or NULL
+ * @param proc_cls closure for iter
  */
-static int
-iterator_zero_prepare (void *cls,
-                      struct NextRequestClosure *nrc)
+static void
+mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
+                                 enum GNUNET_BLOCK_Type type,
+                                 PluginDatumProcessor proc, void *proc_cls)
 {
+  struct Plugin *plugin = cls;
+  unsigned long long rvalue =
+      (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                     UINT64_MAX);
+
+  execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
+                  &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
+                  MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+                  &rvalue, GNUNET_YES, -1);
+}
+
+
+/**
+ * Context for 'repl_proc' function.
+ */
+struct ReplCtx
+{
+
+  /**
+   * Plugin handle.
+   */
   struct Plugin *plugin;
-  int ret;
 
-  if (nrc == NULL)
-    return GNUNET_NO;
-  plugin = nrc->plugin;
-  ret = prepared_statement_run_select (plugin,
-                                      plugin->zero_iter,
-                                      7, nrc->rbind,
-                                      &return_ok, NULL,
-                                      MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
-                                      -1);
-  nrc->count++;
-  return ret;
-}
+  /**
+   * Function to call for the result (or the NULL).
+   */
+  PluginDatumProcessor proc;
+
+  /**
+   * Closure for proc.
+   */
+  void *proc_cls;
+};
 
 
 /**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Wrapper for the processor for 'mysql_plugin_get_replication'.
+ * Decrements the replication counter and calls the original
+ * iterator.
  *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ *         (continue on call to "next", of course),
+ *         GNUNET_NO to delete the item and continue (if supported)
  */
-static void
-mysql_plugin_iter_zero_anonymity (void *cls,
-                                 enum GNUNET_BLOCK_Type type,
-                                 PluginIterator iter,
-                                 void *iter_cls)
+static int
+repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
+           const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
+           uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
+           uint64_t uid)
 {
-  struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
-
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->type = type;  
-  nrc->dviter = iter;
-  nrc->dviter_cls = iter_cls;
-  nrc->prep = &iterator_zero_prepare;
-  mysql_plugin_next_request (nrc, GNUNET_NO);
+  struct ReplCtx *rc = cls;
+  struct Plugin *plugin = rc->plugin;
+  unsigned long long oid;
+  int ret;
+  int iret;
+
+  ret =
+      rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
+                expiration, uid);
+  if (NULL != key)
+  {
+    oid = (unsigned long long) uid;
+    iret =
+      GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
+                                          MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
+    if (iret == GNUNET_SYSERR)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Failed to reduce replication counter\n");
+      return GNUNET_SYSERR;
+    }
+  }
+  return ret;
 }
 
 
 /**
- * Run the SELECT statement for the replication function.
- * 
- * @param cls the 'struct Plugin'
- * @param nrc the context (not used)
+ * Get a random item for replication.  Returns a single, not expired,
+ * random item from those with the highest replication counters.  The
+ * item's replication counter is decremented by one IF it was positive
+ * before.  Call 'proc' with all values ZERO or NULL if the datastore
+ * is empty.
+ *
+ * @param cls closure
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
-static int
-replication_prepare (void *cls,
-                    struct NextRequestClosure *nrc)
+static void
+mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
+                              void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  unsigned long long nt;
-
-  nt = (unsigned long long) nrc->now.abs_value;
-  return prepared_statement_run_select (plugin,
-                                       plugin->select_replication, 
-                                       6, nrc->rbind, 
-                                       &return_ok, NULL,
-                                       MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
-                                       -1);
+  struct ReplCtx rc;
+  unsigned long long rvalue;
+  unsigned long repl;
+  MYSQL_BIND results;
+
+  rc.plugin = plugin;
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+  memset (&results, 0, sizeof (results));
+  results.buffer_type = MYSQL_TYPE_LONG;
+  results.buffer = &repl;
+  results.is_unsigned = GNUNET_YES;
+
+  if (1 !=
+      GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
+  {
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    return;
+  }
+
+  rvalue =
+      (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                     UINT64_MAX);
+  execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
+                  MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
+                  &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
+                  MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
+
 }
 
 
 /**
- * Get a random item for replication.  Returns a single, not expired, random item
- * from those with the highest replication counters.  The item's 
- * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get all of the keys in the datastore.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call on each key
+ * @param proc_cls closure for proc
  */
 static void
-mysql_plugin_replication_get (void *cls,
-                             PluginIterator iter, void *iter_cls)
+mysql_plugin_get_keys (void *cls,
+                       PluginKeyProcessor proc,
+                       void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure nrc;
-
-  memset (&nrc, 0, sizeof (nrc));
-  nrc.plugin = plugin;
-  nrc.now = GNUNET_TIME_absolute_get ();
-  nrc.prep = &replication_prepare;
-  nrc.prep_cls = plugin;
-  nrc.type = 0;
-  nrc.dviter = iter;
-  nrc.dviter_cls = iter_cls;
-  nrc.end_it = GNUNET_NO;
-  mysql_next_request_cont (&nrc, NULL);
+  const char *query = "SELECT hash FROM gn090";
+  int ret;
+  MYSQL_STMT *statement;
+  struct GNUNET_HashCode key;
+  MYSQL_BIND cbind[1];
+  unsigned long length;
+  statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
+                                              plugin->get_all_keys);
+  if (statement == NULL)
+  {
+    GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    return;
+  }
+  if (mysql_stmt_prepare (statement, query, strlen (query)))
+  {
+    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
+                     _("Failed to prepare statement `%s'\n"), query);
+    GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    return;
+  }
+  GNUNET_assert (proc != NULL);
+  if (mysql_stmt_execute (statement))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("`%s' for `%s' failed at %s:%d with error: %s\n"),
+                "mysql_stmt_execute", query, __FILE__, __LINE__,
+                mysql_stmt_error (statement));
+    GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    return;
+  }
+  memset (cbind, 0, sizeof (cbind));
+  cbind[0].buffer_type = MYSQL_TYPE_BLOB;
+  cbind[0].buffer = &key;
+  cbind[0].buffer_length = sizeof (key);
+  cbind[0].length = &length;
+  cbind[0].is_unsigned = GNUNET_NO;
+  if (mysql_stmt_bind_result (statement, cbind))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("`%s' failed at %s:%d with error: %s\n"),
+                "mysql_stmt_bind_result", __FILE__, __LINE__,
+                mysql_stmt_error (statement));
+    GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    return;
+  }
+  while (0 == (ret = mysql_stmt_fetch (statement)))
+  {
+    if (sizeof (struct GNUNET_HashCode) == length)
+      proc (proc_cls, &key, 1);    
+  }
+  if (ret != MYSQL_NO_DATA)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("`%s' failed at %s:%d with error: %s\n"),
+                    "mysql_stmt_fetch", __FILE__, __LINE__,
+                    mysql_stmt_error (statement));    
+    GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    return;
+  }
+  mysql_stmt_reset (statement);
 }
 
 
 /**
- * Run the SELECT statement for the expiration function.
- * 
- * @param cls the 'struct Plugin'
- * @param nrc the context (not used)
- * @return GNUNET_OK on success, GNUNET_NO if there are
- *         no more values, GNUNET_SYSERR on error
+ * Context for 'expi_proc' function.
+ */
+struct ExpiCtx
+{
+
+  /**
+   * Plugin handle.
+   */
+  struct Plugin *plugin;
+
+  /**
+   * Function to call for the result (or the NULL).
+   */
+  PluginDatumProcessor proc;
+
+  /**
+   * Closure for proc.
+   */
+  void *proc_cls;
+};
+
+
+
+/**
+ * Wrapper for the processor for 'mysql_plugin_get_expiration'.
+ * If no expired value was found, we do a second query for
+ * low-priority content.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ *         (continue on call to "next", of course),
+ *         GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-expiration_prepare (void *cls,
-                   struct NextRequestClosure *nrc)
+expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
+           const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
+           uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
+           uint64_t uid)
 {
-  struct Plugin *plugin = cls;
-  long long nt;
+  struct ExpiCtx *rc = cls;
+  struct Plugin *plugin = rc->plugin;
 
-  if (NULL == nrc)
-    return GNUNET_NO;
-  nt = (long long) nrc->now.abs_value;
-  return prepared_statement_run_select
-    (plugin,
-     plugin->select_expiration, 
-     7, nrc->rbind, 
-     &return_ok, NULL,
-     MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
-     -1);
+  if (NULL == key)
+  {
+    execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
+                    -1);
+    return GNUNET_SYSERR;
+  }
+  return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
+                   expiration, uid);
 }
 
 
 /**
  * Get a random item for expiration.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
 static void
-mysql_plugin_expiration_get (void *cls,
-                            PluginIterator iter, void *iter_cls)
+mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
+                             void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure nrc;
-
-  memset (&nrc, 0, sizeof (nrc));
-  nrc.plugin = plugin;
-  nrc.now = GNUNET_TIME_absolute_get ();
-  nrc.prep = &expiration_prepare;
-  nrc.prep_cls = plugin;
-  nrc.type = 0;
-  nrc.dviter = iter;
-  nrc.dviter_cls = iter_cls;
-  nrc.end_it = GNUNET_NO;
-  mysql_next_request_cont (&nrc, NULL);
+  long long nt;
+  struct ExpiCtx rc;
+
+  rc.plugin = plugin;
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+  nt = (long long) GNUNET_TIME_absolute_get ().abs_value;
+  execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
+                  MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
+
 }
 
 
@@ -1568,14 +919,13 @@ mysql_plugin_expiration_get (void *cls,
  *
  * @param cls the "struct Plugin*"
  */
-static void 
+static void
 mysql_plugin_drop (void *cls)
 {
   struct Plugin *plugin = cls;
 
-  if (GNUNET_OK != run_statement (plugin,
-                                 "DROP TABLE gn090"))
-    return;           /* error */
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
+    return;                     /* error */
   plugin->env->duc (plugin->env->cls, 0);
 }
 
@@ -1595,76 +945,78 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
 
   plugin = GNUNET_malloc (sizeof (struct Plugin));
   plugin->env = env;
-  plugin->cnffile = get_my_cnf_path (env->cfg);
-  if (GNUNET_OK != iopen (plugin))
-    {
-      iclose (plugin);
-      GNUNET_free_non_null (plugin->cnffile);
-      GNUNET_free (plugin);
-      return NULL;
-    }
-#define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
-#define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
-  if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
-             " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
-             " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
-             " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
-             " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
-             " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
-             " hash BINARY(64) NOT NULL DEFAULT '',"
-             " vhash BINARY(64) NOT NULL DEFAULT '',"
-             " value BLOB NOT NULL DEFAULT '',"
-             " uid BIGINT NOT NULL AUTO_INCREMENT,"
-             " PRIMARY KEY (uid),"
-             " INDEX idx_hash (hash(64)),"
-             " INDEX idx_hash_uid (hash(64),uid),"
-             " INDEX idx_hash_vhash (hash(64),vhash(64)),"
-             " INDEX idx_hash_type_uid (hash(64),type,uid),"
-             " INDEX idx_prio (prio),"
-             " INDEX idx_repl (repl),"
-             " INDEX idx_expire_prio (expire,prio),"
-             " INDEX idx_anonLevel_uid (anonLevel,uid)"
-             ") ENGINE=InnoDB") ||
-      MRUNS ("SET AUTOCOMMIT = 1") ||
+  plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
+  if (NULL == plugin->mc)
+  {
+    GNUNET_free (plugin);
+    return NULL;
+  }
+#define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
+#define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
+  if (MRUNS
+      ("CREATE TABLE IF NOT EXISTS gn090 ("
+       " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+       " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+       " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+       " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+       " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
+       " rvalue BIGINT UNSIGNED NOT NULL,"
+       " hash BINARY(64) NOT NULL DEFAULT '',"
+       " vhash BINARY(64) NOT NULL DEFAULT '',"
+       " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
+       " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
+       " INDEX idx_hash_uid (hash(64),uid),"
+       " INDEX idx_hash_vhash (hash(64),vhash(64)),"
+       " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
+       " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
+       " INDEX idx_expire (expire),"
+       " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
+       ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
-      PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
-      || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
-      || PINIT (plugin->select_entry_by_hash_vhash_and_type,
-                SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
-      || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
-      || PINIT (plugin->get_size, SELECT_SIZE)
-      || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
-      || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
+      PINIT (plugin->select_entry_by_hash_and_vhash,
+             SELECT_ENTRY_BY_HASH_AND_VHASH) ||
+      PINIT (plugin->select_entry_by_hash_and_type,
+             SELECT_ENTRY_BY_HASH_AND_TYPE) ||
+      PINIT (plugin->select_entry_by_hash_vhash_and_type,
+             SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
+      PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
+      PINIT (plugin->get_size, SELECT_SIZE) ||
+      PINIT (plugin->count_entry_by_hash_and_vhash,
+             COUNT_ENTRY_BY_HASH_AND_VHASH) ||
+      PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
-                COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
-      || PINIT (plugin->update_entry, UPDATE_ENTRY)
-      || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
-      || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
-      || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
-    {
-      iclose (plugin);
-      GNUNET_free_non_null (plugin->cnffile);
-      GNUNET_free (plugin);
-      return NULL;
-    }
+                COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
+      PINIT (plugin->update_entry, UPDATE_ENTRY) ||
+      PINIT (plugin->dec_repl, DEC_REPL) ||
+      PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
+      PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
+      PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
+      PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
+      PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
+      PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
+  {
+    GNUNET_MYSQL_context_destroy (plugin->mc);
+    GNUNET_free (plugin);
+    return NULL;
+  }
 #undef PINIT
 #undef MRUNS
 
   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
   api->cls = plugin;
-  api->get_size = &mysql_plugin_get_size;
+  api->estimate_size = &mysql_plugin_estimate_size;
   api->put = &mysql_plugin_put;
-  api->next_request = &mysql_plugin_next_request;
-  api->get = &mysql_plugin_get;
-  api->replication_get = &mysql_plugin_replication_get;
-  api->expiration_get = &mysql_plugin_expiration_get;
   api->update = &mysql_plugin_update;
-  api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
+  api->get_key = &mysql_plugin_get_key;
+  api->get_replication = &mysql_plugin_get_replication;
+  api->get_expiration = &mysql_plugin_get_expiration;
+  api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
+  api->get_keys = &mysql_plugin_get_keys;
   api->drop = &mysql_plugin_drop;
-  GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
-                   "mysql", _("Mysql database running\n"));
+  GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
+                   _("Mysql database running\n"));
   return api;
 }
 
@@ -1680,19 +1032,9 @@ libgnunet_plugin_datastore_mysql_done (void *cls)
   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
   struct Plugin *plugin = api->cls;
 
-  iclose (plugin);
-  if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (plugin->next_task);
-      plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-      plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
-      GNUNET_free (plugin->next_task_nc);
-      plugin->next_task_nc = NULL;
-    }
-  GNUNET_free_non_null (plugin->cnffile);
+  GNUNET_MYSQL_context_destroy (plugin->mc);
   GNUNET_free (plugin);
   GNUNET_free (api);
-  mysql_library_end ();
   return NULL;
 }