removing dead/legacy server/connection logic, except for in tcp/wlan/bt plugins ...
[oweals/gnunet.git] / src / datacache / plugin_datacache_postgres.c
index 6d7e2802112d5ae73648bf209644275eae15674c..13c2c26a27f4000e54c040cfe467decf9d8097c8 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2006, 2009, 2010, 2012 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2006, 2009, 2010, 2012, 2015 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
@@ -27,7 +27,6 @@
 #include "gnunet_util_lib.h"
 #include "gnunet_postgres_lib.h"
 #include "gnunet_datacache_plugin.h"
-#include <postgresql/libpq-fe.h>
 
 #define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
 
@@ -51,6 +50,10 @@ struct Plugin
    */
   PGconn *dbh;
 
+  /**
+   * Number of key-value pairs in the database.
+   */
+  unsigned int num_items;
 };
 
 
@@ -58,7 +61,7 @@ struct Plugin
  * @brief Get a database handle
  *
  * @param plugin global context
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 static int
 init_connection (struct Plugin *plugin)
@@ -71,7 +74,7 @@ init_connection (struct Plugin *plugin)
     return GNUNET_SYSERR;
   ret =
       PQexec (plugin->dbh,
-              "CREATE TEMPORARY TABLE gn090dc ("
+              "CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
               "  type INTEGER NOT NULL DEFAULT 0,"
               "  discard_time BIGINT NOT NULL DEFAULT 0,"
               "  key BYTEA NOT NULL DEFAULT '',"
@@ -86,7 +89,8 @@ init_connection (struct Plugin *plugin)
                       PG_DIAG_SQLSTATE)))))
   {
     (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret,
-                                        PGRES_COMMAND_OK, "CREATE TABLE",
+                                        PGRES_COMMAND_OK,
+                                         "CREATE TABLE",
                                         "gn090dc");
     PQfinish (plugin->dbh);
     plugin->dbh = NULL;
@@ -95,9 +99,11 @@ init_connection (struct Plugin *plugin)
   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
   {
     if ((GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_key ON gn090dc (key)")) ||
+         GNUNET_POSTGRES_exec (plugin->dbh,
+                               "CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)")) ||
         (GNUNET_OK !=
-         GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_dt ON gn090dc (discard_time)")))
+         GNUNET_POSTGRES_exec (plugin->dbh,
+                               "CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)")))
     {
       PQclear (ret);
       PQfinish (plugin->dbh);
@@ -110,16 +116,25 @@ init_connection (struct Plugin *plugin)
       PQexec (plugin->dbh,
               "ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL");
   if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090dc"))
+      GNUNET_POSTGRES_check_result (plugin->dbh,
+                                    ret,
+                                    PGRES_COMMAND_OK,
+                                    "ALTER TABLE",
+                                    "gn090dc"))
   {
     PQfinish (plugin->dbh);
     plugin->dbh = NULL;
     return GNUNET_SYSERR;
   }
   PQclear (ret);
-  ret = PQexec (plugin->dbh, "ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN");
+  ret = PQexec (plugin->dbh,
+                "ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN");
   if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090dc"))
+      GNUNET_POSTGRES_check_result (plugin->dbh,
+                                    ret,
+                                    PGRES_COMMAND_OK,
+                                    "ALTER TABLE",
+                                    "gn090dc"))
   {
     PQfinish (plugin->dbh);
     plugin->dbh = NULL;
@@ -127,23 +142,39 @@ init_connection (struct Plugin *plugin)
   }
   PQclear (ret);
   if ((GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "getkt",
-                   "SELECT discard_time,type,value,path FROM gn090dc "
-                   "WHERE key=$1 AND type=$2 ", 2)) ||
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "getkt",
+                                "SELECT discard_time,type,value,path FROM gn090dc "
+                                "WHERE key=$1 AND type=$2 ", 2)) ||
+      (GNUNET_OK !=
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "getk",
+                                "SELECT discard_time,type,value,path FROM gn090dc "
+                                "WHERE key=$1", 1)) ||
       (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "getk",
-                   "SELECT discard_time,type,value,path FROM gn090dc "
-                   "WHERE key=$1", 1)) ||
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "getm",
+                                "SELECT length(value),oid,key FROM gn090dc "
+                                "ORDER BY discard_time ASC LIMIT 1", 0)) ||
       (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "getm",
-                   "SELECT length(value),oid,key FROM gn090dc "
-                   "ORDER BY discard_time ASC LIMIT 1", 0)) ||
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "get_random",
+                                "SELECT discard_time,type,value,path,key FROM gn090dc "
+                                "ORDER BY key ASC LIMIT 1 OFFSET $1", 1)) ||
       (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090dc WHERE oid=$1", 1)) ||
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "get_closest",
+                                "SELECT discard_time,type,value,path,key FROM gn090dc "
+                                "WHERE key>=$1 ORDER BY key ASC LIMIT $2", 1)) ||
       (GNUNET_OK !=
-       GNUNET_POSTGRES_prepare (plugin->dbh, "put",
-                   "INSERT INTO gn090dc (type, discard_time, key, value, path) "
-                   "VALUES ($1, $2, $3, $4, $5)", 5)))
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "delrow",
+                                "DELETE FROM gn090dc WHERE oid=$1", 1)) ||
+      (GNUNET_OK !=
+       GNUNET_POSTGRES_prepare (plugin->dbh,
+                                "put",
+                                "INSERT INTO gn090dc (type, discard_time, key, value, path) "
+                                "VALUES ($1, $2, $3, $4, $5)", 5)))
   {
     PQfinish (plugin->dbh);
     plugin->dbh = NULL;
@@ -156,19 +187,22 @@ init_connection (struct Plugin *plugin)
 /**
  * Store an item in the datastore.
  *
- * @param cls closure (our "struct Plugin")
- * @param key key to store data under
- * @param size number of bytes in data
+ * @param cls closure (our `struct Plugin`)
+ * @param key key to store @a data under
+ * @param size number of bytes in @a data
  * @param data data to store
  * @param type type of the value
  * @param discard_time when to discard the value in any case
- * @param path_info_len number of entries in 'path_info'
+ * @param path_info_len number of entries in @a path_info
  * @param path_info a path through the network
  * @return 0 if duplicate, -1 on error, number of bytes used otherwise
  */
 static ssize_t
-postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, size_t size,
-                     const char *data, enum GNUNET_BLOCK_Type type,
+postgres_plugin_put (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     size_t size,
+                     const char *data,
+                     enum GNUNET_BLOCK_Type type,
                      struct GNUNET_TIME_Absolute discard_time,
                     unsigned int path_info_len,
                     const struct GNUNET_PeerIdentity *path_info)
@@ -201,6 +235,7 @@ postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, size_t size,
       GNUNET_POSTGRES_check_result (plugin->dbh, ret,
                                    PGRES_COMMAND_OK, "PQexecPrepared", "put"))
     return -1;
+  plugin->num_items++;
   PQclear (ret);
   return size + OVERHEAD;
 }
@@ -210,28 +245,30 @@ postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, size_t size,
  * Iterate over the results for a particular key
  * in the datastore.
  *
- * @param cls closure (our "struct Plugin")
- * @param key
+ * @param cls closure (our `struct Plugin`)
+ * @param key key to look for
  * @param type entries of which type are relevant?
  * @param iter maybe NULL (to just count)
- * @param iter_cls closure for iter
+ * @param iter_cls closure for @a iter
  * @return the number of results found
  */
 static unsigned int
-postgres_plugin_get (void *cls, const struct GNUNET_HashCode * key,
+postgres_plugin_get (void *cls,
+                     const struct GNUNET_HashCode *key,
                      enum GNUNET_BLOCK_Type type,
-                     GNUNET_DATACACHE_Iterator iter, void *iter_cls)
+                     GNUNET_DATACACHE_Iterator iter,
+                     void *iter_cls)
 {
   struct Plugin *plugin = cls;
   uint32_t btype = htonl (type);
 
   const char *paramValues[] = {
     (const char *) key,
-    (const char *) &btype,
+    (const char *) &btype
   };
   int paramLengths[] = {
     sizeof (struct GNUNET_HashCode),
-    sizeof (btype),
+    sizeof (btype)
   };
   const int paramFormats[] = { 1, 1 };
   struct GNUNET_TIME_Absolute expiration_time;
@@ -247,7 +284,10 @@ postgres_plugin_get (void *cls, const struct GNUNET_HashCode * key,
                       (type == 0) ? 1 : 2, paramValues, paramLengths,
                       paramFormats, 1);
   if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared",
+      GNUNET_POSTGRES_check_result (plugin->dbh,
+                                    res,
+                                    PGRES_TUPLES_OK,
+                                    "PQexecPrepared",
                                    (type == 0) ? "getk" : "getkt"))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -268,8 +308,9 @@ postgres_plugin_get (void *cls, const struct GNUNET_HashCode * key,
     PQclear (res);
     return cnt;
   }
-  if ((4 != PQnfields (res)) || (sizeof (uint64_t) != PQfsize (res, 0)) ||
-      (sizeof (uint32_t) != PQfsize (res, 1)))
+  if ( (4 != PQnfields (res)) ||
+       (sizeof (uint64_t) != PQfsize (res, 0)) ||
+       (sizeof (uint32_t) != PQfsize (res, 1)))
   {
     GNUNET_break (0);
     PQclear (res);
@@ -314,8 +355,8 @@ postgres_plugin_get (void *cls, const struct GNUNET_HashCode * key,
  * Delete the entry with the lowest expiration value
  * from the datacache right now.
  *
- * @param cls closure (our "struct Plugin")
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @param cls closure (our `struct Plugin`)
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 static int
 postgres_plugin_del (void *cls)
@@ -326,9 +367,15 @@ postgres_plugin_del (void *cls)
   struct GNUNET_HashCode key;
   PGresult *res;
 
-  res = PQexecPrepared (plugin->dbh, "getm", 0, NULL, NULL, NULL, 1);
+  res = PQexecPrepared (plugin->dbh,
+                        "getm",
+                        0, NULL, NULL, NULL, 1);
   if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "getm"))
+      GNUNET_POSTGRES_check_result (plugin->dbh,
+                                    res,
+                                    PGRES_TUPLES_OK,
+                                    "PQexecPrepared",
+                                    "getm"))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "Ending iteration (postgres error)\n");
@@ -352,20 +399,246 @@ postgres_plugin_del (void *cls)
   }
   size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
   oid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
-  memcpy (&key, PQgetvalue (res, 0, 2), sizeof (struct GNUNET_HashCode));
+  GNUNET_memcpy (&key, PQgetvalue (res, 0, 2), sizeof (struct GNUNET_HashCode));
   PQclear (res);
-  if (GNUNET_OK != GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", oid))
+  if (GNUNET_OK !=
+      GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
+                                       "delrow",
+                                       oid))
     return GNUNET_SYSERR;
-  plugin->env->delete_notify (plugin->env->cls, &key, size + OVERHEAD);
+  plugin->num_items--;
+  plugin->env->delete_notify (plugin->env->cls,
+                              &key,
+                              size + OVERHEAD);
   return GNUNET_OK;
 }
 
 
+/**
+ * Obtain a random key-value pair from the datacache.
+ *
+ * @param cls closure (our `struct Plugin`)
+ * @param iter maybe NULL (to just count)
+ * @param iter_cls closure for @a iter
+ * @return the number of results found, zero (datacache empty) or one
+ */
+static unsigned int
+postgres_plugin_get_random (void *cls,
+                            GNUNET_DATACACHE_Iterator iter,
+                            void *iter_cls)
+{
+  struct Plugin *plugin = cls;
+  unsigned int off;
+  uint32_t off_be;
+  struct GNUNET_TIME_Absolute expiration_time;
+  uint32_t size;
+  unsigned int path_len;
+  const struct GNUNET_PeerIdentity *path;
+  const struct GNUNET_HashCode *key;
+  unsigned int type;
+  PGresult *res;
+  const char *paramValues[] = {
+    (const char *) &off_be
+  };
+  int paramLengths[] = {
+    sizeof (off_be)
+  };
+  const int paramFormats[] = { 1 };
+
+  if (0 == plugin->num_items)
+    return 0;
+  if (NULL == iter)
+    return 1;
+  off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+                                  plugin->num_items);
+  off_be = htonl (off);
+  res =
+    PQexecPrepared (plugin->dbh, "get_random",
+                    1, paramValues, paramLengths, paramFormats,
+                    1);
+  if (GNUNET_OK !=
+      GNUNET_POSTGRES_check_result (plugin->dbh,
+                                    res,
+                                    PGRES_TUPLES_OK,
+                                    "PQexecPrepared",
+                                   "get_random"))
+  {
+    GNUNET_break (0);
+    return 0;
+  }
+  if (0 == PQntuples (res))
+  {
+    GNUNET_break (0);
+    return 0;
+  }
+  if ( (5 != PQnfields (res)) ||
+       (sizeof (uint64_t) != PQfsize (res, 0)) ||
+       (sizeof (uint32_t) != PQfsize (res, 1)) ||
+       (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
+  {
+    GNUNET_break (0);
+    PQclear (res);
+    return 0;
+  }
+  expiration_time.abs_value_us =
+    GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 0));
+  type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
+  size = PQgetlength (res, 0, 2);
+  path_len = PQgetlength (res, 0, 3);
+  if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_break (0);
+    path_len = 0;
+  }
+  path_len %= sizeof (struct GNUNET_PeerIdentity);
+  path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, 0, 3);
+  key = (const struct GNUNET_HashCode *) PQgetvalue (res, 0, 4);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Found random value with key %s of size %u bytes and type %u in database\n",
+       GNUNET_h2s (key),
+       (unsigned int) size,
+       (unsigned int) type);
+  (void) iter (iter_cls,
+               key,
+               size,
+               PQgetvalue (res, 0, 2),
+               (enum GNUNET_BLOCK_Type) type,
+               expiration_time,
+               path_len,
+               path);
+  PQclear (res);
+  return 1;
+}
+
+
+/**
+ * Iterate over the results that are "close" to a particular key in
+ * the datacache.  "close" is defined as numerically larger than @a
+ * key (when interpreted as a circular address space), with small
+ * distance.
+ *
+ * @param cls closure (internal context for the plugin)
+ * @param key area of the keyspace to look into
+ * @param num_results number of results that should be returned to @a iter
+ * @param iter maybe NULL (to just count)
+ * @param iter_cls closure for @a iter
+ * @return the number of results found
+ */
+static unsigned int
+postgres_plugin_get_closest (void *cls,
+                             const struct GNUNET_HashCode *key,
+                             unsigned int num_results,
+                             GNUNET_DATACACHE_Iterator iter,
+                             void *iter_cls)
+{
+  struct Plugin *plugin = cls;
+  uint32_t nbo_limit = htonl (num_results);
+  const char *paramValues[] = {
+    (const char *) key,
+    (const char *) &nbo_limit,
+  };
+  int paramLengths[] = {
+    sizeof (struct GNUNET_HashCode),
+    sizeof (nbo_limit)
+
+  };
+  const int paramFormats[] = { 1, 1 };
+  struct GNUNET_TIME_Absolute expiration_time;
+  uint32_t size;
+  unsigned int type;
+  unsigned int cnt;
+  unsigned int i;
+  unsigned int path_len;
+  const struct GNUNET_PeerIdentity *path;
+  PGresult *res;
+
+  res =
+      PQexecPrepared (plugin->dbh,
+                      "get_closest",
+                      2,
+                      paramValues,
+                      paramLengths,
+                      paramFormats,
+                      1);
+  if (GNUNET_OK !=
+      GNUNET_POSTGRES_check_result (plugin->dbh,
+                                    res,
+                                    PGRES_TUPLES_OK,
+                                    "PQexecPrepared",
+                                   "get_closest"))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Ending iteration (postgres error)\n");
+    return 0;
+  }
+
+  if (0 == (cnt = PQntuples (res)))
+  {
+    /* no result */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Ending iteration (no more results)\n");
+    PQclear (res);
+    return 0;
+  }
+  if (NULL == iter)
+  {
+    PQclear (res);
+    return cnt;
+  }
+  if ( (5 != PQnfields (res)) ||
+       (sizeof (uint64_t) != PQfsize (res, 0)) ||
+       (sizeof (uint32_t) != PQfsize (res, 1)) ||
+       (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
+  {
+    GNUNET_break (0);
+    PQclear (res);
+    return 0;
+  }
+  for (i = 0; i < cnt; i++)
+  {
+    expiration_time.abs_value_us =
+        GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
+    type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
+    size = PQgetlength (res, i, 2);
+    path_len = PQgetlength (res, i, 3);
+    if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_break (0);
+      path_len = 0;
+    }
+    path_len %= sizeof (struct GNUNET_PeerIdentity);
+    path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, i, 3);
+    key = (const struct GNUNET_HashCode *) PQgetvalue (res, i, 4);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Found result of size %u bytes and type %u in database\n",
+        (unsigned int) size,
+         (unsigned int) type);
+    if (GNUNET_SYSERR ==
+        iter (iter_cls,
+              key,
+              size,
+              PQgetvalue (res, i, 2),
+              (enum GNUNET_BLOCK_Type) type,
+             expiration_time,
+             path_len,
+             path))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          "Ending iteration (client error)\n");
+      PQclear (res);
+      return cnt;
+    }
+  }
+  PQclear (res);
+  return cnt;
+}
+
+
 /**
  * Entry point for the plugin.
  *
- * @param cls closure (the "struct GNUNET_DATACACHE_PluginEnvironmnet")
- * @return the plugin's closure (our "struct Plugin")
+ * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
+ * @return the plugin's closure (our `struct Plugin`)
  */
 void *
 libgnunet_plugin_datacache_postgres_init (void *cls)
@@ -374,7 +647,7 @@ libgnunet_plugin_datacache_postgres_init (void *cls)
   struct GNUNET_DATACACHE_PluginFunctions *api;
   struct Plugin *plugin;
 
-  plugin = GNUNET_malloc (sizeof (struct Plugin));
+  plugin = GNUNET_new (struct Plugin);
   plugin->env = env;
 
   if (GNUNET_OK != init_connection (plugin))
@@ -383,13 +656,15 @@ libgnunet_plugin_datacache_postgres_init (void *cls)
     return NULL;
   }
 
-  api = GNUNET_malloc (sizeof (struct GNUNET_DATACACHE_PluginFunctions));
+  api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
   api->cls = plugin;
   api->get = &postgres_plugin_get;
   api->put = &postgres_plugin_put;
   api->del = &postgres_plugin_del;
+  api->get_random = &postgres_plugin_get_random;
+  api->get_closest = &postgres_plugin_get_closest;
   LOG (GNUNET_ERROR_TYPE_INFO,
-       _("Postgres datacache running\n"));
+       "Postgres datacache running\n");
   return api;
 }
 
@@ -397,7 +672,7 @@ libgnunet_plugin_datacache_postgres_init (void *cls)
 /**
  * Exit point from the plugin.
  *
- * @param cls closure (our "struct Plugin")
+ * @param cls closure (our `struct Plugin`)
  * @return NULL
  */
 void *