first batch of license fixes (boring)
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
index 09dbdc5b6206a2f77052fbc18c73a2ae82611d82..93586e2528a80e6042bc53fa09e5d6bc722d33db 100644 (file)
@@ -2,20 +2,15 @@
      This file is part of GNUnet
      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
+     Affero General Public License for more details.
 */
 
 /**
@@ -150,55 +145,71 @@ struct Plugin
 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
 
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
-  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
-
-#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define DELETE_ENTRY_BY_HASH_VALUE "DELETE FROM gn090 "\
+  "WHERE hash = ? AND "\
+  "value = ? "\
+  "LIMIT 1"
+  struct GNUNET_MYSQL_StatementHandle *delete_entry_by_hash_value;
+
+#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, uid"
+
+#define SELECT_ENTRY "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "WHERE uid >= ? AND "\
+  "(rvalue >= ? OR 0 = ?) "\
+  "ORDER BY uid LIMIT 1"
+  struct GNUNET_MYSQL_StatementHandle *select_entry;
+
+#define SELECT_ENTRY_BY_HASH "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "FORCE INDEX (idx_hash_type_uid) "\
+  "WHERE hash=? AND "\
+  "uid >= ? AND "\
+  "(rvalue >= ? OR 0 = ?) "\
+  "ORDER BY uid LIMIT 1"
   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
 
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
-  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
-
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
-  struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
-
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
-  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
-
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "FORCE INDEX (idx_hash_type_uid) "\
+  "WHERE hash = ? AND "\
+  "type = ? AND "\
+  "uid >= ? AND "\
+  "(rvalue >= ? OR 0 = ?) "\
+  "ORDER BY uid LIMIT 1"
   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
 
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
-  struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
-
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
-  struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
-
-#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
+#define UPDATE_ENTRY "UPDATE gn090 SET "\
+  "prio = prio + ?, "\
+  "repl = repl + ?, "\
+  "expire = GREATEST(expire, ?) "\
+  "WHERE hash = ? AND vhash = ?"
   struct GNUNET_MYSQL_StatementHandle *update_entry;
 
 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
   struct GNUNET_MYSQL_StatementHandle *dec_repl;
 
-#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
+#define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
   struct GNUNET_MYSQL_StatementHandle *get_size;
 
-#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
-   "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
-   "WHERE anonLevel=0 AND type=? AND "\
-   "(rvalue >= ? OR"\
-   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
-   "ORDER BY rvalue ASC LIMIT 1"
+#define SELECT_IT_NON_ANONYMOUS "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "FORCE INDEX (idx_anonLevel_type_rvalue) "\
+  "WHERE anonLevel=0 AND "\
+  "type=? AND "\
+  "uid >= ? "\
+  "ORDER BY uid LIMIT 1"
   struct GNUNET_MYSQL_StatementHandle *zero_iter;
 
-#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
+#define SELECT_IT_EXPIRATION "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "FORCE INDEX (idx_expire) "\
+  "WHERE expire < ? "\
+  "ORDER BY expire ASC LIMIT 1"
   struct GNUNET_MYSQL_StatementHandle *select_expiration;
 
-#define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
+#define SELECT_IT_PRIORITY "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "FORCE INDEX (idx_prio) "\
+  "ORDER BY prio ASC LIMIT 1"
   struct GNUNET_MYSQL_StatementHandle *select_priority;
 
-#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
-  "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
+#define SELECT_IT_REPLICATION "SELECT " RESULT_COLUMNS " FROM gn090 "\
+  "FORCE INDEX (idx_repl_rvalue) "\
   "WHERE repl=? AND "\
   " (rvalue>=? OR"\
   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
@@ -221,23 +232,22 @@ struct Plugin
  *
  * @param plugin plugin context
  * @param uid unique ID of the entry to delete
- * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
  */
 static int
-do_delete_entry (struct Plugin *plugin, unsigned long long uid)
+do_delete_entry (struct Plugin *plugin,
+                 unsigned long long uid)
 {
   int ret;
   uint64_t uid64 = (uint64_t) uid;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Deleting value %llu from gn090 table\n",
-              uid);
-
   struct GNUNET_MY_QueryParam params_delete[] = {
     GNUNET_MY_query_param_uint64 (&uid64),
     GNUNET_MY_query_param_end
   };
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Deleting value %llu from gn090 table\n",
+              uid);
   ret = GNUNET_MY_exec_prepared (plugin->mc,
                                  plugin->delete_entry_by_uid,
                                  params_delete);
@@ -247,7 +257,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid)
   }
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Deleting value %llu from gn090 table failed\n",
-              uid);
+              (unsigned long long) uid);
   return ret;
 }
 
@@ -256,7 +266,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid)
  * Get an estimate of how much space the database is
  * currently using.
  *
- * @param cls our "struct Plugin *"
+ * @param cls our `struct Plugin *`
  * @return number of bytes used on disk
  */
 static void
@@ -265,26 +275,34 @@ mysql_plugin_estimate_size (void *cls,
 {
   struct Plugin *plugin = cls;
   uint64_t total;
+  int ret;
   struct GNUNET_MY_QueryParam params_get[] = {
     GNUNET_MY_query_param_end
   };
-
   struct GNUNET_MY_ResultSpec results_get[] = {
     GNUNET_MY_result_spec_uint64 (&total),
     GNUNET_MY_result_spec_end
   };
-  int ret;
 
-  ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->get_size, params_get);
-  if (GNUNET_OK == ret)
-    {
-      if (GNUNET_OK == GNUNET_MY_extract_result (plugin->get_size, results_get))
-      {
-        *estimate = (unsigned long long)total;
-      }
-    }
-  else
-    *estimate = 0;
+  ret = GNUNET_MY_exec_prepared (plugin->mc,
+                                 plugin->get_size,
+                                 params_get);
+  *estimate = 0;
+  total = UINT64_MAX;
+  if ( (GNUNET_OK == ret) &&
+       (GNUNET_OK ==
+        GNUNET_MY_extract_result (plugin->get_size,
+                                  results_get)) )
+  {
+    *estimate = (unsigned long long) total;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Size estimate for MySQL payload is %lld\n",
+                (long long) total);
+    GNUNET_assert (UINT64_MAX != total);
+    GNUNET_break (GNUNET_NO ==
+                  GNUNET_MY_extract_result (plugin->get_size,
+                                            NULL));
+  }
 }
 
 
@@ -293,7 +311,8 @@ mysql_plugin_estimate_size (void *cls,
  *
  * @param cls closure
  * @param key key for the item
- * @param size number of bytes in data
+ * @param absent true if the key was not found in the bloom filter
+ * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
  * @param priority priority of the content
@@ -301,11 +320,12 @@ mysql_plugin_estimate_size (void *cls,
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
  * @param cont continuation called with success or failure status
- * @param cont_cls continuation closure
+ * @param cont_cls closure for @a cont
  */
 static void
 mysql_plugin_put (void *cls,
                   const struct GNUNET_HashCode *key,
+                  bool absent,
                   uint32_t size,
                   const void *data,
                   enum GNUNET_BLOCK_Type type,
@@ -317,24 +337,65 @@ mysql_plugin_put (void *cls,
                   void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  unsigned int irepl = replication;
-  unsigned int ipriority = priority;
-  unsigned int ianonymity = anonymity;
   uint64_t lexpiration = expiration.abs_value_us;
+  struct GNUNET_HashCode vhash;
+
+  GNUNET_CRYPTO_hash (data,
+                      size,
+                      &vhash);
+  if (!absent)
+  {
+    struct GNUNET_MY_QueryParam params_update[] = {
+      GNUNET_MY_query_param_uint32 (&priority),
+      GNUNET_MY_query_param_uint32 (&replication),
+      GNUNET_MY_query_param_uint64 (&lexpiration),
+      GNUNET_MY_query_param_auto_from_type (key),
+      GNUNET_MY_query_param_auto_from_type (&vhash),
+      GNUNET_MY_query_param_end
+    };
+
+    if (GNUNET_OK !=
+        GNUNET_MY_exec_prepared (plugin->mc,
+                                 plugin->update_entry,
+                                 params_update))
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_SYSERR,
+            _("MySQL statement run failure"));
+      return;
+    }
+
+    MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->update_entry);
+    my_ulonglong rows = mysql_stmt_affected_rows (stmt);
+
+    GNUNET_break (GNUNET_NO ==
+                  GNUNET_MY_extract_result (plugin->update_entry,
+                                            NULL));
+    if (0 != rows)
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_NO,
+            NULL);
+      return;
+    }
+  }
+
   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                UINT64_MAX);
-  unsigned long lsize;
-  struct GNUNET_HashCode vhash;
   struct GNUNET_MY_QueryParam params_insert[] = {
-    GNUNET_MY_query_param_uint32 (&irepl),
+    GNUNET_MY_query_param_uint32 (&replication),
     GNUNET_MY_query_param_uint32 (&type),
-    GNUNET_MY_query_param_uint32 (&ipriority),
-    GNUNET_MY_query_param_uint32 (&ianonymity),
+    GNUNET_MY_query_param_uint32 (&priority),
+    GNUNET_MY_query_param_uint32 (&anonymity),
     GNUNET_MY_query_param_uint64 (&lexpiration),
     GNUNET_MY_query_param_uint64 (&lrvalue),
     GNUNET_MY_query_param_auto_from_type (key),
     GNUNET_MY_query_param_auto_from_type (&vhash),
-    GNUNET_MY_query_param_fixed_size (data, lsize),
+    GNUNET_MY_query_param_fixed_size (data, size),
     GNUNET_MY_query_param_end
   };
 
@@ -344,92 +405,34 @@ mysql_plugin_put (void *cls,
     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
     return;
   }
-  lsize = size;
-  GNUNET_CRYPTO_hash (data,
-                      size,
-                      &vhash);
 
   if (GNUNET_OK !=
       GNUNET_MY_exec_prepared (plugin->mc,
                                plugin->insert_entry,
                                params_insert))
   {
-    cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure"));
+    cont (cont_cls,
+          key,
+          size,
+          GNUNET_SYSERR,
+          _("MySQL statement run failure"));
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Inserted value `%s' with size %u into gn090 table\n",
-              GNUNET_h2s (key), (unsigned int) size);
+              GNUNET_h2s (key),
+              (unsigned int) size);
   if (size > 0)
-    plugin->env->duc (plugin->env->cls, size);
-  cont (cont_cls, key, size, GNUNET_OK, NULL);
-}
-
-
-/**
- * Update the priority for a particular key in the datastore.  If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept.  For the
- * anonymity level, the lower value is to be used.  The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- *     change?  If priority + delta < 0 the
- *     priority should be set to 0 (never go
- *     negative).
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-mysql_plugin_update (void *cls, uint64_t uid, int delta,
-                     struct GNUNET_TIME_Absolute expire,
-                     PluginUpdateCont cont, void *cont_cls)
-{
-  struct Plugin *plugin = cls;
-  uint32_t delta1 = (uint32_t)delta;
-  unsigned long long vkey = uid;
-//  unsigned long long lexpire = expire.abs_value_us;
-  uint64_t lexpire = expire.abs_value_us;
-  int ret;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Updating value %llu adding %d to priority and maxing exp at %s\n",
-              vkey, delta,
-             GNUNET_STRINGS_absolute_time_to_string (expire));
-/*  ret =
-    GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
-                                        MYSQL_TYPE_LONG, &delta, GNUNET_NO,
-                              MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
-                              MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
-                              MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
-*/
-  struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint32 (&delta1),
-    GNUNET_MY_query_param_uint64 (&lexpire),
-    GNUNET_MY_query_param_uint64 (&lexpire),
-    GNUNET_MY_query_param_uint64 (&uid),
-    GNUNET_MY_query_param_end
-  };
-
-  ret = GNUNET_MY_exec_prepared (plugin->mc,
-                                 plugin->update_entry,
-                                 params_update);
-
-  if (ret != GNUNET_OK)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
-                vkey);
-  }
-  cont (cont_cls, ret, NULL);
+    plugin->env->duc (plugin->env->cls,
+                      size);
+  GNUNET_break (GNUNET_NO ==
+                GNUNET_MY_extract_result (plugin->insert_entry,
+                                          NULL));
+  cont (cont_cls,
+        key,
+        size,
+        GNUNET_OK,
+        NULL);
 }
 
 
@@ -440,7 +443,7 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta,
  * @param plugin the plugin handle
  * @param stmt select statement to run
  * @param proc function to call on result
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  * @param params_select arguments to initialize stmt
  */
 static void
@@ -451,6 +454,7 @@ execute_select (struct Plugin *plugin,
                 struct GNUNET_MY_QueryParam *params_select)
 {
   int ret;
+  uint32_t replication;
   uint32_t type;
   uint32_t priority;
   uint32_t anonymity;
@@ -460,6 +464,7 @@ execute_select (struct Plugin *plugin,
   struct GNUNET_HashCode key;
   struct GNUNET_TIME_Absolute expiration;
   struct GNUNET_MY_ResultSpec results_select[] = {
+    GNUNET_MY_result_spec_uint32 (&replication),
     GNUNET_MY_result_spec_uint32 (&type),
     GNUNET_MY_result_spec_uint32 (&priority),
     GNUNET_MY_result_spec_uint32 (&anonymity),
@@ -476,16 +481,16 @@ execute_select (struct Plugin *plugin,
   if (GNUNET_OK != ret)
   {
     proc (proc_cls,
-          NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+          NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
   }
 
   ret = GNUNET_MY_extract_result (stmt,
                                   results_select);
-  if (ret <= 0)
+  if (GNUNET_OK != ret)
   {
     proc (proc_cls,
-          NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+          NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
   }
 
@@ -497,6 +502,9 @@ execute_select (struct Plugin *plugin,
               (unsigned int) anonymity,
              GNUNET_STRINGS_absolute_time_to_string (expiration));
   GNUNET_assert (value_size < MAX_DATUM_SIZE);
+  GNUNET_break (GNUNET_NO ==
+                GNUNET_MY_extract_result (stmt,
+                                          NULL));
   ret = proc (proc_cls,
               &key,
               value_size,
@@ -504,9 +512,11 @@ execute_select (struct Plugin *plugin,
               type,
               priority,
               anonymity,
+              replication,
               expiration,
               uid);
-  if (ret == GNUNET_NO)
+  GNUNET_MY_cleanup_result (results_select);
+  if (GNUNET_NO == ret)
   {
     do_delete_entry (plugin, uid);
     if (0 != value_size)
@@ -520,14 +530,9 @@ execute_select (struct Plugin *plugin,
  * Get one of the results for a particular key in the datastore.
  *
  * @param cls closure
- * @param offset offset of the result (modulo num-results);
- *               specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
  * @param key key to match, never NULL
- * @param vhash hash of the value, maybe NULL (to
- *        match all values that have the right key).
- *        Note that for DBlocks there is no difference
- *        betwen key and vhash, but for other blocks
- *        there may be!
  * @param type entries of which type are relevant?
  *     Use 0 for any type.
  * @param proc function to call on the matching value,
@@ -536,167 +541,72 @@ execute_select (struct Plugin *plugin,
  */
 static void
 mysql_plugin_get_key (void *cls,
-                      uint64_t offset,
+                      uint64_t next_uid,
+                      bool random,
                       const struct GNUNET_HashCode *key,
-                      const struct GNUNET_HashCode *vhash,
                       enum GNUNET_BLOCK_Type type,
                       PluginDatumProcessor proc,
                       void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  int ret;
-  uint64_t total;
-  unsigned long hashSize;
-  unsigned long hashSize2;
-
-  total = -1;
-  struct GNUNET_MY_ResultSpec results_get[] = {
-    GNUNET_MY_result_spec_uint64 (&total),
-    GNUNET_MY_result_spec_end
-  };
+  uint64_t rvalue;
 
-  if (type != 0)
+  if (random)
   {
-    if (vhash != NULL)
-    {
-      struct GNUNET_MY_QueryParam params_get[] = {
-        GNUNET_MY_query_param_fixed_size (key, hashSize),
-        GNUNET_MY_query_param_fixed_size (vhash, hashSize2),
-        GNUNET_MY_query_param_uint32 (&type),
-        GNUNET_MY_query_param_end
-      };
-
-      ret =
-        GNUNET_MY_exec_prepared (plugin->mc,
-                                 plugin->count_entry_by_hash_vhash_and_type,
-                                 params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
-                                  results_get);
-    }
-    else
-    {
-      struct GNUNET_MY_QueryParam params_get[] = {
-        GNUNET_MY_query_param_fixed_size (key, hashSize),
-        GNUNET_MY_query_param_uint32 (&type),
-        GNUNET_MY_query_param_end
-      };
-
-      ret =
-        GNUNET_MY_exec_prepared (plugin->mc,
-                                 plugin->count_entry_by_hash_and_type,
-                                 params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
-                                  results_get);
-    }
+    rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       UINT64_MAX);
+    next_uid = 0;
   }
   else
-  {
-    if (vhash != NULL)
-    {
-      struct GNUNET_MY_QueryParam params_get[] = {
-        GNUNET_MY_query_param_fixed_size (key, hashSize),
-        GNUNET_MY_query_param_fixed_size (vhash, hashSize2),
-        GNUNET_MY_query_param_end
-      };
+    rvalue = 0;
 
-      ret =
-        GNUNET_MY_exec_prepared (plugin->mc,
-                                 plugin->count_entry_by_hash_and_vhash,
-                                 params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
-                                  results_get);
-    }
-    else
-    {
-      struct GNUNET_MY_QueryParam params_get[] = {
-        GNUNET_MY_query_param_fixed_size (key, hashSize),
-        GNUNET_MY_query_param_end
-      };
-
-      ret =
-        GNUNET_MY_exec_prepared (plugin->mc,
-                                 plugin->count_entry_by_hash,
-                                 params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash,
-                                  results_get);
-    }
-  }
-  if ((ret != GNUNET_OK) || (0 >= total))
+  if (NULL == key)
   {
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
+    struct GNUNET_MY_QueryParam params_select[] = {
+      GNUNET_MY_query_param_uint64 (&next_uid),
+      GNUNET_MY_query_param_uint64 (&rvalue),
+      GNUNET_MY_query_param_uint64 (&rvalue),
+      GNUNET_MY_query_param_end
+    };
+
+    execute_select (plugin,
+                    plugin->select_entry,
+                    proc,
+                    proc_cls,
+                    params_select);
   }
-  offset = offset % total;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Obtaining %llu/%lld result for GET `%s'\n",
-              (unsigned long long) offset,
-              (unsigned long long) total,
-              GNUNET_h2s (key));
-  if (type != GNUNET_BLOCK_TYPE_ANY)
+  else if (type != GNUNET_BLOCK_TYPE_ANY)
   {
-    if (NULL != vhash)
-    {
-      struct GNUNET_MY_QueryParam params_select[] = {
-        GNUNET_MY_query_param_auto_from_type (key),
-        GNUNET_MY_query_param_auto_from_type (vhash),
-        GNUNET_MY_query_param_uint32 (&type),
-        GNUNET_MY_query_param_uint64 (&offset),
-        GNUNET_MY_query_param_end
-      };
-
-      execute_select (plugin,
-                      plugin->select_entry_by_hash_vhash_and_type,
-                      proc, proc_cls,
-                      params_select);
-    }
-    else
-    {
-      struct GNUNET_MY_QueryParam params_select[] = {
-        GNUNET_MY_query_param_auto_from_type (key),
-        GNUNET_MY_query_param_uint32 (&type),
-        GNUNET_MY_query_param_uint64 (&offset),
-        GNUNET_MY_query_param_end
-      };
-
-      execute_select (plugin,
-                      plugin->select_entry_by_hash_and_type,
-                      proc,  proc_cls,
-                      params_select);
-    }
+    struct GNUNET_MY_QueryParam params_select[] = {
+      GNUNET_MY_query_param_auto_from_type (key),
+      GNUNET_MY_query_param_uint32 (&type),
+      GNUNET_MY_query_param_uint64 (&next_uid),
+      GNUNET_MY_query_param_uint64 (&rvalue),
+      GNUNET_MY_query_param_uint64 (&rvalue),
+      GNUNET_MY_query_param_end
+    };
+
+    execute_select (plugin,
+                    plugin->select_entry_by_hash_and_type,
+                    proc,
+                    proc_cls,
+                    params_select);
   }
   else
   {
-    if (NULL != vhash)
-    {
-      struct GNUNET_MY_QueryParam params_select[] = {
-        GNUNET_MY_query_param_auto_from_type (key),
-        GNUNET_MY_query_param_auto_from_type (vhash),
-        GNUNET_MY_query_param_uint64 (&offset),
-        GNUNET_MY_query_param_end
-      };
-
-      execute_select (plugin,
-                      plugin->select_entry_by_hash_and_vhash,
-                      proc, proc_cls,
-                      params_select);
-    }
-    else
-    {
-      struct GNUNET_MY_QueryParam params_select[] = {
-        GNUNET_MY_query_param_auto_from_type (key),
-        GNUNET_MY_query_param_uint64 (&offset),
-        GNUNET_MY_query_param_end
-      };
-
-      execute_select (plugin,
-                      plugin->select_entry_by_hash,
-                      proc, proc_cls,
-                      params_select);
-    }
+    struct GNUNET_MY_QueryParam params_select[] = {
+      GNUNET_MY_query_param_auto_from_type (key),
+      GNUNET_MY_query_param_uint64 (&next_uid),
+      GNUNET_MY_query_param_uint64 (&rvalue),
+      GNUNET_MY_query_param_uint64 (&rvalue),
+      GNUNET_MY_query_param_end
+    };
+
+    execute_select (plugin,
+                    plugin->select_entry_by_hash,
+                    proc,
+                    proc_cls,
+                    params_select);
   }
 }
 
@@ -704,32 +614,34 @@ mysql_plugin_get_key (void *cls,
 /**
  * Get a zero-anonymity datum from the datastore.
  *
- * @param cls our "struct Plugin*"
- * @param offset offset of the result
+ * @param cls our `struct Plugin *`
+ * @param next_uid return the result with lowest uid >= next_uid
  * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param proc function to call on a matching value or NULL
- * @param proc_cls closure for iter
+ *        Must not be zero (ANY).
+ * @param proc function to call on a matching value;
+ *        will be called with NULL if no value matches
+ * @param proc_cls closure for @a proc
  */
 static void
-mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
+mysql_plugin_get_zero_anonymity (void *cls,
+                                 uint64_t next_uid,
                                  enum GNUNET_BLOCK_Type type,
-                                 PluginDatumProcessor proc, void *proc_cls)
+                                 PluginDatumProcessor proc,
+                                 void *proc_cls)
 {
   struct Plugin *plugin = cls;
   uint32_t typei = (uint32_t) type;
-  uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                              UINT64_MAX);
+
   struct GNUNET_MY_QueryParam params_zero_iter[] = {
     GNUNET_MY_query_param_uint32 (&typei),
-    GNUNET_MY_query_param_uint64 (&rvalue),
-    GNUNET_MY_query_param_uint32 (&typei),
-    GNUNET_MY_query_param_uint64 (&rvalue),
+    GNUNET_MY_query_param_uint64 (&next_uid),
     GNUNET_MY_query_param_end
   };
 
-  execute_select (plugin, plugin->zero_iter,
-                  proc, proc_cls,
+  execute_select (plugin,
+                  plugin->zero_iter,
+                  proc,
+                  proc_cls,
                   params_zero_iter);
 }
 
@@ -758,29 +670,35 @@ struct ReplCtx
 
 
 /**
- * Wrapper for the processor for 'mysql_plugin_get_replication'.
+ * Wrapper for the processor for #mysql_plugin_get_replication().
  * Decrements the replication counter and calls the original
  * iterator.
  *
  * @param cls closure
  * @param key key for the content
- * @param size number of bytes in data
+ * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
  * @param expiration expiration time for the content
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
  *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ *         #GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
-           const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
-           uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
+repl_proc (void *cls,
+           const struct GNUNET_HashCode *key,
+           uint32_t size,
+           const void *data,
+           enum GNUNET_BLOCK_Type type,
+           uint32_t priority,
+           uint32_t anonymity,
+           uint32_t replication,
+           struct GNUNET_TIME_Absolute expiration,
            uint64_t uid)
 {
   struct ReplCtx *rc = cls;
@@ -788,21 +706,27 @@ repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
   int ret;
   int iret;
 
-  ret =
-      rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
-                expiration, uid);
+  ret = rc->proc (rc->proc_cls,
+                  key,
+                  size,
+                  data,
+                  type,
+                  priority,
+                  anonymity,
+                  replication,
+                  expiration,
+                  uid);
   if (NULL != key)
   {
-      struct GNUNET_MY_QueryParam params_proc[] = {
-        GNUNET_MY_query_param_uint64 (&uid),
-        GNUNET_MY_query_param_end
-      };
-
-      iret =
-      GNUNET_MY_exec_prepared (plugin->mc,
-                               plugin->dec_repl,
-                               params_proc);
-    if (iret == GNUNET_SYSERR)
+    struct GNUNET_MY_QueryParam params_proc[] = {
+      GNUNET_MY_query_param_uint64 (&uid),
+      GNUNET_MY_query_param_end
+    };
+
+    iret = GNUNET_MY_exec_prepared (plugin->mc,
+                                    plugin->dec_repl,
+                                    params_proc);
+    if (GNUNET_SYSERR == iret)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Failed to reduce replication counter\n");
@@ -817,70 +741,68 @@ repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
  * Get a random item for replication.  Returns a single, not expired,
  * random item from those with the highest replication counters.  The
  * item's replication counter is decremented by one IF it was positive
- * before.  Call 'proc' with all values ZERO or NULL if the datastore
+ * before.  Call @a proc with all values ZERO or NULL if the datastore
  * is empty.
  *
  * @param cls closure
  * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
+mysql_plugin_get_replication (void *cls,
+                              PluginDatumProcessor proc,
                               void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct ReplCtx rc;
-  unsigned long long rvalue;
-  //unsigned long repl;
+  uint64_t rvalue;
   uint32_t repl;
-  MYSQL_BIND results;
-
-  rc.plugin = plugin;
-  rc.proc = proc;
-  rc.proc_cls = proc_cls;
-  memset (&results, 0, sizeof (results));
-  results.buffer_type = MYSQL_TYPE_LONG;
-  results.buffer = &repl;
-  results.is_unsigned = GNUNET_YES;
-
+  struct ReplCtx rc;
   struct GNUNET_MY_QueryParam params_get[] = {
     GNUNET_MY_query_param_end
   };
-
   struct GNUNET_MY_ResultSpec results_get[] = {
     GNUNET_MY_result_spec_uint32 (&repl),
     GNUNET_MY_result_spec_end
   };
-/*
-  if (1 !=
-      GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
-  {
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-*/
+  struct GNUNET_MY_QueryParam params_select[] = {
+    GNUNET_MY_query_param_uint32 (&repl),
+    GNUNET_MY_query_param_uint64 (&rvalue),
+    GNUNET_MY_query_param_uint32 (&repl),
+    GNUNET_MY_query_param_uint64 (&rvalue),
+    GNUNET_MY_query_param_end
+  };
+
+  rc.plugin = plugin;
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+
   if (1 !=
-      GNUNET_MY_exec_prepared (plugin->mc, plugin->max_repl, params_get))
+      GNUNET_MY_exec_prepared (plugin->mc,
+                               plugin->max_repl,
+                               params_get))
   {
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
   }
 
-  if (1 !=
-      GNUNET_MY_extract_result (plugin->max_repl, results_get))
+  if (GNUNET_OK !=
+      GNUNET_MY_extract_result (plugin->max_repl,
+                                results_get))
   {
-      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      return;
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    return;
   }
-
-  rvalue =
-      (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                                     UINT64_MAX);
-  execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
-                  MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
-                  &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
-                  MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
-
+  GNUNET_break (GNUNET_NO ==
+                GNUNET_MY_extract_result (plugin->max_repl,
+                                          NULL));
+  rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                     UINT64_MAX);
+
+  execute_select (plugin,
+                  plugin->select_replication,
+                  &repl_proc,
+                  &rc,
+                  params_select);
 }
 
 
@@ -889,122 +811,92 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
  *
  * @param cls closure
  * @param proc function to call on each key
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
 mysql_plugin_get_keys (void *cls,
-                       PluginKeyProcessor proc,
-                       void *proc_cls)
+                       PluginKeyProcessor proc,
+                       void *proc_cls)
 {
   struct Plugin *plugin = cls;
-//  const char *query = "SELECT hash FROM gn090";
-  char *query = "SELECT hash FROM gn090";
   int ret;
   MYSQL_STMT *statement;
-  struct GNUNET_MYSQL_StatementHandle *statements_handle_select = NULL;
-
-
+  unsigned int cnt;
   struct GNUNET_HashCode key;
-//  MYSQL_BIND cbind[1];
-//  unsigned long length;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
-
-
-  statements_handle_select = GNUNET_MYSQL_statement_prepare (plugin->mc,
-                                                             query);
-/*
-  if (statement == NULL)
-  {
-    GNUNET_MYSQL_statements_invalidate (plugin->mc);
-    proc (proc_cls, NULL, 0);
-    return;
-  }
-
-  if (mysql_stmt_prepare (statement, query, strlen (query)))
-  {
-    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
-                     _("Failed to prepare statement `%s'\n"), query);
-    GNUNET_MYSQL_statements_invalidate (plugin->mc);
-    proc (proc_cls, NULL, 0);
-    return;
-  }
-*/
-  GNUNET_assert (proc != NULL);
-
+  struct GNUNET_HashCode last;
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_end
   };
-
   struct GNUNET_MY_ResultSpec results_select[] = {
     GNUNET_MY_result_spec_auto_from_type (&key),
     GNUNET_MY_result_spec_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                              statements_handle_select,
-                              params_select))
+  GNUNET_assert (NULL != proc);
+  statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
+  if (GNUNET_OK !=
+      GNUNET_MY_exec_prepared (plugin->mc,
+                               plugin->get_all_keys,
+                               params_select))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
-                "mysql_stmt_execute", query, __FILE__, __LINE__,
+                "mysql_stmt_execute",
+                GET_ALL_KEYS,
+                __FILE__,
+                __LINE__,
                 mysql_stmt_error (statement));
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
     proc (proc_cls, NULL, 0);
     return;
   }
-
-  ret = GNUNET_MY_extract_result (statements_handle_select,
-                                  results_select);
-/*  if (mysql_stmt_execute (statement))
+  memset (&last, 0, sizeof (last)); /* make static analysis happy */
+  ret = GNUNET_YES;
+  cnt = 0;
+  while (ret == GNUNET_YES)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("`%s' for `%s' failed at %s:%d with error: %s\n"),
-                "mysql_stmt_execute", query, __FILE__, __LINE__,
-                mysql_stmt_error (statement));
-    GNUNET_MYSQL_statements_invalidate (plugin->mc);
-    proc (proc_cls, NULL, 0);
-    return;
+    ret = GNUNET_MY_extract_result (plugin->get_all_keys,
+                                    results_select);
+    if (0 != memcmp (&last,
+                     &key,
+                     sizeof (key)))
+    {
+      if (0 != cnt)
+        proc (proc_cls,
+              &last,
+              cnt);
+      cnt = 1;
+      last = key;
+    }
+    else
+    {
+      cnt++;
+    }
   }
-  memset (cbind, 0, sizeof (cbind));
-  cbind[0].buffer_type = MYSQL_TYPE_BLOB;
-  cbind[0].buffer = &key;
-  cbind[0].buffer_length = sizeof (key);
-  cbind[0].length = &length;
-  cbind[0].is_unsigned = GNUNET_NO;
-  if (mysql_stmt_bind_result (statement, cbind))
+  if (0 != cnt)
+    proc (proc_cls,
+          &last,
+          cnt);
+  /* finally, let app know we are done */
+  proc (proc_cls,
+        NULL,
+        0);
+  if (GNUNET_SYSERR == ret)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("`%s' failed at %s:%d with error: %s\n"),
-                "mysql_stmt_bind_result", __FILE__, __LINE__,
+                "mysql_stmt_fetch",
+                __FILE__,
+                __LINE__,
                 mysql_stmt_error (statement));
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
-    proc (proc_cls, NULL, 0);
     return;
   }
-  while (0 == (ret = mysql_stmt_fetch (statement)))
-  {
-    if (sizeof (struct GNUNET_HashCode) == length)
-      proc (proc_cls, &key, 1);
-  }
-  proc (proc_cls, NULL, 0);
-*/
-  if (ret != MYSQL_NO_DATA)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("`%s' failed at %s:%d with error: %s\n"),
-                    "mysql_stmt_fetch", __FILE__, __LINE__,
-                    mysql_stmt_error (statement));
-    GNUNET_MYSQL_statements_invalidate (plugin->mc);
-    return;
-  }
-
-  mysql_stmt_reset (statement);
 }
 
 
 /**
- * Context for 'expi_proc' function.
+ * Context for #expi_proc() function.
  */
 struct ExpiCtx
 {
@@ -1020,7 +912,7 @@ struct ExpiCtx
   PluginDatumProcessor proc;
 
   /**
-   * Closure for proc.
+   * Closure for @e proc.
    */
   void *proc_cls;
 };
@@ -1028,7 +920,7 @@ struct ExpiCtx
 
 
 /**
- * Wrapper for the processor for 'mysql_plugin_get_expiration'.
+ * Wrapper for the processor for #mysql_plugin_get_expiration().
  * If no expired value was found, we do a second query for
  * low-priority content.
  *
@@ -1039,81 +931,173 @@ struct ExpiCtx
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
  * @param expiration expiration time for the content
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
  *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ *         #GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
-           const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
-           uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
+expi_proc (void *cls,
+           const struct GNUNET_HashCode *key,
+           uint32_t size,
+           const void *data,
+           enum GNUNET_BLOCK_Type type,
+           uint32_t priority,
+           uint32_t anonymity,
+           uint32_t replication,
+           struct GNUNET_TIME_Absolute expiration,
            uint64_t uid)
 {
   struct ExpiCtx *rc = cls;
   struct Plugin *plugin = rc->plugin;
+  struct GNUNET_MY_QueryParam params_select[] = {
+    GNUNET_MY_query_param_end
+  };
 
   if (NULL == key)
   {
-    execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
-                    -1);
+    execute_select (plugin,
+                    plugin->select_priority,
+                    rc->proc,
+                    rc->proc_cls,
+                    params_select);
     return GNUNET_SYSERR;
   }
-  return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
-                   expiration, uid);
+  return rc->proc (rc->proc_cls,
+                   key,
+                   size,
+                   data,
+                   type,
+                   priority,
+                   anonymity,
+                   replication,
+                   expiration,
+                   uid);
 }
 
 
 /**
  * Get a random item for expiration.
- * Call 'proc' with all values ZERO or NULL if the datastore is empty.
+ * Call @a proc with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
  * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
+mysql_plugin_get_expiration (void *cls,
+                             PluginDatumProcessor proc,
                              void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  long long nt;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_MY_QueryParam params_select[] = {
+    GNUNET_MY_query_param_absolute_time (&now),
+    GNUNET_MY_query_param_end
+  };
   struct ExpiCtx rc;
 
   rc.plugin = plugin;
   rc.proc = proc;
   rc.proc_cls = proc_cls;
-  nt = (long long) GNUNET_TIME_absolute_get ().abs_value_us;
-  execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
-                  MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
-
+  now = GNUNET_TIME_absolute_get ();
+  execute_select (plugin,
+                  plugin->select_expiration,
+                  expi_proc,
+                  &rc,
+                  params_select);
 }
 
 
 /**
  * Drop database.
  *
- * @param cls the "struct Plugin*"
+ * @param cls the `struct Plugin *`
  */
 static void
 mysql_plugin_drop (void *cls)
 {
   struct Plugin *plugin = cls;
 
-  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
+  if (GNUNET_OK !=
+      GNUNET_MYSQL_statement_run (plugin->mc,
+                                  "DROP TABLE gn090"))
     return;                     /* error */
   plugin->env->duc (plugin->env->cls, 0);
 }
 
 
+/**
+ * Remove a particular key in the datastore.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure for @a cont
+ */
+static void
+mysql_plugin_remove_key (void *cls,
+                         const struct GNUNET_HashCode *key,
+                         uint32_t size,
+                         const void *data,
+                         PluginRemoveCont cont,
+                         void *cont_cls)
+{
+  struct Plugin *plugin = cls;
+  struct GNUNET_MY_QueryParam params_delete[] = {
+    GNUNET_MY_query_param_auto_from_type (key),
+    GNUNET_MY_query_param_fixed_size (data, size),
+    GNUNET_MY_query_param_end
+  };
+
+  if (GNUNET_OK !=
+      GNUNET_MY_exec_prepared (plugin->mc,
+                               plugin->delete_entry_by_hash_value,
+                               params_delete))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Removing key `%s' from gn090 table failed\n",
+                GNUNET_h2s (key));
+    cont (cont_cls,
+          key,
+          size,
+          GNUNET_SYSERR,
+          _("MySQL statement run failure"));
+    return;
+  }
+
+  MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->delete_entry_by_hash_value);
+  my_ulonglong rows = mysql_stmt_affected_rows (stmt);
+
+  if (0 == rows)
+  {
+    cont (cont_cls,
+          key,
+          size,
+          GNUNET_NO,
+          NULL);
+    return;
+  }
+  plugin->env->duc (plugin->env->cls,
+                    -size);
+  cont (cont_cls,
+        key,
+        size,
+        GNUNET_OK,
+        NULL);
+}
+
+
 /**
  * Entry point for the plugin.
  *
- * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
- * @return our "struct Plugin*"
+ * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
+ * @return our `struct Plugin *`
  */
 void *
 libgnunet_plugin_datastore_mysql_init (void *cls)
@@ -1124,7 +1108,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
 
   plugin = GNUNET_new (struct Plugin);
   plugin->env = env;
-  plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
+  plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
+                                            "datastore-mysql");
   if (NULL == plugin->mc)
   {
     GNUNET_free (plugin);
@@ -1143,30 +1128,21 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
        " hash BINARY(64) NOT NULL DEFAULT '',"
        " vhash BINARY(64) NOT NULL DEFAULT '',"
        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
-       " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
-       " INDEX idx_hash_uid (hash(64),uid),"
-       " INDEX idx_hash_vhash (hash(64),vhash(64)),"
+       " PRIMARY KEY (uid),"
        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
-       " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
+       " INDEX idx_prio (prio),"
+       " INDEX idx_repl_rvalue (repl,rvalue),"
        " INDEX idx_expire (expire),"
        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
+      PINIT (plugin->delete_entry_by_hash_value, DELETE_ENTRY_BY_HASH_VALUE) ||
+      PINIT (plugin->select_entry, SELECT_ENTRY) ||
       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
-      PINIT (plugin->select_entry_by_hash_and_vhash,
-             SELECT_ENTRY_BY_HASH_AND_VHASH) ||
       PINIT (plugin->select_entry_by_hash_and_type,
              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
-      PINIT (plugin->select_entry_by_hash_vhash_and_type,
-             SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
-      PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
       PINIT (plugin->get_size, SELECT_SIZE) ||
-      PINIT (plugin->count_entry_by_hash_and_vhash,
-             COUNT_ENTRY_BY_HASH_AND_VHASH) ||
-      PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
-      || PINIT (plugin->count_entry_by_hash_vhash_and_type,
-                COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
       PINIT (plugin->dec_repl, DEC_REPL) ||
       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
@@ -1174,7 +1150,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
-      PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
+      PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ||
+      false)
   {
     GNUNET_MYSQL_context_destroy (plugin->mc);
     GNUNET_free (plugin);
@@ -1187,13 +1164,13 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
   api->cls = plugin;
   api->estimate_size = &mysql_plugin_estimate_size;
   api->put = &mysql_plugin_put;
-  api->update = &mysql_plugin_update;
   api->get_key = &mysql_plugin_get_key;
   api->get_replication = &mysql_plugin_get_replication;
   api->get_expiration = &mysql_plugin_get_expiration;
   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
   api->get_keys = &mysql_plugin_get_keys;
   api->drop = &mysql_plugin_drop;
+  api->remove_key = &mysql_plugin_remove_key;
   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
                    _("Mysql database running\n"));
   return api;
@@ -1202,7 +1179,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
 
 /**
  * Exit point from the plugin.
- * @param cls our "struct Plugin*"
+ *
+ * @param cls our `struct Plugin *`
  * @return always NULL
  */
 void *