/*
This file is part of GNUnet
- (C) 2006, 2009, 2010 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2006, 2009, 2010, 2012, 2015, 2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
*/
#include "platform.h"
#include "gnunet_util_lib.h"
+#include "gnunet_pq_lib.h"
#include "gnunet_datacache_plugin.h"
-#include <postgresql/libpq-fe.h>
-#define DEBUG_POSTGRES GNUNET_NO
+#define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
/**
* Per-entry overhead estimate
*/
-#define OVERHEAD (sizeof(GNUNET_HashCode) + 24)
+#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
/**
* Context for all functions in this plugin.
*/
PGconn *dbh;
+ /**
+ * Number of key-value pairs in the database.
+ */
+ unsigned int num_items;
};
/**
- * Check if the result obtained from Postgres has
- * the desired status code. If not, log an error, clear the
- * result and return GNUNET_SYSERR.
- *
- * @return GNUNET_OK if the result is acceptable
+ * @brief Get a database handle
+ *
+ * @param plugin global context
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
*/
static int
-check_result (struct Plugin *plugin, PGresult * ret, int expected_status,
- const char *command, const char *args, int line)
+init_connection (struct Plugin *plugin)
{
- if (ret == NULL)
+ struct GNUNET_PQ_ExecuteStatement es[] = {
+ GNUNET_PQ_make_execute ("CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
+ " type INTEGER NOT NULL,"
+ " discard_time BIGINT NOT NULL,"
+ " key BYTEA NOT NULL,"
+ " value BYTEA NOT NULL,"
+ " path BYTEA DEFAULT NULL)"
+ "WITH OIDS"),
+ GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)"),
+ GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)"),
+ GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL"),
+ GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN"),
+ GNUNET_PQ_EXECUTE_STATEMENT_END
+ };
+ struct GNUNET_PQ_PreparedStatement ps[] = {
+ GNUNET_PQ_make_prepare ("getkt",
+ "SELECT discard_time,type,value,path FROM gn090dc "
+ "WHERE key=$1 AND type=$2",
+ 2),
+ GNUNET_PQ_make_prepare ("getk",
+ "SELECT discard_time,type,value,path FROM gn090dc "
+ "WHERE key=$1",
+ 1),
+ GNUNET_PQ_make_prepare ("getm",
+ "SELECT length(value) AS len,oid,key FROM gn090dc "
+ "ORDER BY discard_time ASC LIMIT 1",
+ 0),
+ GNUNET_PQ_make_prepare ("get_random",
+ "SELECT discard_time,type,value,path,key FROM gn090dc "
+ "ORDER BY key ASC LIMIT 1 OFFSET $1",
+ 1),
+ GNUNET_PQ_make_prepare ("get_closest",
+ "SELECT discard_time,type,value,path,key FROM gn090dc "
+ "WHERE key>=$1 ORDER BY key ASC LIMIT $2",
+ 1),
+ GNUNET_PQ_make_prepare ("delrow",
+ "DELETE FROM gn090dc WHERE oid=$1",
+ 1),
+ GNUNET_PQ_make_prepare ("put",
+ "INSERT INTO gn090dc (type, discard_time, key, value, path) "
+ "VALUES ($1, $2, $3, $4, $5)",
+ 5),
+ GNUNET_PQ_PREPARED_STATEMENT_END
+ };
+
+ plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
+ "datacache-postgres");
+ if (NULL == plugin->dbh)
+ return GNUNET_SYSERR;
+ if (GNUNET_OK !=
+ GNUNET_PQ_exec_statements (plugin->dbh,
+ es))
{
- GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "datastore-postgres",
- "Postgres failed to allocate result for `%s:%s' at %d\n",
- command, args, line);
+ PQfinish (plugin->dbh);
+ plugin->dbh = NULL;
return GNUNET_SYSERR;
}
- if (PQresultStatus (ret) != expected_status)
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_prepare_statements (plugin->dbh,
+ ps))
{
- GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "datastore-postgres",
- _("`%s:%s' failed at %s:%d with error: %s"), command, args,
- __FILE__, line, PQerrorMessage (plugin->dbh));
- PQclear (ret);
+ PQfinish (plugin->dbh);
+ plugin->dbh = NULL;
return GNUNET_SYSERR;
}
return GNUNET_OK;
/**
- * Run simple SQL statement (without results).
+ * Store an item in the datastore.
+ *
+ * @param cls closure (our `struct Plugin`)
+ * @param key key to store @a data under
+ * @param data_size number of bytes in @a data
+ * @param data data to store
+ * @param type type of the value
+ * @param discard_time when to discard the value in any case
+ * @param path_info_len number of entries in @a path_info
+ * @param path_info a path through the network
+ * @return 0 if duplicate, -1 on error, number of bytes used otherwise
*/
-static int
-pq_exec (struct Plugin *plugin, const char *sql, int line)
+static ssize_t
+postgres_plugin_put (void *cls,
+ const struct GNUNET_HashCode *key,
+ size_t data_size,
+ const char *data,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute discard_time,
+ unsigned int path_info_len,
+ const struct GNUNET_PeerIdentity *path_info)
{
- PGresult *ret;
-
- ret = PQexec (plugin->dbh, sql);
- if (GNUNET_OK !=
- check_result (plugin, ret, PGRES_COMMAND_OK, "PQexec", sql, line))
- return GNUNET_SYSERR;
- PQclear (ret);
- return GNUNET_OK;
+ struct Plugin *plugin = cls;
+ uint32_t type32 = (uint32_t) type;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32 (&type32),
+ GNUNET_PQ_query_param_absolute_time (&discard_time),
+ GNUNET_PQ_query_param_auto_from_type (key),
+ GNUNET_PQ_query_param_fixed_size (data, data_size),
+ GNUNET_PQ_query_param_fixed_size (path_info,
+ path_info_len * sizeof (struct GNUNET_PeerIdentity)),
+ GNUNET_PQ_query_param_end
+ };
+ enum GNUNET_DB_QueryStatus ret;
+
+ ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+ "put",
+ params);
+ if (0 > ret)
+ return -1;
+ plugin->num_items++;
+ return data_size + OVERHEAD;
}
/**
- * Prepare SQL statement.
+ * Closure for #handle_results.
*/
-static int
-pq_prepare (struct Plugin *plugin, const char *name, const char *sql,
- int nparms, int line)
+struct HandleResultContext
{
- PGresult *ret;
- ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
- if (GNUNET_OK !=
- check_result (plugin, ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
- return GNUNET_SYSERR;
- PQclear (ret);
- return GNUNET_OK;
-}
+ /**
+ * Function to call on each result, may be NULL.
+ */
+ GNUNET_DATACACHE_Iterator iter;
+
+ /**
+ * Closure for @e iter.
+ */
+ void *iter_cls;
+
+ /**
+ * Key used.
+ */
+ const struct GNUNET_HashCode *key;
+};
/**
- * @brief Get a database handle
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results. Parse the result
+ * and call the callback given in @a cls
+ *
+ * @param cls closure of type `struct HandleResultContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
*/
-static int
-init_connection (struct Plugin *plugin)
+static void
+handle_results (void *cls,
+ PGresult *result,
+ unsigned int num_results)
{
- char *conninfo;
- PGresult *ret;
+ struct HandleResultContext *hrc = cls;
- /* Open database and precompile statements */
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
- "datacache-postgres", "CONFIG",
- &conninfo))
- conninfo = NULL;
- plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
- GNUNET_free_non_null (conninfo);
- if (NULL == plugin->dbh)
- {
- /* FIXME: warn about out-of-memory? */
- return GNUNET_SYSERR;
- }
- if (PQstatus (plugin->dbh) != CONNECTION_OK)
- {
- GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "datacache-postgres",
- _("Unable to initialize Postgres: %s"),
- PQerrorMessage (plugin->dbh));
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- ret =
- PQexec (plugin->dbh,
- "CREATE TEMPORARY TABLE gn090dc ("
- " type INTEGER NOT NULL DEFAULT 0,"
- " discard_time BIGINT NOT NULL DEFAULT 0,"
- " key BYTEA NOT NULL DEFAULT '',"
- " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
- if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07", /* duplicate table */
- PQresultErrorField
- (ret,
- PG_DIAG_SQLSTATE)))))
+ for (unsigned int i=0;i<num_results;i++)
{
- (void) check_result (plugin, ret, PGRES_COMMAND_OK, "CREATE TABLE",
- "gn090dc", __LINE__);
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- if (PQresultStatus (ret) == PGRES_COMMAND_OK)
- {
- if ((GNUNET_OK !=
- pq_exec (plugin, "CREATE INDEX idx_key ON gn090dc (key)", __LINE__)) ||
- (GNUNET_OK !=
- pq_exec (plugin, "CREATE INDEX idx_dt ON gn090dc (discard_time)",
- __LINE__)))
+ struct GNUNET_TIME_Absolute expiration_time;
+ uint32_t type;
+ void *data;
+ size_t data_size;
+ struct GNUNET_PeerIdentity *path;
+ size_t path_len;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_absolute_time ("discard_time",
+ &expiration_time),
+ GNUNET_PQ_result_spec_uint32 ("type",
+ &type),
+ GNUNET_PQ_result_spec_variable_size ("value",
+ &data,
+ &data_size),
+ GNUNET_PQ_result_spec_variable_size ("path",
+ (void **) &path,
+ &path_len),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_YES !=
+ GNUNET_PQ_extract_result (result,
+ rs,
+ i))
{
- PQclear (ret);
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
+ GNUNET_break (0);
+ return;
}
+ if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_break (0);
+ path_len = 0;
+ }
+ path_len %= sizeof (struct GNUNET_PeerIdentity);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Found result of size %u bytes and type %u in database\n",
+ (unsigned int) data_size,
+ (unsigned int) type);
+ if ( (NULL != hrc->iter) &&
+ (GNUNET_SYSERR ==
+ hrc->iter (hrc->iter_cls,
+ hrc->key,
+ data_size,
+ data,
+ (enum GNUNET_BLOCK_Type) type,
+ expiration_time,
+ path_len,
+ path)) )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ending iteration (client error)\n");
+ GNUNET_PQ_cleanup_result (rs);
+ return;
+ }
+ GNUNET_PQ_cleanup_result (rs);
}
- PQclear (ret);
-#if 1
- ret =
- PQexec (plugin->dbh,
- "ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL");
- if (GNUNET_OK !=
- check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090dc",
- __LINE__))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- PQclear (ret);
- ret = PQexec (plugin->dbh, "ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN");
- if (GNUNET_OK !=
- check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090dc",
- __LINE__))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- PQclear (ret);
-#endif
- if ((GNUNET_OK !=
- pq_prepare (plugin, "getkt",
- "SELECT discard_time,type,value FROM gn090dc "
- "WHERE key=$1 AND type=$2 ", 2, __LINE__)) ||
- (GNUNET_OK !=
- pq_prepare (plugin, "getk",
- "SELECT discard_time,type,value FROM gn090dc "
- "WHERE key=$1", 1, __LINE__)) ||
- (GNUNET_OK !=
- pq_prepare (plugin, "getm",
- "SELECT length(value),oid,key FROM gn090dc "
- "ORDER BY discard_time ASC LIMIT 1", 0, __LINE__)) ||
- (GNUNET_OK !=
- pq_prepare (plugin, "delrow", "DELETE FROM gn090dc WHERE oid=$1", 1,
- __LINE__)) ||
- (GNUNET_OK !=
- pq_prepare (plugin, "put",
- "INSERT INTO gn090dc (type, discard_time, key, value) "
- "VALUES ($1, $2, $3, $4)", 4, __LINE__)))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
}
/**
- * Delete the row identified by the given rowid (qid
- * in postgres).
+ * Iterate over the results for a particular key
+ * in the datastore.
*
- * @return GNUNET_OK on success
+ * @param cls closure (our `struct Plugin`)
+ * @param key key to look for
+ * @param type entries of which type are relevant?
+ * @param iter maybe NULL (to just count)
+ * @param iter_cls closure for @a iter
+ * @return the number of results found
*/
-static int
-delete_by_rowid (struct Plugin *plugin, uint32_t rowid)
+static unsigned int
+postgres_plugin_get (void *cls,
+ const struct GNUNET_HashCode *key,
+ enum GNUNET_BLOCK_Type type,
+ GNUNET_DATACACHE_Iterator iter,
+ void *iter_cls)
{
- uint32_t brow = htonl (rowid);
- const char *paramValues[] = { (const char *) &brow };
- int paramLengths[] = { sizeof (brow) };
- const int paramFormats[] = { 1 };
- PGresult *ret;
-
- ret =
- PQexecPrepared (plugin->dbh, "delrow", 1, paramValues, paramLengths,
- paramFormats, 1);
- if (GNUNET_OK !=
- check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
- __LINE__))
- {
- return GNUNET_SYSERR;
- }
- PQclear (ret);
- return GNUNET_OK;
+ struct Plugin *plugin = cls;
+ uint32_t type32 = (uint32_t) type;
+ struct GNUNET_PQ_QueryParam paramk[] = {
+ GNUNET_PQ_query_param_auto_from_type (key),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_QueryParam paramkt[] = {
+ GNUNET_PQ_query_param_auto_from_type (key),
+ GNUNET_PQ_query_param_uint32 (&type32),
+ GNUNET_PQ_query_param_end
+ };
+ enum GNUNET_DB_QueryStatus res;
+ struct HandleResultContext hr_ctx;
+
+ hr_ctx.iter = iter;
+ hr_ctx.iter_cls = iter_cls;
+ hr_ctx.key = key;
+ res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ (0 == type) ? "getk" : "getkt",
+ (0 == type) ? paramk : paramkt,
+ &handle_results,
+ &hr_ctx);
+ if (res < 0)
+ return 0;
+ return res;
}
/**
- * Store an item in the datastore.
+ * Delete the entry with the lowest expiration value
+ * from the datacache right now.
*
- * @param cls closure (our "struct Plugin")
- * @param key key to store data under
- * @param size number of bytes in data
- * @param data data to store
- * @param type type of the value
- * @param discard_time when to discard the value in any case
- * @return 0 on error, number of bytes used otherwise
+ * @param cls closure (our `struct Plugin`)
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
*/
-static size_t
-postgres_plugin_put (void *cls, const GNUNET_HashCode * key, size_t size,
- const char *data, enum GNUNET_BLOCK_Type type,
- struct GNUNET_TIME_Absolute discard_time)
+static int
+postgres_plugin_del (void *cls)
{
struct Plugin *plugin = cls;
- PGresult *ret;
- uint32_t btype = htonl (type);
- uint64_t bexpi = GNUNET_TIME_absolute_hton (discard_time).abs_value__;
-
- const char *paramValues[] = {
- (const char *) &btype,
- (const char *) &bexpi,
- (const char *) key,
- (const char *) data
+ struct GNUNET_PQ_QueryParam pempty[] = {
+ GNUNET_PQ_query_param_end
};
- int paramLengths[] = {
- sizeof (btype),
- sizeof (bexpi),
- sizeof (GNUNET_HashCode),
- size
+ uint32_t size;
+ uint32_t oid;
+ struct GNUNET_HashCode key;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32 ("len",
+ &size),
+ GNUNET_PQ_result_spec_uint32 ("oid",
+ &oid),
+ GNUNET_PQ_result_spec_auto_from_type ("key",
+ &key),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus res;
+ struct GNUNET_PQ_QueryParam dparam[] = {
+ GNUNET_PQ_query_param_uint32 (&oid),
+ GNUNET_PQ_query_param_end
};
- const int paramFormats[] = { 1, 1, 1, 1 };
- ret =
- PQexecPrepared (plugin->dbh, "put", 4, paramValues, paramLengths,
- paramFormats, 1);
- if (GNUNET_OK !=
- check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put",
- __LINE__))
+ res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
+ "getm",
+ pempty,
+ rs);
+ if (0 > res)
+ return GNUNET_SYSERR;
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
+ {
+ /* no result */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ending iteration (no more results)\n");
+ return 0;
+ }
+ res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+ "delrow",
+ dparam);
+ if (0 > res)
+ {
+ GNUNET_PQ_cleanup_result (rs);
return GNUNET_SYSERR;
- PQclear (ret);
- return size + OVERHEAD;
+ }
+ plugin->num_items--;
+ plugin->env->delete_notify (plugin->env->cls,
+ &key,
+ size + OVERHEAD);
+ GNUNET_PQ_cleanup_result (rs);
+ return GNUNET_OK;
}
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Obtain a random key-value pair from the datacache.
*
- * @param cls closure (our "struct Plugin")
- * @param key
- * @param type entries of which type are relevant?
+ * @param cls closure (our `struct Plugin`)
* @param iter maybe NULL (to just count)
- * @param iter_cls closure for iter
- * @return the number of results found
+ * @param iter_cls closure for @a iter
+ * @return the number of results found, zero (datacache empty) or one
*/
static unsigned int
-postgres_plugin_get (void *cls, const GNUNET_HashCode * key,
- enum GNUNET_BLOCK_Type type,
- GNUNET_DATACACHE_Iterator iter, void *iter_cls)
+postgres_plugin_get_random (void *cls,
+ GNUNET_DATACACHE_Iterator iter,
+ void *iter_cls)
{
struct Plugin *plugin = cls;
- uint32_t btype = htonl (type);
-
- const char *paramValues[] = {
- (const char *) key,
- (const char *) &btype,
+ uint32_t off;
+ struct GNUNET_TIME_Absolute expiration_time;
+ size_t data_size;
+ void *data;
+ size_t path_len;
+ struct GNUNET_PeerIdentity *path;
+ struct GNUNET_HashCode key;
+ uint32_t type;
+ enum GNUNET_DB_QueryStatus res;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32 (&off),
+ GNUNET_PQ_query_param_end
};
- int paramLengths[] = {
- sizeof (GNUNET_HashCode),
- sizeof (btype),
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_absolute_time ("discard_time",
+ &expiration_time),
+ GNUNET_PQ_result_spec_uint32 ("type",
+ &type),
+ GNUNET_PQ_result_spec_variable_size ("value",
+ &data,
+ &data_size),
+ GNUNET_PQ_result_spec_variable_size ("path",
+ (void **) &path,
+ &path_len),
+ GNUNET_PQ_result_spec_auto_from_type ("key",
+ &key),
+ GNUNET_PQ_result_spec_end
};
- const int paramFormats[] = { 1, 1 };
- struct GNUNET_TIME_Absolute expiration_time;
- uint32_t size;
- unsigned int cnt;
- unsigned int i;
- PGresult *res;
-
- cnt = 0;
- res =
- PQexecPrepared (plugin->dbh, (type == 0) ? "getk" : "getkt",
- (type == 0) ? 1 : 2, paramValues, paramLengths,
- paramFormats, 1);
- if (GNUNET_OK !=
- check_result (plugin, res, PGRES_TUPLES_OK, "PQexecPrepared",
- (type == 0) ? "getk" : "getkt", __LINE__))
- {
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
- "Ending iteration (postgres error)\n");
-#endif
- return 0;
- }
- if (0 == (cnt = PQntuples (res)))
+ if (0 == plugin->num_items)
+ return 0;
+ if (NULL == iter)
+ return 1;
+ off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+ plugin->num_items);
+ res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
+ "get_random",
+ params,
+ rs);
+ if (0 > res)
{
- /* no result */
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
- "Ending iteration (no more results)\n");
-#endif
- PQclear (res);
+ GNUNET_break (0);
return 0;
}
- if (iter == NULL)
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
{
- PQclear (res);
- return cnt;
+ GNUNET_break (0);
+ return 0;
}
- if ((3 != PQnfields (res)) || (sizeof (uint64_t) != PQfsize (res, 0)) ||
- (sizeof (uint32_t) != PQfsize (res, 1)))
+ if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_break (0);
- PQclear (res);
- return 0;
+ path_len = 0;
}
- for (i = 0; i < cnt; i++)
+ path_len %= sizeof (struct GNUNET_PeerIdentity);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Found random value with key %s of size %u bytes and type %u in database\n",
+ GNUNET_h2s (&key),
+ (unsigned int) data_size,
+ (unsigned int) type);
+ (void) iter (iter_cls,
+ &key,
+ data_size,
+ data,
+ (enum GNUNET_BLOCK_Type) type,
+ expiration_time,
+ path_len,
+ path);
+ GNUNET_PQ_cleanup_result (rs);
+ return 1;
+}
+
+
+/**
+ * Closure for #extract_result_cb.
+ */
+struct ExtractResultContext
+{
+ /**
+ * Function to call for each result found.
+ */
+ GNUNET_DATACACHE_Iterator iter;
+
+ /**
+ * Closure for @e iter.
+ */
+ void *iter_cls;
+
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results. Calls the `iter`
+ * from @a cls for each result.
+ *
+ * @param cls closure with the `struct ExtractResultContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+extract_result_cb (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct ExtractResultContext *erc = cls;
+
+ if (NULL == erc->iter)
+ return;
+ for (unsigned int i=0;i<num_results;i++)
{
- expiration_time.abs_value =
- GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
- type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
- size = PQgetlength (res, i, 2);
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
- "Found result of size %u bytes and type %u in database\n",
- (unsigned int) size, (unsigned int) type);
-#endif
+ struct GNUNET_TIME_Absolute expiration_time;
+ uint32_t type;
+ void *data;
+ size_t data_size;
+ struct GNUNET_PeerIdentity *path;
+ size_t path_len;
+ struct GNUNET_HashCode key;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_absolute_time ("",
+ &expiration_time),
+ GNUNET_PQ_result_spec_uint32 ("type",
+ &type),
+ GNUNET_PQ_result_spec_variable_size ("value",
+ &data,
+ &data_size),
+ GNUNET_PQ_result_spec_variable_size ("path",
+ (void **) &path,
+ &path_len),
+ GNUNET_PQ_result_spec_auto_from_type ("key",
+ &key),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_YES !=
+ GNUNET_PQ_extract_result (result,
+ rs,
+ i))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_break (0);
+ path_len = 0;
+ }
+ path_len %= sizeof (struct GNUNET_PeerIdentity);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Found result of size %u bytes and type %u in database\n",
+ (unsigned int) data_size,
+ (unsigned int) type);
if (GNUNET_SYSERR ==
- iter (iter_cls, expiration_time, key, size, PQgetvalue (res, i, 2),
- (enum GNUNET_BLOCK_Type) type))
+ erc->iter (erc->iter_cls,
+ &key,
+ data_size,
+ data,
+ (enum GNUNET_BLOCK_Type) type,
+ expiration_time,
+ path_len,
+ path))
{
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
- "Ending iteration (client error)\n");
-#endif
- PQclear (res);
- return cnt;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ending iteration (client error)\n");
+ GNUNET_PQ_cleanup_result (rs);
+ break;
}
+ GNUNET_PQ_cleanup_result (rs);
}
- PQclear (res);
- return cnt;
}
/**
- * Delete the entry with the lowest expiration value
- * from the datacache right now.
- *
- * @param cls closure (our "struct Plugin")
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * Iterate over the results that are "close" to a particular key in
+ * the datacache. "close" is defined as numerically larger than @a
+ * key (when interpreted as a circular address space), with small
+ * distance.
+ *
+ * @param cls closure (internal context for the plugin)
+ * @param key area of the keyspace to look into
+ * @param num_results number of results that should be returned to @a iter
+ * @param iter maybe NULL (to just count)
+ * @param iter_cls closure for @a iter
+ * @return the number of results found
*/
-static int
-postgres_plugin_del (void *cls)
+static unsigned int
+postgres_plugin_get_closest (void *cls,
+ const struct GNUNET_HashCode *key,
+ unsigned int num_results,
+ GNUNET_DATACACHE_Iterator iter,
+ void *iter_cls)
{
struct Plugin *plugin = cls;
- uint32_t size;
- uint32_t oid;
- GNUNET_HashCode key;
- PGresult *res;
-
- res = PQexecPrepared (plugin->dbh, "getm", 0, NULL, NULL, NULL, 1);
- if (GNUNET_OK !=
- check_result (plugin, res, PGRES_TUPLES_OK, "PQexecPrepared", "getm",
- __LINE__))
+ uint32_t num_results32 = (uint32_t) num_results;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (key),
+ GNUNET_PQ_query_param_uint32 (&num_results32),
+ GNUNET_PQ_query_param_end
+ };
+ enum GNUNET_DB_QueryStatus res;
+ struct ExtractResultContext erc;
+
+ erc.iter = iter;
+ erc.iter_cls = iter_cls;
+ res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ "get_closest",
+ params,
+ &extract_result_cb,
+ &erc);
+ if (0 > res)
{
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
- "Ending iteration (postgres error)\n");
-#endif
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ending iteration (postgres error)\n");
return 0;
}
- if (0 == PQntuples (res))
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
{
/* no result */
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datacache-postgres",
- "Ending iteration (no more results)\n");
-#endif
- PQclear (res);
- return GNUNET_SYSERR;
- }
- if ((3 != PQnfields (res)) || (sizeof (size) != PQfsize (res, 0)) ||
- (sizeof (oid) != PQfsize (res, 1)) ||
- (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 2)))
- {
- GNUNET_break (0);
- PQclear (res);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ending iteration (no more results)\n");
return 0;
}
- size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
- oid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
- memcpy (&key, PQgetvalue (res, 0, 2), sizeof (GNUNET_HashCode));
- PQclear (res);
- if (GNUNET_OK != delete_by_rowid (plugin, oid))
- return GNUNET_SYSERR;
- plugin->env->delete_notify (plugin->env->cls, &key, size + OVERHEAD);
- return GNUNET_OK;
+ return res;
}
/**
* Entry point for the plugin.
*
- * @param cls closure (the "struct GNUNET_DATACACHE_PluginEnvironmnet")
- * @return the plugin's closure (our "struct Plugin")
+ * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
+ * @return the plugin's closure (our `struct Plugin`)
*/
void *
libgnunet_plugin_datacache_postgres_init (void *cls)
struct GNUNET_DATACACHE_PluginFunctions *api;
struct Plugin *plugin;
- plugin = GNUNET_malloc (sizeof (struct Plugin));
+ plugin = GNUNET_new (struct Plugin);
plugin->env = env;
if (GNUNET_OK != init_connection (plugin))
return NULL;
}
- api = GNUNET_malloc (sizeof (struct GNUNET_DATACACHE_PluginFunctions));
+ api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
api->cls = plugin;
api->get = &postgres_plugin_get;
api->put = &postgres_plugin_put;
api->del = &postgres_plugin_del;
- GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datacache-postgres",
- _("Postgres datacache running\n"));
+ api->get_random = &postgres_plugin_get_random;
+ api->get_closest = &postgres_plugin_get_closest;
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Postgres datacache running\n");
return api;
}
/**
* Exit point from the plugin.
*
- * @param cls closure (our "struct Plugin")
+ * @param cls closure (our `struct Plugin`)
* @return NULL
*/
void *