uncrustify as demanded.
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
index 5c7f75d5ed7b02b64b87a3ac018ff61aae190711..17b645585841e126dd94d4c30667e42c5581105f 100644 (file)
@@ -1,35 +1,32 @@
 /*
      This file is part of GNUnet
-     (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009-2017 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
 
 /**
  * @file datastore/plugin_datastore_postgres.c
  * @brief postgres-based datastore backend
  * @author Christian Grothoff
  */
-
 #include "platform.h"
 #include "gnunet_datastore_plugin.h"
-#include "gnunet_postgres_lib.h"
-#include <postgresql/libpq-fe.h>
+#include "gnunet_pq_lib.h"
 
-#define DEBUG_POSTGRES GNUNET_EXTRA_LOGGING
 
 /**
  * After how many ms "busy" should a DB operation fail for good?
@@ -47,8 +44,7 @@
 /**
  * Context for all functions in this plugin.
  */
-struct Plugin
-{
+struct Plugin {
   /**
    * Our execution environment.
    */
@@ -58,7 +54,6 @@ struct Plugin
    * Native Postgres database handle.
    */
   PGconn *dbh;
-
 };
 
 
@@ -66,150 +61,123 @@ struct Plugin
  * @brief Get a database handle
  *
  * @param plugin global context
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 static int
-init_connection (struct Plugin *plugin)
+init_connection(struct Plugin *plugin)
 {
-  PGresult *ret;
+  struct GNUNET_PQ_ExecuteStatement es[] = {
+    /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
+     * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
+     * we do math or inequality tests, so we can't handle the entire range of uint32_t.
+     * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
+     * PostgreSQL also recommends against using WITH OIDS.
+     */
+    GNUNET_PQ_make_execute("CREATE TABLE IF NOT EXISTS gn090 ("
+                           "  repl INTEGER NOT NULL DEFAULT 0,"
+                           "  type INTEGER NOT NULL DEFAULT 0,"
+                           "  prio INTEGER NOT NULL DEFAULT 0,"
+                           "  anonLevel INTEGER NOT NULL DEFAULT 0,"
+                           "  expire BIGINT NOT NULL DEFAULT 0,"
+                           "  rvalue BIGINT NOT NULL DEFAULT 0,"
+                           "  hash BYTEA NOT NULL DEFAULT '',"
+                           "  vhash BYTEA NOT NULL DEFAULT '',"
+                           "  value BYTEA NOT NULL DEFAULT '')"
+                           "WITH OIDS"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
+    GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
+    GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
+    GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
+    GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
+    GNUNET_PQ_EXECUTE_STATEMENT_END
+  };
 
-  plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
+#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
+  struct GNUNET_PQ_PreparedStatement ps[] = {
+    GNUNET_PQ_make_prepare("get",
+                           "SELECT " RESULT_COLUMNS " FROM gn090"
+                           " WHERE oid >= $1::bigint AND"
+                           " (rvalue >= $2 OR 0 = $3::smallint) AND"
+                           " (hash = $4 OR 0 = $5::smallint) AND"
+                           " (type = $6 OR 0 = $7::smallint)"
+                           " ORDER BY oid ASC LIMIT 1",
+                           7),
+    GNUNET_PQ_make_prepare("put",
+                           "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
+                           "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
+                           9),
+    GNUNET_PQ_make_prepare("update",
+                           "UPDATE gn090"
+                           " SET prio = prio + $1,"
+                           " repl = repl + $2,"
+                           " expire = GREATEST(expire, $3)"
+                           " WHERE hash = $4 AND vhash = $5",
+                           5),
+    GNUNET_PQ_make_prepare("decrepl",
+                           "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
+                           "WHERE oid = $1",
+                           1),
+    GNUNET_PQ_make_prepare("select_non_anonymous",
+                           "SELECT " RESULT_COLUMNS " FROM gn090 "
+                           "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
+                           "ORDER BY oid ASC LIMIT 1",
+                           2),
+    GNUNET_PQ_make_prepare("select_expiration_order",
+                           "(SELECT " RESULT_COLUMNS " FROM gn090 "
+                           "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
+                           "UNION "
+                           "(SELECT " RESULT_COLUMNS " FROM gn090 "
+                           "ORDER BY prio ASC LIMIT 1) "
+                           "ORDER BY expire ASC LIMIT 1",
+                           1),
+    GNUNET_PQ_make_prepare("select_replication_order",
+                           "SELECT " RESULT_COLUMNS " FROM gn090 "
+                           "ORDER BY repl DESC,RANDOM() LIMIT 1",
+                           0),
+    GNUNET_PQ_make_prepare("delrow",
+                           "DELETE FROM gn090 "
+                           "WHERE oid=$1",
+                           1),
+    GNUNET_PQ_make_prepare("remove",
+                           "DELETE FROM gn090"
+                           " WHERE hash = $1 AND"
+                           " value = $2",
+                           2),
+    GNUNET_PQ_make_prepare("get_keys",
+                           "SELECT hash FROM gn090",
+                           0),
+    GNUNET_PQ_make_prepare("estimate_size",
+                           "SELECT CASE WHEN NOT EXISTS"
+                           "  (SELECT 1 FROM gn090)"
+                           "  THEN 0"
+                           "  ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
+                           "END AS total",
+                           0),
+    GNUNET_PQ_PREPARED_STATEMENT_END
+  };
+#undef RESULT_COLUMNS
+
+  plugin->dbh = GNUNET_PQ_connect_with_cfg(plugin->env->cfg,
+                                           "datastore-postgres");
   if (NULL == plugin->dbh)
     return GNUNET_SYSERR;
-  ret =
-      PQexec (plugin->dbh,
-              "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
-              "  type INTEGER NOT NULL DEFAULT 0,"
-              "  prio INTEGER NOT NULL DEFAULT 0,"
-              "  anonLevel INTEGER NOT NULL DEFAULT 0,"
-              "  expire BIGINT NOT NULL DEFAULT 0,"
-              "  rvalue BIGINT NOT NULL DEFAULT 0,"
-              "  hash BYTEA NOT NULL DEFAULT '',"
-              "  vhash BYTEA NOT NULL DEFAULT '',"
-              "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
-  if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07",    /* duplicate table */
-                                                                                    PQresultErrorField
-                                                                                    (ret,
-                                                                                     PG_DIAG_SQLSTATE)))))
-  {
-    (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090");
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  if (PQresultStatus (ret) == PGRES_COMMAND_OK)
-  {
-    if ((GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh,
-                  "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh,
-                  "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
-        (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
+
+  if ((GNUNET_OK !=
+       GNUNET_PQ_exec_statements(plugin->dbh,
+                                 es)) ||
+      (GNUNET_OK !=
+       GNUNET_PQ_prepare_statements(plugin->dbh,
+                                    ps)))
     {
-      PQclear (ret);
-      PQfinish (plugin->dbh);
+      PQfinish(plugin->dbh);
       plugin->dbh = NULL;
       return GNUNET_SYSERR;
     }
-  }
-  PQclear (ret);
-  ret =
-      PQexec (plugin->dbh,
-              "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  PQclear (ret);
-  ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  PQclear (ret);
-  ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  PQclear (ret);
-  if ((GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
-                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "WHERE hash=$1 AND vhash=$2 AND type=$3 "
-                   "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
-                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "WHERE hash=$1 AND type=$2 "
-                   "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
-                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "WHERE hash=$1 AND vhash=$2 "
-                   "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "get",
-                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "put",
-                   "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
-                   "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "update",
-                   "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
-                   "WHERE oid = $3", 3)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
-                   "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
-                   "WHERE oid = $1", 1)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
-                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
-                   1)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
-                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
-                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
-                   1)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
-                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
-                   "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
-      (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
   return GNUNET_OK;
 }
 
@@ -218,43 +186,46 @@ init_connection (struct Plugin *plugin)
  * Get an estimate of how much space the database is
  * currently using.
  *
- * @param cls our "struct Plugin*"
+ * @param cls our `struct Plugin *`
  * @return number of bytes used on disk
  */
-static unsigned long long
-postgres_plugin_estimate_size (void *cls)
+static void
+postgres_plugin_estimate_size(void *cls,
+                              unsigned long long *estimate)
 {
   struct Plugin *plugin = cls;
-  unsigned long long total;
-  PGresult *ret;
+  uint64_t total;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_end
+  };
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_uint64("total",
+                                 &total),
+    GNUNET_PQ_result_spec_end
+  };
+  enum GNUNET_DB_QueryStatus ret;
 
-  ret =
-      PQexecParams (plugin->dbh,
-                    "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
-                    NULL, NULL, NULL, NULL, 1);
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
-  {
-    return 0;
-  }
-  if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
-      (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
-  {
-    GNUNET_break (0);
-    PQclear (ret);
-    return 0;
-  }
-  total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
-  PQclear (ret);
-  return total;
+  if (NULL == estimate)
+    return;
+  ret = GNUNET_PQ_eval_prepared_singleton_select(plugin->dbh,
+                                                 "estimate_size",
+                                                 params,
+                                                 rs);
+  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
+    {
+      *estimate = 0LL;
+      return;
+    }
+  *estimate = total;
 }
 
 
 /**
  * Store an item in the datastore.
  *
- * @param cls closure with the 'struct Plugin' 
+ * @param cls closure with the `struct Plugin`
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in data
  * @param data content stored
  * @param type type of the content
@@ -262,300 +233,309 @@ postgres_plugin_estimate_size (void *cls)
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-static int
-postgres_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size,
-                     const void *data, enum GNUNET_BLOCK_Type type,
-                     uint32_t priority, uint32_t anonymity,
-                     uint32_t replication,
-                     struct GNUNET_TIME_Absolute expiration, char **msg)
+static void
+postgres_plugin_put(void *cls,
+                    const struct GNUNET_HashCode *key,
+                    bool absent,
+                    uint32_t size,
+                    const void *data,
+                    enum GNUNET_BLOCK_Type type,
+                    uint32_t priority,
+                    uint32_t anonymity,
+                    uint32_t replication,
+                    struct GNUNET_TIME_Absolute expiration,
+                    PluginPutCont cont,
+                    void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  GNUNET_HashCode vhash;
-  PGresult *ret;
-  uint32_t btype = htonl (type);
-  uint32_t bprio = htonl (priority);
-  uint32_t banon = htonl (anonymity);
-  uint32_t brepl = htonl (replication);
-  uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
-
-  const char *paramValues[] = {
-    (const char *) &brepl,
-    (const char *) &btype,
-    (const char *) &bprio,
-    (const char *) &banon,
-    (const char *) &bexpi,
-    (const char *) key,
-    (const char *) &vhash,
-    (const char *) data
-  };
-  int paramLengths[] = {
-    sizeof (brepl),
-    sizeof (btype),
-    sizeof (bprio),
-    sizeof (banon),
-    sizeof (bexpi),
-    sizeof (GNUNET_HashCode),
-    sizeof (GNUNET_HashCode),
-    size
-  };
-  const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
+  struct GNUNET_HashCode vhash;
+  enum GNUNET_DB_QueryStatus ret;
 
-  GNUNET_CRYPTO_hash (data, size, &vhash);
-  ret =
-      PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
-                      paramFormats, 1);
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
-    return GNUNET_SYSERR;
-  PQclear (ret);
-  plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
-#if DEBUG_POSTGRES
-  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
-                   "Stored %u bytes in database\n", (unsigned int) size);
-#endif
-  return GNUNET_OK;
+  GNUNET_CRYPTO_hash(data,
+                     size,
+                     &vhash);
+  if (!absent)
+    {
+      struct GNUNET_PQ_QueryParam params[] = {
+        GNUNET_PQ_query_param_uint32(&priority),
+        GNUNET_PQ_query_param_uint32(&replication),
+        GNUNET_PQ_query_param_absolute_time(&expiration),
+        GNUNET_PQ_query_param_auto_from_type(key),
+        GNUNET_PQ_query_param_auto_from_type(&vhash),
+        GNUNET_PQ_query_param_end
+      };
+      ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+                                               "update",
+                                               params);
+      if (0 > ret)
+        {
+          cont(cont_cls,
+               key,
+               size,
+               GNUNET_SYSERR,
+               _("Postgress exec failure"));
+          return;
+        }
+      bool affected = (0 != ret);
+      if (affected)
+        {
+          cont(cont_cls,
+               key,
+               size,
+               GNUNET_NO,
+               NULL);
+          return;
+        }
+    }
+
+  {
+    uint32_t utype = (uint32_t)type;
+    uint64_t rvalue = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
+                                               UINT64_MAX);
+    struct GNUNET_PQ_QueryParam params[] = {
+      GNUNET_PQ_query_param_uint32(&replication),
+      GNUNET_PQ_query_param_uint32(&utype),
+      GNUNET_PQ_query_param_uint32(&priority),
+      GNUNET_PQ_query_param_uint32(&anonymity),
+      GNUNET_PQ_query_param_absolute_time(&expiration),
+      GNUNET_PQ_query_param_uint64(&rvalue),
+      GNUNET_PQ_query_param_auto_from_type(key),
+      GNUNET_PQ_query_param_auto_from_type(&vhash),
+      GNUNET_PQ_query_param_fixed_size(data, size),
+      GNUNET_PQ_query_param_end
+    };
+
+    ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+                                             "put",
+                                             params);
+    if (0 > ret)
+      {
+        cont(cont_cls,
+             key,
+             size,
+             GNUNET_SYSERR,
+             "Postgress exec failure");
+        return;
+      }
+  }
+  plugin->env->duc(plugin->env->cls,
+                   size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+  GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+                  "datastore-postgres",
+                  "Stored %u bytes in database\n",
+                  (unsigned int)size);
+  cont(cont_cls,
+       key,
+       size,
+       GNUNET_OK,
+       NULL);
 }
 
 
 /**
- * Function invoked to process the result and call
- * the processor.
+ * Closure for #process_result.
+ */
+struct ProcessResultContext {
+  /**
+   * The plugin handle.
+   */
+  struct Plugin *plugin;
+
+  /**
+   * Function to call on each result.
+   */
+  PluginDatumProcessor proc;
+
+  /**
+   * Closure for @e proc.
+   */
+  void *proc_cls;
+};
+
+
+/**
+ * Function invoked to process the result and call the processor of @a
+ * cls.
  *
- * @param plugin global plugin data
- * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param cls our `struct ProcessResultContext`
  * @param res result from exec
- * @param filename filename for error messages
- * @param line line number for error messages
+ * @param num_results number of results in @a res
  */
 static void
-process_result (struct Plugin *plugin, PluginDatumProcessor proc,
-                void *proc_cls, PGresult * res, 
-               const char *filename, int line)
+process_result(void *cls,
+               PGresult *res,
+               unsigned int num_results)
 {
-  int iret;
-  enum GNUNET_BLOCK_Type type;
-  uint32_t anonymity;
-  uint32_t priority;
-  uint32_t size;
-  unsigned int rowid;
-  struct GNUNET_TIME_Absolute expiration_time;
-  GNUNET_HashCode key;
+  struct ProcessResultContext *prc = cls;
+  struct Plugin *plugin = prc->plugin;
 
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
-                                    filename, line))
-  {
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
-                     "Ending iteration (postgres error)\n");
-#endif
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-
-  if (0 == PQntuples (res))
-  {
-    /* no result */
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
-                     "Ending iteration (no more results)\n");
-#endif
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    PQclear (res);
-    return;
-  }
-  if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
-      (sizeof (uint32_t) != PQfsize (res, 0)) ||
-      (sizeof (uint32_t) != PQfsize (res, 6)))
-  {
-    GNUNET_break (0);
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    PQclear (res);
-    return;
-  }
-  rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
-  if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
-      (sizeof (uint32_t) != PQfsize (res, 1)) ||
-      (sizeof (uint32_t) != PQfsize (res, 2)) ||
-      (sizeof (uint64_t) != PQfsize (res, 3)) ||
-      (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)))
-  {
-    GNUNET_break (0);
-    PQclear (res);
-    GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-
-  type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
-  priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
-  anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
-  expiration_time.abs_value =
-      GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
-  memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
-  size = PQgetlength (res, 0, 5);
-#if DEBUG_POSTGRES
-  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
-                   "Found result of size %u bytes and type %u in database\n",
-                   (unsigned int) size, (unsigned int) type);
-#endif
-  iret =
-      proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
-            (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
-            rowid);
-  PQclear (res);
-  if (iret == GNUNET_NO)
-  {
-#if DEBUG_POSTGRES
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Processor asked for item %u to be removed.\n", rowid);
-#endif
-    if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
+  if (0 == num_results)
     {
-#if DEBUG_POSTGRES
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
-                       "Deleting %u bytes from database\n",
-                       (unsigned int) size);
-#endif
-      plugin->env->duc (plugin->env->cls,
-                        -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
-#if DEBUG_POSTGRES
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
-                       "Deleted %u bytes from database\n", (unsigned int) size);
-#endif
+      /* no result */
+      GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+                      "datastore-postgres",
+                      "Ending iteration (no more results)\n");
+      prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+                GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
     }
-  }
+  if (1 != num_results)
+    {
+      GNUNET_break(0);
+      prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+                GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
+    }
+  /* Technically we don't need the loop here, but nicer in case
+     we ever relax the condition above. */
+  for (unsigned int i = 0; i < num_results; i++)
+    {
+      int iret;
+      uint32_t rowid;
+      uint32_t utype;
+      uint32_t anonymity;
+      uint32_t replication;
+      uint32_t priority;
+      size_t size;
+      void *data;
+      struct GNUNET_TIME_Absolute expiration_time;
+      struct GNUNET_HashCode key;
+      struct GNUNET_PQ_ResultSpec rs[] = {
+        GNUNET_PQ_result_spec_uint32("repl", &replication),
+        GNUNET_PQ_result_spec_uint32("type", &utype),
+        GNUNET_PQ_result_spec_uint32("prio", &priority),
+        GNUNET_PQ_result_spec_uint32("anonLevel", &anonymity),
+        GNUNET_PQ_result_spec_absolute_time("expire", &expiration_time),
+        GNUNET_PQ_result_spec_auto_from_type("hash", &key),
+        GNUNET_PQ_result_spec_variable_size("value", &data, &size),
+        GNUNET_PQ_result_spec_uint32("oid", &rowid),
+        GNUNET_PQ_result_spec_end
+      };
+
+      if (GNUNET_OK !=
+          GNUNET_PQ_extract_result(res,
+                                   rs,
+                                   i))
+        {
+          GNUNET_break(0);
+          prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
+          return;
+        }
+
+      GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+                      "datastore-postgres",
+                      "Found result of size %u bytes and type %u in database\n",
+                      (unsigned int)size,
+                      (unsigned int)utype);
+      iret = prc->proc(prc->proc_cls,
+                       &key,
+                       size,
+                       data,
+                       (enum GNUNET_BLOCK_Type)utype,
+                       priority,
+                       anonymity,
+                       replication,
+                       expiration_time,
+                       rowid);
+      if (iret == GNUNET_NO)
+        {
+          struct GNUNET_PQ_QueryParam param[] = {
+            GNUNET_PQ_query_param_uint32(&rowid),
+            GNUNET_PQ_query_param_end
+          };
+
+          GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
+                     "Processor asked for item %u to be removed.\n",
+                     (unsigned int)rowid);
+          if (0 <
+              GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+                                                 "delrow",
+                                                 param))
+            {
+              GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+                              "datastore-postgres",
+                              "Deleting %u bytes from database\n",
+                              (unsigned int)size);
+              plugin->env->duc(plugin->env->cls,
+                               -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+              GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+                              "datastore-postgres",
+                              "Deleted %u bytes from database\n",
+                              (unsigned int)size);
+            }
+        }
+      GNUNET_PQ_cleanup_result(rs);
+    } /* for (i) */
 }
 
 
 /**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
  *
- * @param cls closure with the 'struct Plugin'
- * @param offset offset of the result (modulo num-results);
- *        specific ordering does not matter for the offset
+ * @param cls closure with the `struct Plugin`
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
  * @param key maybe NULL (to match all entries)
- * @param vhash hash of the value, maybe NULL (to
- *        match all values that have the right key).
- *        Note that for DBlocks there is no difference
- *        betwen key and vhash, but for other blocks
- *        there may be!
  * @param type entries of which type are relevant?
  *     Use 0 for any type.
  * @param proc function to call on the matching value;
- *        will be called once with a NULL if no value matches
- * @param proc_cls closure for iter
+ *        will be called with NULL if nothing matches
+ * @param proc_cls closure for @a proc
  */
 static void
-postgres_plugin_get_key (void *cls, uint64_t offset,
-                         const GNUNET_HashCode * key,
-                         const GNUNET_HashCode * vhash,
-                         enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
-                         void *proc_cls)
+postgres_plugin_get_key(void *cls,
+                        uint64_t next_uid,
+                        bool random,
+                        const struct GNUNET_HashCode *key,
+                        enum GNUNET_BLOCK_Type type,
+                        PluginDatumProcessor proc,
+                        void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  const int paramFormats[] = { 1, 1, 1, 1, 1 };
-  int paramLengths[4];
-  const char *paramValues[4];
-  int nparams;
-  const char *pname;
-  PGresult *ret;
-  uint64_t total;
-  uint64_t blimit_off;
-  uint32_t btype;
-
-  GNUNET_assert (key != NULL);
-  paramValues[0] = (const char *) key;
-  paramLengths[0] = sizeof (GNUNET_HashCode);
-  btype = htonl (type);
-  if (type != 0)
-  {
-    if (vhash != NULL)
-    {
-      paramValues[1] = (const char *) vhash;
-      paramLengths[1] = sizeof (GNUNET_HashCode);
-      paramValues[2] = (const char *) &btype;
-      paramLengths[2] = sizeof (btype);
-      paramValues[3] = (const char *) &blimit_off;
-      paramLengths[3] = sizeof (blimit_off);
-      nparams = 4;
-      pname = "getvt";
-      ret =
-          PQexecParams (plugin->dbh,
-                        "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
-                        3, NULL, paramValues, paramLengths, paramFormats, 1);
-    }
-    else
+  uint32_t utype = type;
+  uint16_t use_rvalue = random;
+  uint16_t use_key = NULL != key;
+  uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
+  uint64_t rvalue;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_uint64(&next_uid),
+    GNUNET_PQ_query_param_uint64(&rvalue),
+    GNUNET_PQ_query_param_uint16(&use_rvalue),
+    GNUNET_PQ_query_param_auto_from_type(key),
+    GNUNET_PQ_query_param_uint16(&use_key),
+    GNUNET_PQ_query_param_uint32(&utype),
+    GNUNET_PQ_query_param_uint16(&use_type),
+    GNUNET_PQ_query_param_end
+  };
+  struct ProcessResultContext prc;
+  enum GNUNET_DB_QueryStatus res;
+
+  if (random)
     {
-      paramValues[1] = (const char *) &btype;
-      paramLengths[1] = sizeof (btype);
-      paramValues[2] = (const char *) &blimit_off;
-      paramLengths[2] = sizeof (blimit_off);
-      nparams = 3;
-      pname = "gett";
-      ret =
-          PQexecParams (plugin->dbh,
-                        "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
-                        2, NULL, paramValues, paramLengths, paramFormats, 1);
+      rvalue = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
+                                        UINT64_MAX);
+      next_uid = 0;
     }
-  }
   else
-  {
-    if (vhash != NULL)
     {
-      paramValues[1] = (const char *) vhash;
-      paramLengths[1] = sizeof (GNUNET_HashCode);
-      paramValues[2] = (const char *) &blimit_off;
-      paramLengths[2] = sizeof (blimit_off);
-      nparams = 3;
-      pname = "getv";
-      ret =
-          PQexecParams (plugin->dbh,
-                        "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
-                        2, NULL, paramValues, paramLengths, paramFormats, 1);
+      rvalue = 0;
     }
-    else
-    {
-      paramValues[1] = (const char *) &blimit_off;
-      paramLengths[1] = sizeof (blimit_off);
-      nparams = 2;
-      pname = "get";
-      ret =
-          PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
-                        1, NULL, paramValues, paramLengths, paramFormats, 1);
-    }
-  }
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
-  {
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-  if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
-      (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
-  {
-    GNUNET_break (0);
-    PQclear (ret);
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-  total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
-  PQclear (ret);
-  if (total == 0)
-  {
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-  blimit_off = GNUNET_htonll (offset % total);
-  ret =
-      PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
-                      paramFormats, 1);
-  process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
+  prc.plugin = plugin;
+  prc.proc = proc;
+  prc.proc_cls = proc_cls;
+
+  res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+                                             "get",
+                                             params,
+                                             &process_result,
+                                             &prc);
+  if (0 > res)
+    proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+         GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
@@ -563,43 +543,49 @@ postgres_plugin_get_key (void *cls, uint64_t offset,
  * Select a subset of the items in the datastore and call
  * the given iterator for each of them.
  *
- * @param cls our "struct Plugin*"
- * @param offset offset of the result (modulo num-results);
- *        specific ordering does not matter for the offset
+ * @param cls our `struct Plugin *`
+ * @param next_uid return the result with lowest uid >= next_uid
  * @param type entries of which type should be considered?
- *        Use 0 for any type.
+ *        Must not be zero (ANY).
  * @param proc function to call on the matching value;
- *        will be called with NULL if no value matches
- * @param proc_cls closure for proc
+ *        will be called with NULL if no value matches
+ * @param proc_cls closure for @a proc
  */
 static void
-postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
-                                    enum GNUNET_BLOCK_Type type,
-                                    PluginDatumProcessor proc, void *proc_cls)
+postgres_plugin_get_zero_anonymity(void *cls,
+                                   uint64_t next_uid,
+                                   enum GNUNET_BLOCK_Type type,
+                                   PluginDatumProcessor proc,
+                                   void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  uint32_t btype;
-  uint64_t boff;
-  const int paramFormats[] = { 1, 1 };
-  int paramLengths[] = { sizeof (btype), sizeof (boff) };
-  const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
-  PGresult *ret;
-
-  btype = htonl ((uint32_t) type);
-  boff = GNUNET_htonll (offset);
-  ret =
-      PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
-                      paramLengths, paramFormats, 1);
-  process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
+  uint32_t utype = type;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_uint32(&utype),
+    GNUNET_PQ_query_param_uint64(&next_uid),
+    GNUNET_PQ_query_param_end
+  };
+  struct ProcessResultContext prc;
+  enum GNUNET_DB_QueryStatus res;
+
+  prc.plugin = plugin;
+  prc.proc = proc;
+  prc.proc_cls = proc_cls;
+  res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+                                             "select_non_anonymous",
+                                             params,
+                                             &process_result,
+                                             &prc);
+  if (0 > res)
+    proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+         GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
 /**
- * Context for 'repl_iter' function.
+ * Context for #repl_iter() function.
  */
-struct ReplCtx
-{
-
+struct ReplCtx {
   /**
    * Plugin handle.
    */
@@ -611,7 +597,7 @@ struct ReplCtx
   PluginDatumProcessor proc;
 
   /**
-   * Closure for proc.
+   * Closure for @e proc.
    */
   void *proc_cls;
 };
@@ -622,270 +608,363 @@ struct ReplCtx
  * Decrements the replication counter and calls the original
  * iterator.
  *
- * @param cls closure with the 'struct ReplCtx*'
+ * @param cls closure with the `struct ReplCtx *`
  * @param key key for the content
- * @param size number of bytes in data
+ * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
  * @param expiration expiration time for the content
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration,
+ *         #GNUNET_OK to continue
  *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ *         #GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-repl_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
-           const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
-           uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
-           uint64_t uid)
+repl_proc(void *cls,
+          const struct GNUNET_HashCode *key,
+          uint32_t size,
+          const void *data,
+          enum GNUNET_BLOCK_Type type,
+          uint32_t priority,
+          uint32_t anonymity,
+          uint32_t replication,
+          struct GNUNET_TIME_Absolute expiration,
+          uint64_t uid)
 {
   struct ReplCtx *rc = cls;
   struct Plugin *plugin = rc->plugin;
   int ret;
-  PGresult *qret;
-  uint32_t boid;
-
-  ret =
-      rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
-                expiration, uid);
-  if (NULL != key)
-  {
-    boid = htonl ((uint32_t) uid);
-    const char *paramValues[] = {
-      (const char *) &boid,
-    };
-    int paramLengths[] = {
-      sizeof (boid),
-    };
-    const int paramFormats[] = { 1 };
-    qret =
-        PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
-                        paramFormats, 1);
-    if (GNUNET_OK !=
-        GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
-                      "decrepl"))
-      return GNUNET_SYSERR;
-    PQclear (qret);
-  }
+  uint32_t oid = (uint32_t)uid;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_uint32(&oid),
+    GNUNET_PQ_query_param_end
+  };
+  enum GNUNET_DB_QueryStatus qret;
+
+  ret = rc->proc(rc->proc_cls,
+                 key,
+                 size,
+                 data,
+                 type,
+                 priority,
+                 anonymity,
+                 replication,
+                 expiration,
+                 uid);
+  if (NULL == key)
+    return ret;
+  qret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+                                            "decrepl",
+                                            params);
+  if (0 > qret)
+    return GNUNET_SYSERR;
   return ret;
 }
 
 
 /**
- * Get a random item for replication.  Returns a single, not expired, random item
- * from those with the highest replication counters.  The item's
- * replication counter is decremented by one IF it was positive before.
- * Call 'proc' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for replication.  Returns a single, not expired,
+ * random item from those with the highest replication counters.  The
+ * item's replication counter is decremented by one IF it was positive
+ * before.  Call @a proc with all values ZERO or NULL if the datastore
+ * is empty.
  *
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
  * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
-                                 void *proc_cls)
+postgres_plugin_get_replication(void *cls,
+                                PluginDatumProcessor proc,
+                                void *proc_cls)
 {
   struct Plugin *plugin = cls;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_end
+  };
   struct ReplCtx rc;
-  PGresult *ret;
+  struct ProcessResultContext prc;
+  enum GNUNET_DB_QueryStatus res;
 
   rc.plugin = plugin;
   rc.proc = proc;
   rc.proc_cls = proc_cls;
-  ret =
-      PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
-                      NULL, 1);
-  process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
+  prc.plugin = plugin;
+  prc.proc = &repl_proc;
+  prc.proc_cls = &rc;
+  res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+                                             "select_replication_order",
+                                             params,
+                                             &process_result,
+                                             &prc);
+  if (0 > res)
+    proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+         GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
 /**
- * Get a random item for expiration.
- * Call 'proc' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for expiration.  Call @a proc with all values
+ * ZERO or NULL if the datastore is empty.
  *
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
  * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
-                                void *proc_cls)
+postgres_plugin_get_expiration(void *cls,
+                               PluginDatumProcessor proc,
+                               void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  uint64_t btime;
-  const int paramFormats[] = { 1 };
-  int paramLengths[] = { sizeof (btime) };
-  const char *paramValues[] = { (const char *) &btime };
-  PGresult *ret;
-
-  btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
-  ret =
-      PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
-                      paramLengths, paramFormats, 1);
-  process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_absolute_time(&now),
+    GNUNET_PQ_query_param_end
+  };
+  struct ProcessResultContext prc;
+
+  now = GNUNET_TIME_absolute_get();
+  prc.plugin = plugin;
+  prc.proc = proc;
+  prc.proc_cls = proc_cls;
+  (void)GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+                                             "select_expiration_order",
+                                             params,
+                                             &process_result,
+                                             &prc);
 }
 
 
 /**
- * Update the priority for a particular key in the datastore.  If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept.  For the
- * anonymity level, the lower value is to be used.  The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
+ * Closure for #process_keys.
+ */
+struct ProcessKeysContext {
+  /**
+   * Function to call for each key.
+   */
+  PluginKeyProcessor proc;
+
+  /**
+   * Closure for @e proc.
+   */
+  void *proc_cls;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.
  *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- *     change?  If priority + delta < 0 the
- *     priority should be set to 0 (never go
- *     negative).
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cls closure with a `struct ProcessKeysContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
  */
-static int
-postgres_plugin_update (void *cls, uint64_t uid, int delta,
-                        struct GNUNET_TIME_Absolute expire, char **msg)
+static void
+process_keys(void *cls,
+             PGresult *result,
+             unsigned int num_results)
 {
-  struct Plugin *plugin = cls;
-  PGresult *ret;
-  int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
-  uint32_t boid = htonl ((uint32_t) uid);
-  uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
-
-  const char *paramValues[] = {
-    (const char *) &bdelta,
-    (const char *) &bexpire,
-    (const char *) &boid,
-  };
-  int paramLengths[] = {
-    sizeof (bdelta),
-    sizeof (bexpire),
-    sizeof (boid),
-  };
-  const int paramFormats[] = { 1, 1, 1 };
+  struct ProcessKeysContext *pkc = cls;
 
-  ret =
-      PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
-                      paramFormats, 1);
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
-    return GNUNET_SYSERR;
-  PQclear (ret);
-  return GNUNET_OK;
+  for (unsigned i = 0; i < num_results; i++)
+    {
+      struct GNUNET_HashCode key;
+      struct GNUNET_PQ_ResultSpec rs[] = {
+        GNUNET_PQ_result_spec_auto_from_type("hash",
+                                             &key),
+        GNUNET_PQ_result_spec_end
+      };
+
+      if (GNUNET_OK !=
+          GNUNET_PQ_extract_result(result,
+                                   rs,
+                                   i))
+        {
+          GNUNET_break(0);
+          continue;
+        }
+      pkc->proc(pkc->proc_cls,
+                &key,
+                1);
+      GNUNET_PQ_cleanup_result(rs);
+    }
 }
 
 
-
 /**
  * Get all of the keys in the datastore.
  *
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin *`
  * @param proc function to call on each key
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-postgres_plugin_get_keys (void *cls,
-                         PluginKeyProcessor proc,
-                         void *proc_cls)
+postgres_plugin_get_keys(void *cls,
+                         PluginKeyProcessor proc,
+                         void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  int ret;
-  int i;
-  GNUNET_HashCode key;
-  PGresult * res;
-
-  res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
-  ret = PQntuples (res);
-  for (i=0;i<ret;i++)
-  {
-    if (sizeof (GNUNET_HashCode) != PQgetlength (res, i, 0))
-    {
-      memcpy (&key, PQgetvalue (res, i, 0), sizeof (GNUNET_HashCode));
-      proc (proc_cls, &key, 1);    
-    }
-  }
-  PQclear (res);
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_end
+  };
+  struct ProcessKeysContext pkc;
+
+  pkc.proc = proc;
+  pkc.proc_cls = proc_cls;
+  (void)GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+                                             "get_keys",
+                                             params,
+                                             &process_keys,
+                                             &pkc);
+  proc(proc_cls,
+       NULL,
+       0);
 }
 
 
-
 /**
  * Drop database.
  *
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin *`
  */
 static void
-postgres_plugin_drop (void *cls)
+postgres_plugin_drop(void *cls)
 {
   struct Plugin *plugin = cls;
-  
-  if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
-    GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
+  struct GNUNET_PQ_ExecuteStatement es[] = {
+    GNUNET_PQ_make_execute("DROP TABLE gn090"),
+    GNUNET_PQ_EXECUTE_STATEMENT_END
+  };
+
+  if (GNUNET_OK !=
+      GNUNET_PQ_exec_statements(plugin->dbh,
+                                es))
+    GNUNET_log_from(GNUNET_ERROR_TYPE_WARNING,
+                    "postgres",
+                    _("Failed to drop table from database.\n"));
+}
+
+
+/**
+ * Remove a particular key in the datastore.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure for @a cont
+ */
+static void
+postgres_plugin_remove_key(void *cls,
+                           const struct GNUNET_HashCode *key,
+                           uint32_t size,
+                           const void *data,
+                           PluginRemoveCont cont,
+                           void *cont_cls)
+{
+  struct Plugin *plugin = cls;
+  enum GNUNET_DB_QueryStatus ret;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_auto_from_type(key),
+    GNUNET_PQ_query_param_fixed_size(data, size),
+    GNUNET_PQ_query_param_end
+  };
+
+  ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+                                           "remove",
+                                           params);
+  if (0 > ret)
+    {
+      cont(cont_cls,
+           key,
+           size,
+           GNUNET_SYSERR,
+           _("Postgress exec failure"));
+      return;
+    }
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
+    {
+      cont(cont_cls,
+           key,
+           size,
+           GNUNET_NO,
+           NULL);
+      return;
+    }
+  plugin->env->duc(plugin->env->cls,
+                   -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+  GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+                  "datastore-postgres",
+                  "Deleted %u bytes from database\n",
+                  (unsigned int)size);
+  cont(cont_cls,
+       key,
+       size,
+       GNUNET_OK,
+       NULL);
 }
 
 
 /**
  * Entry point for the plugin.
  *
- * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
- * @return our "struct Plugin*"
+ * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
+ * @return our `struct Plugin *`
  */
 void *
-libgnunet_plugin_datastore_postgres_init (void *cls)
+libgnunet_plugin_datastore_postgres_init(void *cls)
 {
   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
   struct GNUNET_DATASTORE_PluginFunctions *api;
   struct Plugin *plugin;
 
-  plugin = GNUNET_malloc (sizeof (struct Plugin));
+  plugin = GNUNET_new(struct Plugin);
   plugin->env = env;
-  if (GNUNET_OK != init_connection (plugin))
-  {
-    GNUNET_free (plugin);
-    return NULL;
-  }
-  api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
+  if (GNUNET_OK != init_connection(plugin))
+    {
+      GNUNET_free(plugin);
+      return NULL;
+    }
+  api = GNUNET_new(struct GNUNET_DATASTORE_PluginFunctions);
   api->cls = plugin;
   api->estimate_size = &postgres_plugin_estimate_size;
   api->put = &postgres_plugin_put;
-  api->update = &postgres_plugin_update;
   api->get_key = &postgres_plugin_get_key;
   api->get_replication = &postgres_plugin_get_replication;
   api->get_expiration = &postgres_plugin_get_expiration;
   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
   api->get_keys = &postgres_plugin_get_keys;
   api->drop = &postgres_plugin_drop;
-  GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
-                   _("Postgres database running\n"));
+  api->remove_key = &postgres_plugin_remove_key;
+  GNUNET_log_from(GNUNET_ERROR_TYPE_INFO,
+                  "datastore-postgres",
+                  _("Postgres database running\n"));
   return api;
 }
 
 
 /**
  * Exit point from the plugin.
- * @param cls our "struct Plugin*"
+ *
+ * @param cls our `struct Plugin *`
  * @return always NULL
  */
 void *
-libgnunet_plugin_datastore_postgres_done (void *cls)
+libgnunet_plugin_datastore_postgres_done(void *cls)
 {
   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
   struct Plugin *plugin = api->cls;
 
-  PQfinish (plugin->dbh);
-  GNUNET_free (plugin);
-  GNUNET_free (api);
+  PQfinish(plugin->dbh);
+  GNUNET_free(plugin);
+  GNUNET_free(api);
   return NULL;
 }