/*
This file is part of GNUnet
- (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2009-2017 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
+ Affero General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
/**
* @file datastore/plugin_datastore_postgres.c
* @brief postgres-based datastore backend
* @author Christian Grothoff
*/
-
#include "platform.h"
#include "gnunet_datastore_plugin.h"
-#include "gnunet_postgres_lib.h"
-#include <postgresql/libpq-fe.h>
+#include "gnunet_pq_lib.h"
-#define DEBUG_POSTGRES GNUNET_EXTRA_LOGGING
/**
* After how many ms "busy" should a DB operation fail for good?
/**
* Context for all functions in this plugin.
*/
-struct Plugin
-{
+struct Plugin {
/**
* Our execution environment.
*/
* Native Postgres database handle.
*/
PGconn *dbh;
-
};
* @brief Get a database handle
*
* @param plugin global context
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
*/
static int
-init_connection (struct Plugin *plugin)
+init_connection(struct Plugin *plugin)
{
- PGresult *ret;
+ struct GNUNET_PQ_ExecuteStatement es[] = {
+ /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
+ * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
+ * we do math or inequality tests, so we can't handle the entire range of uint32_t.
+ * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
+ * PostgreSQL also recommends against using WITH OIDS.
+ */
+ GNUNET_PQ_make_execute("CREATE TABLE IF NOT EXISTS gn090 ("
+ " repl INTEGER NOT NULL DEFAULT 0,"
+ " type INTEGER NOT NULL DEFAULT 0,"
+ " prio INTEGER NOT NULL DEFAULT 0,"
+ " anonLevel INTEGER NOT NULL DEFAULT 0,"
+ " expire BIGINT NOT NULL DEFAULT 0,"
+ " rvalue BIGINT NOT NULL DEFAULT 0,"
+ " hash BYTEA NOT NULL DEFAULT '',"
+ " vhash BYTEA NOT NULL DEFAULT '',"
+ " value BYTEA NOT NULL DEFAULT '')"
+ "WITH OIDS"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
+ GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
+ GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
+ GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
+ GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
+ GNUNET_PQ_EXECUTE_STATEMENT_END
+ };
- plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
+#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
+ struct GNUNET_PQ_PreparedStatement ps[] = {
+ GNUNET_PQ_make_prepare("get",
+ "SELECT " RESULT_COLUMNS " FROM gn090"
+ " WHERE oid >= $1::bigint AND"
+ " (rvalue >= $2 OR 0 = $3::smallint) AND"
+ " (hash = $4 OR 0 = $5::smallint) AND"
+ " (type = $6 OR 0 = $7::smallint)"
+ " ORDER BY oid ASC LIMIT 1",
+ 7),
+ GNUNET_PQ_make_prepare("put",
+ "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
+ "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
+ 9),
+ GNUNET_PQ_make_prepare("update",
+ "UPDATE gn090"
+ " SET prio = prio + $1,"
+ " repl = repl + $2,"
+ " expire = GREATEST(expire, $3)"
+ " WHERE hash = $4 AND vhash = $5",
+ 5),
+ GNUNET_PQ_make_prepare("decrepl",
+ "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
+ "WHERE oid = $1",
+ 1),
+ GNUNET_PQ_make_prepare("select_non_anonymous",
+ "SELECT " RESULT_COLUMNS " FROM gn090 "
+ "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
+ "ORDER BY oid ASC LIMIT 1",
+ 2),
+ GNUNET_PQ_make_prepare("select_expiration_order",
+ "(SELECT " RESULT_COLUMNS " FROM gn090 "
+ "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
+ "UNION "
+ "(SELECT " RESULT_COLUMNS " FROM gn090 "
+ "ORDER BY prio ASC LIMIT 1) "
+ "ORDER BY expire ASC LIMIT 1",
+ 1),
+ GNUNET_PQ_make_prepare("select_replication_order",
+ "SELECT " RESULT_COLUMNS " FROM gn090 "
+ "ORDER BY repl DESC,RANDOM() LIMIT 1",
+ 0),
+ GNUNET_PQ_make_prepare("delrow",
+ "DELETE FROM gn090 "
+ "WHERE oid=$1",
+ 1),
+ GNUNET_PQ_make_prepare("remove",
+ "DELETE FROM gn090"
+ " WHERE hash = $1 AND"
+ " value = $2",
+ 2),
+ GNUNET_PQ_make_prepare("get_keys",
+ "SELECT hash FROM gn090",
+ 0),
+ GNUNET_PQ_make_prepare("estimate_size",
+ "SELECT CASE WHEN NOT EXISTS"
+ " (SELECT 1 FROM gn090)"
+ " THEN 0"
+ " ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
+ "END AS total",
+ 0),
+ GNUNET_PQ_PREPARED_STATEMENT_END
+ };
+#undef RESULT_COLUMNS
+
+ plugin->dbh = GNUNET_PQ_connect_with_cfg(plugin->env->cfg,
+ "datastore-postgres");
if (NULL == plugin->dbh)
return GNUNET_SYSERR;
- ret =
- PQexec (plugin->dbh,
- "CREATE TABLE gn090 (" " repl INTEGER NOT NULL DEFAULT 0,"
- " type INTEGER NOT NULL DEFAULT 0,"
- " prio INTEGER NOT NULL DEFAULT 0,"
- " anonLevel INTEGER NOT NULL DEFAULT 0,"
- " expire BIGINT NOT NULL DEFAULT 0,"
- " rvalue BIGINT NOT NULL DEFAULT 0,"
- " hash BYTEA NOT NULL DEFAULT '',"
- " vhash BYTEA NOT NULL DEFAULT '',"
- " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
- if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07", /* duplicate table */
- PQresultErrorField
- (ret,
- PG_DIAG_SQLSTATE)))))
- {
- (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090");
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- if (PQresultStatus (ret) == PGRES_COMMAND_OK)
- {
- if ((GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh,
- "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh,
- "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
+
+ if ((GNUNET_OK !=
+ GNUNET_PQ_exec_statements(plugin->dbh,
+ es)) ||
+ (GNUNET_OK !=
+ GNUNET_PQ_prepare_statements(plugin->dbh,
+ ps)))
{
- PQclear (ret);
- PQfinish (plugin->dbh);
+ PQfinish(plugin->dbh);
plugin->dbh = NULL;
return GNUNET_SYSERR;
}
- }
- PQclear (ret);
- ret =
- PQexec (plugin->dbh,
- "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- PQclear (ret);
- ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- PQclear (ret);
- ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
- PQclear (ret);
- if ((GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND vhash=$2 AND type=$3 "
- "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND type=$2 "
- "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND vhash=$2 "
- "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "get",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "put",
- "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
- "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "update",
- "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
- "WHERE oid = $3", 3)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
- "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
- "WHERE oid = $1", 1)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
- 1)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
- "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
- "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
- 1)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
- {
- PQfinish (plugin->dbh);
- plugin->dbh = NULL;
- return GNUNET_SYSERR;
- }
return GNUNET_OK;
}
* Get an estimate of how much space the database is
* currently using.
*
- * @param cls our "struct Plugin*"
+ * @param cls our `struct Plugin *`
* @return number of bytes used on disk
*/
-static unsigned long long
-postgres_plugin_estimate_size (void *cls)
+static void
+postgres_plugin_estimate_size(void *cls,
+ unsigned long long *estimate)
{
struct Plugin *plugin = cls;
- unsigned long long total;
- PGresult *ret;
+ uint64_t total;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64("total",
+ &total),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus ret;
- ret =
- PQexecParams (plugin->dbh,
- "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
- NULL, NULL, NULL, NULL, 1);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
- {
- return 0;
- }
- if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
- (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
- {
- GNUNET_break (0);
- PQclear (ret);
- return 0;
- }
- total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
- PQclear (ret);
- return total;
+ if (NULL == estimate)
+ return;
+ ret = GNUNET_PQ_eval_prepared_singleton_select(plugin->dbh,
+ "estimate_size",
+ params,
+ rs);
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
+ {
+ *estimate = 0LL;
+ return;
+ }
+ *estimate = total;
}
/**
* Store an item in the datastore.
*
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in data
* @param data content stored
* @param type type of the content
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-static int
-postgres_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size,
- const void *data, enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, char **msg)
+static void
+postgres_plugin_put(void *cls,
+ const struct GNUNET_HashCode *key,
+ bool absent,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ PluginPutCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
- GNUNET_HashCode vhash;
- PGresult *ret;
- uint32_t btype = htonl (type);
- uint32_t bprio = htonl (priority);
- uint32_t banon = htonl (anonymity);
- uint32_t brepl = htonl (replication);
- uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
-
- const char *paramValues[] = {
- (const char *) &brepl,
- (const char *) &btype,
- (const char *) &bprio,
- (const char *) &banon,
- (const char *) &bexpi,
- (const char *) key,
- (const char *) &vhash,
- (const char *) data
- };
- int paramLengths[] = {
- sizeof (brepl),
- sizeof (btype),
- sizeof (bprio),
- sizeof (banon),
- sizeof (bexpi),
- sizeof (GNUNET_HashCode),
- sizeof (GNUNET_HashCode),
- size
- };
- const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
+ struct GNUNET_HashCode vhash;
+ enum GNUNET_DB_QueryStatus ret;
- GNUNET_CRYPTO_hash (data, size, &vhash);
- ret =
- PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
- paramFormats, 1);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
- return GNUNET_SYSERR;
- PQclear (ret);
- plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
- "Stored %u bytes in database\n", (unsigned int) size);
-#endif
- return GNUNET_OK;
+ GNUNET_CRYPTO_hash(data,
+ size,
+ &vhash);
+ if (!absent)
+ {
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32(&priority),
+ GNUNET_PQ_query_param_uint32(&replication),
+ GNUNET_PQ_query_param_absolute_time(&expiration),
+ GNUNET_PQ_query_param_auto_from_type(key),
+ GNUNET_PQ_query_param_auto_from_type(&vhash),
+ GNUNET_PQ_query_param_end
+ };
+ ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+ "update",
+ params);
+ if (0 > ret)
+ {
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ _("Postgress exec failure"));
+ return;
+ }
+ bool affected = (0 != ret);
+ if (affected)
+ {
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_NO,
+ NULL);
+ return;
+ }
+ }
+
+ {
+ uint32_t utype = (uint32_t)type;
+ uint64_t rvalue = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32(&replication),
+ GNUNET_PQ_query_param_uint32(&utype),
+ GNUNET_PQ_query_param_uint32(&priority),
+ GNUNET_PQ_query_param_uint32(&anonymity),
+ GNUNET_PQ_query_param_absolute_time(&expiration),
+ GNUNET_PQ_query_param_uint64(&rvalue),
+ GNUNET_PQ_query_param_auto_from_type(key),
+ GNUNET_PQ_query_param_auto_from_type(&vhash),
+ GNUNET_PQ_query_param_fixed_size(data, size),
+ GNUNET_PQ_query_param_end
+ };
+
+ ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+ "put",
+ params);
+ if (0 > ret)
+ {
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ "Postgress exec failure");
+ return;
+ }
+ }
+ plugin->env->duc(plugin->env->cls,
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+ GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Stored %u bytes in database\n",
+ (unsigned int)size);
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_OK,
+ NULL);
}
/**
- * Function invoked to process the result and call
- * the processor.
+ * Closure for #process_result.
+ */
+struct ProcessResultContext {
+ /**
+ * The plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call on each result.
+ */
+ PluginDatumProcessor proc;
+
+ /**
+ * Closure for @e proc.
+ */
+ void *proc_cls;
+};
+
+
+/**
+ * Function invoked to process the result and call the processor of @a
+ * cls.
*
- * @param plugin global plugin data
- * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param cls our `struct ProcessResultContext`
* @param res result from exec
- * @param filename filename for error messages
- * @param line line number for error messages
+ * @param num_results number of results in @a res
*/
static void
-process_result (struct Plugin *plugin, PluginDatumProcessor proc,
- void *proc_cls, PGresult * res,
- const char *filename, int line)
+process_result(void *cls,
+ PGresult *res,
+ unsigned int num_results)
{
- int iret;
- enum GNUNET_BLOCK_Type type;
- uint32_t anonymity;
- uint32_t priority;
- uint32_t size;
- unsigned int rowid;
- struct GNUNET_TIME_Absolute expiration_time;
- GNUNET_HashCode key;
+ struct ProcessResultContext *prc = cls;
+ struct Plugin *plugin = prc->plugin;
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
- filename, line))
- {
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
- "Ending iteration (postgres error)\n");
-#endif
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
-
- if (0 == PQntuples (res))
- {
- /* no result */
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
- "Ending iteration (no more results)\n");
-#endif
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- PQclear (res);
- return;
- }
- if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
- (sizeof (uint32_t) != PQfsize (res, 0)) ||
- (sizeof (uint32_t) != PQfsize (res, 6)))
- {
- GNUNET_break (0);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- PQclear (res);
- return;
- }
- rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
- if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
- (sizeof (uint32_t) != PQfsize (res, 1)) ||
- (sizeof (uint32_t) != PQfsize (res, 2)) ||
- (sizeof (uint64_t) != PQfsize (res, 3)) ||
- (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)))
- {
- GNUNET_break (0);
- PQclear (res);
- GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
-
- type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
- priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
- anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
- expiration_time.abs_value =
- GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
- memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
- size = PQgetlength (res, 0, 5);
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
- "Found result of size %u bytes and type %u in database\n",
- (unsigned int) size, (unsigned int) type);
-#endif
- iret =
- proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
- (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
- rowid);
- PQclear (res);
- if (iret == GNUNET_NO)
- {
-#if DEBUG_POSTGRES
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processor asked for item %u to be removed.\n", rowid);
-#endif
- if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
+ if (0 == num_results)
{
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
- "Deleting %u bytes from database\n",
- (unsigned int) size);
-#endif
- plugin->env->duc (plugin->env->cls,
- -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
- "Deleted %u bytes from database\n", (unsigned int) size);
-#endif
+ /* no result */
+ GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Ending iteration (no more results)\n");
+ prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
}
- }
+ if (1 != num_results)
+ {
+ GNUNET_break(0);
+ prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ /* Technically we don't need the loop here, but nicer in case
+ we ever relax the condition above. */
+ for (unsigned int i = 0; i < num_results; i++)
+ {
+ int iret;
+ uint32_t rowid;
+ uint32_t utype;
+ uint32_t anonymity;
+ uint32_t replication;
+ uint32_t priority;
+ size_t size;
+ void *data;
+ struct GNUNET_TIME_Absolute expiration_time;
+ struct GNUNET_HashCode key;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32("repl", &replication),
+ GNUNET_PQ_result_spec_uint32("type", &utype),
+ GNUNET_PQ_result_spec_uint32("prio", &priority),
+ GNUNET_PQ_result_spec_uint32("anonLevel", &anonymity),
+ GNUNET_PQ_result_spec_absolute_time("expire", &expiration_time),
+ GNUNET_PQ_result_spec_auto_from_type("hash", &key),
+ GNUNET_PQ_result_spec_variable_size("value", &data, &size),
+ GNUNET_PQ_result_spec_uint32("oid", &rowid),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result(res,
+ rs,
+ i))
+ {
+ GNUNET_break(0);
+ prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+
+ GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Found result of size %u bytes and type %u in database\n",
+ (unsigned int)size,
+ (unsigned int)utype);
+ iret = prc->proc(prc->proc_cls,
+ &key,
+ size,
+ data,
+ (enum GNUNET_BLOCK_Type)utype,
+ priority,
+ anonymity,
+ replication,
+ expiration_time,
+ rowid);
+ if (iret == GNUNET_NO)
+ {
+ struct GNUNET_PQ_QueryParam param[] = {
+ GNUNET_PQ_query_param_uint32(&rowid),
+ GNUNET_PQ_query_param_end
+ };
+
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
+ "Processor asked for item %u to be removed.\n",
+ (unsigned int)rowid);
+ if (0 <
+ GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+ "delrow",
+ param))
+ {
+ GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Deleting %u bytes from database\n",
+ (unsigned int)size);
+ plugin->env->duc(plugin->env->cls,
+ -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+ GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Deleted %u bytes from database\n",
+ (unsigned int)size);
+ }
+ }
+ GNUNET_PQ_cleanup_result(rs);
+ } /* for (i) */
}
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
*
- * @param cls closure with the 'struct Plugin'
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param cls closure with the `struct Plugin`
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
- * @param vhash hash of the value, maybe NULL (to
- * match all values that have the right key).
- * Note that for DBlocks there is no difference
- * betwen key and vhash, but for other blocks
- * there may be!
* @param type entries of which type are relevant?
* Use 0 for any type.
* @param proc function to call on the matching value;
- * will be called once with a NULL if no value matches
- * @param proc_cls closure for iter
+ * will be called with NULL if nothing matches
+ * @param proc_cls closure for @a proc
*/
static void
-postgres_plugin_get_key (void *cls, uint64_t offset,
- const GNUNET_HashCode * key,
- const GNUNET_HashCode * vhash,
- enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
- void *proc_cls)
+postgres_plugin_get_key(void *cls,
+ uint64_t next_uid,
+ bool random,
+ const struct GNUNET_HashCode *key,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- const int paramFormats[] = { 1, 1, 1, 1, 1 };
- int paramLengths[4];
- const char *paramValues[4];
- int nparams;
- const char *pname;
- PGresult *ret;
- uint64_t total;
- uint64_t blimit_off;
- uint32_t btype;
-
- GNUNET_assert (key != NULL);
- paramValues[0] = (const char *) key;
- paramLengths[0] = sizeof (GNUNET_HashCode);
- btype = htonl (type);
- if (type != 0)
- {
- if (vhash != NULL)
- {
- paramValues[1] = (const char *) vhash;
- paramLengths[1] = sizeof (GNUNET_HashCode);
- paramValues[2] = (const char *) &btype;
- paramLengths[2] = sizeof (btype);
- paramValues[3] = (const char *) &blimit_off;
- paramLengths[3] = sizeof (blimit_off);
- nparams = 4;
- pname = "getvt";
- ret =
- PQexecParams (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
- 3, NULL, paramValues, paramLengths, paramFormats, 1);
- }
- else
+ uint32_t utype = type;
+ uint16_t use_rvalue = random;
+ uint16_t use_key = NULL != key;
+ uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
+ uint64_t rvalue;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint64(&next_uid),
+ GNUNET_PQ_query_param_uint64(&rvalue),
+ GNUNET_PQ_query_param_uint16(&use_rvalue),
+ GNUNET_PQ_query_param_auto_from_type(key),
+ GNUNET_PQ_query_param_uint16(&use_key),
+ GNUNET_PQ_query_param_uint32(&utype),
+ GNUNET_PQ_query_param_uint16(&use_type),
+ GNUNET_PQ_query_param_end
+ };
+ struct ProcessResultContext prc;
+ enum GNUNET_DB_QueryStatus res;
+
+ if (random)
{
- paramValues[1] = (const char *) &btype;
- paramLengths[1] = sizeof (btype);
- paramValues[2] = (const char *) &blimit_off;
- paramLengths[2] = sizeof (blimit_off);
- nparams = 3;
- pname = "gett";
- ret =
- PQexecParams (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
- 2, NULL, paramValues, paramLengths, paramFormats, 1);
+ rvalue = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ next_uid = 0;
}
- }
else
- {
- if (vhash != NULL)
{
- paramValues[1] = (const char *) vhash;
- paramLengths[1] = sizeof (GNUNET_HashCode);
- paramValues[2] = (const char *) &blimit_off;
- paramLengths[2] = sizeof (blimit_off);
- nparams = 3;
- pname = "getv";
- ret =
- PQexecParams (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
- 2, NULL, paramValues, paramLengths, paramFormats, 1);
+ rvalue = 0;
}
- else
- {
- paramValues[1] = (const char *) &blimit_off;
- paramLengths[1] = sizeof (blimit_off);
- nparams = 2;
- pname = "get";
- ret =
- PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
- 1, NULL, paramValues, paramLengths, paramFormats, 1);
- }
- }
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
- {
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
- (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
- {
- GNUNET_break (0);
- PQclear (ret);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
- PQclear (ret);
- if (total == 0)
- {
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- blimit_off = GNUNET_htonll (offset % total);
- ret =
- PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
- paramFormats, 1);
- process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
+ prc.plugin = plugin;
+ prc.proc = proc;
+ prc.proc_cls = proc_cls;
+
+ res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+ "get",
+ params,
+ &process_result,
+ &prc);
+ if (0 > res)
+ proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*
- * @param cls our "struct Plugin*"
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param cls our `struct Plugin *`
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
- * Use 0 for any type.
+ * Must not be zero (ANY).
* @param proc function to call on the matching value;
- * will be called with a NULL if no value matches
- * @param proc_cls closure for proc
+ * will be called with NULL if no value matches
+ * @param proc_cls closure for @a proc
*/
static void
-postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
- enum GNUNET_BLOCK_Type type,
- PluginDatumProcessor proc, void *proc_cls)
+postgres_plugin_get_zero_anonymity(void *cls,
+ uint64_t next_uid,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- uint32_t btype;
- uint64_t boff;
- const int paramFormats[] = { 1, 1 };
- int paramLengths[] = { sizeof (btype), sizeof (boff) };
- const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
- PGresult *ret;
-
- btype = htonl ((uint32_t) type);
- boff = GNUNET_htonll (offset);
- ret =
- PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
- paramLengths, paramFormats, 1);
- process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
+ uint32_t utype = type;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32(&utype),
+ GNUNET_PQ_query_param_uint64(&next_uid),
+ GNUNET_PQ_query_param_end
+ };
+ struct ProcessResultContext prc;
+ enum GNUNET_DB_QueryStatus res;
+
+ prc.plugin = plugin;
+ prc.proc = proc;
+ prc.proc_cls = proc_cls;
+ res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+ "select_non_anonymous",
+ params,
+ &process_result,
+ &prc);
+ if (0 > res)
+ proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
/**
- * Context for 'repl_iter' function.
+ * Context for #repl_iter() function.
*/
-struct ReplCtx
-{
-
+struct ReplCtx {
/**
* Plugin handle.
*/
PluginDatumProcessor proc;
/**
- * Closure for proc.
+ * Closure for @e proc.
*/
void *proc_cls;
};
* Decrements the replication counter and calls the original
* iterator.
*
- * @param cls closure with the 'struct ReplCtx*'
+ * @param cls closure with the `struct ReplCtx *`
* @param key key for the content
- * @param size number of bytes in data
+ * @param size number of bytes in @a data
* @param data content stored
* @param type type of the content
* @param priority priority of the content
* @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
* @param expiration expiration time for the content
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration,
+ * #GNUNET_OK to continue
* (continue on call to "next", of course),
- * GNUNET_NO to delete the item and continue (if supported)
+ * #GNUNET_NO to delete the item and continue (if supported)
*/
static int
-repl_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
- const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
- uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
- uint64_t uid)
+repl_proc(void *cls,
+ const struct GNUNET_HashCode *key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
{
struct ReplCtx *rc = cls;
struct Plugin *plugin = rc->plugin;
int ret;
- PGresult *qret;
- uint32_t boid;
-
- ret =
- rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
- expiration, uid);
- if (NULL != key)
- {
- boid = htonl ((uint32_t) uid);
- const char *paramValues[] = {
- (const char *) &boid,
- };
- int paramLengths[] = {
- sizeof (boid),
- };
- const int paramFormats[] = { 1 };
- qret =
- PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
- paramFormats, 1);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
- "decrepl"))
- return GNUNET_SYSERR;
- PQclear (qret);
- }
+ uint32_t oid = (uint32_t)uid;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32(&oid),
+ GNUNET_PQ_query_param_end
+ };
+ enum GNUNET_DB_QueryStatus qret;
+
+ ret = rc->proc(rc->proc_cls,
+ key,
+ size,
+ data,
+ type,
+ priority,
+ anonymity,
+ replication,
+ expiration,
+ uid);
+ if (NULL == key)
+ return ret;
+ qret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+ "decrepl",
+ params);
+ if (0 > qret)
+ return GNUNET_SYSERR;
return ret;
}
/**
- * Get a random item for replication. Returns a single, not expired, random item
- * from those with the highest replication counters. The item's
- * replication counter is decremented by one IF it was positive before.
- * Call 'proc' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for replication. Returns a single, not expired,
+ * random item from those with the highest replication counters. The
+ * item's replication counter is decremented by one IF it was positive
+ * before. Call @a proc with all values ZERO or NULL if the datastore
+ * is empty.
*
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
* @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
*/
static void
-postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
- void *proc_cls)
+postgres_plugin_get_replication(void *cls,
+ PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
struct ReplCtx rc;
- PGresult *ret;
+ struct ProcessResultContext prc;
+ enum GNUNET_DB_QueryStatus res;
rc.plugin = plugin;
rc.proc = proc;
rc.proc_cls = proc_cls;
- ret =
- PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
- NULL, 1);
- process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
+ prc.plugin = plugin;
+ prc.proc = &repl_proc;
+ prc.proc_cls = &rc;
+ res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+ "select_replication_order",
+ params,
+ &process_result,
+ &prc);
+ if (0 > res)
+ proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
/**
- * Get a random item for expiration.
- * Call 'proc' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for expiration. Call @a proc with all values
+ * ZERO or NULL if the datastore is empty.
*
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
* @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
*/
static void
-postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
- void *proc_cls)
+postgres_plugin_get_expiration(void *cls,
+ PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- uint64_t btime;
- const int paramFormats[] = { 1 };
- int paramLengths[] = { sizeof (btime) };
- const char *paramValues[] = { (const char *) &btime };
- PGresult *ret;
-
- btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
- ret =
- PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
- paramLengths, paramFormats, 1);
- process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_absolute_time(&now),
+ GNUNET_PQ_query_param_end
+ };
+ struct ProcessResultContext prc;
+
+ now = GNUNET_TIME_absolute_get();
+ prc.plugin = plugin;
+ prc.proc = proc;
+ prc.proc_cls = proc_cls;
+ (void)GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+ "select_expiration_order",
+ params,
+ &process_result,
+ &prc);
}
/**
- * Update the priority for a particular key in the datastore. If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept. For the
- * anonymity level, the lower value is to be used. The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
+ * Closure for #process_keys.
+ */
+struct ProcessKeysContext {
+ /**
+ * Function to call for each key.
+ */
+ PluginKeyProcessor proc;
+
+ /**
+ * Closure for @e proc.
+ */
+ void *proc_cls;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.
*
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- * change? If priority + delta < 0 the
- * priority should be set to 0 (never go
- * negative).
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cls closure with a `struct ProcessKeysContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
*/
-static int
-postgres_plugin_update (void *cls, uint64_t uid, int delta,
- struct GNUNET_TIME_Absolute expire, char **msg)
+static void
+process_keys(void *cls,
+ PGresult *result,
+ unsigned int num_results)
{
- struct Plugin *plugin = cls;
- PGresult *ret;
- int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
- uint32_t boid = htonl ((uint32_t) uid);
- uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
-
- const char *paramValues[] = {
- (const char *) &bdelta,
- (const char *) &bexpire,
- (const char *) &boid,
- };
- int paramLengths[] = {
- sizeof (bdelta),
- sizeof (bexpire),
- sizeof (boid),
- };
- const int paramFormats[] = { 1, 1, 1 };
+ struct ProcessKeysContext *pkc = cls;
- ret =
- PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
- paramFormats, 1);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
- return GNUNET_SYSERR;
- PQclear (ret);
- return GNUNET_OK;
+ for (unsigned i = 0; i < num_results; i++)
+ {
+ struct GNUNET_HashCode key;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_auto_from_type("hash",
+ &key),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result(result,
+ rs,
+ i))
+ {
+ GNUNET_break(0);
+ continue;
+ }
+ pkc->proc(pkc->proc_cls,
+ &key,
+ 1);
+ GNUNET_PQ_cleanup_result(rs);
+ }
}
-
/**
* Get all of the keys in the datastore.
*
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin *`
* @param proc function to call on each key
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
*/
static void
-postgres_plugin_get_keys (void *cls,
- PluginKeyProcessor proc,
- void *proc_cls)
+postgres_plugin_get_keys(void *cls,
+ PluginKeyProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- int ret;
- int i;
- GNUNET_HashCode key;
- PGresult * res;
-
- res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
- ret = PQntuples (res);
- for (i=0;i<ret;i++)
- {
- if (sizeof (GNUNET_HashCode) != PQgetlength (res, i, 0))
- {
- memcpy (&key, PQgetvalue (res, i, 0), sizeof (GNUNET_HashCode));
- proc (proc_cls, &key, 1);
- }
- }
- PQclear (res);
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
+ struct ProcessKeysContext pkc;
+
+ pkc.proc = proc;
+ pkc.proc_cls = proc_cls;
+ (void)GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
+ "get_keys",
+ params,
+ &process_keys,
+ &pkc);
+ proc(proc_cls,
+ NULL,
+ 0);
}
-
/**
* Drop database.
*
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin *`
*/
static void
-postgres_plugin_drop (void *cls)
+postgres_plugin_drop(void *cls)
{
struct Plugin *plugin = cls;
-
- if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
- GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
+ struct GNUNET_PQ_ExecuteStatement es[] = {
+ GNUNET_PQ_make_execute("DROP TABLE gn090"),
+ GNUNET_PQ_EXECUTE_STATEMENT_END
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_exec_statements(plugin->dbh,
+ es))
+ GNUNET_log_from(GNUNET_ERROR_TYPE_WARNING,
+ "postgres",
+ _("Failed to drop table from database.\n"));
+}
+
+
+/**
+ * Remove a particular key in the datastore.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure for @a cont
+ */
+static void
+postgres_plugin_remove_key(void *cls,
+ const struct GNUNET_HashCode *key,
+ uint32_t size,
+ const void *data,
+ PluginRemoveCont cont,
+ void *cont_cls)
+{
+ struct Plugin *plugin = cls;
+ enum GNUNET_DB_QueryStatus ret;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type(key),
+ GNUNET_PQ_query_param_fixed_size(data, size),
+ GNUNET_PQ_query_param_end
+ };
+
+ ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
+ "remove",
+ params);
+ if (0 > ret)
+ {
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ _("Postgress exec failure"));
+ return;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
+ {
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_NO,
+ NULL);
+ return;
+ }
+ plugin->env->duc(plugin->env->cls,
+ -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+ GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Deleted %u bytes from database\n",
+ (unsigned int)size);
+ cont(cont_cls,
+ key,
+ size,
+ GNUNET_OK,
+ NULL);
}
/**
* Entry point for the plugin.
*
- * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
- * @return our "struct Plugin*"
+ * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
+ * @return our `struct Plugin *`
*/
void *
-libgnunet_plugin_datastore_postgres_init (void *cls)
+libgnunet_plugin_datastore_postgres_init(void *cls)
{
struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
struct GNUNET_DATASTORE_PluginFunctions *api;
struct Plugin *plugin;
- plugin = GNUNET_malloc (sizeof (struct Plugin));
+ plugin = GNUNET_new(struct Plugin);
plugin->env = env;
- if (GNUNET_OK != init_connection (plugin))
- {
- GNUNET_free (plugin);
- return NULL;
- }
- api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
+ if (GNUNET_OK != init_connection(plugin))
+ {
+ GNUNET_free(plugin);
+ return NULL;
+ }
+ api = GNUNET_new(struct GNUNET_DATASTORE_PluginFunctions);
api->cls = plugin;
api->estimate_size = &postgres_plugin_estimate_size;
api->put = &postgres_plugin_put;
- api->update = &postgres_plugin_update;
api->get_key = &postgres_plugin_get_key;
api->get_replication = &postgres_plugin_get_replication;
api->get_expiration = &postgres_plugin_get_expiration;
api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
api->get_keys = &postgres_plugin_get_keys;
api->drop = &postgres_plugin_drop;
- GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
- _("Postgres database running\n"));
+ api->remove_key = &postgres_plugin_remove_key;
+ GNUNET_log_from(GNUNET_ERROR_TYPE_INFO,
+ "datastore-postgres",
+ _("Postgres database running\n"));
return api;
}
/**
* Exit point from the plugin.
- * @param cls our "struct Plugin*"
+ *
+ * @param cls our `struct Plugin *`
* @return always NULL
*/
void *
-libgnunet_plugin_datastore_postgres_done (void *cls)
+libgnunet_plugin_datastore_postgres_done(void *cls)
{
struct GNUNET_DATASTORE_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
- PQfinish (plugin->dbh);
- GNUNET_free (plugin);
- GNUNET_free (api);
+ PQfinish(plugin->dbh);
+ GNUNET_free(plugin);
+ GNUNET_free(api);
return NULL;
}