fixes
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
index 5fc3f68785726210d1478f711a403d39f0d4799e..005026c92a2af7563b870907e1b4873889521541 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
  * MANUAL SETUP INSTRUCTIONS
  *
  * 1) in /etc/gnunet.conf, set
- *    <pre>
*     [datastore]
*     DATABASE = "mysql"
- *    </pre>
+ * @verbatim
      [datastore]
      DATABASE = "mysql"
+   @endverbatim
  * 2) Then access mysql as root,
- *    <pre>
- *
- *    $ mysql -u root -p
- *
- *    </pre>
+ * @verbatim
+     $ mysql -u root -p
+   @endverbatim
  *    and do the following. [You should replace $USER with the username
  *    that will be running the gnunetd process].
- *    <pre>
- *
+ * @verbatim
       CREATE DATABASE gnunet;
       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
          ON gnunet.* TO $USER@localhost;
       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
       FLUSH PRIVILEGES;
- *
- *    </pre>
+   @endverbatim
  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
  *    with the following lines
- *    <pre>
-
+ * @verbatim
       [client]
       user=$USER
       password=$the_password_you_like
-
- *    </pre>
+   @endverbatim
  *
  * Thats it. Note that .my.cnf file is a security risk unless its on
  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
  * 4) Still, perhaps you should briefly try if the DB connection
  *    works. First, login as $USER. Then use,
  *
- *    <pre>
*    $ mysql -u $USER -p $the_password_you_like
*    mysql> use gnunet;
- *    </pre>
+ * @verbatim
+     $ mysql -u $USER -p $the_password_you_like
+     mysql> use gnunet;
+   @endverbatim
  *
  *    If you get the message &quot;Database changed&quot; it probably works.
  *
  * - The tables can be verified/fixed in two ways;
  *   1) by running mysqlcheck -A, or
  *   2) by executing (inside of mysql using the GNUnet database):
- *   mysql> REPAIR TABLE gn080;
- *   mysql> REPAIR TABLE gn072;
+ * @verbatim
+     mysql> REPAIR TABLE gn090;
+   @endverbatim
  *
  * PROBLEMS?
  *
  * friend is probably the mysql manual. The first thing to check
  * is that mysql is basically operational, that you can connect
  * to it, create tables, issue queries etc.
- *
- * TODO:
- * - implement GET
- * - remove 'size' field in gn080.
- * - use FOREIGN KEY for 'uid/vkey'
- * - consistent naming of uid/vkey
  */
 
 #include "platform.h"
-#include "plugin_datastore.h"
+#include "gnunet_datastore_plugin.h"
 #include "gnunet_util_lib.h"
 #include <mysql/mysql.h>
 
 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
 
 
-/* warning, slighly crazy mysql statements ahead.  Essentially, MySQL does not handle
-   "OR" very well, so we need to use UNION instead.  And UNION does not
-   automatically apply a LIMIT on the outermost clause, so we need to
-   repeat ourselves quite a bit.  All hail the performance gods (and thanks
-   to #mysql on freenode) */
-#define SELECT_IT_LOW_PRIORITY "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
-                               "ORDER BY prio ASC,vkey ASC LIMIT 1) "\
-                               "UNION "\
-                               "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
-                               "ORDER BY prio ASC,vkey ASC LIMIT 1)"\
-                               "ORDER BY prio ASC,vkey ASC LIMIT 1"
-
-#define SELECT_IT_NON_ANONYMOUS "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
-                                " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
-                                "UNION "\
-                                "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
-                                " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
-                                "ORDER BY prio DESC,vkey DESC LIMIT 1"
-
-#define SELECT_IT_EXPIRATION_TIME "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
-                                  "ORDER BY expire ASC,vkey ASC LIMIT 1) "\
-                                  "UNION "\
-                                  "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
-                                  "ORDER BY expire ASC,vkey ASC LIMIT 1)"\
-                                  "ORDER BY expire ASC,vkey ASC LIMIT 1"
-
-
-#define SELECT_IT_MIGRATION_ORDER "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
-                                  " AND expire > ? AND type!=3"\
-                                  " ORDER BY expire DESC,vkey DESC LIMIT 1) "\
-                                  "UNION "\
-                                  "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
-                                  " AND expire > ? AND type!=3"\
-                                  " ORDER BY expire DESC,vkey DESC LIMIT 1)"\
-                                  "ORDER BY expire DESC,vkey DESC LIMIT 1"
-
-#define SELECT_SIZE "SELECT sum(size) FROM gn080"
-
-
 struct GNUNET_MysqlStatementHandle
 {
   struct GNUNET_MysqlStatementHandle *next;
@@ -249,21 +199,17 @@ struct NextRequestClosure
 
   MYSQL_BIND rbind[7];
 
-  unsigned int type;
-  
-  unsigned int iter_select;
+  enum GNUNET_BLOCK_Type type;
 
   PluginIterator dviter;
 
   void *dviter_cls;
 
-  unsigned int last_prio;
-
-  unsigned long long last_expire;
-
-  unsigned long long last_vkey;
+  unsigned int count;
 
   int end_it;
+
+  int one_shot;
 };
 
 
@@ -277,10 +223,19 @@ struct Plugin
    */
   struct GNUNET_DATASTORE_PluginEnvironment *env;
 
+  /**
+   * Handle to talk to MySQL.
+   */
   MYSQL *dbf;
   
+  /**
+   * We keep all prepared statements in a DLL.  This is the head.
+   */
   struct GNUNET_MysqlStatementHandle *shead;
 
+  /**
+   * We keep all prepared statements in a DLL.  This is the tail.
+   */
   struct GNUNET_MysqlStatementHandle *stail;
 
   /**
@@ -299,67 +254,66 @@ struct Plugin
   GNUNET_SCHEDULER_TaskIdentifier next_task;
 
   /**
-   * Statements dealing with gn072 table 
-   */
-#define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?"
-  struct GNUNET_MysqlStatementHandle *select_value;
-
-#define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"
-  struct GNUNET_MysqlStatementHandle *delete_value;
-
-#define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)"
-  struct GNUNET_MysqlStatementHandle *insert_value;
-
-  /**
-   * Statements dealing with gn080 table 
+   * Prepared statements.
    */
-#define INSERT_ENTRY "INSERT INTO gn080 (size,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)"
+#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
   struct GNUNET_MysqlStatementHandle *insert_entry;
   
-#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn080 WHERE vkey=?"
-  struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey;
-  
-#define SELECT_ENTRY_BY_HASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
-  
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
+#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
+  struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
 
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
-  
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
-  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
-  
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=?"
+#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 WHERE hash=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
+  
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+  struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
 
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
+#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
 
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=? AND type=?"
+#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
+  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
+#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND type=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
 
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
+  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
+#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?"
   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
+  
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
+  struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
 
-#define UPDATE_ENTRY "UPDATE gn080 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
+#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
   struct GNUNET_MysqlStatementHandle *update_entry;
 
-  struct GNUNET_MysqlStatementHandle *iter[4];
+#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
+  struct GNUNET_MysqlStatementHandle *dec_repl;
 
-  //static unsigned int stat_size;
+#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
+  struct GNUNET_MysqlStatementHandle *get_size;
 
-  /**
-   * Size of the mysql database on disk.
-   */
-  unsigned long long content_size;
+#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
+  struct GNUNET_MysqlStatementHandle *zero_iter;
+
+#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
+  "UNION "\
+  "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY prio ASC LIMIT 1) "\
+  "ORDER BY expire ASC LIMIT 1"
+  struct GNUNET_MysqlStatementHandle *select_expiration;
+
+#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY repl DESC,RAND() LIMIT 1"
+  struct GNUNET_MysqlStatementHandle *select_replication;
 
 };
 
 
 /**
  * Obtain the location of ".my.cnf".
+ *
+ * @param cfg our configuration
  * @return NULL on error
  */
 static char *
@@ -421,6 +375,9 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
 
 /**
  * Free a prepared statement.
+ *
+ * @param plugin plugin context
+ * @param s prepared statement
  */
 static void
 prepared_statement_destroy (struct Plugin *plugin, 
@@ -536,8 +493,11 @@ iopen (struct Plugin *ret)
     }
 
   GNUNET_assert (mysql_dbname != NULL);
-  mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
-                      mysql_dbname, (unsigned int) mysql_port, NULL,
+  mysql_real_connect (ret->dbf, 
+                     mysql_server, 
+                     mysql_user, mysql_password,
+                      mysql_dbname, 
+                     (unsigned int) mysql_port, NULL,
                      CLIENT_IGNORE_SIGPIPE);
   GNUNET_free_non_null (mysql_server);
   GNUNET_free_non_null (mysql_user);
@@ -556,6 +516,8 @@ iopen (struct Plugin *ret)
 /**
  * Run the given MySQL statement.
  *
+ * @param plugin plugin context
+ * @param statement SQL statement to run
  * @return GNUNET_OK on success, GNUNET_SYSERR on error
  */
 static int
@@ -576,49 +538,11 @@ run_statement (struct Plugin *plugin,
 }
 
 
-#if 0
-/**
- * Run the given MySQL SELECT statement.  The statement
- * must have only a single result (one column, one row).
- *
- * @return result on success, NULL on error
- */
-static char *
-run_statement_select (struct Plugin *plugin,
-                     const char *statement)
-{
-  MYSQL_RES *sql_res;
-  MYSQL_ROW sql_row;
-  char *ret;
-  
-  if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
-    return NULL;
-  mysql_query (plugin->dbf, statement);
-  if ((mysql_error (plugin->dbf)[0]) ||
-      (!(sql_res = mysql_use_result (plugin->dbf))) ||
-      (!(sql_row = mysql_fetch_row (sql_res))))
-    {
-      LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
-                 "mysql_query", plugin);
-      return NULL;
-    }
-  if ((mysql_num_fields (sql_res) != 1) || (sql_row[0] == NULL))
-    {
-      GNUNET_break (mysql_num_fields (sql_res) == 1);
-      if (sql_res != NULL)
-        mysql_free_result (sql_res);
-      return NULL;
-    }
-  ret = GNUNET_strdup (sql_row[0]);
-  mysql_free_result (sql_res);
-  return ret;
-}
-#endif
-
-
 /**
  * Create a prepared statement.
  *
+ * @param plugin plugin context
+ * @param statement SQL statement text to prepare
  * @return NULL on error
  */
 static struct GNUNET_MysqlStatementHandle *
@@ -639,6 +563,8 @@ prepared_statement_create (struct Plugin *plugin,
 /**
  * Prepare a statement for running.
  *
+ * @param plugin plugin context
+ * @param ret handle to prepared statement
  * @return GNUNET_OK on success
  */
 static int
@@ -678,6 +604,7 @@ prepare_statement (struct Plugin *plugin,
  * Bind the parameters for the given MySQL statement
  * and run it.
  *
+ * @param plugin plugin context
  * @param s statement to bind and run
  * @param ap arguments for the binding
  * @return GNUNET_SYSERR on error, GNUNET_OK on success
@@ -702,7 +629,7 @@ init_params (struct Plugin *plugin,
   memset (qbind, 0, sizeof (qbind));
   off = 0;
   ft = 0;
-  while ((pc > 0) && (-1 != (ft = va_arg (ap, enum enum_field_types))))
+  while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
     {
       qbind[off].buffer_type = ft;
       switch (ft)
@@ -733,9 +660,9 @@ init_params (struct Plugin *plugin,
       pc--;
       off++;
     }
-  if (!((pc == 0) && (ft != -1) && (va_arg (ap, int) == -1)))
+  if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
     {
-      GNUNET_break (0);
+      GNUNET_assert (0);
       return GNUNET_SYSERR;
     }
   if (mysql_stmt_bind_param (s->statement, qbind))
@@ -770,12 +697,14 @@ init_params (struct Plugin *plugin,
  */
 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
                                           unsigned int num_values,
-                                          MYSQL_BIND * values);
+                                          MYSQL_BIND *values);
 
 
 /**
  * Run a prepared SELECT statement.
  *
+ * @param plugin plugin context
+ * @param s statement to run
  * @param result_size number of elements in results array
  * @param results pointer to already initialized MYSQL_BIND
  *        array (of sufficient size) for passing results
@@ -789,12 +718,10 @@ typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
  */
 static int
 prepared_statement_run_select (struct Plugin *plugin,
-                              struct GNUNET_MysqlStatementHandle
-                              *s,
+                              struct GNUNET_MysqlStatementHandle *s,
                               unsigned int result_size,
-                              MYSQL_BIND * results,
-                              GNUNET_MysqlDataProcessor
-                              processor, void *processor_cls,
+                              MYSQL_BIND *results,
+                              GNUNET_MysqlDataProcessor processor, void *processor_cls,
                               ...)
 {
   va_list ap;
@@ -859,11 +786,13 @@ prepared_statement_run_select (struct Plugin *plugin,
 /**
  * Run a prepared statement that does NOT produce results.
  *
+ * @param plugin plugin context
+ * @param s statement to run
+ * @param insert_id NULL or address where to store the row ID of whatever
+ *        was inserted (only for INSERT statements!)
  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
  *        values (size + buffer-reference for pointers); terminated
  *        with "-1"
- * @param insert_id NULL or address where to store the row ID of whatever
- *        was inserted (only for INSERT statements!)
  * @return GNUNET_SYSERR on error, otherwise
  *         the number of successfully affected rows
  */
@@ -893,206 +822,218 @@ prepared_statement_run (struct Plugin *plugin,
 
 
 /**
- * Delete an value from the gn072 table.
+ * Delete an entry from the gn090 table.
  *
- * @param vkey vkey identifying the value to delete
+ * @param plugin plugin context
+ * @param uid unique ID of the entry to delete
  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
  */
 static int
-do_delete_value (struct Plugin *plugin,
-                unsigned long long vkey)
+do_delete_entry (struct Plugin *plugin,
+                unsigned long long uid)
 {
   int ret;
-
 #if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Deleting value %llu from gn072 table\n",
-             vkey);
+             "Deleting value %llu from gn090 table\n",
+             uid);
 #endif
   ret = prepared_statement_run (plugin,
-                               plugin->delete_value,
+                               plugin->delete_entry_by_uid,
                                NULL,
-                               MYSQL_TYPE_LONGLONG,
-                               &vkey, GNUNET_YES, -1);
-  if (ret > 0)
-    {
-      ret = GNUNET_OK;
-    }
-  else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 "Deleting value %llu from gn072 table failed\n",
-                 vkey);
-    }
+                               MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
+                               -1);
+  if (ret >= 0)
+    return GNUNET_OK;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+             "Deleting value %llu from gn090 table failed\n",
+             uid);
   return ret;
 }
 
+
 /**
- * Insert a value into the gn072 table.
+ * Function that simply returns GNUNET_OK
  *
- * @param value the value to insert
- * @param size size of the value
- * @param vkey vkey identifying the value henceforth (set)
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @param cls closure, not used
+ * @param num_values not used
+ * @param values not used
+ * @return GNUNET_OK
  */
 static int
-do_insert_value (struct Plugin *plugin,
-                const void *value, unsigned int size,
-                 unsigned long long *vkey)
+return_ok (void *cls, 
+          unsigned int num_values, 
+          MYSQL_BIND *values)
 {
-  unsigned long length = size;
-  int ret;
+  return GNUNET_OK;
+}
 
-  ret = prepared_statement_run (plugin,
-                               plugin->insert_value,
-                               vkey,
-                               MYSQL_TYPE_BLOB,
-                               value, length, &length, -1);
-  if (ret == GNUNET_OK)
+
+/**
+ * Get an estimate of how much space the database is
+ * currently using.
+ *
+ * @param cls our "struct Plugin *"
+ * @return number of bytes used on disk
+ */
+static unsigned long long
+mysql_plugin_get_size (void *cls)
+{
+  struct Plugin *plugin = cls;
+  MYSQL_BIND cbind[1];
+  long long total;
+
+  memset (cbind, 0, sizeof (cbind));
+  total = 0;
+  cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
+  cbind[0].buffer = &total;
+  cbind[0].is_unsigned = GNUNET_NO;
+  if (GNUNET_OK != 
+      prepared_statement_run_select (plugin,
+                                    plugin->get_size,
+                                    1, cbind, 
+                                    &return_ok, NULL,
+                                    -1))
+    return 0;
+  return total;
+}
+
+
+/**
+ * Store an item in the datastore.
+ *
+ * @param cls closure
+ * @param key key for the item
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
+ * @param expiration expiration time for the content
+ * @param msg set to error message
+ * @return GNUNET_OK on success
+ */
+static int
+mysql_plugin_put (void *cls,
+                 const GNUNET_HashCode * key,
+                 uint32_t size,
+                 const void *data,
+                 enum GNUNET_BLOCK_Type type,
+                 uint32_t priority,
+                 uint32_t anonymity,
+                 uint32_t replication,
+                 struct GNUNET_TIME_Absolute expiration,
+                 char **msg)
+{
+  struct Plugin *plugin = cls;
+  unsigned int irepl = replication;
+  unsigned int itype = type;
+  unsigned int ipriority = priority;
+  unsigned int ianonymity = anonymity;
+  unsigned long long lexpiration = expiration.abs_value;
+  unsigned long hashSize;
+  unsigned long hashSize2;
+  unsigned long lsize;
+  GNUNET_HashCode vhash;
+
+  if (size > MAX_DATUM_SIZE)
     {
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
+  hashSize = sizeof (GNUNET_HashCode);
+  hashSize2 = sizeof (GNUNET_HashCode);
+  lsize = size;
+  GNUNET_CRYPTO_hash (data, size, &vhash);
+  if (GNUNET_OK !=
+      prepared_statement_run (plugin,
+                             plugin->insert_entry,
+                             NULL,
+                             MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
+                             MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+                             MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
+                             MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
+                             MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
+                             MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                             MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
+                             MYSQL_TYPE_BLOB, data, lsize, &lsize, 
+                             -1))
+    return GNUNET_SYSERR;    
 #if DEBUG_MYSQL
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Inserted value number %llu with length %u into gn072 table\n",
-                 *vkey,
-                 size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Inserted value `%s' with size %u into gn090 table\n",
+             GNUNET_h2s (key),
+             (unsigned int) size);
 #endif
-    }
-  else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 "Failed to insert %u byte value into gn072 table\n",
-                 size);
-    }
-  return ret;
+  if (size > 0)
+    plugin->env->duc (plugin->env->cls,
+                     size);
+  return GNUNET_OK;
 }
 
+
 /**
- * Delete an entry from the gn080 table.
+ * Update the priority for a particular key in the datastore.  If
+ * the expiration time in value is different than the time found in
+ * the datastore, the higher value should be kept.  For the
+ * anonymity level, the lower value is to be used.  The specified
+ * priority should be added to the existing priority, ignoring the
+ * priority in value.
  *
- * @param vkey vkey identifying the entry to delete
- * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
+ * Note that it is possible for multiple values to match this put.
+ * In that case, all of the respective values are updated.
+ *
+ * @param cls our "struct Plugin*"
+ * @param uid unique identifier of the datum
+ * @param delta by how much should the priority
+ *     change?  If priority + delta < 0 the
+ *     priority should be set to 0 (never go
+ *     negative).
+ * @param expire new expiration time should be the
+ *     MAX of any existing expiration time and
+ *     this value
+ * @param msg set to error message
+ * @return GNUNET_OK on success
  */
 static int
-do_delete_entry_by_vkey (struct Plugin *plugin,
-                        unsigned long long vkey)
+mysql_plugin_update (void *cls,
+                    uint64_t uid,
+                    int delta, 
+                    struct GNUNET_TIME_Absolute expire,
+                    char **msg)
 {
+  struct Plugin *plugin = cls;
+  unsigned long long vkey = uid;
+  unsigned long long lexpire = expire.abs_value;
   int ret;
 
 #if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Deleting value %llu from gn080 table\n",
-             vkey);
+             "Updating value %llu adding %d to priority and maxing exp at %llu\n",
+             vkey,
+             delta,
+             lexpire);
 #endif
   ret = prepared_statement_run (plugin,
-                               plugin->delete_entry_by_vkey,
+                               plugin->update_entry,
                                NULL,
-                               MYSQL_TYPE_LONGLONG,
-                               &vkey, GNUNET_YES, -1);
-  if (ret > 0)
-    {
-      ret = GNUNET_OK;
-    }
-  else
+                               MYSQL_TYPE_LONG, &delta, GNUNET_NO,
+                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
+                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
+                               -1);
+  if (ret != GNUNET_OK)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 "Deleting value %llu from gn080 table failed\n",
+                 "Failed to update value %llu\n",
                  vkey);
     }
   return ret;
 }
 
-static int
-return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values)
-{
-  return GNUNET_OK;
-}
-
-
-static int
-iterator_helper_prepare (void *cls,
-                        struct NextRequestClosure *nrc)
-{
-  struct Plugin *plugin;
-  int ret;
 
-  if (nrc == NULL)
-    return GNUNET_NO;
-  plugin = nrc->plugin;
-  ret = GNUNET_SYSERR;
-  switch (nrc->iter_select)
-    {
-    case 0:
-    case 1:
-      ret = prepared_statement_run_select (plugin,
-                                          plugin->iter[nrc->iter_select],
-                                          7,
-                                          nrc->rbind,
-                                          &return_ok,
-                                          NULL,
-                                          MYSQL_TYPE_LONG,
-                                          &nrc->last_prio,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_vkey,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONG,
-                                          &nrc->last_prio,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_vkey,
-                                          GNUNET_YES, -1);
-      break;
-    case 2:
-      ret = prepared_statement_run_select (plugin,
-                                          plugin->iter[nrc->iter_select],
-                                          7,
-                                          nrc->rbind,
-                                          &return_ok,
-                                          NULL,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_expire,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_vkey,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_expire,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_vkey,
-                                          GNUNET_YES, -1);
-      break;
-    case 3:
-      ret = prepared_statement_run_select (plugin,
-                                          plugin->iter[nrc->iter_select],
-                                          7,
-                                          nrc->rbind,
-                                          &return_ok,
-                                          NULL,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_expire,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_vkey,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->now.value,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_expire,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->last_vkey,
-                                          GNUNET_YES,
-                                          MYSQL_TYPE_LONGLONG,
-                                          &nrc->now.value,
-                                          GNUNET_YES, -1);
-      break;
-    default:
-      GNUNET_assert (0);
-    }
-  return ret;
-}
 
 
 /**
@@ -1103,157 +1044,85 @@ iterator_helper_prepare (void *cls,
  */
 static void 
 mysql_next_request_cont (void *next_cls,
-                         const struct GNUNET_SCHEDULER_TaskContext *tc)
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct NextRequestClosure *nrc = next_cls;
   struct Plugin *plugin;
   int ret;
-  unsigned int size;
   unsigned int type;
   unsigned int priority;
   unsigned int anonymity;
   unsigned long long exp;
-  unsigned long long vkey;
   unsigned long hashSize;
+  unsigned long size;
+  unsigned long long uid;
+  char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
   GNUNET_HashCode key;
   struct GNUNET_TIME_Absolute expiration;
-  unsigned long length;
-  MYSQL_BIND *rbind; /* size 7 */
-  MYSQL_BIND dbind[1];
-  char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+  MYSQL_BIND *rbind = nrc->rbind;
 
   plugin = nrc->plugin;
   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
   plugin->next_task_nc = NULL;
 
- AGAIN: 
+  if (GNUNET_YES == nrc->end_it) 
+    goto END_SET;
   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
   nrc->now = GNUNET_TIME_absolute_get ();
   hashSize = sizeof (GNUNET_HashCode);
   memset (nrc->rbind, 0, sizeof (nrc->rbind));
   rbind = nrc->rbind;
   rbind[0].buffer_type = MYSQL_TYPE_LONG;
-  rbind[0].buffer = &size;
+  rbind[0].buffer = &type;
   rbind[0].is_unsigned = 1;
   rbind[1].buffer_type = MYSQL_TYPE_LONG;
-  rbind[1].buffer = &type;
+  rbind[1].buffer = &priority;
   rbind[1].is_unsigned = 1;
   rbind[2].buffer_type = MYSQL_TYPE_LONG;
-  rbind[2].buffer = &priority;
+  rbind[2].buffer = &anonymity;
   rbind[2].is_unsigned = 1;
-  rbind[3].buffer_type = MYSQL_TYPE_LONG;
-  rbind[3].buffer = &anonymity;
+  rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[3].buffer = &exp;
   rbind[3].is_unsigned = 1;
-  rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[4].buffer = &exp;
-  rbind[4].is_unsigned = 1;
+  rbind[4].buffer_type = MYSQL_TYPE_BLOB;
+  rbind[4].buffer = &key;
+  rbind[4].buffer_length = hashSize;
+  rbind[4].length = &hashSize;
   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[5].buffer = &key;
-  rbind[5].buffer_length = hashSize;
-  rbind[5].length = &hashSize;
+  rbind[5].buffer = value;
+  rbind[5].buffer_length = size = sizeof (value);
+  rbind[5].length = &size;
   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[6].buffer = &vkey;
-  rbind[6].is_unsigned = GNUNET_YES;
+  rbind[6].buffer = &uid;
+  rbind[6].is_unsigned = 1;
 
-  if ( (GNUNET_YES == nrc->end_it) ||
-       (GNUNET_OK != nrc->prep (nrc->prep_cls,
-                               nrc)))
+  if (GNUNET_OK != nrc->prep (nrc->prep_cls,
+                             nrc))
     goto END_SET;
   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
-    {
-      GNUNET_break (0); /* far too big */
-      goto END_SET;
-    }
-  nrc->last_vkey = vkey;
-  nrc->last_prio = priority;
-  nrc->last_expire = exp;
-  if ((rbind[0].buffer_type != MYSQL_TYPE_LONG) ||
-      (!rbind[0].is_unsigned) ||
-      (rbind[1].buffer_type != MYSQL_TYPE_LONG) ||
-      (!rbind[1].is_unsigned) ||
-      (rbind[2].buffer_type != MYSQL_TYPE_LONG) ||
-      (!rbind[2].is_unsigned) ||
-      (rbind[3].buffer_type != MYSQL_TYPE_LONG) ||
-      (!rbind[3].is_unsigned) ||
-      (rbind[4].buffer_type != MYSQL_TYPE_LONGLONG) ||
-      (!rbind[4].is_unsigned) ||
-      (rbind[5].buffer_type != MYSQL_TYPE_BLOB) ||
-      (rbind[5].buffer_length != sizeof (GNUNET_HashCode)) ||
-      (*rbind[5].length != sizeof (GNUNET_HashCode)) ||
-      (rbind[6].buffer_type != MYSQL_TYPE_LONGLONG) ||
-      (!rbind[6].is_unsigned))
+  GNUNET_assert (size <= sizeof(value));
+  if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
+       (hashSize != sizeof (GNUNET_HashCode)) )
     {
       GNUNET_break (0);
       goto END_SET;
     }    
 #if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Found value %llu with size %u, prio %u, anon %u, expire %llu selecting from gn080 table\n",
-             vkey,
-             size,
-             priority,
-             anonymity,
-             exp);
-#endif
-  /* now do query on gn072 */
-  length = size;
-  memset (dbind, 0, sizeof (dbind));
-  dbind[0].buffer_type = MYSQL_TYPE_BLOB;
-  dbind[0].buffer_length = size;
-  dbind[0].length = &length;
-  dbind[0].buffer = datum;
-  ret = prepared_statement_run_select (plugin,
-                                      plugin->select_value,
-                                      1,
-                                      dbind,
-                                      &return_ok,
-                                      NULL,
-                                      MYSQL_TYPE_LONGLONG,
-                                      &vkey, GNUNET_YES, -1);
-  GNUNET_break (ret <= 1);     /* should only have one rbind! */
-  if (ret > 0)
-    ret = GNUNET_OK;
-  if ((ret != GNUNET_OK) ||
-      (dbind[0].buffer_length != size) || (length != size))
-    {
-      GNUNET_break (ret != 0); /* should have one rbind! */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
-                 "Failed to obtain %llu from gn072\n",
-                 vkey);
-      GNUNET_break (length == size);    /* length should match! */
-      GNUNET_break (dbind[0].buffer_length == size);    /* length should be internally consistent! */
-      if (ret != 0)
-       {
-         do_delete_value (plugin, vkey);
-         do_delete_entry_by_vkey (plugin, vkey);
-         plugin->content_size -= size;
-       }
-      goto AGAIN;
-    }
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Calling iterator with value `%s' number %llu of size %u with type %u, priority %u, anonymity %u and expiration %llu\n",
+             "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
+             (unsigned int) size,
              GNUNET_h2s (&key),
-             vkey,
-             size,
-             type,
              priority,
              anonymity,
              exp);
 #endif
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  expiration.value = exp;
-  ret = nrc->dviter (nrc->dviter_cls,
-                    nrc,
+  expiration.abs_value = exp;
+  ret = nrc->dviter (nrc->dviter_cls, 
+                    (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
                     &key,
-                    size,
-                    datum,
-                    type,
-                    priority,
-                    anonymity,
-                    expiration,
-                    vkey);
+                    size, value,
+                    type, priority, anonymity, expiration,
+                    uid);
   if (ret == GNUNET_SYSERR)
     {
       nrc->end_it = GNUNET_YES;
@@ -1261,10 +1130,13 @@ mysql_next_request_cont (void *next_cls,
     }
   if (ret == GNUNET_NO)
     {
-      do_delete_value (plugin, vkey);
-      do_delete_entry_by_vkey (plugin, vkey);
-      plugin->content_size -= size;
+      do_delete_entry (plugin, uid);
+      if (size != 0)
+       plugin->env->duc (plugin->env->cls,
+                         - size);
     }
+  if (nrc->one_shot == GNUNET_YES)
+    GNUNET_free (nrc);
   return;
  END_SET:
   /* call dviter with "end of set" */
@@ -1300,185 +1172,14 @@ mysql_plugin_next_request (void *next_cls,
   if (GNUNET_YES == end_it)
     nrc->end_it = GNUNET_YES;
   nrc->plugin->next_task_nc = nrc;
-  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
-                                                    &mysql_next_request_cont,
+  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
                                                     nrc);
 }  
 
 
 /**
- * Iterate over the items in the datastore
- * using the given query to select and order
- * the items.
- *
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter never NULL
- * @param is_asc are we using ascending order?
+ * Context for 'get_statement_prepare'.
  */
-static void
-iterateHelper (struct Plugin *plugin,
-              unsigned int type,
-               int is_asc,
-               unsigned int iter_select, 
-              PluginIterator dviter,
-               void *dviter_cls)
-{
-  struct NextRequestClosure *nrc;
-
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->type = type;  
-  nrc->iter_select = iter_select;
-  nrc->dviter = dviter;
-  nrc->dviter_cls = dviter_cls;
-  nrc->prep = &iterator_helper_prepare;
-  if (is_asc)
-    {
-      nrc->last_prio = 0;
-      nrc->last_vkey = 0;
-      nrc->last_expire = 0;
-    }
-  else
-    {
-      nrc->last_prio = 0x7FFFFFFFL;
-      nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
-      nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL;       /* MySQL only supports 63 bits */
-    }
-  mysql_plugin_next_request (nrc, GNUNET_NO);
-}
-
-
-/**
- * Get an estimate of how much space the database is
- * currently using.
- *
- * @param cls our "struct Plugin*"
- * @return number of bytes used on disk
- */
-static unsigned long long
-mysql_plugin_get_size (void *cls)
-{
-  struct Plugin *plugin = cls;
-  return plugin->content_size;
-}
-
-
-/**
- * Store an item in the datastore.
- *
- * @param cls closure
- * @param key key for the item
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
- */
-static int
-mysql_plugin_put (void *cls,
-                    const GNUNET_HashCode * key,
-                    uint32_t size,
-                    const void *data,
-                    enum GNUNET_BLOCK_Type type,
-                    uint32_t priority,
-                    uint32_t anonymity,
-                    struct GNUNET_TIME_Absolute expiration,
-                    char **msg)
-{
-  struct Plugin *plugin = cls;
-  unsigned int itype = type;
-  unsigned int isize = size;
-  unsigned int ipriority = priority;
-  unsigned int ianonymity = anonymity;
-  unsigned long long lexpiration = expiration.value;
-  unsigned long hashSize;
-  unsigned long hashSize2;
-  unsigned long long vkey;
-  GNUNET_HashCode vhash;
-
-  if (size > MAX_DATUM_SIZE)
-    {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-  hashSize = sizeof (GNUNET_HashCode);
-  hashSize2 = sizeof (GNUNET_HashCode);
-  GNUNET_CRYPTO_hash (data, size, &vhash);
-  if (GNUNET_OK != do_insert_value (plugin,
-                                   data, size, &vkey))
-    return GNUNET_SYSERR;
-  if (GNUNET_OK !=
-      prepared_statement_run (plugin,
-                             plugin->insert_entry,
-                             NULL,
-                             MYSQL_TYPE_LONG,
-                             &isize,
-                             GNUNET_YES,
-                             MYSQL_TYPE_LONG,
-                             &itype,
-                             GNUNET_YES,
-                             MYSQL_TYPE_LONG,
-                             &ipriority,
-                             GNUNET_YES,
-                             MYSQL_TYPE_LONG,
-                             &ianonymity,
-                             GNUNET_YES,
-                             MYSQL_TYPE_LONGLONG,
-                             &lexpiration,
-                             GNUNET_YES,
-                             MYSQL_TYPE_BLOB,
-                             key,
-                             hashSize,
-                             &hashSize,
-                             MYSQL_TYPE_BLOB,
-                             &vhash,
-                             hashSize2,
-                             &hashSize2,
-                             MYSQL_TYPE_LONGLONG,
-                             &vkey, GNUNET_YES, -1))
-    {
-      do_delete_value (plugin, vkey);
-      return GNUNET_SYSERR;
-    }
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Inserted value `%s' number %llu with size %u into gn080 table\n",
-             GNUNET_h2s (key),
-             vkey,
-             isize);
-#endif
-  plugin->content_size += size;
-  return GNUNET_OK;
-}
-
-
-/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
- */
-static void
-mysql_plugin_iter_low_priority (void *cls,
-                               enum GNUNET_BLOCK_Type type,
-                               PluginIterator iter,
-                               void *iter_cls)
-{
-  struct Plugin *plugin = cls;
-  iterateHelper (plugin, type, GNUNET_YES, 
-                0, iter, iter_cls); 
-}
-
-
 struct GetContext
 {
   GNUNET_HashCode key;
@@ -1489,10 +1190,9 @@ struct GetContext
   unsigned long long expiration;
   unsigned long long vkey;
   unsigned long long total;
-  int off;
-  int count;
+  unsigned int off;
+  unsigned int count;
   int have_vhash;
-  unsigned long size; /* OBSOLETE! */
 };
 
 
@@ -1503,9 +1203,8 @@ get_statement_prepare (void *cls,
   struct GetContext *gc = cls;
   struct Plugin *plugin;
   int ret;
-  unsigned int limit_off;
   unsigned long hashSize;
-
+  
   if (NULL == nrc)
     {
       GNUNET_free (gc);
@@ -1515,45 +1214,41 @@ get_statement_prepare (void *cls,
     return GNUNET_NO;
   plugin = nrc->plugin;
   hashSize = sizeof (GNUNET_HashCode);
-  if (gc->count + gc->off == gc->total)
-    nrc->last_vkey = 0;          /* back to start */
-  if (gc->count == 0)
-    limit_off = gc->off;
-  else
-    limit_off = 0;
+  if (++gc->off >= gc->total)
+    gc->off = 0;
 #if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Obtaining result number %d/%lld at offset %d with lvc %llu for GET `%s'\n",
+             "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
              gc->count+1,
              gc->total,
-             limit_off,
-             nrc->last_vkey,
+             gc->off,
              GNUNET_h2s (&gc->key));  
 #endif
   if (nrc->type != 0)
     {
       if (gc->have_vhash)
        {
-         ret =
-           prepared_statement_run_select
-           (plugin,
-            plugin->select_entry_by_hash_vhash_and_type, 7, nrc->rbind, &return_ok,
-            NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
-            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
-            MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
-            &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
-            -1);
+         ret = prepared_statement_run_select (plugin,
+                                              plugin->select_entry_by_hash_vhash_and_type, 
+                                              7, nrc->rbind, 
+                                              &return_ok, NULL, 
+                                              MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+                                              MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
+                                              MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
+                                              MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
+                                              -1);
        }
       else
        {
          ret =
-           prepared_statement_run_select
-           (plugin,
-            plugin->select_entry_by_hash_and_type, 7, nrc->rbind, &return_ok, NULL,
-            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
-            MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
-            &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
-            -1);
+           prepared_statement_run_select (plugin,
+                                          plugin->select_entry_by_hash_and_type, 
+                                          7, nrc->rbind, 
+                                          &return_ok, NULL,
+                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+                                          MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
+                                          MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
+                                          -1);
        }
     }
   else
@@ -1561,23 +1256,25 @@ get_statement_prepare (void *cls,
       if (gc->have_vhash)
        {
          ret =
-           prepared_statement_run_select
-           (plugin,
-            plugin->select_entry_by_hash_and_vhash, 7, nrc->rbind, &return_ok, NULL,
-            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
-            &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG,
-            &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
-            GNUNET_YES, -1);
+           prepared_statement_run_select (plugin,
+                                          plugin->select_entry_by_hash_and_vhash, 
+                                          7, nrc->rbind, 
+                                          &return_ok, NULL,
+                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
+                                          MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
+                                          MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
+                                          -1);
        }
       else
        {
          ret =
-           prepared_statement_run_select
-           (plugin,
-            plugin->select_entry_by_hash, 7, nrc->rbind, &return_ok, NULL,
-            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
-            MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
-            &limit_off, GNUNET_YES, -1);
+           prepared_statement_run_select (plugin,
+                                          plugin->select_entry_by_hash, 
+                                          7, nrc->rbind, 
+                                          &return_ok, NULL,
+                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
+                                          MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
+                                          -1);
        }
     }
   gc->count++;
@@ -1586,8 +1283,7 @@ get_statement_prepare (void *cls,
 
 
 /**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Iterate over the results for a particular key in the datastore.
  *
  * @param cls closure
  * @param key maybe NULL (to match all entries)
@@ -1597,15 +1293,15 @@ get_statement_prepare (void *cls,
  *        betwen key and vhash, but for other blocks
  *        there may be!
  * @param type entries of which type are relevant?
- *     Use 0 for any type.
+ *        Use 0 for any type.
  * @param iter function to call on each matching value;
  *        will be called once with a NULL value at the end
  * @param iter_cls closure for iter
  */
 static void
 mysql_plugin_get (void *cls,
-                 const GNUNET_HashCode * key,
-                 const GNUNET_HashCode * vhash,
+                 const GNUNET_HashCode *key,
+                 const GNUNET_HashCode *vhash,
                  enum GNUNET_BLOCK_Type type,
                  PluginIterator iter, void *iter_cls)
 {
@@ -1617,17 +1313,13 @@ mysql_plugin_get (void *cls,
   struct NextRequestClosure *nrc;
   long long total;
   unsigned long hashSize;
+  unsigned long hashSize2;
 
+  GNUNET_assert (key != NULL);
   if (iter == NULL) 
     return;
-  if (key == NULL)
-    {
-      mysql_plugin_iter_low_priority (plugin,
-                                     type, 
-                                     iter, iter_cls);
-      return;
-    }
   hashSize = sizeof (GNUNET_HashCode);
+  hashSize2 = sizeof (GNUNET_HashCode);
   memset (cbind, 0, sizeof (cbind));
   total = -1;
   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
@@ -1638,22 +1330,25 @@ mysql_plugin_get (void *cls,
       if (vhash != NULL)
         {
           ret =
-            prepared_statement_run_select
-            (plugin,
-            plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL,
-             MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
-             vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES,
-             -1);
+            prepared_statement_run_select (plugin,
+                                          plugin->count_entry_by_hash_vhash_and_type, 
+                                          1, cbind, 
+                                          &return_ok, NULL,
+                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
+                                          MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
+                                          MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+                                          -1);
         }
       else
         {
           ret =
-            prepared_statement_run_select
-            (plugin,
-            plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL,
-             MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG,
-             &itype, GNUNET_YES, -1);
-
+            prepared_statement_run_select (plugin,
+                                          plugin->count_entry_by_hash_and_type, 
+                                          1, cbind, 
+                                          &return_ok, NULL,
+                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
+                                          MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+                                          -1);
         }
     }
   else
@@ -1661,11 +1356,13 @@ mysql_plugin_get (void *cls,
       if (vhash != NULL)
         {
           ret =
-            prepared_statement_run_select
-            (plugin,
-            plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL,
-             MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
-             vhash, hashSize, &hashSize, -1);
+            prepared_statement_run_select (plugin,
+                                          plugin->count_entry_by_hash_and_vhash, 
+                                          1, cbind,
+                                          &return_ok, NULL,
+                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
+                                          MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
+                                          -1);
 
         }
       else
@@ -1673,10 +1370,10 @@ mysql_plugin_get (void *cls,
           ret =
             prepared_statement_run_select (plugin,
                                           plugin->count_entry_by_hash,
-                                          1, cbind, &return_ok,
-                                          NULL, MYSQL_TYPE_BLOB,
-                                          key, hashSize,
-                                          &hashSize, -1);
+                                          1, cbind, 
+                                          &return_ok, NULL, 
+                                          MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
+                                          -1);
         }
     }
   if ((ret != GNUNET_OK) || (0 >= total))
@@ -1706,79 +1403,38 @@ mysql_plugin_get (void *cls,
   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
   nrc->plugin = plugin;
   nrc->type = type;  
-  nrc->iter_select = -1;
   nrc->dviter = iter;
   nrc->dviter_cls = iter_cls;
   nrc->prep = &get_statement_prepare;
   nrc->prep_cls = gc;
-  nrc->last_vkey = 0;
   mysql_plugin_next_request (nrc, GNUNET_NO);
 }
 
 
 /**
- * Update the priority for a particular key in the datastore.  If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept.  For the
- * anonymity level, the lower value is to be used.  The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- *     change?  If priority + delta < 0 the
- *     priority should be set to 0 (never go
- *     negative).
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * Run the prepared statement to get the next data item ready.
+ * 
+ * @param cls not used
+ * @param nrc closure for the next request iterator
+ * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
  */
 static int
-mysql_plugin_update (void *cls,
-                    uint64_t uid,
-                    int delta, 
-                    struct GNUNET_TIME_Absolute expire,
-                    char **msg)
+iterator_zero_prepare (void *cls,
+                      struct NextRequestClosure *nrc)
 {
-  struct Plugin *plugin = cls;
-  unsigned long long vkey = uid;
-  unsigned long long lexpire = expire.value;
+  struct Plugin *plugin;
   int ret;
 
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Updating value %llu adding %d to priority and maxing exp at %llu\n",
-             vkey,
-             delta,
-             lexpire);
-#endif
-  ret = prepared_statement_run (plugin,
-                               plugin->update_entry,
-                               NULL,
-                               MYSQL_TYPE_LONG,
-                               &delta,
-                               GNUNET_NO,
-                               MYSQL_TYPE_LONGLONG,
-                               &lexpire,
-                               GNUNET_YES,
-                               MYSQL_TYPE_LONGLONG,
-                               &lexpire,
-                               GNUNET_YES,
-                               MYSQL_TYPE_LONGLONG,
-                               &vkey,
-                               GNUNET_YES, -1);
-  if (ret != GNUNET_OK)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 "Failed to update value %llu\n",
-                 vkey);
-    }
+  if (nrc == NULL)
+    return GNUNET_NO;
+  plugin = nrc->plugin;
+  ret = prepared_statement_run_select (plugin,
+                                      plugin->zero_iter,
+                                      7, nrc->rbind,
+                                      &return_ok, NULL,
+                                      MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
+                                      -1);
+  nrc->count++;
   return ret;
 }
 
@@ -1796,95 +1452,236 @@ mysql_plugin_update (void *cls,
  */
 static void
 mysql_plugin_iter_zero_anonymity (void *cls,
-                                    enum GNUNET_BLOCK_Type type,
-                                    PluginIterator iter,
-                                    void *iter_cls)
+                                 enum GNUNET_BLOCK_Type type,
+                                 PluginIterator iter,
+                                 void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls);
+  struct NextRequestClosure *nrc;
+
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->type = type;  
+  nrc->dviter = iter;
+  nrc->dviter_cls = iter_cls;
+  nrc->prep = &iterator_zero_prepare;
+  mysql_plugin_next_request (nrc, GNUNET_NO);
 }
 
 
 /**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * Run the SELECT statement for the replication function.
+ * 
+ * @param cls the 'struct Plugin'
+ * @param nrc the context (not used)
  */
-static void
-mysql_plugin_iter_ascending_expiration (void *cls,
-                                       enum GNUNET_BLOCK_Type type,
-                                       PluginIterator iter,
-                                       void *iter_cls)
+static int
+replication_prepare (void *cls,
+                    struct NextRequestClosure *nrc)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, type, GNUNET_YES, 2, iter, iter_cls);
+
+  return prepared_statement_run_select (plugin,
+                                       plugin->select_replication, 
+                                       7, nrc->rbind, 
+                                       &return_ok, NULL,
+                                       -1);
 }
 
 
+
 /**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Context for 'repl_iter' function.
+ */
+struct ReplCtx
+{
+  
+  /**
+   * Plugin handle.
+   */
+  struct Plugin *plugin;
+  
+  /**
+   * Function to call for the result (or the NULL).
+   */
+  PluginIterator iter;
+  
+  /**
+   * Closure for iter.
+   */
+  void *iter_cls;
+};
+
+
+/**
+ * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Decrements the replication counter and calls the original
+ * iterator.
  *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
+ * @param cls closure
+ * @param next_cls closure to pass to the "next" function.
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ *         (continue on call to "next", of course),
+ *         GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+repl_iter (void *cls,
+          void *next_cls,
+          const GNUNET_HashCode *key,
+          uint32_t size,
+          const void *data,
+          enum GNUNET_BLOCK_Type type,
+          uint32_t priority,
+          uint32_t anonymity,
+          struct GNUNET_TIME_Absolute expiration, 
+          uint64_t uid)
+{
+  struct ReplCtx *rc = cls;
+  struct Plugin *plugin = rc->plugin;
+  unsigned long long oid;
+  int ret;
+  int iret;
+
+  ret = rc->iter (rc->iter_cls,
+                 next_cls, key,
+                 size, data, 
+                 type, priority, anonymity, expiration,
+                 uid);
+  if (NULL != key)
+    {
+      oid = (unsigned long long) uid;
+      iret = prepared_statement_run (plugin,
+                                   plugin->dec_repl,
+                                   NULL,
+                                   MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
+                                   -1);
+      if (iret == GNUNET_SYSERR)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     "Failed to reduce replication counter\n");
+         return GNUNET_SYSERR;
+       }
+    }
+  return ret;
+}
+
+
+/**
+ * Get a random item for replication.  Returns a single, not expired, random item
+ * from those with the highest replication counters.  The item's 
+ * replication counter is decremented by one IF it was positive before.
+ * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ *
+ * @param cls closure
+ * @param iter function to call the value (once only).
  * @param iter_cls closure for iter
  */
 static void
-mysql_plugin_iter_migration_order (void *cls,
-                                     enum GNUNET_BLOCK_Type type,
-                                     PluginIterator iter,
-                                     void *iter_cls)
+mysql_plugin_replication_get (void *cls,
+                             PluginIterator iter, void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, 0, GNUNET_NO, 3, iter, iter_cls);
+  struct NextRequestClosure *nrc;
+  struct ReplCtx rc;
+  
+  rc.plugin = plugin;
+  rc.iter = iter;
+  rc.iter_cls = iter_cls;
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->now = GNUNET_TIME_absolute_get ();
+  nrc->prep = &replication_prepare;
+  nrc->prep_cls = plugin;
+  nrc->type = 0;
+  nrc->dviter = &repl_iter;
+  nrc->dviter_cls = &rc;
+  nrc->end_it = GNUNET_NO;
+  nrc->one_shot = GNUNET_YES;
+  mysql_next_request_cont (nrc, NULL);
 }
 
 
 /**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Run the SELECT statement for the expiration function.
+ * 
+ * @param cls the 'struct Plugin'
+ * @param nrc the context (not used)
+ * @return GNUNET_OK on success, GNUNET_NO if there are
+ *         no more values, GNUNET_SYSERR on error
+ */
+static int
+expiration_prepare (void *cls,
+                   struct NextRequestClosure *nrc)
+{
+  struct Plugin *plugin = cls;
+  long long nt;
+
+  if (NULL == nrc)
+    return GNUNET_NO;
+  nt = (long long) nrc->now.abs_value;
+  return prepared_statement_run_select
+    (plugin,
+     plugin->select_expiration, 
+     7, nrc->rbind, 
+     &return_ok, NULL,
+     MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
+     -1);
+}
+
+
+/**
+ * Get a random item for expiration.
+ * Call 'iter' with all values ZERO or NULL if the datastore is empty.
  *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
+ * @param cls closure
+ * @param iter function to call the value (once only).
  * @param iter_cls closure for iter
  */
 static void
-mysql_plugin_iter_all_now (void *cls,
-                          enum GNUNET_BLOCK_Type type,
-                          PluginIterator iter,
-                          void *iter_cls)
+mysql_plugin_expiration_get (void *cls,
+                            PluginIterator iter, void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  iterateHelper (plugin, 0, GNUNET_YES, 0, iter, iter_cls);
+  struct NextRequestClosure *nrc;
+
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->now = GNUNET_TIME_absolute_get ();
+  nrc->prep = &expiration_prepare;
+  nrc->prep_cls = plugin;
+  nrc->type = 0;
+  nrc->dviter = iter;
+  nrc->dviter_cls = iter_cls;
+  nrc->end_it = GNUNET_NO;
+  nrc->one_shot = GNUNET_YES;
+  mysql_next_request_cont (nrc, NULL);
 }
 
 
 /**
  * Drop database.
+ *
+ * @param cls the "struct Plugin*"
  */
 static void 
 mysql_plugin_drop (void *cls)
 {
   struct Plugin *plugin = cls;
 
-  if ((GNUNET_OK != run_statement (plugin,
-                                  "DROP TABLE gn080")) ||
-      (GNUNET_OK != run_statement (plugin,
-                                  "DROP TABLE gn072")))
-    return;                     /* error */
-  plugin->content_size = 0;
+  if (GNUNET_OK != run_statement (plugin,
+                                 "DROP TABLE gn090"))
+    return;           /* error */
+  plugin->env->duc (plugin->env->cls, 0);
 }
 
 
@@ -1913,47 +1710,47 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
     }
 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
-  if (MRUNS ("CREATE TABLE IF NOT EXISTS gn080 ("
-             " size INT(11) UNSIGNED NOT NULL DEFAULT 0,"
+  if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
+             " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
              " hash BINARY(64) NOT NULL DEFAULT '',"
              " vhash BINARY(64) NOT NULL DEFAULT '',"
-             " vkey BIGINT UNSIGNED NOT NULL DEFAULT 0,"
-             " INDEX hash (hash(64)),"
-             " INDEX hash_vhash_vkey (hash(64),vhash(64),vkey),"
-             " INDEX hash_vkey (hash(64),vkey),"
-             " INDEX vkey (vkey),"
-             " INDEX prio (prio,vkey),"
-             " INDEX expire (expire,vkey,type),"
-             " INDEX anonLevel (anonLevel,prio,vkey,type)"
+             " value BLOB NOT NULL DEFAULT '',"
+             " uid BIGINT NOT NULL AUTO_INCREMENT,"
+             " PRIMARY KEY (uid)"
+#if 0
+             " INDEX idx_hash (hash(64)),"
+             " INDEX idx_hash_uid (hash(64),uid),"
+             " INDEX idx_hash_vhash (hash(64),vhash(64)),"
+             " INDEX idx_hash_type_uid (hash(64),type,uid),"
+             " INDEX idx_prio (prio),"
+             " INDEX idx_repl (repl),"
+             " INDEX idx_expire_prio (expire,prio),"
+             " INDEX idx_anonLevel_uid (anonLevel,uid)"
+#endif
              ") ENGINE=InnoDB") ||
-      MRUNS ("CREATE TABLE IF NOT EXISTS gn072 ("
-             " vkey BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,"
-             " value BLOB NOT NULL DEFAULT '') ENGINE=MyISAM") ||
       MRUNS ("SET AUTOCOMMIT = 1") ||
-      PINIT (plugin->select_value, SELECT_VALUE) ||
-      PINIT (plugin->delete_value, DELETE_VALUE) ||
-      PINIT (plugin->insert_value, INSERT_VALUE) ||
       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
-      PINIT (plugin->delete_entry_by_vkey, DELETE_ENTRY_BY_VKEY) ||
+      PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
+      || PINIT (plugin->get_size, SELECT_SIZE)
       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
       || PINIT (plugin->update_entry, UPDATE_ENTRY)
-      || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY)
-      || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS)
-      || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME)
-      || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER))
+      || PINIT (plugin->dec_repl, DEC_REPL)
+      || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
+      || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
+      || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
     {
       iclose (plugin);
       GNUNET_free_non_null (plugin->cnffile);
@@ -1969,12 +1766,10 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
   api->put = &mysql_plugin_put;
   api->next_request = &mysql_plugin_next_request;
   api->get = &mysql_plugin_get;
+  api->replication_get = &mysql_plugin_replication_get;
+  api->expiration_get = &mysql_plugin_expiration_get;
   api->update = &mysql_plugin_update;
-  api->iter_low_priority = &mysql_plugin_iter_low_priority;
   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
-  api->iter_ascending_expiration = &mysql_plugin_iter_ascending_expiration;
-  api->iter_migration_order = &mysql_plugin_iter_migration_order;
-  api->iter_all_now = &mysql_plugin_iter_all_now;
   api->drop = &mysql_plugin_drop;
   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
                    "mysql", _("Mysql database running\n"));
@@ -1996,8 +1791,7 @@ libgnunet_plugin_datastore_mysql_done (void *cls)
   iclose (plugin);
   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (plugin->env->sched,
-                              plugin->next_task);
+      GNUNET_SCHEDULER_cancel (plugin->next_task);
       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
       GNUNET_free (plugin->next_task_nc);