fixes
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
index 03fe06b93302179bc926849e9aa3e19bb716d103..51a60a1dc36c77636c05bba845a5e31d7e331259 100644 (file)
  * MANUAL SETUP INSTRUCTIONS
  *
  * 1) in /etc/gnunet.conf, set
- *    <pre>
- *
- *     sqstore = "sqstore_mysql"
- *
- *    </pre>
+ * @verbatim
+ *     [datastore]
+ *     DATABASE = "mysql"
+ * @endverbatim
  * 2) Then access mysql as root,
- *    <pre>
- *
- *    $ mysql -u root -p
- *
- *    </pre>
+ * @verbatim
+     $ mysql -u root -p
+ * @endverbatim
  *    and do the following. [You should replace $USER with the username
  *    that will be running the gnunetd process].
- *    <pre>
- *
+ * @verbatim
       CREATE DATABASE gnunet;
       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
          ON gnunet.* TO $USER@localhost;
       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
       FLUSH PRIVILEGES;
- *
- *    </pre>
+ * @endverbatim
  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
  *    with the following lines
- *    <pre>
-
+ * @verbatim
       [client]
       user=$USER
       password=$the_password_you_like
-
- *    </pre>
+ * @endverbatim
  *
  * Thats it. Note that .my.cnf file is a security risk unless its on
  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
  * 4) Still, perhaps you should briefly try if the DB connection
  *    works. First, login as $USER. Then use,
  *
- *    <pre>
*    $ mysql -u $USER -p $the_password_you_like
*    mysql> use gnunet;
- *    </pre>
+ * @verbatim
+     $ mysql -u $USER -p $the_password_you_like
+     mysql> use gnunet;
+ * @endverbatim
  *
  *    If you get the message &quot;Database changed&quot; it probably works.
  *
  * - The tables can be verified/fixed in two ways;
  *   1) by running mysqlcheck -A, or
  *   2) by executing (inside of mysql using the GNUnet database):
- *   mysql> REPAIR TABLE gn080;
- *   mysql> REPAIR TABLE gn072;
+ * @verbatim
+     mysql> REPAIR TABLE gn090;
+     mysql> REPAIR TABLE gn072;
+ * @endverbatim
  *
  * PROBLEMS?
  *
  * is that mysql is basically operational, that you can connect
  * to it, create tables, issue queries etc.
  *
+ * TODO:
+ * - use FOREIGN KEY for 'uid/vkey'
+ * - consistent naming of uid/vkey
  */
 
 #include "platform.h"
 #include "plugin_datastore.h"
+#include "gnunet_util_lib.h"
+#include <mysql/mysql.h>
 
 #define DEBUG_MYSQL GNUNET_NO
 
    automatically apply a LIMIT on the outermost clause, so we need to
    repeat ourselves quite a bit.  All hail the performance gods (and thanks
    to #mysql on freenode) */
-#define SELECT_IT_LOW_PRIORITY "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
-                               "ORDER BY prio ASC,vkey ASC LIMIT 1) "\
+#define SELECT_IT_LOW_PRIORITY "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
+                               "ORDER BY prio ASC,vkey ASC LIMIT 1) "                          \
                                "UNION "\
-                               "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
+                               "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
                                "ORDER BY prio ASC,vkey ASC LIMIT 1)"\
                                "ORDER BY prio ASC,vkey ASC LIMIT 1"
 
-#define SELECT_IT_NON_ANONYMOUS "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
+#define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
                                 " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
                                 "UNION "\
-                                "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
+                                "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
                                 " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
                                 "ORDER BY prio DESC,vkey DESC LIMIT 1"
 
-#define SELECT_IT_EXPIRATION_TIME "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
+#define SELECT_IT_EXPIRATION_TIME "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
                                   "ORDER BY expire ASC,vkey ASC LIMIT 1) "\
                                   "UNION "\
-                                  "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
+                                  "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
                                   "ORDER BY expire ASC,vkey ASC LIMIT 1)"\
                                   "ORDER BY expire ASC,vkey ASC LIMIT 1"
 
 
-#define SELECT_IT_MIGRATION_ORDER "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
+#define SELECT_IT_MIGRATION_ORDER "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
                                   " AND expire > ? AND type!=3"\
                                   " ORDER BY expire DESC,vkey DESC LIMIT 1) "\
                                   "UNION "\
-                                  "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
+                                  "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
                                   " AND expire > ? AND type!=3"\
                                   " ORDER BY expire DESC,vkey DESC LIMIT 1)"\
                                   "ORDER BY expire DESC,vkey DESC LIMIT 1"
 
-#define SELECT_SIZE "SELECT sum(size) FROM gn080"
 
 
 struct GNUNET_MysqlStatementHandle
@@ -204,6 +203,62 @@ struct GNUNET_MysqlStatementHandle
 
 };
 
+/**
+ * Context for the universal iterator.
+ */
+struct NextRequestClosure;
+
+/**
+ * Type of a function that will prepare
+ * the next iteration.
+ *
+ * @param cls closure
+ * @param nc the next context; NULL for the last
+ *         call which gives the callback a chance to
+ *         clean up the closure
+ * @return GNUNET_OK on success, GNUNET_NO if there are
+ *         no more values, GNUNET_SYSERR on error
+ */
+typedef int (*PrepareFunction)(void *cls,
+                              struct NextRequestClosure *nc);
+
+
+struct NextRequestClosure
+{
+  struct Plugin *plugin;
+
+  struct GNUNET_TIME_Absolute now;
+
+  /**
+   * Function to call to prepare the next
+   * iteration.
+   */
+  PrepareFunction prep;
+
+  /**
+   * Closure for prep.
+   */
+  void *prep_cls;
+
+  MYSQL_BIND rbind[6];
+
+  unsigned int type;
+  
+  unsigned int iter_select;
+
+  PluginIterator dviter;
+
+  void *dviter_cls;
+
+  unsigned int last_prio;
+
+  unsigned long long last_expire;
+
+  unsigned long long last_vkey;
+
+  int end_it;
+};
+
 
 /**
  * Context for all functions in this plugin.
@@ -226,72 +281,80 @@ struct Plugin
    */
   char *cnffile;
 
+  /**
+   * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
+   */
+  struct NextRequestClosure *next_task_nc;
+
+  /**
+   * Pending task with scheduler for running the next request.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier next_task;
+
   /**
    * Statements dealing with gn072 table 
    */
 #define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?"
   struct GNUNET_MysqlStatementHandle *select_value;
 
-#define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"o
+#define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"
   struct GNUNET_MysqlStatementHandle *delete_value;
 
 #define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)"
   struct GNUNET_MysqlStatementHandle *insert_value;
 
   /**
-   * Statements dealing with gn080 table 
+   * Statements dealing with gn090 table 
    */
-#define INSERT_ENTRY "INSERT INTO gn080 (size,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)"
+#define INSERT_ENTRY "INSERT INTO gn090 (type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?)"
   struct GNUNET_MysqlStatementHandle *insert_entry;
   
-#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn080 WHERE vkey=?"
+#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn090 WHERE vkey=?"
   struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey;
   
-#define SELECT_ENTRY_BY_HASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
   
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
 
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
   
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
   
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=?"
+#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
 
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
+#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
 
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=? AND type=?"
+#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=? AND type=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
 
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
+#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
 
-#define UPDATE_ENTRY "UPDATE gn080 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
+#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
   struct GNUNET_MysqlStatementHandle *update_entry;
 
-  struct GNUNET_MysqlStatementHandle *iter[4];
-
-  //static unsigned int stat_size;
+#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072"
+  struct GNUNET_MysqlStatementHandle *get_size;
 
-  /**
-   * Size of the mysql database on disk.
-   */
-  unsigned long long content_size;
+  struct GNUNET_MysqlStatementHandle *iter[4];
 
 };
 
 
 /**
  * Obtain the location of ".my.cnf".
+ *
+ * @param cfg our configuration
  * @return NULL on error
  */
 static char *
-get_my_cnf_path (struct GNUNET_ConfigurationHandle *cfg)
+get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   char *cnffile;
   char *home_dir;
@@ -299,6 +362,7 @@ get_my_cnf_path (struct GNUNET_ConfigurationHandle *cfg)
 #ifndef WINDOWS
   struct passwd *pw;
 #endif
+  int configured;
 
 #ifndef WINDOWS
   pw = getpwuid (getuid ());
@@ -308,27 +372,36 @@ get_my_cnf_path (struct GNUNET_ConfigurationHandle *cfg)
                           "getpwuid");
       return NULL;
     }
-  home_dir = GNUNET_strdup (pw->pw_dir);
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_have_value (cfg,
+                                      "datastore-mysql", "CONFIG"))
+    {
+      GNUNET_assert (GNUNET_OK == 
+                    GNUNET_CONFIGURATION_get_value_filename (cfg,
+                                                             "datastore-mysql", "CONFIG", &cnffile));
+      configured = GNUNET_YES;
+    }
+  else
+    {
+      home_dir = GNUNET_strdup (pw->pw_dir);
 #else
-  home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
-  plibc_conv_to_win_path ("~/", home_dir);
+      home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
+      plibc_conv_to_win_path ("~/", home_dir);
 #endif
-  GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
-  GNUNET_free (home_dir);
-  GNUNET_CONFIUGRATION_get_value_filename (cfg,
-                                          "MYSQL", "CONFIG", cnffile,
-                                          &home_dir);
-  GNUNET_free (cnffile);
-  cnffile = home_dir;
+      GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
+      GNUNET_free (home_dir);
+      configured = GNUNET_NO;
+    }
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              _("Trying to use file `%s' for MySQL configuration.\n"),
              cnffile);
   if ((0 != STAT (cnffile, &st)) ||
       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("Could not access file `%s': %s\n"), cnffile,
-                 STRERROR (errno));
+      if (configured == GNUNET_YES)
+       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                   _("Could not access file `%s': %s\n"), cnffile,
+                   STRERROR (errno));
       GNUNET_free (cnffile);
       return NULL;
     }
@@ -336,6 +409,28 @@ get_my_cnf_path (struct GNUNET_ConfigurationHandle *cfg)
 }
 
 
+
+/**
+ * Free a prepared statement.
+ *
+ * @param plugin plugin context
+ * @param s prepared statement
+ */
+static void
+prepared_statement_destroy (struct Plugin *plugin, 
+                           struct GNUNET_MysqlStatementHandle
+                           *s)
+{
+  GNUNET_CONTAINER_DLL_remove (plugin->shead,
+                              plugin->stail,
+                              s);
+  if (s->valid)
+    mysql_stmt_close (s->statement);
+  GNUNET_free (s->query);
+  GNUNET_free (s);
+}
+
+
 /**
  * Close database connection and all prepared statements (we got a DB
  * disconnect error).
@@ -346,16 +441,9 @@ iclose (struct Plugin *plugin)
   struct GNUNET_MysqlStatementHandle *spos;
 
   spos = plugin->shead;
-  while (spos != NULL)
-    {
-      if (spos->statement != NULL)
-       {
-         mysql_stmt_close (spos->statement);
-         spos->statement = NULL;
-       }
-      spos->valid = GNUNET_NO;
-      spos = spos->next;
-    }
+  while (NULL != plugin->shead)
+    prepared_statement_destroy (plugin,
+                               plugin->shead);
   if (plugin->dbf != NULL)
     {
       mysql_close (plugin->dbf);
@@ -392,57 +480,62 @@ iopen (struct Plugin *ret)
   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
   mysql_options (ret->dbf,
                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
+  mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
   timeout = 60; /* in seconds */
   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
   mysql_dbname = NULL;
   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "MYSQL", "DATABASE"))
+                                                    "datastore-mysql", "DATABASE"))
     GNUNET_assert (GNUNET_OK == 
                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
-                                                         "MYSQL", "DATABASE", 
+                                                         "datastore-mysql", "DATABASE", 
                                                          &mysql_dbname));
   else
     mysql_dbname = GNUNET_strdup ("gnunet");
   mysql_user = NULL;
   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "MYSQL", "USER"))
+                                                    "datastore-mysql", "USER"))
     {
-      GNUNET_break (GNUNET_OK == 
+      GNUNET_assert (GNUNET_OK == 
                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
-                                                          "MYSQL", "USER", 
+                                                          "datastore-mysql", "USER", 
                                                           &mysql_user));
     }
   mysql_password = NULL;
   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
-                                                    "MYSQL", "PASSWORD"))
+                                                    "datastore-mysql", "PASSWORD"))
     {
-      GNUNET_break (GNUNET_OK ==
-                   GNUNET_CONFIGURATION_get_value_string (ret->cfg,
-                                                          "MYSQL", "PASSWORD",
+      GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+                                                          "datastore-mysql", "PASSWORD",
                                                           &mysql_password));
     }
   mysql_server = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->cfg,
-                                                    "MYSQL", "HOST"))
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+                                                    "datastore-mysql", "HOST"))
     {
-      GNUNET_break (GNUNET_OK == 
-                   GNUNET_CONFIGURATION_get_value_string (ret->cfg,
-                                                          "MYSQL", "HOST", "",
+      GNUNET_assert (GNUNET_OK == 
+                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+                                                          "datastore-mysql", "HOST", 
                                                           &mysql_server));
     }
   mysql_port = 0;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->cfg,
-                                                    "MYSQL", "PORT"))
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+                                                    "datastore-mysql", "PORT"))
     {
-      GNUNET_break (GNUNET_OK ==
-                   GNUNET_CONFIGURATION_get_value_number (ret->cfg, "MYSQL",
+      GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
                                                           "PORT", &mysql_port));
     }
 
   GNUNET_assert (mysql_dbname != NULL);
   mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
-                      mysql_dbname, (unsigned int) mysql_port, NULL, 0);
+                      mysql_dbname, (unsigned int) mysql_port, NULL,
+                     CLIENT_IGNORE_SIGPIPE);
+  GNUNET_free_non_null (mysql_server);
+  GNUNET_free_non_null (mysql_user);
+  GNUNET_free_non_null (mysql_password);
   GNUNET_free (mysql_dbname);
   if (mysql_error (ret->dbf)[0])
     {
@@ -450,7 +543,6 @@ iopen (struct Plugin *ret)
                  "mysql_real_connect", ret);
       return GNUNET_SYSERR;
     }
-  ret->valid = GNUNET_YES;
   return GNUNET_OK;
 }
 
@@ -458,13 +550,15 @@ iopen (struct Plugin *ret)
 /**
  * Run the given MySQL statement.
  *
+ * @param plugin plugin context
+ * @param statement SQL statement to run
  * @return GNUNET_OK on success, GNUNET_SYSERR on error
  */
 static int
 run_statement (struct Plugin *plugin,
               const char *statement)
 {
-  if ((NULL == plugin->dbh) && (GNUNET_OK != iopen (plugin)))
+  if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
     return GNUNET_SYSERR;
   mysql_query (plugin->dbf, statement);
   if (mysql_error (plugin->dbf)[0])
@@ -478,47 +572,11 @@ run_statement (struct Plugin *plugin,
 }
 
 
-/**
- * Run the given MySQL SELECT statement.  The statement
- * must have only a single result (one column, one row).
- *
- * @return result on success, NULL on error
- */
-static char *
-run_statement_select (struct Plugin *plugin,
-                     const char *statement)
-{
-  MYSQL_RES *sql_res;
-  MYSQL_ROW sql_row;
-  char *ret;
-  
-  if ((NULL == plugin->dbh) && (GNUNET_OK != iopen (plugin)))
-    return NULL;
-  mysql_query (plugin->dbf, statement);
-  if ((mysql_error (plugin->dbf)[0]) ||
-      (!(sql_res = mysql_use_result (plugin->dbf))) ||
-      (!(sql_row = mysql_fetch_row (sql_res))))
-    {
-      LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
-                 "mysql_query", plugin);
-      return NULL;
-    }
-  if ((mysql_num_fields (sql_res) != 1) || (sql_row[0] == NULL))
-    {
-      GNUNET_break (mysql_num_fields (sql_res) == 1);
-      if (sql_res != NULL)
-        mysql_free_result (sql_res);
-      return NULL;
-    }
-  ret = GNUNET_strdup (sql_row[0]);
-  mysql_free_result (sql_res);
-  return ret;
-}
-
-
 /**
  * Create a prepared statement.
  *
+ * @param plugin plugin context
+ * @param statement SQL statement text to prepare
  * @return NULL on error
  */
 static struct GNUNET_MysqlStatementHandle *
@@ -539,6 +597,8 @@ prepared_statement_create (struct Plugin *plugin,
 /**
  * Prepare a statement for running.
  *
+ * @param plugin plugin context
+ * @param ret handle to prepared statement
  * @return GNUNET_OK on success
  */
 static int
@@ -547,7 +607,7 @@ prepare_statement (struct Plugin *plugin,
 {
   if (GNUNET_YES == ret->valid)
     return GNUNET_OK;
-  if ((NULL == plugin->dbh) && 
+  if ((NULL == plugin->dbf) && 
       (GNUNET_OK != iopen (plugin)))
     return GNUNET_SYSERR;
   ret->statement = mysql_stmt_init (plugin->dbf);
@@ -570,24 +630,7 @@ prepare_statement (struct Plugin *plugin,
     }
   ret->valid = GNUNET_YES;
   return GNUNET_OK;
-}
-
 
-/**
- * Free a prepared statement.
- */
-static void
-prepared_statement_destroy (struct Plugin *plugin, 
-                           struct GNUNET_MysqlStatementHandle
-                           *s)
-{
-  GNUNET_CONTAINER_DLL_remove (plugin->shead,
-                              plugin->stail,
-                              s);
-  if (s->valid)
-    mysql_stmt_close (s->statement);
-  GNUNET_free (s->query);
-  GNUNET_free (s);
 }
 
 
@@ -595,6 +638,7 @@ prepared_statement_destroy (struct Plugin *plugin,
  * Bind the parameters for the given MySQL statement
  * and run it.
  *
+ * @param plugin plugin context
  * @param s statement to bind and run
  * @param ap arguments for the binding
  * @return GNUNET_SYSERR on error, GNUNET_OK on success
@@ -693,6 +737,8 @@ typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
 /**
  * Run a prepared SELECT statement.
  *
+ * @param plugin plugin context
+ * @param s statement to run
  * @param result_size number of elements in results array
  * @param results pointer to already initialized MYSQL_BIND
  *        array (of sufficient size) for passing results
@@ -776,11 +822,13 @@ prepared_statement_run_select (struct Plugin *plugin,
 /**
  * Run a prepared statement that does NOT produce results.
  *
+ * @param plugin plugin context
+ * @param s statement to run
+ * @param insert_id NULL or address where to store the row ID of whatever
+ *        was inserted (only for INSERT statements!)
  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
  *        values (size + buffer-reference for pointers); terminated
  *        with "-1"
- * @param insert_id NULL or address where to store the row ID of whatever
- *        was inserted (only for INSERT statements!)
  * @return GNUNET_SYSERR on error, otherwise
  *         the number of successfully affected rows
  */
@@ -809,363 +857,485 @@ prepared_statement_run (struct Plugin *plugin,
 }
 
 
-
-
 /**
  * Delete an value from the gn072 table.
  *
+ * @param plugin plugin context
  * @param vkey vkey identifying the value to delete
  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
  */
 static int
-do_delete_value (unsigned long long vkey)
+do_delete_value (struct Plugin *plugin,
+                unsigned long long vkey)
 {
   int ret;
 
-  ret = GNUNET_MYSQL_prepared_statement_run (delete_value,
-                                             NULL,
-                                             MYSQL_TYPE_LONGLONG,
-                                             &vkey, GNUNET_YES, -1);
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Deleting value %llu from gn072 table\n",
+             vkey);
+#endif
+  ret = prepared_statement_run (plugin,
+                               plugin->delete_value,
+                               NULL,
+                               MYSQL_TYPE_LONGLONG,
+                               &vkey, GNUNET_YES, -1);
   if (ret > 0)
-    ret = GNUNET_OK;
+    {
+      ret = GNUNET_OK;
+    }
+  else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Deleting value %llu from gn072 table failed\n",
+                 vkey);
+    }
   return ret;
 }
 
 /**
  * Insert a value into the gn072 table.
  *
+ * @param plugin plugin context
  * @param value the value to insert
  * @param size size of the value
  * @param vkey vkey identifying the value henceforth (set)
  * @return GNUNET_OK on success, GNUNET_SYSERR on error
  */
 static int
-do_insert_value (const void *value, unsigned int size,
+do_insert_value (struct Plugin *plugin,
+                const void *value, unsigned int size,
                  unsigned long long *vkey)
 {
   unsigned long length = size;
+  int ret;
 
-  return GNUNET_MYSQL_prepared_statement_run (insert_value,
-                                              vkey,
-                                              MYSQL_TYPE_BLOB,
-                                              value, length, &length, -1);
+  ret = prepared_statement_run (plugin,
+                               plugin->insert_value,
+                               vkey,
+                               MYSQL_TYPE_BLOB,
+                               value, length, &length, -1);
+  if (ret == GNUNET_OK)
+    {
+#if DEBUG_MYSQL
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Inserted value number %llu with length %u into gn072 table\n",
+                 *vkey,
+                 size);
+#endif
+    }
+  else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Failed to insert %u byte value into gn072 table\n",
+                 size);
+    }
+  return ret;
 }
 
 /**
- * Delete an entry from the gn080 table.
+ * Delete an entry from the gn090 table.
  *
+ * @param plugin plugin context
  * @param vkey vkey identifying the entry to delete
  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
  */
 static int
-do_delete_entry_by_vkey (unsigned long long vkey)
+do_delete_entry_by_vkey (struct Plugin *plugin,
+                        unsigned long long vkey)
 {
   int ret;
 
-  ret = GNUNET_MYSQL_prepared_statement_run (delete_entry_by_vkey,
-                                             NULL,
-                                             MYSQL_TYPE_LONGLONG,
-                                             &vkey, GNUNET_YES, -1);
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Deleting value %llu from gn090 table\n",
+             vkey);
+#endif
+  ret = prepared_statement_run (plugin,
+                               plugin->delete_entry_by_vkey,
+                               NULL,
+                               MYSQL_TYPE_LONGLONG,
+                               &vkey, GNUNET_YES, -1);
   if (ret > 0)
-    ret = GNUNET_OK;
+    {
+      ret = GNUNET_OK;
+    }
+  else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Deleting value %llu from gn090 table failed\n",
+                 vkey);
+    }
   return ret;
 }
 
+
+/**
+ * Function that simply returns GNUNET_OK
+ *
+ * @param cls closure, not used
+ * @param num_values not used
+ * @param values not used
+ * @return GNUNET_OK
+ */
 static int
-return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values)
+return_ok (void *cls, 
+          unsigned int num_values, 
+          MYSQL_BIND * values)
 {
   return GNUNET_OK;
 }
 
+
 /**
- * Given a full (SELECT *) result set from gn080 table,
- * assemble it into a GNUNET_DatastoreValue representation.
- *
- * Call *without* holding the lock, but while within
- * mysql_thread_start/end.
+ * FIXME.
+ * 
+ * @param cls FIXME
+ * @param ncr FIXME
+ * @return FIXME
+ */
+static int
+iterator_helper_prepare (void *cls,
+                        struct NextRequestClosure *nrc)
+{
+  struct Plugin *plugin;
+  int ret;
+
+  if (nrc == NULL)
+    return GNUNET_NO;
+  plugin = nrc->plugin;
+  ret = GNUNET_SYSERR;
+  switch (nrc->iter_select)
+    {
+    case 0:
+    case 1:
+      ret = prepared_statement_run_select (plugin,
+                                          plugin->iter[nrc->iter_select],
+                                          6,
+                                          nrc->rbind,
+                                          &return_ok,
+                                          NULL,
+                                          MYSQL_TYPE_LONG,
+                                          &nrc->last_prio,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_vkey,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONG,
+                                          &nrc->last_prio,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_vkey,
+                                          GNUNET_YES, -1);
+      break;
+    case 2:
+      ret = prepared_statement_run_select (plugin,
+                                          plugin->iter[nrc->iter_select],
+                                          6,
+                                          nrc->rbind,
+                                          &return_ok,
+                                          NULL,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_expire,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_vkey,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_expire,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_vkey,
+                                          GNUNET_YES, -1);
+      break;
+    case 3:
+      ret = prepared_statement_run_select (plugin,
+                                          plugin->iter[nrc->iter_select],
+                                          6,
+                                          nrc->rbind,
+                                          &return_ok,
+                                          NULL,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_expire,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_vkey,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->now.value,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_expire,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->last_vkey,
+                                          GNUNET_YES,
+                                          MYSQL_TYPE_LONGLONG,
+                                          &nrc->now.value,
+                                          GNUNET_YES, -1);
+      break;
+    default:
+      GNUNET_assert (0);
+    }
+  return ret;
+}
+
+
+/**
+ * Continuation of "mysql_next_request".
  *
- * @param result location where mysql_stmt_fetch stored the results
- * @return NULL on error
+ * @param next_cls the next context
+ * @param tc the task context (unused)
  */
-static GNUNET_DatastoreValue *
-assembleDatum (MYSQL_BIND * result)
+static void 
+mysql_next_request_cont (void *next_cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GNUNET_DatastoreValue *datum;
-  unsigned int contentSize;
+  struct NextRequestClosure *nrc = next_cls;
+  struct Plugin *plugin;
+  int ret;
   unsigned int type;
-  unsigned int prio;
-  unsigned int level;
+  unsigned int priority;
+  unsigned int anonymity;
   unsigned long long exp;
   unsigned long long vkey;
+  unsigned long hashSize;
+  GNUNET_HashCode key;
+  struct GNUNET_TIME_Absolute expiration;
   unsigned long length;
-  MYSQL_BIND rbind[1];
-  int ret;
+  MYSQL_BIND *rbind; /* size 7 */
+  MYSQL_BIND dbind[1];
+  char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
 
-  if ((result[0].buffer_type != MYSQL_TYPE_LONG) ||
-      (!result[0].is_unsigned) ||
-      (result[1].buffer_type != MYSQL_TYPE_LONG) ||
-      (!result[1].is_unsigned) ||
-      (result[2].buffer_type != MYSQL_TYPE_LONG) ||
-      (!result[2].is_unsigned) ||
-      (result[3].buffer_type != MYSQL_TYPE_LONG) ||
-      (!result[3].is_unsigned) ||
-      (result[4].buffer_type != MYSQL_TYPE_LONGLONG) ||
-      (!result[4].is_unsigned) ||
-      (result[5].buffer_type != MYSQL_TYPE_BLOB) ||
-      (result[5].buffer_length != sizeof (GNUNET_HashCode)) ||
-      (*result[5].length != sizeof (GNUNET_HashCode)) ||
-      (result[6].buffer_type != MYSQL_TYPE_LONGLONG) ||
-      (!result[6].is_unsigned))
-    {
-      GNUNET_break (0);
-      return NULL;              /* error */
-    }
+  plugin = nrc->plugin;
+  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+  plugin->next_task_nc = NULL;
 
-  contentSize = *(unsigned int *) result[0].buffer;
-  if (contentSize < sizeof (GNUNET_DatastoreValue))
-    return NULL;                /* error */
-  if (contentSize > GNUNET_MAX_BUFFER_SIZE)
+ AGAIN: 
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->now = GNUNET_TIME_absolute_get ();
+  hashSize = sizeof (GNUNET_HashCode);
+  memset (nrc->rbind, 0, sizeof (nrc->rbind));
+  rbind = nrc->rbind;
+  rbind[0].buffer_type = MYSQL_TYPE_LONG;
+  rbind[0].buffer = &type;
+  rbind[0].is_unsigned = 1;
+  rbind[1].buffer_type = MYSQL_TYPE_LONG;
+  rbind[1].buffer = &priority;
+  rbind[1].is_unsigned = 1;
+  rbind[2].buffer_type = MYSQL_TYPE_LONG;
+  rbind[2].buffer = &anonymity;
+  rbind[2].is_unsigned = 1;
+  rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[3].buffer = &exp;
+  rbind[3].is_unsigned = 1;
+  rbind[4].buffer_type = MYSQL_TYPE_BLOB;
+  rbind[4].buffer = &key;
+  rbind[4].buffer_length = hashSize;
+  rbind[4].length = &hashSize;
+  rbind[5].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[5].buffer = &vkey;
+  rbind[5].is_unsigned = GNUNET_YES;
+
+  if ( (GNUNET_YES == nrc->end_it) ||
+       (GNUNET_OK != nrc->prep (nrc->prep_cls,
+                               nrc)))
+    goto END_SET;
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->last_vkey = vkey;
+  nrc->last_prio = priority;
+  nrc->last_expire = exp;
+  if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
+       (hashSize != sizeof (GNUNET_HashCode)) )
     {
-       GNUNET_break (0); /* far too big */
-       return NULL;
-    }
-  contentSize -= sizeof (GNUNET_DatastoreValue);
-  type = *(unsigned int *) result[1].buffer;
-  prio = *(unsigned int *) result[2].buffer;
-  level = *(unsigned int *) result[3].buffer;
-  exp = *(unsigned long long *) result[4].buffer;
-  vkey = *(unsigned long long *) result[6].buffer;
-  datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) + contentSize);
-  datum->size = htonl (contentSize + sizeof (GNUNET_DatastoreValue));
-  datum->type = htonl (type);
-  datum->priority = htonl (prio);
-  datum->anonymity_level = htonl (level);
-  datum->expiration_time = GNUNET_htonll (exp);
-
+      GNUNET_break (0);
+      goto END_SET;
+    }    
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Found value %llu with prio %u, anon %u, expire %llu selecting from gn090 table\n",
+             vkey,           
+             priority,
+             anonymity,
+             exp);
+#endif
   /* now do query on gn072 */
-  length = contentSize;
-  memset (rbind, 0, sizeof (rbind));
-  rbind[0].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[0].buffer_length = contentSize;
-  rbind[0].length = &length;
-  rbind[0].buffer = &datum[1];
-  ret = GNUNET_MYSQL_prepared_statement_run_select (select_value,
-                                                    1,
-                                                    rbind,
-                                                    &return_ok,
-                                                    NULL,
-                                                    MYSQL_TYPE_LONGLONG,
-                                                    &vkey, GNUNET_YES, -1);
-  GNUNET_break (ret <= 1);     /* should only have one result! */
+  length = sizeof (datum);
+  memset (dbind, 0, sizeof (dbind));
+  dbind[0].buffer_type = MYSQL_TYPE_BLOB;
+  dbind[0].buffer_length = length;
+  dbind[0].length = &length;
+  dbind[0].buffer = datum;
+  ret = prepared_statement_run_select (plugin,
+                                      plugin->select_value,
+                                      1,
+                                      dbind,
+                                      &return_ok,
+                                      NULL,
+                                      MYSQL_TYPE_LONGLONG,
+                                      &vkey, GNUNET_YES, -1);
+  GNUNET_break (ret <= 1);     /* should only have one rbind! */
   if (ret > 0)
     ret = GNUNET_OK;
-  if ((ret != GNUNET_OK) ||
-      (rbind[0].buffer_length != contentSize) || (length != contentSize))
+  if (ret != GNUNET_OK) 
     {
-      GNUNET_break (ret != 0); /* should have one result! */
-      GNUNET_break (length == contentSize);    /* length should match! */
-      GNUNET_break (rbind[0].buffer_length == contentSize);    /* length should be internally consistent! */
-      do_delete_value (vkey);
-      if (ret != 0)
-        do_delete_entry_by_vkey (vkey);
-      content_size -= ntohl (datum->size);
-      GNUNET_free (datum);
-      return NULL;
+      GNUNET_break (0);
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
+                 _("Failed to obtain value %llu from table `%s'\n"),
+                 vkey,
+                 "gn072");
+      goto AGAIN;
+    }
+  GNUNET_break (length <= sizeof(datum));
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Calling iterator with value `%s' number %llu of size %u with type %u, priority %u, anonymity %u and expiration %llu\n",
+             GNUNET_h2s (&key),
+             vkey,           
+             length,
+             type,
+             priority,
+             anonymity,
+             exp);
+#endif
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  expiration.value = exp;
+  ret = nrc->dviter (nrc->dviter_cls,
+                    nrc,
+                    &key,
+                    length,
+                    datum,
+                    type,
+                    priority,
+                    anonymity,
+                    expiration,
+                    vkey);
+  if (ret == GNUNET_SYSERR)
+    {
+      nrc->end_it = GNUNET_YES;
+      return;
+    }
+  if (ret == GNUNET_NO)
+    {
+      do_delete_value (plugin, vkey);
+      do_delete_entry_by_vkey (plugin, vkey);
+      if (length != 0)
+       plugin->env->duc (plugin->env->cls,
+                         - length);
     }
-  return datum;
+  return;
+ END_SET:
+  /* call dviter with "end of set" */
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->dviter (nrc->dviter_cls, 
+              NULL, NULL, 0, NULL, 0, 0, 0, 
+              GNUNET_TIME_UNIT_ZERO_ABS, 0);
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->prep (nrc->prep_cls, NULL);
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_free (nrc);
 }
 
 
+/**
+ * Function invoked on behalf of a "PluginIterator"
+ * asking the database plugin to call the iterator
+ * with the next item.
+ *
+ * @param next_cls whatever argument was given
+ *        to the PluginIterator as "next_cls".
+ * @param end_it set to GNUNET_YES if we
+ *        should terminate the iteration early
+ *        (iterator should be still called once more
+ *         to signal the end of the iteration).
+ */
+static void 
+mysql_plugin_next_request (void *next_cls,
+                          int end_it)
+{
+  struct NextRequestClosure *nrc = next_cls;
+
+  if (GNUNET_YES == end_it)
+    nrc->end_it = GNUNET_YES;
+  nrc->plugin->next_task_nc = nrc;
+  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
+                                                    &mysql_next_request_cont,
+                                                    nrc);
+}  
+
+
 /**
  * Iterate over the items in the datastore
  * using the given query to select and order
  * the items.
  *
+ * @param plugin plugin context
  * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter never NULL
+ * @param iter_select which iterator statement are we using
  * @param is_asc are we using ascending order?
- * @return the number of results, GNUNET_SYSERR if the
- *   iter is non-NULL and aborted the iteration
+ * @param dviter function to call on each matching item
+ * @param dviter_cls closure for dviter
  */
-static int
+static void
 iterateHelper (struct Plugin *plugin,
-              unsigned int type,
+              enum GNUNET_BLOCK_Type type,
                int is_asc,
-               unsigned int iter_select, GNUNET_DatastoreValueIterator dviter,
-               void *closure)
+               unsigned int iter_select, 
+              PluginIterator dviter,
+               void *dviter_cls)
 {
-  GNUNET_DatastoreValue *datum;
-  int count;
-  int ret;
-  unsigned int last_prio;
-  unsigned long long last_expire;
-  unsigned long long last_vkey;
-  unsigned int size;
-  unsigned int rtype;
-  unsigned int prio;
-  unsigned int level;
-  unsigned long long expiration;
-  unsigned long long vkey;
-  unsigned long hashSize;
-  GNUNET_HashCode key;
-  GNUNET_CronTime now;
-  MYSQL_BIND rbind[7];
-
+  struct NextRequestClosure *nrc;
+
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->type = type;  
+  nrc->iter_select = iter_select;
+  nrc->dviter = dviter;
+  nrc->dviter_cls = dviter_cls;
+  nrc->prep = &iterator_helper_prepare;
   if (is_asc)
     {
-      last_prio = 0;
-      last_vkey = 0;
-      last_expire = 0;
+      nrc->last_prio = 0;
+      nrc->last_vkey = 0;
+      nrc->last_expire = 0;
     }
   else
     {
-      last_prio = 0x7FFFFFFFL;
-      last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
-      last_expire = 0x7FFFFFFFFFFFFFFFLL;       /* MySQL only supports 63 bits */
-    }
-  hashSize = sizeof (GNUNET_HashCode);
-  memset (rbind, 0, sizeof (rbind));
-  rbind[0].buffer_type = MYSQL_TYPE_LONG;
-  rbind[0].buffer = &size;
-  rbind[0].is_unsigned = 1;
-  rbind[1].buffer_type = MYSQL_TYPE_LONG;
-  rbind[1].buffer = &rtype;
-  rbind[1].is_unsigned = 1;
-  rbind[2].buffer_type = MYSQL_TYPE_LONG;
-  rbind[2].buffer = &prio;
-  rbind[2].is_unsigned = 1;
-  rbind[3].buffer_type = MYSQL_TYPE_LONG;
-  rbind[3].buffer = &level;
-  rbind[3].is_unsigned = 1;
-  rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[4].buffer = &expiration;
-  rbind[4].is_unsigned = 1;
-  rbind[5].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[5].buffer = &key;
-  rbind[5].buffer_length = hashSize;
-  rbind[5].length = &hashSize;
-  rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[6].buffer = &vkey;
-  rbind[6].is_unsigned = GNUNET_YES;
-
-  now = GNUNET_get_time ();
-  count = 0;
-  while (1)
-    {
-      switch (iter_select)
-        {
-        case 0:
-        case 1:
-          ret = prepared_statement_run_select (iter[iter_select],
-                                              7,
-                                              rbind,
-                                              &return_ok,
-                                              NULL,
-                                              MYSQL_TYPE_LONG,
-                                              &last_prio,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_vkey,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONG,
-                                              &last_prio,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_vkey,
-                                              GNUNET_YES, -1);
-          break;
-        case 2:
-          ret = prepared_statement_run_select (iter[iter_select],
-                                              7,
-                                              rbind,
-                                              &return_ok,
-                                              NULL,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_expire,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_vkey,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_expire,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_vkey,
-                                              GNUNET_YES, -1);
-          break;
-        case 3:
-          ret = prepared_statement_run_select (iter[iter_select],
-                                              7,
-                                              rbind,
-                                              &return_ok,
-                                              NULL,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_expire,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_vkey,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &now,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_expire,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &last_vkey,
-                                              GNUNET_YES,
-                                              MYSQL_TYPE_LONGLONG,
-                                              &now,
-                                              GNUNET_YES, -1);
-          break;
-        default:
-          GNUNET_break (0);
-          return GNUNET_SYSERR;
-        }
-      if (ret != GNUNET_OK)
-        break;
-      last_vkey = vkey;
-      last_prio = prio;
-      last_expire = expiration;
-      count++;
-      if (dviter != NULL)
-        {
-          datum = assembleDatum (rbind);
-          if (datum == NULL)
-            continue;
-          ret = dviter (&key, datum, closure, vkey);
-          if (ret == GNUNET_SYSERR)
-            {
-              GNUNET_free (datum);
-              break;
-            }
-          if (ret == GNUNET_NO)
-            {
-              do_delete_value (vkey);
-              do_delete_entry_by_vkey (vkey);
-              content_size -= ntohl (datum->size);
-            }
-          GNUNET_free (datum);
-        }
+      nrc->last_prio = 0x7FFFFFFFL;
+      nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
+      nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL;       /* MySQL only supports 63 bits */
     }
-  return count;
+  mysql_plugin_next_request (nrc, GNUNET_NO);
 }
 
 
-
 /**
  * Get an estimate of how much space the database is
  * currently using.
  *
- * @param cls our "struct Plugin*"
+ * @param cls our "struct Plugin *"
  * @return number of bytes used on disk
  */
 static unsigned long long
 mysql_plugin_get_size (void *cls)
 {
   struct Plugin *plugin = cls;
-  return plugin->content_size;
+  MYSQL_BIND cbind[1];
+  long long total;
+
+  memset (cbind, 0, sizeof (cbind));
+  total = 0;
+  cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
+  cbind[0].buffer = &total;
+  cbind[0].is_unsigned = GNUNET_NO;
+  if (GNUNET_OK != 
+      prepared_statement_run_select (plugin,
+                                    plugin->get_size,
+                                    1, cbind, 
+                                    &return_ok, NULL,
+                                    -1))
+    return 0;
+  return total;
 }
 
 
@@ -1185,64 +1355,51 @@ mysql_plugin_get_size (void *cls)
  */
 static int
 mysql_plugin_put (void *cls,
-                    const GNUNET_HashCode * key,
-                    uint32_t size,
-                    const void *data,
-                    enum GNUNET_BLOCK_Type type,
-                    uint32_t priority,
-                    uint32_t anonymity,
-                    struct GNUNET_TIME_Absolute expiration,
-                    char **msg)
+                 const GNUNET_HashCode * key,
+                 uint32_t size,
+                 const void *data,
+                 enum GNUNET_BLOCK_Type type,
+                 uint32_t priority,
+                 uint32_t anonymity,
+                 struct GNUNET_TIME_Absolute expiration,
+                 char **msg)
 {
   struct Plugin *plugin = cls;
-
-  unsigned long contentSize;
+  unsigned int itype = type;
+  unsigned int ipriority = priority;
+  unsigned int ianonymity = anonymity;
+  unsigned long long lexpiration = expiration.value;
   unsigned long hashSize;
   unsigned long hashSize2;
-  unsigned int size;
-  unsigned int type;
-  unsigned int prio;
-  unsigned int level;
-  unsigned long long expiration;
   unsigned long long vkey;
   GNUNET_HashCode vhash;
 
-  if (((ntohl (value->size) < sizeof (GNUNET_DatastoreValue))) ||
-      ((ntohl (value->size) - sizeof (GNUNET_DatastoreValue)) >
-       MAX_DATUM_SIZE))
+  if (size > MAX_DATUM_SIZE)
     {
       GNUNET_break (0);
       return GNUNET_SYSERR;
     }
   hashSize = sizeof (GNUNET_HashCode);
   hashSize2 = sizeof (GNUNET_HashCode);
-  size = ntohl (value->size);
-  type = ntohl (value->type);
-  prio = ntohl (value->priority);
-  level = ntohl (value->anonymity_level);
-  expiration = GNUNET_ntohll (value->expiration_time);
-  contentSize = ntohl (value->size) - sizeof (GNUNET_DatastoreValue);
-  GNUNET_hash (&value[1], contentSize, &vhash);
-
-  if (GNUNET_OK != do_insert_value (&value[1], contentSize, &vkey))
+  GNUNET_CRYPTO_hash (data, size, &vhash);
+  if (GNUNET_OK != do_insert_value (plugin,
+                                   data, size, &vkey))
     return GNUNET_SYSERR;
   if (GNUNET_OK !=
-      prepared_statement_run (insert_entry,
+      prepared_statement_run (plugin,
+                             plugin->insert_entry,
                              NULL,
                              MYSQL_TYPE_LONG,
-                             &size,
-                             GNUNET_YES,
-                             MYSQL_TYPE_LONG,
-                             &type,
+                             &itype,
                              GNUNET_YES,
                              MYSQL_TYPE_LONG,
-                             &prio,
+                             &ipriority,
                              GNUNET_YES,
                              MYSQL_TYPE_LONG,
-                             &level,
+                             &ianonymity,
                              GNUNET_YES,
                              MYSQL_TYPE_LONGLONG,
-                             &expiration,
+                             &lexpiration,
                              GNUNET_YES,
                              MYSQL_TYPE_BLOB,
                              key,
@@ -1255,32 +1412,148 @@ mysql_plugin_put (void *cls,
                              MYSQL_TYPE_LONGLONG,
                              &vkey, GNUNET_YES, -1))
     {
-      do_delete_value (vkey);
+      do_delete_value (plugin, vkey);
       return GNUNET_SYSERR;
     }
-  plugin->content_size += ntohl (value->size);
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Inserted value `%s' number %llu with size %u into gn090 table\n",
+             GNUNET_h2s (key),
+             vkey,
+             (unsigned int) size);
+#endif
+  if (size > 0)
+    plugin->env->duc (plugin->env->cls,
+                     size);
   return GNUNET_OK;
 }
 
 
 /**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
+ * Select a subset of the items in the datastore and call
+ * the given iterator for each of them.
  *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
+ * @param cls our "struct Plugin*"
+ * @param type entries of which type should be considered?
+ *        Use 0 for any type.
+ * @param iter function to call on each matching value;
+ *        will be called once with a NULL value at the end
+ * @param iter_cls closure for iter
  */
-static void 
-mysql_plugin_next_request (void *next_cls,
-                      int end_it)
+static void
+mysql_plugin_iter_low_priority (void *cls,
+                               enum GNUNET_BLOCK_Type type,
+                               PluginIterator iter,
+                               void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  GNUNET_break (0);
+  iterateHelper (plugin, type, GNUNET_YES, 
+                0, iter, iter_cls); 
+}
+
+
+struct GetContext
+{
+  GNUNET_HashCode key;
+  GNUNET_HashCode vhash;
+
+  unsigned int prio;
+  unsigned int anonymity;
+  unsigned long long expiration;
+  unsigned long long vkey;
+  unsigned long long total;
+  int off;
+  int count;
+  int have_vhash;
+};
+
+
+static int
+get_statement_prepare (void *cls,
+                      struct NextRequestClosure *nrc)
+{
+  struct GetContext *gc = cls;
+  struct Plugin *plugin;
+  int ret;
+  unsigned int limit_off;
+  unsigned long hashSize;
+
+  if (NULL == nrc)
+    {
+      GNUNET_free (gc);
+      return GNUNET_NO;
+    }
+  if (gc->count == gc->total)
+    return GNUNET_NO;
+  plugin = nrc->plugin;
+  hashSize = sizeof (GNUNET_HashCode);
+  if (gc->count + gc->off == gc->total)
+    nrc->last_vkey = 0;          /* back to start */
+  if (gc->count == 0)
+    limit_off = gc->off;
+  else
+    limit_off = 0;
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Obtaining result number %d/%lld at offset %d with lvc %llu for GET `%s'\n",
+             gc->count+1,
+             gc->total,
+             limit_off,
+             nrc->last_vkey,
+             GNUNET_h2s (&gc->key));  
+#endif
+  if (nrc->type != 0)
+    {
+      if (gc->have_vhash)
+       {
+         ret =
+           prepared_statement_run_select
+           (plugin,
+            plugin->select_entry_by_hash_vhash_and_type, 6, nrc->rbind, &return_ok,
+            NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
+            MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
+            &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
+            -1);
+       }
+      else
+       {
+         ret =
+           prepared_statement_run_select
+           (plugin,
+            plugin->select_entry_by_hash_and_type, 6, nrc->rbind, &return_ok, NULL,
+            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+            MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
+            &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
+            -1);
+       }
+    }
+  else
+    {
+      if (gc->have_vhash)
+       {
+         ret =
+           prepared_statement_run_select
+           (plugin,
+            plugin->select_entry_by_hash_and_vhash, 6, nrc->rbind, &return_ok, NULL,
+            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
+            &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG,
+            &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
+            GNUNET_YES, -1);
+       }
+      else
+       {
+         ret =
+           prepared_statement_run_select
+           (plugin,
+            plugin->select_entry_by_hash, 6, nrc->rbind, &return_ok, NULL,
+            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+            MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
+            &limit_off, GNUNET_YES, -1);
+       }
+    }
+  gc->count++;
+  return ret;
 }
 
 
@@ -1309,51 +1582,49 @@ mysql_plugin_get (void *cls,
                  PluginIterator iter, void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  int count;
-  unsigned long long total;
-  int off;
+  unsigned int itype = type;
   int ret;
-  unsigned int size;
-  unsigned int rtype;
-  unsigned int prio;
-  unsigned int level;
-  unsigned int limit_off;
-  unsigned long long expiration;
-  unsigned long long vkey;
-  unsigned long long last_vkey;
-  GNUNET_DatastoreValue *datum;
-  GNUNET_HashCode key;
+  MYSQL_BIND cbind[1];
+  struct GetContext *gc;
+  struct NextRequestClosure *nrc;
+  long long total;
   unsigned long hashSize;
-  unsigned long hashSize2;
-  MYSQL_BIND rbind[7];
 
+  if (iter == NULL) 
+    return;
   if (key == NULL)
-    return iterateLowPriority (type, iter, closure);
+    {
+      mysql_plugin_iter_low_priority (plugin,
+                                     type, 
+                                     iter, iter_cls);
+      return;
+    }
   hashSize = sizeof (GNUNET_HashCode);
-  hashSize2 = sizeof (GNUNET_HashCode);
-  memset (rbind, 0, sizeof (rbind));
+  memset (cbind, 0, sizeof (cbind));
   total = -1;
-  rbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[0].buffer = &total;
-  rbind[0].is_unsigned = GNUNET_YES;
+  cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
+  cbind[0].buffer = &total;
+  cbind[0].is_unsigned = GNUNET_NO;
   if (type != 0)
     {
       if (vhash != NULL)
         {
           ret =
             prepared_statement_run_select
-            (count_entry_by_hash_vhash_and_type, 1, rbind, &return_ok, NULL,
-             MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_BLOB,
-             vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONG, &type, GNUNET_YES,
+            (plugin,
+            plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL,
+             MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
+             vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES,
              -1);
         }
       else
         {
           ret =
             prepared_statement_run_select
-            (count_entry_by_hash_and_type, 1, rbind, &return_ok, NULL,
-             MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_LONG,
-             &type, GNUNET_YES, -1);
+            (plugin,
+            plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL,
+             MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG,
+             &itype, GNUNET_YES, -1);
 
         }
     }
@@ -1363,130 +1634,57 @@ mysql_plugin_get (void *cls,
         {
           ret =
             prepared_statement_run_select
-            (count_entry_by_hash_and_vhash, 1, rbind, &return_ok, NULL,
-             MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_BLOB,
-             vhash, hashSize2, &hashSize2, -1);
+            (plugin,
+            plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL,
+             MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
+             vhash, hashSize, &hashSize, -1);
 
         }
       else
         {
           ret =
-            GNUNET_MYSQL_prepared_statement_run_select (count_entry_by_hash,
-                                                        1, rbind, &return_ok,
-                                                        NULL, MYSQL_TYPE_BLOB,
-                                                        key, hashSize2,
-                                                        &hashSize2, -1);
+            prepared_statement_run_select (plugin,
+                                          plugin->count_entry_by_hash,
+                                          1, cbind, &return_ok,
+                                          NULL, MYSQL_TYPE_BLOB,
+                                          key, hashSize,
+                                          &hashSize, -1);
         }
     }
-  if ((ret != GNUNET_OK) || (-1 == total))
-    return GNUNET_SYSERR;
-  if ((iter == NULL) || (total == 0))
-    return (int) total;
-
-  last_vkey = 0;
-  count = 0;
-  off = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, total);
-
-  memset (rbind, 0, sizeof (rbind));
-  rbind[0].buffer_type = MYSQL_TYPE_LONG;
-  rbind[0].buffer = &size;
-  rbind[0].is_unsigned = GNUNET_YES;
-  rbind[1].buffer_type = MYSQL_TYPE_LONG;
-  rbind[1].buffer = &rtype;
-  rbind[1].is_unsigned = GNUNET_YES;
-  rbind[2].buffer_type = MYSQL_TYPE_LONG;
-  rbind[2].buffer = &prio;
-  rbind[2].is_unsigned = GNUNET_YES;
-  rbind[3].buffer_type = MYSQL_TYPE_LONG;
-  rbind[3].buffer = &level;
-  rbind[3].is_unsigned = GNUNET_YES;
-  rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[4].buffer = &expiration;
-  rbind[4].is_unsigned = GNUNET_YES;
-  rbind[5].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[5].buffer = &key;
-  rbind[5].buffer_length = hashSize;
-  rbind[5].length = &hashSize;
-  rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[6].buffer = &vkey;
-  rbind[6].is_unsigned = GNUNET_YES;
-  while (1)
+  if ((ret != GNUNET_OK) || (0 >= total))
     {
-      if (count == 0)
-        limit_off = off;
-      else
-        limit_off = 0;
-      if (type != 0)
-        {
-          if (vhash != NULL)
-            {
-              ret =
-                prepared_statement_run_select
-                (select_entry_by_hash_vhash_and_type, 7, rbind, &return_ok,
-                 NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
-                 MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
-                 MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
-                 &type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
-                 -1);
-            }
-          else
-            {
-              ret =
-                prepared_statement_run_select
-                (select_entry_by_hash_and_type, 7, rbind, &return_ok, NULL,
-                 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
-                 MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
-                 &type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
-                 -1);
-            }
-        }
-      else
-        {
-          if (vhash != NULL)
-            {
-              ret =
-                prepared_statement_run_select
-                (select_entry_by_hash_and_vhash, 7, rbind, &return_ok, NULL,
-                 MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
-                 vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONGLONG,
-                 &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
-                 GNUNET_YES, -1);
-            }
-          else
-            {
-              ret =
-                prepared_statement_run_select
-                (select_entry_by_hash, 7, rbind, &return_ok, NULL,
-                 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
-                 MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
-                 &limit_off, GNUNET_YES, -1);
-            }
-        }
-      if (ret != GNUNET_OK)
-        break;
-      last_vkey = vkey;
-      datum = assembleDatum (rbind);
-      if (datum == NULL)
-        continue;
-      count++;
-      ret = iter (&key, datum, closure, vkey);
-      if (ret == GNUNET_SYSERR)
-        {
-          GNUNET_free (datum);
-          break;
-        }
-      if (ret == GNUNET_NO)
-        {
-          do_delete_value (vkey);
-          do_delete_entry_by_vkey (vkey);
-          content_size -= ntohl (datum->size);
-        }
-      GNUNET_free (datum);
-      if (count + off == total)
-        last_vkey = 0;          /* back to start */
-      if (count == total)
-        break;
+      iter (iter_cls, 
+           NULL, NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
+    }
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Iterating over %lld results for GET `%s'\n",
+             total,
+             GNUNET_h2s (key));
+#endif
+  gc = GNUNET_malloc (sizeof (struct GetContext));
+  gc->key = *key;
+  if (vhash != NULL)
+    {
+      gc->have_vhash = GNUNET_YES;
+      gc->vhash = *vhash;
     }
+  gc->total = total;
+  gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
+  
+
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->type = type;  
+  nrc->iter_select = -1;
+  nrc->dviter = iter;
+  nrc->dviter_cls = iter_cls;
+  nrc->prep = &get_statement_prepare;
+  nrc->prep_cls = gc;
+  nrc->last_vkey = 0;
+  mysql_plugin_next_request (nrc, GNUNET_NO);
 }
 
 
@@ -1522,45 +1720,38 @@ mysql_plugin_update (void *cls,
 {
   struct Plugin *plugin = cls;
   unsigned long long vkey = uid;
+  unsigned long long lexpire = expire.value;
+  int ret;
 
-  return prepared_statement_run (plugin,
-                                plugin->update_entry,
-                                NULL,
-                                MYSQL_TYPE_LONG,
-                                &delta,
-                                GNUNET_NO,
-                                MYSQL_TYPE_LONGLONG,
-                                &expire.value,
-                                GNUNET_YES,
-                                MYSQL_TYPE_LONGLONG,
-                                &expire.value,
-                                GNUNET_YES,
-                                MYSQL_TYPE_LONGLONG,
-                                &vkey,
-                                GNUNET_YES, -1);
-}
-
-
-/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
- */
-static void
-mysql_plugin_iter_low_priority (void *cls,
-                                  enum GNUNET_BLOCK_Type type,
-                                  PluginIterator iter,
-                                  void *iter_cls)
-{
-  struct Plugin *plugin = cls;
-  iterateHelper (plugin, type, GNUNET_YES, 
-                0, iter, iter_cls); 
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Updating value %llu adding %d to priority and maxing exp at %llu\n",
+             vkey,
+             delta,
+             lexpire);
+#endif
+  ret = prepared_statement_run (plugin,
+                               plugin->update_entry,
+                               NULL,
+                               MYSQL_TYPE_LONG,
+                               &delta,
+                               GNUNET_NO,
+                               MYSQL_TYPE_LONGLONG,
+                               &lexpire,
+                               GNUNET_YES,
+                               MYSQL_TYPE_LONGLONG,
+                               &lexpire,
+                               GNUNET_YES,
+                               MYSQL_TYPE_LONGLONG,
+                               &vkey,
+                               GNUNET_YES, -1);
+  if (ret != GNUNET_OK)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Failed to update value %llu\n",
+                 vkey);
+    }
+  return ret;
 }
 
 
@@ -1582,7 +1773,7 @@ mysql_plugin_iter_zero_anonymity (void *cls,
                                     void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, type, GNUNET_NO, 1, iter, closure);
+  iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls);
 }
 
 
@@ -1599,12 +1790,12 @@ mysql_plugin_iter_zero_anonymity (void *cls,
  */
 static void
 mysql_plugin_iter_ascending_expiration (void *cls,
-                                          enum GNUNET_BLOCK_Type type,
-                                          PluginIterator iter,
+                                       enum GNUNET_BLOCK_Type type,
+                                       PluginIterator iter,
                                        void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, type, GNUNET_YES, 2, iter, closure);
+  iterateHelper (plugin, type, GNUNET_YES, 2, iter, iter_cls);
 }
 
 
@@ -1626,7 +1817,7 @@ mysql_plugin_iter_migration_order (void *cls,
                                      void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, 0, GNUNET_NO, 3, iter, closure);
+  iterateHelper (plugin, 0, GNUNET_NO, 3, iter, iter_cls);
 }
 
 
@@ -1643,12 +1834,12 @@ mysql_plugin_iter_migration_order (void *cls,
  */
 static void
 mysql_plugin_iter_all_now (void *cls,
-                             enum GNUNET_BLOCK_Type type,
-                             PluginIterator iter,
-                             void *iter_cls)
+                          enum GNUNET_BLOCK_Type type,
+                          PluginIterator iter,
+                          void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, 0, GNUNET_YES, 0, iter, closure);
+  iterateHelper (plugin, 0, GNUNET_YES, 0, iter, iter_cls);
 }
 
 
@@ -1661,11 +1852,11 @@ mysql_plugin_drop (void *cls)
   struct Plugin *plugin = cls;
 
   if ((GNUNET_OK != run_statement (plugin,
-                                  "DROP TABLE gn080")) ||
+                                  "DROP TABLE gn090")) ||
       (GNUNET_OK != run_statement (plugin,
                                   "DROP TABLE gn072")))
-    return;                     /* error */
-  plugin->content_size = 0;
+    return;           /* error */
+  plugin->env->duc (plugin->env->cls, 0);
 }
 
 
@@ -1694,8 +1885,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
     }
 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
-  if (MRUNS ("CREATE TABLE IF NOT EXISTS gn080 ("
-             " size INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+  if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
@@ -1726,6 +1916,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
+      || PINIT (plugin->get_size, SELECT_SIZE)
       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
@@ -1736,14 +1927,14 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
       || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME)
       || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER))
     {
-      GNUNET_MYSQL_database_close (db);
-      db = NULL;
-      return GNUNET_SYSERR;
+      iclose (plugin);
+      GNUNET_free_non_null (plugin->cnffile);
+      GNUNET_free (plugin);
+      return NULL;
     }
 #undef PINIT
 #undef MRUNS
 
-
   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
   api->cls = plugin;
   api->get_size = &mysql_plugin_get_size;
@@ -1775,9 +1966,17 @@ libgnunet_plugin_datastore_mysql_done (void *cls)
   struct Plugin *plugin = api->cls;
 
   iclose (plugin);
+  if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (plugin->env->sched,
+                              plugin->next_task);
+      plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+      plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
+      GNUNET_free (plugin->next_task_nc);
+      plugin->next_task_nc = NULL;
+    }
   GNUNET_free_non_null (plugin->cnffile);
   GNUNET_free (plugin);
-  GNUNET_free (plugin);
   GNUNET_free (api);
   mysql_library_end ();
   return NULL;