also limit ma DHT puts
[oweals/gnunet.git] / src / datacache / plugin_datacache_postgres.c
index c5730078a0ea353067cb3db85a1221d1165c6da6..2c233c4c21cdd82e93f7aed8b3b4bf9ae4fd3a5d 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2006, 2009, 2010 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2006, 2009, 2010, 2012, 2015, 2017 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
+#include "gnunet_pq_lib.h"
 #include "gnunet_datacache_plugin.h"
-#include <postgresql/libpq-fe.h>
 
-#define DEBUG_POSTGRES GNUNET_NO
+#define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
 
 /**
  * Per-entry overhead estimate
  */
-#define OVERHEAD (sizeof(GNUNET_HashCode) + 24)
+#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
 
 /**
  * Context for all functions in this plugin.
@@ -50,35 +50,86 @@ struct Plugin
    */
   PGconn *dbh;
 
+  /**
+   * Number of key-value pairs in the database.
+   */
+  unsigned int num_items;
 };
 
 
 /**
- * Check if the result obtained from Postgres has
- * the desired status code.  If not, log an error, clear the
- * result and return GNUNET_SYSERR.
- * 
- * @return GNUNET_OK if the result is acceptable
+ * @brief Get a database handle
+ *
+ * @param plugin global context
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 static int
-check_result (struct Plugin *plugin, PGresult * ret, int expected_status,
-              const char *command, const char *args, int line)
+init_connection (struct Plugin *plugin)
 {
-  if (ret == NULL)
+  struct GNUNET_PQ_ExecuteStatement es[] = {
+    GNUNET_PQ_make_execute ("CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
+                            "  type INTEGER NOT NULL,"
+                            "  discard_time BIGINT NOT NULL,"
+                            "  key BYTEA NOT NULL,"
+                            "  value BYTEA NOT NULL,"
+                            "  path BYTEA DEFAULT NULL)"
+                            "WITH OIDS"),
+    GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)"),
+    GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)"),
+    GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL"),
+    GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN"),
+    GNUNET_PQ_EXECUTE_STATEMENT_END
+  };
+  struct GNUNET_PQ_PreparedStatement ps[] = {
+    GNUNET_PQ_make_prepare ("getkt",
+                            "SELECT discard_time,type,value,path FROM gn090dc "
+                            "WHERE key=$1 AND type=$2",
+                            2),
+    GNUNET_PQ_make_prepare ("getk",
+                            "SELECT discard_time,type,value,path FROM gn090dc "
+                            "WHERE key=$1",
+                            1),
+    GNUNET_PQ_make_prepare ("getm",
+                            "SELECT length(value) AS len,oid,key FROM gn090dc "
+                            "ORDER BY discard_time ASC LIMIT 1",
+                            0),
+    GNUNET_PQ_make_prepare ("get_random",
+                            "SELECT discard_time,type,value,path,key FROM gn090dc "
+                            "ORDER BY key ASC LIMIT 1 OFFSET $1",
+                            1),
+    GNUNET_PQ_make_prepare ("get_closest",
+                            "SELECT discard_time,type,value,path,key FROM gn090dc "
+                            "WHERE key>=$1 ORDER BY key ASC LIMIT $2",
+                            1),
+    GNUNET_PQ_make_prepare ("delrow",
+                            "DELETE FROM gn090dc WHERE oid=$1",
+                            1),
+    GNUNET_PQ_make_prepare ("put",
+                            "INSERT INTO gn090dc (type, discard_time, key, value, path) "
+                            "VALUES ($1, $2, $3, $4, $5)",
+                            5),
+    GNUNET_PQ_PREPARED_STATEMENT_END
+  };
+
+  plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
+                                            "datacache-postgres");
+  if (NULL == plugin->dbh)
+    return GNUNET_SYSERR;
+  if (GNUNET_OK !=
+      GNUNET_PQ_exec_statements (plugin->dbh,
+                                 es))
   {
-    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                     "datastore-postgres",
-                     "Postgres failed to allocate result for `%s:%s' at %d\n",
-                     command, args, line);
+    PQfinish (plugin->dbh);
+    plugin->dbh = NULL;
     return GNUNET_SYSERR;
   }
-  if (PQresultStatus (ret) != expected_status)
+
+  if (GNUNET_OK !=
+      GNUNET_PQ_prepare_statements (plugin->dbh,
+                                    ps))
   {
-    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                     "datastore-postgres",
-                     _("`%s:%s' failed at %s:%d with error: %s"), command, args,
-                     __FILE__, line, PQerrorMessage (plugin->dbh));
-    PQclear (ret);
+    PQfinish (plugin->dbh);
+    plugin->dbh = NULL;
     return GNUNET_SYSERR;
   }
   return GNUNET_OK;
@@ -86,394 +137,504 @@ check_result (struct Plugin *plugin, PGresult * ret, int expected_status,
 
 
 /**
- * Run simple SQL statement (without results).
+ * Store an item in the datastore.
+ *
+ * @param cls closure (our `struct Plugin`)
+ * @param key key to store @a data under
+ * @param data_size number of bytes in @a data
+ * @param data data to store
+ * @param type type of the value
+ * @param discard_time when to discard the value in any case
+ * @param path_info_len number of entries in @a path_info
+ * @param path_info a path through the network
+ * @return 0 if duplicate, -1 on error, number of bytes used otherwise
  */
-static int
-pq_exec (struct Plugin *plugin, const char *sql, int line)
+static ssize_t
+postgres_plugin_put (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     size_t data_size,
+                     const char *data,
+                     enum GNUNET_BLOCK_Type type,
+                     struct GNUNET_TIME_Absolute discard_time,
+                    unsigned int path_info_len,
+                    const struct GNUNET_PeerIdentity *path_info)
 {
-  PGresult *ret;
-
-  ret = PQexec (plugin->dbh, sql);
-  if (GNUNET_OK !=
-      check_result (plugin, ret, PGRES_COMMAND_OK, "PQexec", sql, line))
-    return GNUNET_SYSERR;
-  PQclear (ret);
-  return GNUNET_OK;
+  struct Plugin *plugin = cls;
+  uint32_t type32 = (uint32_t) type;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_uint32 (&type32),
+    GNUNET_PQ_query_param_absolute_time (&discard_time),
+    GNUNET_PQ_query_param_auto_from_type (key),
+    GNUNET_PQ_query_param_fixed_size (data, data_size),
+    GNUNET_PQ_query_param_fixed_size (path_info,
+                                      path_info_len * sizeof (struct GNUNET_PeerIdentity)),
+    GNUNET_PQ_query_param_end
+  };
+  enum GNUNET_DB_QueryStatus ret;
+
+  ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+                                            "put",
+                                            params);
+  if (0 > ret)
+    return -1;
+  plugin->num_items++;
+  return data_size + OVERHEAD;
 }
 
 
 /**
- * Prepare SQL statement.
+ * Closure for #handle_results.
  */
-static int
-pq_prepare (struct Plugin *plugin, const char *name, const char *sql,
-            int nparms, int line)
+struct HandleResultContext
 {
-  PGresult *ret;
 
-  ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
-  if (GNUNET_OK !=
-      check_result (plugin, ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
-    return GNUNET_SYSERR;
-  PQclear (ret);
-  return GNUNET_OK;
-}
+  /**
+   * Function to call on each result, may be NULL.
+   */
+  GNUNET_DATACACHE_Iterator iter;
+
+  /**
+   * Closure for @e iter.
+   */
+  void *iter_cls;
+
+  /**
+   * Key used.
+   */
+  const struct GNUNET_HashCode *key;
+};
 
 
 /**
- * @brief Get a database handle
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.  Parse the result
+ * and call the callback given in @a cls
+ *
+ * @param cls closure of type `struct HandleResultContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
  */
-static int
-init_connection (struct Plugin *plugin)
+static void
+handle_results (void *cls,
+                PGresult *result,
+                unsigned int num_results)
 {
-  char *conninfo;
-  PGresult *ret;
+  struct HandleResultContext *hrc = cls;
 
-  /* Open database and precompile statements */
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
-                                             "datacache-postgres", "CONFIG",
-                                             &conninfo))
-    conninfo = NULL;
-  plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
-  GNUNET_free_non_null (conninfo);
-  if (NULL == plugin->dbh)
-  {
-    /* FIXME: warn about out-of-memory? */
-    return GNUNET_SYSERR;
-  }
-  if (PQstatus (plugin->dbh) != CONNECTION_OK)
-  {
-    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "datacache-postgres",
-                     _("Unable to initialize Postgres: %s"),
-                     PQerrorMessage (plugin->dbh));
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  ret =
-      PQexec (plugin->dbh,
-              "CREATE TEMPORARY TABLE gn090dc ("
-              "  type INTEGER NOT NULL DEFAULT 0,"
-              "  discard_time BIGINT NOT NULL DEFAULT 0,"
-              "  key BYTEA NOT NULL DEFAULT '',"
-              "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
-  if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07",    /* duplicate table */
-                                                                                    PQresultErrorField
-                                                                                    (ret,
-                                                                                     PG_DIAG_SQLSTATE)))))
+  for (unsigned int i=0;i<num_results;i++)
   {
-    (void) check_result (plugin, ret, PGRES_COMMAND_OK, "CREATE TABLE",
-                         "gn090dc", __LINE__);
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  if (PQresultStatus (ret) == PGRES_COMMAND_OK)
-  {
-    if ((GNUNET_OK !=
-         pq_exec (plugin, "CREATE INDEX idx_key ON gn090dc (key)", __LINE__)) ||
-        (GNUNET_OK !=
-         pq_exec (plugin, "CREATE INDEX idx_dt ON gn090dc (discard_time)",
-                  __LINE__)))
+    struct GNUNET_TIME_Absolute expiration_time;
+    uint32_t type;
+    void *data;
+    size_t data_size;
+    struct GNUNET_PeerIdentity *path;
+    size_t path_len;
+    struct GNUNET_PQ_ResultSpec rs[] = {
+      GNUNET_PQ_result_spec_absolute_time ("discard_time",
+                                           &expiration_time),
+      GNUNET_PQ_result_spec_uint32 ("type",
+                                    &type),
+      GNUNET_PQ_result_spec_variable_size ("value",
+                                           &data,
+                                           &data_size),
+      GNUNET_PQ_result_spec_variable_size ("path",
+                                           (void **) &path,
+                                           &path_len),
+      GNUNET_PQ_result_spec_end
+    };
+
+    if (GNUNET_YES !=
+        GNUNET_PQ_extract_result (result,
+                                  rs,
+                                  i))
     {
-      PQclear (ret);
-      PQfinish (plugin->dbh);
-      plugin->dbh = NULL;
-      return GNUNET_SYSERR;
+      GNUNET_break (0);
+      return;
     }
+    if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_break (0);
+      path_len = 0;
+    }
+    path_len %= sizeof (struct GNUNET_PeerIdentity);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Found result of size %u bytes and type %u in database\n",
+        (unsigned int) data_size,
+         (unsigned int) type);
+    if ( (NULL != hrc->iter) &&
+         (GNUNET_SYSERR ==
+          hrc->iter (hrc->iter_cls,
+                     hrc->key,
+                     data_size,
+                     data,
+                     (enum GNUNET_BLOCK_Type) type,
+                     expiration_time,
+                     path_len,
+                     path)) )
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          "Ending iteration (client error)\n");
+      GNUNET_PQ_cleanup_result (rs);
+      return;
+    }
+    GNUNET_PQ_cleanup_result (rs);
   }
-  PQclear (ret);
-#if 1
-  ret =
-      PQexec (plugin->dbh,
-              "ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL");
-  if (GNUNET_OK !=
-      check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090dc",
-                    __LINE__))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  PQclear (ret);
-  ret = PQexec (plugin->dbh, "ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN");
-  if (GNUNET_OK !=
-      check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090dc",
-                    __LINE__))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  PQclear (ret);
-#endif
-  if ((GNUNET_OK !=
-       pq_prepare (plugin, "getkt",
-                   "SELECT discard_time,type,value FROM gn090dc "
-                   "WHERE key=$1 AND type=$2 ", 2, __LINE__)) ||
-      (GNUNET_OK !=
-       pq_prepare (plugin, "getk",
-                   "SELECT discard_time,type,value FROM gn090dc "
-                   "WHERE key=$1", 1, __LINE__)) ||
-      (GNUNET_OK !=
-       pq_prepare (plugin, "getm",
-                   "SELECT length(value),oid,key FROM gn090dc "
-                   "ORDER BY discard_time ASC LIMIT 1", 0, __LINE__)) ||
-      (GNUNET_OK !=
-       pq_prepare (plugin, "delrow", "DELETE FROM gn090dc WHERE oid=$1", 1,
-                   __LINE__)) ||
-      (GNUNET_OK !=
-       pq_prepare (plugin, "put",
-                   "INSERT INTO gn090dc (type, discard_time, key, value) "
-                   "VALUES ($1, $2, $3, $4)", 4, __LINE__)))
-  {
-    PQfinish (plugin->dbh);
-    plugin->dbh = NULL;
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_OK;
 }
 
 
 /**
- * Delete the row identified by the given rowid (qid
- * in postgres).
+ * Iterate over the results for a particular key
+ * in the datastore.
  *
- * @return GNUNET_OK on success
+ * @param cls closure (our `struct Plugin`)
+ * @param key key to look for
+ * @param type entries of which type are relevant?
+ * @param iter maybe NULL (to just count)
+ * @param iter_cls closure for @a iter
+ * @return the number of results found
  */
-static int
-delete_by_rowid (struct Plugin *plugin, uint32_t rowid)
+static unsigned int
+postgres_plugin_get (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     enum GNUNET_BLOCK_Type type,
+                     GNUNET_DATACACHE_Iterator iter,
+                     void *iter_cls)
 {
-  uint32_t brow = htonl (rowid);
-  const char *paramValues[] = { (const char *) &brow };
-  int paramLengths[] = { sizeof (brow) };
-  const int paramFormats[] = { 1 };
-  PGresult *ret;
-
-  ret =
-      PQexecPrepared (plugin->dbh, "delrow", 1, paramValues, paramLengths,
-                      paramFormats, 1);
-  if (GNUNET_OK !=
-      check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
-                    __LINE__))
-  {
-    return GNUNET_SYSERR;
-  }
-  PQclear (ret);
-  return GNUNET_OK;
+  struct Plugin *plugin = cls;
+  uint32_t type32 = (uint32_t) type;
+  struct GNUNET_PQ_QueryParam paramk[] = {
+    GNUNET_PQ_query_param_auto_from_type (key),
+    GNUNET_PQ_query_param_end
+  };
+  struct GNUNET_PQ_QueryParam paramkt[] = {
+    GNUNET_PQ_query_param_auto_from_type (key),
+    GNUNET_PQ_query_param_uint32 (&type32),
+    GNUNET_PQ_query_param_end
+  };
+  enum GNUNET_DB_QueryStatus res;
+  struct HandleResultContext hr_ctx;
+
+  hr_ctx.iter = iter;
+  hr_ctx.iter_cls = iter_cls;
+  hr_ctx.key = key;
+  res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                              (0 == type) ? "getk" : "getkt",
+                                              (0 == type) ? paramk : paramkt,
+                                              &handle_results,
+                                              &hr_ctx);
+  if (res < 0)
+    return 0;
+  return res;
 }
 
 
 /**
- * Store an item in the datastore.
+ * Delete the entry with the lowest expiration value
+ * from the datacache right now.
  *
- * @param cls closure (our "struct Plugin")
- * @param key key to store data under
- * @param size number of bytes in data
- * @param data data to store
- * @param type type of the value
- * @param discard_time when to discard the value in any case
- * @return 0 on error, number of bytes used otherwise
+ * @param cls closure (our `struct Plugin`)
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
-static size_t
-postgres_plugin_put (void *cls, const GNUNET_HashCode * key, size_t size,
-                     const char *data, enum GNUNET_BLOCK_Type type,
-                     struct GNUNET_TIME_Absolute discard_time)
+static int
+postgres_plugin_del (void *cls)
 {
   struct Plugin *plugin = cls;
-  PGresult *ret;
-  uint32_t btype = htonl (type);
-  uint64_t bexpi = GNUNET_TIME_absolute_hton (discard_time).abs_value__;
-
-  const char *paramValues[] = {
-    (const char *) &btype,
-    (const char *) &bexpi,
-    (const char *) key,
-    (const char *) data
+  struct GNUNET_PQ_QueryParam pempty[] = {
+    GNUNET_PQ_query_param_end
   };
-  int paramLengths[] = {
-    sizeof (btype),
-    sizeof (bexpi),
-    sizeof (GNUNET_HashCode),
-    size
+  uint32_t size;
+  uint32_t oid;
+  struct GNUNET_HashCode key;
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_uint32 ("len",
+                                  &size),
+    GNUNET_PQ_result_spec_uint32 ("oid",
+                                  &oid),
+    GNUNET_PQ_result_spec_auto_from_type ("key",
+                                          &key),
+    GNUNET_PQ_result_spec_end
+  };
+  enum GNUNET_DB_QueryStatus res;
+  struct GNUNET_PQ_QueryParam dparam[] = {
+    GNUNET_PQ_query_param_uint32 (&oid),
+    GNUNET_PQ_query_param_end
   };
-  const int paramFormats[] = { 1, 1, 1, 1 };
 
-  ret =
-      PQexecPrepared (plugin->dbh, "put", 4, paramValues, paramLengths,
-                      paramFormats, 1);
-  if (GNUNET_OK !=
-      check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put",
-                    __LINE__))
+  res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
+                                                  "getm",
+                                                  pempty,
+                                                  rs);
+  if (0 > res)
+    return GNUNET_SYSERR;
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
+  {
+    /* no result */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Ending iteration (no more results)\n");
+    return 0;
+  }
+  res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+                                            "delrow",
+                                            dparam);
+  if (0 > res)
+  {
+    GNUNET_PQ_cleanup_result (rs);
     return GNUNET_SYSERR;
-  PQclear (ret);
-  return size + OVERHEAD;
+  }
+  plugin->num_items--;
+  plugin->env->delete_notify (plugin->env->cls,
+                              &key,
+                              size + OVERHEAD);
+  GNUNET_PQ_cleanup_result (rs);
+  return GNUNET_OK;
 }
 
 
 /**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Obtain a random key-value pair from the datacache.
  *
- * @param cls closure (our "struct Plugin")
- * @param key
- * @param type entries of which type are relevant?
+ * @param cls closure (our `struct Plugin`)
  * @param iter maybe NULL (to just count)
- * @param iter_cls closure for iter
- * @return the number of results found
+ * @param iter_cls closure for @a iter
+ * @return the number of results found, zero (datacache empty) or one
  */
 static unsigned int
-postgres_plugin_get (void *cls, const GNUNET_HashCode * key,
-                     enum GNUNET_BLOCK_Type type,
-                     GNUNET_DATACACHE_Iterator iter, void *iter_cls)
+postgres_plugin_get_random (void *cls,
+                            GNUNET_DATACACHE_Iterator iter,
+                            void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  uint32_t btype = htonl (type);
-
-  const char *paramValues[] = {
-    (const char *) key,
-    (const char *) &btype,
+  uint32_t off;
+  struct GNUNET_TIME_Absolute expiration_time;
+  size_t data_size;
+  void *data;
+  size_t path_len;
+  struct GNUNET_PeerIdentity *path;
+  struct GNUNET_HashCode key;
+  uint32_t type;
+  enum GNUNET_DB_QueryStatus res;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_uint32 (&off),
+    GNUNET_PQ_query_param_end
   };
-  int paramLengths[] = {
-    sizeof (GNUNET_HashCode),
-    sizeof (btype),
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_absolute_time ("discard_time",
+                                         &expiration_time),
+    GNUNET_PQ_result_spec_uint32 ("type",
+                                  &type),
+    GNUNET_PQ_result_spec_variable_size ("value",
+                                         &data,
+                                         &data_size),
+    GNUNET_PQ_result_spec_variable_size ("path",
+                                         (void **) &path,
+                                         &path_len),
+    GNUNET_PQ_result_spec_auto_from_type ("key",
+                                          &key),
+    GNUNET_PQ_result_spec_end
   };
-  const int paramFormats[] = { 1, 1 };
-  struct GNUNET_TIME_Absolute expiration_time;
-  uint32_t size;
-  unsigned int cnt;
-  unsigned int i;
-  PGresult *res;
-
-  cnt = 0;
-  res =
-      PQexecPrepared (plugin->dbh, (type == 0) ? "getk" : "getkt",
-                      (type == 0) ? 1 : 2, paramValues, paramLengths,
-                      paramFormats, 1);
-  if (GNUNET_OK !=
-      check_result (plugin, res, PGRES_TUPLES_OK, "PQexecPrepared",
-                    (type == 0) ? "getk" : "getkt", __LINE__))
-  {
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
-                     "Ending iteration (postgres error)\n");
-#endif
-    return 0;
-  }
 
-  if (0 == (cnt = PQntuples (res)))
+  if (0 == plugin->num_items)
+    return 0;
+  if (NULL == iter)
+    return 1;
+  off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+                                  plugin->num_items);
+  res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
+                                                  "get_random",
+                                                  params,
+                                                  rs);
+  if (0 > res)
   {
-    /* no result */
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
-                     "Ending iteration (no more results)\n");
-#endif
-    PQclear (res);
+    GNUNET_break (0);
     return 0;
   }
-  if (iter == NULL)
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
   {
-    PQclear (res);
-    return cnt;
+    GNUNET_break (0);
+    return 0;
   }
-  if ((3 != PQnfields (res)) || (sizeof (uint64_t) != PQfsize (res, 0)) ||
-      (sizeof (uint32_t) != PQfsize (res, 1)))
+  if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
   {
     GNUNET_break (0);
-    PQclear (res);
-    return 0;
+    path_len = 0;
   }
-  for (i = 0; i < cnt; i++)
+  path_len %= sizeof (struct GNUNET_PeerIdentity);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Found random value with key %s of size %u bytes and type %u in database\n",
+       GNUNET_h2s (&key),
+       (unsigned int) data_size,
+       (unsigned int) type);
+  (void) iter (iter_cls,
+               &key,
+               data_size,
+               data,
+               (enum GNUNET_BLOCK_Type) type,
+               expiration_time,
+               path_len,
+               path);
+  GNUNET_PQ_cleanup_result (rs);
+  return 1;
+}
+
+
+/**
+ * Closure for #extract_result_cb.
+ */
+struct ExtractResultContext
+{
+  /**
+   * Function to call for each result found.
+   */
+  GNUNET_DATACACHE_Iterator iter;
+
+  /**
+   * Closure for @e iter.
+   */
+  void *iter_cls;
+
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.  Calls the `iter`
+ * from @a cls for each result.
+ *
+ * @param cls closure with the `struct ExtractResultContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+extract_result_cb (void *cls,
+                   PGresult *result,
+                   unsigned int num_results)
+{
+  struct ExtractResultContext *erc = cls;
+
+  if (NULL == erc->iter)
+    return;
+  for (unsigned int i=0;i<num_results;i++)
   {
-    expiration_time.abs_value =
-        GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
-    type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
-    size = PQgetlength (res, i, 2);
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
-                     "Found result of size %u bytes and type %u in database\n",
-                     (unsigned int) size, (unsigned int) type);
-#endif
+    struct GNUNET_TIME_Absolute expiration_time;
+    uint32_t type;
+    void *data;
+    size_t data_size;
+    struct GNUNET_PeerIdentity *path;
+    size_t path_len;
+    struct GNUNET_HashCode key;
+    struct GNUNET_PQ_ResultSpec rs[] = {
+      GNUNET_PQ_result_spec_absolute_time ("",
+                                           &expiration_time),
+      GNUNET_PQ_result_spec_uint32 ("type",
+                                    &type),
+      GNUNET_PQ_result_spec_variable_size ("value",
+                                           &data,
+                                           &data_size),
+      GNUNET_PQ_result_spec_variable_size ("path",
+                                           (void **) &path,
+                                           &path_len),
+      GNUNET_PQ_result_spec_auto_from_type ("key",
+                                            &key),
+      GNUNET_PQ_result_spec_end
+    };
+
+    if (GNUNET_YES !=
+        GNUNET_PQ_extract_result (result,
+                                  rs,
+                                  i))
+    {
+      GNUNET_break (0);
+      return;
+    }
+    if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_break (0);
+      path_len = 0;
+    }
+    path_len %= sizeof (struct GNUNET_PeerIdentity);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Found result of size %u bytes and type %u in database\n",
+        (unsigned int) data_size,
+         (unsigned int) type);
     if (GNUNET_SYSERR ==
-        iter (iter_cls, expiration_time, key, size, PQgetvalue (res, i, 2),
-              (enum GNUNET_BLOCK_Type) type))
+        erc->iter (erc->iter_cls,
+                   &key,
+                   data_size,
+                   data,
+                   (enum GNUNET_BLOCK_Type) type,
+                   expiration_time,
+                   path_len,
+                   path))
     {
-#if DEBUG_POSTGRES
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
-                       "Ending iteration (client error)\n");
-#endif
-      PQclear (res);
-      return cnt;
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          "Ending iteration (client error)\n");
+      GNUNET_PQ_cleanup_result (rs);
+      break;
     }
+    GNUNET_PQ_cleanup_result (rs);
   }
-  PQclear (res);
-  return cnt;
 }
 
 
 /**
- * Delete the entry with the lowest expiration value
- * from the datacache right now.
- * 
- * @param cls closure (our "struct Plugin")
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * Iterate over the results that are "close" to a particular key in
+ * the datacache.  "close" is defined as numerically larger than @a
+ * key (when interpreted as a circular address space), with small
+ * distance.
+ *
+ * @param cls closure (internal context for the plugin)
+ * @param key area of the keyspace to look into
+ * @param num_results number of results that should be returned to @a iter
+ * @param iter maybe NULL (to just count)
+ * @param iter_cls closure for @a iter
+ * @return the number of results found
  */
-static int
-postgres_plugin_del (void *cls)
+static unsigned int
+postgres_plugin_get_closest (void *cls,
+                             const struct GNUNET_HashCode *key,
+                             unsigned int num_results,
+                             GNUNET_DATACACHE_Iterator iter,
+                             void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  uint32_t size;
-  uint32_t oid;
-  GNUNET_HashCode key;
-  PGresult *res;
-
-  res = PQexecPrepared (plugin->dbh, "getm", 0, NULL, NULL, NULL, 1);
-  if (GNUNET_OK !=
-      check_result (plugin, res, PGRES_TUPLES_OK, "PQexecPrepared", "getm",
-                    __LINE__))
+  uint32_t num_results32 = (uint32_t) num_results;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_auto_from_type (key),
+    GNUNET_PQ_query_param_uint32 (&num_results32),
+    GNUNET_PQ_query_param_end
+  };
+  enum GNUNET_DB_QueryStatus res;
+  struct ExtractResultContext erc;
+
+  erc.iter = iter;
+  erc.iter_cls = iter_cls;
+  res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                              "get_closest",
+                                              params,
+                                              &extract_result_cb,
+                                              &erc);
+  if (0 > res)
   {
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
-                     "Ending iteration (postgres error)\n");
-#endif
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Ending iteration (postgres error)\n");
     return 0;
   }
-  if (0 == PQntuples (res))
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
   {
     /* no result */
-#if DEBUG_POSTGRES
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
-                     "Ending iteration (no more results)\n");
-#endif
-    PQclear (res);
-    return GNUNET_SYSERR;
-  }
-  if ((3 != PQnfields (res)) || (sizeof (size) != PQfsize (res, 0)) ||
-      (sizeof (oid) != PQfsize (res, 1)) ||
-      (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 2)))
-  {
-    GNUNET_break (0);
-    PQclear (res);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Ending iteration (no more results)\n");
     return 0;
   }
-  size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
-  oid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
-  memcpy (&key, PQgetvalue (res, 0, 2), sizeof (GNUNET_HashCode));
-  PQclear (res);
-  if (GNUNET_OK != delete_by_rowid (plugin, oid))
-    return GNUNET_SYSERR;
-  plugin->env->delete_notify (plugin->env->cls, &key, size + OVERHEAD);
-  return GNUNET_OK;
+  return res;
 }
 
 
 /**
  * Entry point for the plugin.
  *
- * @param cls closure (the "struct GNUNET_DATACACHE_PluginEnvironmnet")
- * @return the plugin's closure (our "struct Plugin")
+ * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
+ * @return the plugin's closure (our `struct Plugin`)
  */
 void *
 libgnunet_plugin_datacache_postgres_init (void *cls)
@@ -482,7 +643,7 @@ libgnunet_plugin_datacache_postgres_init (void *cls)
   struct GNUNET_DATACACHE_PluginFunctions *api;
   struct Plugin *plugin;
 
-  plugin = GNUNET_malloc (sizeof (struct Plugin));
+  plugin = GNUNET_new (struct Plugin);
   plugin->env = env;
 
   if (GNUNET_OK != init_connection (plugin))
@@ -491,13 +652,15 @@ libgnunet_plugin_datacache_postgres_init (void *cls)
     return NULL;
   }
 
-  api = GNUNET_malloc (sizeof (struct GNUNET_DATACACHE_PluginFunctions));
+  api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
   api->cls = plugin;
   api->get = &postgres_plugin_get;
   api->put = &postgres_plugin_put;
   api->del = &postgres_plugin_del;
-  GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datacache-postgres",
-                   _("Postgres datacache running\n"));
+  api->get_random = &postgres_plugin_get_random;
+  api->get_closest = &postgres_plugin_get_closest;
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Postgres datacache running\n");
   return api;
 }
 
@@ -505,7 +668,7 @@ libgnunet_plugin_datacache_postgres_init (void *cls)
 /**
  * Exit point from the plugin.
  *
- * @param cls closure (our "struct Plugin")
+ * @param cls closure (our `struct Plugin`)
  * @return NULL
  */
 void *