fix RPS service: Provide closure for view insertion
[oweals/gnunet.git] / src / datacache / plugin_datacache_sqlite.c
index 5567077d3fd8738ca4ff39f20e477d62b885d481..dc4236a8b8fd4ba2254a81f716410b32fb4e3f6e 100644 (file)
@@ -2,20 +2,18 @@
      This file is part of GNUnet
      Copyright (C) 2006, 2009, 2015 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 /**
@@ -26,6 +24,7 @@
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_datacache_plugin.h"
+#include "gnunet_sq_lib.h"
 #include <sqlite3.h>
 
 #define LOG(kind,...) GNUNET_log_from (kind, "datacache-sqlite", __VA_ARGS__)
@@ -37,7 +36,7 @@
  * How much overhead do we assume per entry in the
  * datacache?
  */
-#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 32)
+#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 36)
 
 /**
  * Context for all functions in this plugin.
@@ -59,6 +58,46 @@ struct Plugin
    */
   char *fn;
 
+  /**
+   * Prepared statement for #sqlite_plugin_put.
+   */
+  sqlite3_stmt *insert_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_get.
+   */
+  sqlite3_stmt *get_count_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_get.
+   */
+  sqlite3_stmt *get_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_del.
+   */
+  sqlite3_stmt *del_select_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_del.
+   */
+  sqlite3_stmt *del_expired_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_del.
+   */
+  sqlite3_stmt *del_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_get_random.
+   */
+  sqlite3_stmt *get_random_stmt;
+
+  /**
+   * Prepared statement for #sqlite_plugin_get_closest.
+   */
+  sqlite3_stmt *get_closest_stmt;
+
   /**
    * Number of key-value pairs in the database.
    */
@@ -102,7 +141,8 @@ sq_prepare (sqlite3 *dbh,
   char *dummy;
 
   return sqlite3_prepare (dbh,
-                          zSql, strlen (zSql),
+                          zSql,
+                          strlen (zSql),
                           ppStmt,
                           (const char **) &dummy);
 }
@@ -113,6 +153,7 @@ sq_prepare (sqlite3 *dbh,
  *
  * @param cls closure (our `struct Plugin`)
  * @param key key to store @a data under
+ * @param xor_distance how close is @a key to our PID?
  * @param size number of bytes in @a data
  * @param data data to store
  * @param type type of the value
@@ -124,6 +165,7 @@ sq_prepare (sqlite3 *dbh,
 static ssize_t
 sqlite_plugin_put (void *cls,
                   const struct GNUNET_HashCode *key,
+                   uint32_t xor_distance,
                   size_t size,
                    const char *data,
                   enum GNUNET_BLOCK_Type type,
@@ -132,60 +174,48 @@ sqlite_plugin_put (void *cls,
                   const struct GNUNET_PeerIdentity *path_info)
 {
   struct Plugin *plugin = cls;
-  sqlite3_stmt *stmt;
-  int64_t dval;
+  uint32_t type32 = type;
+  struct GNUNET_SQ_QueryParam params[] = {
+    GNUNET_SQ_query_param_uint32 (&type32),
+    GNUNET_SQ_query_param_absolute_time (&discard_time),
+    GNUNET_SQ_query_param_auto_from_type (key),
+    GNUNET_SQ_query_param_uint32 (&xor_distance),
+    GNUNET_SQ_query_param_fixed_size (data, size),
+    GNUNET_SQ_query_param_fixed_size (path_info,
+                                      path_info_len * sizeof (struct GNUNET_PeerIdentity)),
+    GNUNET_SQ_query_param_end
+  };
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Processing PUT of %u bytes with key `%4s' and expiration %s\n",
+       "Processing PUT of %u bytes with key `%s' and expiration %s\n",
        (unsigned int) size,
        GNUNET_h2s (key),
-       GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (discard_time), GNUNET_YES));
-  dval = (int64_t) discard_time.abs_value_us;
-  if (dval < 0)
-    dval = INT64_MAX;
-  if (sq_prepare
-      (plugin->dbh,
-       "INSERT INTO ds090 (type, expire, key, value, path) VALUES (?, ?, ?, ?, ?)",
-       &stmt) != SQLITE_OK)
-  {
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sq_prepare");
-    return -1;
-  }
-  if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, type)) ||
-      (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, dval)) ||
-      (SQLITE_OK !=
-       sqlite3_bind_blob (stmt, 3,
-                         key, sizeof (struct GNUNET_HashCode),
-                          SQLITE_TRANSIENT)) ||
-      (SQLITE_OK != sqlite3_bind_blob (stmt, 4,
-                                      data, size,
-                                      SQLITE_TRANSIENT)) ||
-      (SQLITE_OK != sqlite3_bind_blob (stmt, 5,
-                                      path_info,
-                                      path_info_len * sizeof (struct GNUNET_PeerIdentity),
-                                      SQLITE_TRANSIENT)))
+       GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (discard_time),
+                                               GNUNET_YES));
+  if (GNUNET_OK !=
+      GNUNET_SQ_bind (plugin->insert_stmt,
+                      params))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_bind_xxx");
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->insert_stmt);
     return -1;
   }
-  if (SQLITE_DONE != sqlite3_step (stmt))
+  if (SQLITE_DONE !=
+      sqlite3_step (plugin->insert_stmt))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_step");
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->insert_stmt);
     return -1;
   }
   plugin->num_items++;
-  if (SQLITE_OK != sqlite3_finalize (stmt))
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sqlite3_finalize");
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->insert_stmt);
   return size + OVERHEAD;
 }
 
@@ -209,120 +239,119 @@ sqlite_plugin_get (void *cls,
                    void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  sqlite3_stmt *stmt;
+  uint32_t type32 = type;
   struct GNUNET_TIME_Absolute now;
   struct GNUNET_TIME_Absolute exp;
-  unsigned int size;
-  const char *dat;
+  size_t size;
+  void *dat;
   unsigned int cnt;
-  unsigned int off;
+  uint32_t off;
   unsigned int total;
-  unsigned int psize;
-  char scratch[256];
-  int64_t ntime;
-  const struct GNUNET_PeerIdentity *path;
+  size_t psize;
+  struct GNUNET_PeerIdentity *path;
+  struct GNUNET_SQ_QueryParam params_count[] = {
+    GNUNET_SQ_query_param_auto_from_type (key),
+    GNUNET_SQ_query_param_uint32 (&type32),
+    GNUNET_SQ_query_param_absolute_time (&now),
+    GNUNET_SQ_query_param_end
+  };
+  struct GNUNET_SQ_QueryParam params_select[] = {
+    GNUNET_SQ_query_param_auto_from_type (key),
+    GNUNET_SQ_query_param_uint32 (&type32),
+    GNUNET_SQ_query_param_absolute_time (&now),
+    GNUNET_SQ_query_param_uint32 (&off),
+    GNUNET_SQ_query_param_end
+  };
+  struct GNUNET_SQ_ResultSpec rs[] = {
+    GNUNET_SQ_result_spec_variable_size (&dat,
+                                         &size),
+    GNUNET_SQ_result_spec_absolute_time (&exp),
+    GNUNET_SQ_result_spec_variable_size ((void **) &path,
+                                         &psize),
+    GNUNET_SQ_result_spec_end
+  };
 
   now = GNUNET_TIME_absolute_get ();
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Processing GET for key `%4s'\n",
+       "Processing GET for key `%s'\n",
        GNUNET_h2s (key));
-  if (sq_prepare
-      (plugin->dbh,
-       "SELECT count(*) FROM ds090 WHERE key=? AND type=? AND expire >= ?",
-       &stmt) != SQLITE_OK)
-  {
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sq_prepare");
-    return 0;
-  }
-  ntime = (int64_t) now.abs_value_us;
-  GNUNET_assert (ntime >= 0);
-  if ((SQLITE_OK !=
-       sqlite3_bind_blob (stmt, 1, key, sizeof (struct GNUNET_HashCode),
-                          SQLITE_TRANSIENT)) ||
-      (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
-      (SQLITE_OK != sqlite3_bind_int64 (stmt, 3, now.abs_value_us)))
+
+  if (GNUNET_OK !=
+      GNUNET_SQ_bind (plugin->get_count_stmt,
+                      params_count))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_bind_xxx");
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->get_count_stmt);
     return 0;
   }
-
-  if (SQLITE_ROW != sqlite3_step (stmt))
+  if (SQLITE_ROW !=
+      sqlite3_step (plugin->get_count_stmt))
   {
     LOG_SQLITE (plugin->dbh, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite_step");
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->get_count_stmt);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "No content found when processing GET for key `%4s'\n",
+         "No content found when processing GET for key `%s'\n",
          GNUNET_h2s (key));
     return 0;
   }
-  total = sqlite3_column_int (stmt, 0);
-  sqlite3_finalize (stmt);
-  if ((0 == total) || (NULL == iter))
+  total = sqlite3_column_int (plugin->get_count_stmt,
+                              0);
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->get_count_stmt);
+  if ( (0 == total) ||
+       (NULL == iter) )
   {
     if (0 == total)
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "No content found when processing GET for key `%4s'\n",
+           "No content found when processing GET for key `%s'\n",
            GNUNET_h2s (key));
     return total;
   }
 
   cnt = 0;
-  off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
+  off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                  total);
   while (cnt < total)
   {
     off = (off + 1) % total;
-    GNUNET_snprintf (scratch, sizeof (scratch),
-                     "SELECT value,expire,path FROM ds090 WHERE key=? AND type=? AND expire >= ? LIMIT 1 OFFSET %u",
-                     off);
-    if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
-    {
-      LOG_SQLITE (plugin->dbh,
-                  GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                  "sq_prepare");
-      return cnt;
-    }
-    if ((SQLITE_OK !=
-         sqlite3_bind_blob (stmt, 1,
-                            key,
-                            sizeof (struct GNUNET_HashCode),
-                            SQLITE_TRANSIENT)) ||
-        (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
-        (SQLITE_OK != sqlite3_bind_int64 (stmt, 3, now.abs_value_us)))
+    if (GNUNET_OK !=
+        GNUNET_SQ_bind (plugin->get_stmt,
+                        params_select))
     {
       LOG_SQLITE (plugin->dbh,
                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                   "sqlite3_bind_xxx");
-      sqlite3_finalize (stmt);
+      GNUNET_SQ_reset (plugin->dbh,
+                       plugin->get_stmt);
       return cnt;
     }
-    if (sqlite3_step (stmt) != SQLITE_ROW)
+    if (SQLITE_ROW !=
+        sqlite3_step (plugin->get_stmt))
+      break;
+    if (GNUNET_OK !=
+        GNUNET_SQ_extract_result (plugin->get_stmt,
+                                  rs))
+    {
+      GNUNET_break (0);
+      GNUNET_SQ_reset (plugin->dbh,
+                       plugin->get_stmt);
       break;
-    size = sqlite3_column_bytes (stmt, 0);
-    dat = sqlite3_column_blob (stmt, 0);
-    exp.abs_value_us = sqlite3_column_int64 (stmt, 1);
-    psize = sqlite3_column_bytes (stmt, 2);
+    }
     if (0 != psize % sizeof (struct GNUNET_PeerIdentity))
     {
       GNUNET_break (0);
       psize = 0;
+      path = NULL;
     }
     psize /= sizeof (struct GNUNET_PeerIdentity);
-    if (0 != psize)
-      path = sqlite3_column_blob (stmt, 2);
-    else
-      path = NULL;
-    ntime = (int64_t) exp.abs_value_us;
-    if (ntime == INT64_MAX)
-      exp = GNUNET_TIME_UNIT_FOREVER_ABS;
     cnt++;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Found %u-byte result when processing GET for key `%4s'\n",
+         "Found %u-byte result when processing GET for key `%s'\n",
          (unsigned int) size,
          GNUNET_h2s (key));
     if (GNUNET_OK != iter (iter_cls,
@@ -334,11 +363,17 @@ sqlite_plugin_get (void *cls,
                            psize,
                            path))
     {
-      sqlite3_finalize (stmt);
+      GNUNET_SQ_cleanup_result (rs);
+      GNUNET_SQ_reset (plugin->dbh,
+                       plugin->get_stmt);
       break;
     }
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_cleanup_result (rs);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->get_stmt);
   }
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->get_stmt);
   return cnt;
 }
 
@@ -354,79 +389,99 @@ static int
 sqlite_plugin_del (void *cls)
 {
   struct Plugin *plugin = cls;
-  unsigned long long rowid;
-  unsigned int dsize;
-  sqlite3_stmt *stmt;
-  sqlite3_stmt *dstmt;
+  uint64_t rowid;
+  void *data;
+  size_t dsize;
   struct GNUNET_HashCode hc;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_SQ_ResultSpec rs[] = {
+    GNUNET_SQ_result_spec_uint64 (&rowid),
+    GNUNET_SQ_result_spec_auto_from_type (&hc),
+    GNUNET_SQ_result_spec_variable_size ((void **) &data,
+                                         &dsize),
+    GNUNET_SQ_result_spec_end
+  };
+  struct GNUNET_SQ_QueryParam params[] = {
+    GNUNET_SQ_query_param_uint64 (&rowid),
+    GNUNET_SQ_query_param_end
+  };
+  struct GNUNET_SQ_QueryParam time_params[] = {
+    GNUNET_SQ_query_param_absolute_time (&now),
+    GNUNET_SQ_query_param_end
+  };
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Processing DEL\n");
-  stmt = NULL;
-  dstmt = NULL;
-  if (SQLITE_OK !=
-      sq_prepare (plugin->dbh,
-                  "SELECT _ROWID_,key,value FROM ds090 ORDER BY expire ASC LIMIT 1",
-                  &stmt))
-  {
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sq_prepare");
-    if (stmt != NULL)
-      (void) sqlite3_finalize (stmt);
-    return GNUNET_SYSERR;
-  }
-  if (SQLITE_ROW != sqlite3_step (stmt))
+  now = GNUNET_TIME_absolute_get ();
+  if (GNUNET_OK !=
+      GNUNET_SQ_bind (plugin->del_expired_stmt,
+                      time_params))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sqlite3_step");
-    (void) sqlite3_finalize (stmt);
+                "sqlite3_bind");
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->del_expired_stmt);
     return GNUNET_SYSERR;
   }
-  rowid = sqlite3_column_int64 (stmt, 0);
-  GNUNET_assert (sqlite3_column_bytes (stmt, 1) == sizeof (struct GNUNET_HashCode));
-  GNUNET_memcpy (&hc, sqlite3_column_blob (stmt, 1), sizeof (struct GNUNET_HashCode));
-  dsize = sqlite3_column_bytes (stmt, 2);
-  if (SQLITE_OK != sqlite3_finalize (stmt))
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sqlite3_step");
-  if (SQLITE_OK !=
-      sq_prepare (plugin->dbh,
-                  "DELETE FROM ds090 WHERE _ROWID_=?", &dstmt))
+  if ( (SQLITE_ROW !=
+        sqlite3_step (plugin->del_expired_stmt)) ||
+       (GNUNET_OK !=
+        GNUNET_SQ_extract_result (plugin->del_expired_stmt,
+                                  rs)) )
   {
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sq_prepare");
-    if (stmt != NULL)
-      (void) sqlite3_finalize (stmt);
-    return GNUNET_SYSERR;
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->del_expired_stmt);
+    if (SQLITE_ROW !=
+        sqlite3_step (plugin->del_select_stmt))
+    {
+      LOG_SQLITE (plugin->dbh,
+                  GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                  "sqlite3_step");
+      GNUNET_SQ_reset (plugin->dbh,
+                       plugin->del_select_stmt);
+      return GNUNET_SYSERR;
+    }
+    if (GNUNET_OK !=
+        GNUNET_SQ_extract_result (plugin->del_select_stmt,
+                                  rs))
+    {
+      GNUNET_SQ_reset (plugin->dbh,
+                       plugin->del_select_stmt);
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
   }
-  if (SQLITE_OK != sqlite3_bind_int64 (dstmt, 1, rowid))
+  GNUNET_SQ_cleanup_result (rs);
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->del_select_stmt);
+  if (GNUNET_OK !=
+      GNUNET_SQ_bind (plugin->del_stmt,
+                      params))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_bind");
-    (void) sqlite3_finalize (dstmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->del_stmt);
     return GNUNET_SYSERR;
   }
-  if (SQLITE_DONE != sqlite3_step (dstmt))
+  if (SQLITE_DONE !=
+      sqlite3_step (plugin->del_stmt))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_step");
-    (void) sqlite3_finalize (dstmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->del_stmt);
     return GNUNET_SYSERR;
   }
   plugin->num_items--;
   plugin->env->delete_notify (plugin->env->cls,
                               &hc,
                               dsize + OVERHEAD);
-  if (SQLITE_OK != sqlite3_finalize (dstmt))
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sqlite3_finalize");
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->del_stmt);
   return GNUNET_OK;
 }
 
@@ -445,17 +500,28 @@ sqlite_plugin_get_random (void *cls,
                           void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  sqlite3_stmt *stmt;
   struct GNUNET_TIME_Absolute exp;
-  unsigned int size;
-  const char *dat;
-  unsigned int off;
-  unsigned int psize;
-  unsigned int type;
-  char scratch[256];
-  int64_t ntime;
-  const struct GNUNET_PeerIdentity *path;
-  const struct GNUNET_HashCode *key;
+  size_t size;
+  void *dat;
+  uint32_t off;
+  size_t psize;
+  uint32_t type;
+  struct GNUNET_PeerIdentity *path;
+  struct GNUNET_HashCode key;
+  struct GNUNET_SQ_QueryParam params[] = {
+    GNUNET_SQ_query_param_uint32 (&off),
+    GNUNET_SQ_query_param_end
+  };
+  struct GNUNET_SQ_ResultSpec rs[] = {
+    GNUNET_SQ_result_spec_variable_size (&dat,
+                                         &size),
+    GNUNET_SQ_result_spec_absolute_time (&exp),
+    GNUNET_SQ_result_spec_variable_size ((void **) &path,
+                                         &psize),
+    GNUNET_SQ_result_spec_auto_from_type (&key),
+    GNUNET_SQ_result_spec_uint32 (&type),
+    GNUNET_SQ_result_spec_end
+  };
 
   if (0 == plugin->num_items)
     return 0;
@@ -463,60 +529,51 @@ sqlite_plugin_get_random (void *cls,
     return 1;
   off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
                                   plugin->num_items);
-  GNUNET_snprintf (scratch,
-                   sizeof (scratch),
-                   "SELECT value,expire,path,key,type FROM ds090 ORDER BY key LIMIT 1 OFFSET %u",
-                   off);
-  if (SQLITE_OK !=
-      sq_prepare (plugin->dbh, scratch, &stmt))
+  if (GNUNET_OK !=
+      GNUNET_SQ_bind (plugin->get_random_stmt,
+                      params))
   {
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sq_prepare");
     return 0;
   }
-  if (SQLITE_ROW != sqlite3_step (stmt))
+  if (SQLITE_ROW !=
+      sqlite3_step (plugin->get_random_stmt))
+  {
+    GNUNET_break (0);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->get_random_stmt);
+    return 0;
+  }
+  if (GNUNET_OK !=
+      GNUNET_SQ_extract_result (plugin->get_random_stmt,
+                                rs))
   {
     GNUNET_break (0);
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->get_random_stmt);
     return 0;
   }
-  size = sqlite3_column_bytes (stmt, 0);
-  dat = sqlite3_column_blob (stmt, 0);
-  exp.abs_value_us = sqlite3_column_int64 (stmt, 1);
-  psize = sqlite3_column_bytes (stmt, 2);
   if (0 != psize % sizeof (struct GNUNET_PeerIdentity))
   {
     GNUNET_break (0);
     psize = 0;
+    path = NULL;
   }
   psize /= sizeof (struct GNUNET_PeerIdentity);
-  if (0 != psize)
-    path = sqlite3_column_blob (stmt, 2);
-  else
-    path = NULL;
-
-  GNUNET_assert (sizeof (struct GNUNET_HashCode) ==
-                 sqlite3_column_bytes (stmt, 3));
-  key = sqlite3_column_blob (stmt, 3);
-  type = sqlite3_column_int (stmt, 4);
-
-  ntime = (int64_t) exp.abs_value_us;
-  if (ntime == INT64_MAX)
-    exp = GNUNET_TIME_UNIT_FOREVER_ABS;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Found %u-byte result with key %s when processing GET-RANDOM\n",
        (unsigned int) size,
-       GNUNET_h2s (key));
+       GNUNET_h2s (&key));
   (void) iter (iter_cls,
-               key,
+               &key,
                size,
                dat,
-               type,
+               (enum GNUNET_BLOCK_Type) type,
                exp,
                psize,
                path);
-  sqlite3_finalize (stmt);
+  GNUNET_SQ_cleanup_result (rs);
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->get_random_stmt);
   return 1;
 }
 
@@ -542,83 +599,73 @@ sqlite_plugin_get_closest (void *cls,
                            void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  sqlite3_stmt *stmt;
+  uint32_t num_results32 = num_results;
   struct GNUNET_TIME_Absolute now;
   struct GNUNET_TIME_Absolute exp;
-  unsigned int size;
-  const char *dat;
+  size_t size;
+  void *dat;
   unsigned int cnt;
-  unsigned int psize;
-  unsigned int type;
-  int64_t ntime;
-  const struct GNUNET_PeerIdentity *path;
+  size_t psize;
+  uint32_t type;
+  struct GNUNET_HashCode hc;
+  struct GNUNET_PeerIdentity *path;
+  struct GNUNET_SQ_QueryParam params[] = {
+    GNUNET_SQ_query_param_auto_from_type (key),
+    GNUNET_SQ_query_param_absolute_time (&now),
+    GNUNET_SQ_query_param_uint32 (&num_results32),
+    GNUNET_SQ_query_param_end
+  };
+  struct GNUNET_SQ_ResultSpec rs[] = {
+    GNUNET_SQ_result_spec_variable_size (&dat,
+                                         &size),
+    GNUNET_SQ_result_spec_absolute_time (&exp),
+    GNUNET_SQ_result_spec_variable_size ((void **) &path,
+                                         &psize),
+    GNUNET_SQ_result_spec_uint32 (&type),
+    GNUNET_SQ_result_spec_auto_from_type (&hc),
+    GNUNET_SQ_result_spec_end
+  };
 
   now = GNUNET_TIME_absolute_get ();
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Processing GET_CLOSEST for key `%4s'\n",
+       "Processing GET_CLOSEST for key `%s'\n",
        GNUNET_h2s (key));
-  if (SQLITE_OK !=
-      sq_prepare (plugin->dbh,
-                  "SELECT value,expire,path,type,key FROM ds090 WHERE key>=? AND expire >= ? ORDER BY KEY ASC LIMIT ?",
-                  &stmt))
-  {
-    LOG_SQLITE (plugin->dbh,
-                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "sq_prepare");
-    return 0;
-  }
-  ntime = (int64_t) now.abs_value_us;
-  GNUNET_assert (ntime >= 0);
-  if ((SQLITE_OK !=
-       sqlite3_bind_blob (stmt,
-                          1,
-                          key,
-                          sizeof (struct GNUNET_HashCode),
-                          SQLITE_TRANSIENT)) ||
-      (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, now.abs_value_us)) ||
-      (SQLITE_OK != sqlite3_bind_int (stmt, 3, num_results)) )
+  if (GNUNET_OK !=
+      GNUNET_SQ_bind (plugin->get_closest_stmt,
+                      params))
   {
     LOG_SQLITE (plugin->dbh,
                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_bind_xxx");
-    sqlite3_finalize (stmt);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->get_closest_stmt);
     return 0;
   }
   cnt = 0;
-  while (SQLITE_ROW == sqlite3_step (stmt))
+  while (SQLITE_ROW ==
+         sqlite3_step (plugin->get_closest_stmt))
   {
-    if (sizeof (struct GNUNET_HashCode) !=
-        sqlite3_column_bytes (stmt, 4))
+    if (GNUNET_OK !=
+        GNUNET_SQ_extract_result (plugin->get_closest_stmt,
+                                  rs))
     {
       GNUNET_break (0);
       break;
     }
-    size = sqlite3_column_bytes (stmt, 0);
-    dat = sqlite3_column_blob (stmt, 0);
-    exp.abs_value_us = sqlite3_column_int64 (stmt, 1);
-    psize = sqlite3_column_bytes (stmt, 2);
-    type = sqlite3_column_int (stmt, 3);
-    key = sqlite3_column_blob (stmt, 4);
     if (0 != psize % sizeof (struct GNUNET_PeerIdentity))
     {
       GNUNET_break (0);
       psize = 0;
+      path = NULL;
     }
     psize /= sizeof (struct GNUNET_PeerIdentity);
-    if (0 != psize)
-      path = sqlite3_column_blob (stmt, 2);
-    else
-      path = NULL;
-    ntime = (int64_t) exp.abs_value_us;
-    if (ntime == INT64_MAX)
-      exp = GNUNET_TIME_UNIT_FOREVER_ABS;
     cnt++;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Found %u-byte result at %s when processing GET_CLOSE\n",
          (unsigned int) size,
-         GNUNET_h2s (key));
+         GNUNET_h2s (&hc));
     if (GNUNET_OK != iter (iter_cls,
-                           key,
+                           &hc,
                            size,
                            dat,
                            type,
@@ -626,11 +673,13 @@ sqlite_plugin_get_closest (void *cls,
                            psize,
                            path))
     {
-      sqlite3_finalize (stmt);
+      GNUNET_SQ_cleanup_result (rs);
       break;
     }
+    GNUNET_SQ_cleanup_result (rs);
   }
-  sqlite3_finalize (stmt);
+  GNUNET_SQ_reset (plugin->dbh,
+                   plugin->get_closest_stmt);
   return cnt;
 }
 
@@ -692,17 +741,72 @@ libgnunet_plugin_datacache_sqlite_init (void *cls)
     SQLITE3_EXEC (dbh, "PRAGMA sqlite_temp_store=3");
 
   SQLITE3_EXEC (dbh,
-                "CREATE TABLE ds090 (" "  type INTEGER NOT NULL DEFAULT 0,"
-                "  expire INTEGER NOT NULL DEFAULT 0,"
+                "CREATE TABLE ds091 ("
+                "  type INTEGER NOT NULL DEFAULT 0,"
+                "  expire INTEGER NOT NULL,"
                 "  key BLOB NOT NULL DEFAULT '',"
-                "  value BLOB NOT NULL DEFAULT '',"
+                "  prox INTEGER NOT NULL,"
+                "  value BLOB NOT NULL,"
                "  path BLOB DEFAULT '')");
-  SQLITE3_EXEC (dbh, "CREATE INDEX idx_hashidx ON ds090 (key,type,expire)");
-  SQLITE3_EXEC (dbh, "CREATE INDEX idx_expire ON ds090 (expire)");
+  SQLITE3_EXEC (dbh, "CREATE INDEX idx_hashidx ON ds091 (key,type,expire)");
+  SQLITE3_EXEC (dbh, "CREATE INDEX idx_prox_expire ON ds091 (prox,expire)");
+  SQLITE3_EXEC (dbh, "CREATE INDEX idx_expire_only ON ds091 (expire)");
   plugin = GNUNET_new (struct Plugin);
   plugin->env = env;
   plugin->dbh = dbh;
   plugin->fn = fn_utf8;
+
+  if ( (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "INSERT INTO ds091 (type, expire, key, prox, value, path) "
+                    "VALUES (?, ?, ?, ?, ?, ?)",
+                    &plugin->insert_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "SELECT count(*) FROM ds091 "
+                    "WHERE key=? AND type=? AND expire >= ?",
+                    &plugin->get_count_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "SELECT value,expire,path FROM ds091"
+                    " WHERE key=? AND type=? AND expire >= ? LIMIT 1 OFFSET ?",
+                    &plugin->get_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "SELECT _ROWID_,key,value FROM ds091"
+                    " WHERE expire < ?"
+                    " ORDER BY expire ASC LIMIT 1",
+                    &plugin->del_expired_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "SELECT _ROWID_,key,value FROM ds091"
+                    " ORDER BY prox ASC, expire ASC LIMIT 1",
+                    &plugin->del_select_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "DELETE FROM ds091 WHERE _ROWID_=?",
+                    &plugin->del_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "SELECT value,expire,path,key,type FROM ds091 "
+                    "ORDER BY key LIMIT 1 OFFSET ?",
+                    &plugin->get_random_stmt)) ||
+       (SQLITE_OK !=
+        sq_prepare (plugin->dbh,
+                    "SELECT value,expire,path,type,key FROM ds091 "
+                    "WHERE key>=? AND expire >= ? ORDER BY KEY ASC LIMIT ?",
+                    &plugin->get_closest_stmt))
+       )
+  {
+    LOG_SQLITE (plugin->dbh,
+                GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                "sq_prepare");
+    GNUNET_break (SQLITE_OK ==
+                  sqlite3_close (plugin->dbh));
+    GNUNET_free (plugin);
+    return NULL;
+  }
+
   api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
   api->cls = plugin;
   api->get = &sqlite_plugin_get;
@@ -741,6 +845,14 @@ libgnunet_plugin_datacache_sqlite_done (void *cls)
                        plugin->fn);
   GNUNET_free_non_null (plugin->fn);
 #endif
+  sqlite3_finalize (plugin->insert_stmt);
+  sqlite3_finalize (plugin->get_count_stmt);
+  sqlite3_finalize (plugin->get_stmt);
+  sqlite3_finalize (plugin->del_select_stmt);
+  sqlite3_finalize (plugin->del_expired_stmt);
+  sqlite3_finalize (plugin->del_stmt);
+  sqlite3_finalize (plugin->get_random_stmt);
+  sqlite3_finalize (plugin->get_closest_stmt);
   result = sqlite3_close (plugin->dbh);
 #if SQLITE_VERSION_NUMBER >= 3007000
   if (SQLITE_BUSY == result)