* @brief postgres-based datastore backend
* @author Christian Grothoff
*/
-
#include "platform.h"
#include "gnunet_datastore_plugin.h"
-#include "gnunet_postgres_lib.h"
#include "gnunet_pq_lib.h"
GNUNET_PQ_make_prepare ("get_keys",
"SELECT hash FROM gn090",
0),
+ GNUNET_PQ_make_prepare ("estimate_size",
+ "SELECT SUM(LENGTH(value))+256*COUNT(*) AS total FROM gn090",
+ 0),
GNUNET_PQ_PREPARED_STATEMENT_END
};
#undef RESULT_COLUMNS
* @return number of bytes used on disk
*/
static void
-postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
+postgres_plugin_estimate_size (void *cls,
+ unsigned long long *estimate)
{
struct Plugin *plugin = cls;
- unsigned long long total;
- PGresult *ret;
+ uint64_t total;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("total",
+ &total),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_PQ_QueryStatus ret;
if (NULL == estimate)
return;
- ret =
- PQexecParams (plugin->dbh,
- "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
- NULL, NULL, NULL, NULL, 1);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh,
- ret,
- PGRES_TUPLES_OK,
- "PQexecParams",
- "get_size"))
+ ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
+ "estimate_size",
+ params,
+ rs);
+ if (GNUNET_PQ_STATUS_SUCCESS_ONE_RESULT != ret)
{
- *estimate = 0;
+ *estimate = 0LL;
return;
}
- if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
- {
- GNUNET_break (0);
- PQclear (ret);
- *estimate = 0;
- return;
- }
- if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
- {
- GNUNET_break (0 == PQgetlength (ret, 0, 0));
- PQclear (ret);
- *estimate = 0;
- return;
- }
- total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
- PQclear (ret);
*estimate = total;
}
/**
- * Function invoked to process the result and call the processor.
+ * Closure for #process_result.
+ */
+struct ProcessResultContext
+{
+
+ /**
+ * The plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call on each result.
+ */
+ PluginDatumProcessor proc;
+
+ /**
+ * Closure for @e proc.
+ */
+ void *proc_cls;
+
+};
+
+
+/**
+ * Function invoked to process the result and call the processor of @a
+ * cls.
*
- * @param plugin global plugin data
- * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param cls our `struct ProcessResultContext`
* @param res result from exec
- * @param filename filename for error messages
- * @param line line number for error messages
+ * @param num_results number of results in @a res
*/
static void
-process_result (struct Plugin *plugin,
- PluginDatumProcessor proc,
- void *proc_cls,
- PGresult * res,
- const char *filename, int line)
+process_result (void *cls,
+ PGresult *res,
+ unsigned int num_results)
{
- int iret;
- uint32_t rowid;
- uint32_t utype;
- uint32_t anonymity;
- uint32_t replication;
- uint32_t priority;
- size_t size;
- void *data;
- struct GNUNET_TIME_Absolute expiration_time;
- struct GNUNET_HashCode key;
- struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint32 ("repl", &replication),
- GNUNET_PQ_result_spec_uint32 ("type", &utype),
- GNUNET_PQ_result_spec_uint32 ("prio", &priority),
- GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
- GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
- GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
- GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
- GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
- GNUNET_PQ_result_spec_end
- };
+ struct ProcessResultContext *prc = cls;
+ struct Plugin *plugin = prc->plugin;
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result_ (plugin->dbh,
- res,
- PGRES_TUPLES_OK,
- "PQexecPrepared",
- "select",
- filename, line))
- {
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "datastore-postgres",
- "Ending iteration (postgres error)\n");
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
-
- if (0 == PQntuples (res))
+ if (0 == num_results)
{
/* no result */
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"datastore-postgres",
"Ending iteration (no more results)\n");
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- PQclear (res);
+ prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- if (1 != PQntuples (res))
+ if (1 != num_results)
{
GNUNET_break (0);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- PQclear (res);
+ prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- if (GNUNET_OK !=
- GNUNET_PQ_extract_result (res,
- rs,
- 0))
+ /* Technically we don't need the loop here, but nicer in case
+ we ever relax the condition above. */
+ for (unsigned int i=0;i<num_results;i++)
{
- GNUNET_break (0);
- PQclear (res);
- GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
- "delrow",
- rowid);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
+ int iret;
+ uint32_t rowid;
+ uint32_t utype;
+ uint32_t anonymity;
+ uint32_t replication;
+ uint32_t priority;
+ size_t size;
+ void *data;
+ struct GNUNET_TIME_Absolute expiration_time;
+ struct GNUNET_HashCode key;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32 ("repl", &replication),
+ GNUNET_PQ_result_spec_uint32 ("type", &utype),
+ GNUNET_PQ_result_spec_uint32 ("prio", &priority),
+ GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
+ GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
+ GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
+ GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
+ GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
+ GNUNET_PQ_result_spec_end
+ };
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "datastore-postgres",
- "Found result of size %u bytes and type %u in database\n",
- (unsigned int) size,
- (unsigned int) utype);
- iret = proc (proc_cls,
- &key,
- size,
- data,
- (enum GNUNET_BLOCK_Type) utype,
- priority,
- anonymity,
- replication,
- expiration_time,
- rowid);
- PQclear (res);
- if (iret == GNUNET_NO)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processor asked for item %u to be removed.\n",
- (unsigned int) rowid);
- if (GNUNET_OK ==
- GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
- "delrow",
- rowid))
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result (res,
+ rs,
+ i))
{
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "datastore-postgres",
- "Deleting %u bytes from database\n",
- (unsigned int) size);
- plugin->env->duc (plugin->env->cls,
- - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "datastore-postgres",
- "Deleted %u bytes from database\n",
- (unsigned int) size);
+ GNUNET_break (0);
+ prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
}
- }
+
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Found result of size %u bytes and type %u in database\n",
+ (unsigned int) size,
+ (unsigned int) utype);
+ iret = prc->proc (prc->proc_cls,
+ &key,
+ size,
+ data,
+ (enum GNUNET_BLOCK_Type) utype,
+ priority,
+ anonymity,
+ replication,
+ expiration_time,
+ rowid);
+ if (iret == GNUNET_NO)
+ {
+ struct GNUNET_PQ_QueryParam param[] = {
+ GNUNET_PQ_query_param_uint32 (&rowid),
+ GNUNET_PQ_query_param_end
+ };
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processor asked for item %u to be removed.\n",
+ (unsigned int) rowid);
+ if (0 <
+ GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+ "delrow",
+ param))
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Deleting %u bytes from database\n",
+ (unsigned int) size);
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "datastore-postgres",
+ "Deleted %u bytes from database\n",
+ (unsigned int) size);
+ }
+ }
+ GNUNET_PQ_cleanup_result (rs);
+ } /* for (i) */
}
/**
* Get one of the results for a particular key in the datastore.
*
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
* @param next_uid return the result with lowest uid >= next_uid
* @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
GNUNET_PQ_query_param_uint16 (&use_type),
GNUNET_PQ_query_param_end
};
- PGresult *ret;
+ struct ProcessResultContext prc;
+ enum GNUNET_PQ_QueryStatus res;
if (random)
{
next_uid = 0;
}
else
+ {
rvalue = 0;
-
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "get",
- params);
- process_result (plugin,
- proc,
- proc_cls,
- ret,
- __FILE__, __LINE__);
+ }
+ prc.plugin = plugin;
+ prc.proc = proc;
+ prc.proc_cls = proc_cls;
+
+ res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ "get",
+ params,
+ &process_result,
+ &prc);
+ if (0 > res)
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
GNUNET_PQ_query_param_uint64 (&next_uid),
GNUNET_PQ_query_param_end
};
- PGresult *ret;
-
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "select_non_anonymous",
- params);
-
- process_result (plugin,
- proc, proc_cls,
- ret,
- __FILE__, __LINE__);
+ struct ProcessResultContext prc;
+ enum GNUNET_PQ_QueryStatus res;
+
+ prc.plugin = plugin;
+ prc.proc = proc;
+ prc.proc_cls = proc_cls;
+ res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ "select_non_anonymous",
+ params,
+ &process_result,
+ &prc);
+ if (0 > res)
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
GNUNET_PQ_query_param_uint32 (&oid),
GNUNET_PQ_query_param_end
};
- PGresult *qret;
+ enum GNUNET_PQ_QueryStatus qret;
ret = rc->proc (rc->proc_cls,
key,
uid);
if (NULL == key)
return ret;
- qret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "decrepl",
- params);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh,
- qret,
- PGRES_COMMAND_OK,
- "PQexecPrepared",
- "decrepl"))
+ qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+ "decrepl",
+ params);
+ if (0 > qret)
return GNUNET_SYSERR;
- PQclear (qret);
return ret;
}
void *proc_cls)
{
struct Plugin *plugin = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
struct ReplCtx rc;
- PGresult *ret;
+ struct ProcessResultContext prc;
+ enum GNUNET_PQ_QueryStatus res;
rc.plugin = plugin;
rc.proc = proc;
rc.proc_cls = proc_cls;
- ret = PQexecPrepared (plugin->dbh,
- "select_replication_order", 0, NULL, NULL,
- NULL, 1);
- process_result (plugin,
- &repl_proc,
- &rc,
- ret,
- __FILE__, __LINE__);
+ prc.plugin = plugin;
+ prc.proc = &repl_proc;
+ prc.proc_cls = &rc;
+ res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ "select_replication_order",
+ params,
+ &process_result,
+ &prc);
+ if (0 > res)
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_end
};
- PGresult *ret;
+ struct ProcessResultContext prc;
now = GNUNET_TIME_absolute_get ();
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "select_expiration_order",
- params);
- process_result (plugin,
- proc, proc_cls,
- ret,
- __FILE__, __LINE__);
+ prc.plugin = plugin;
+ prc.proc = proc;
+ prc.proc_cls = proc_cls;
+ (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ "select_expiration_order",
+ params,
+ &process_result,
+ &prc);
+}
+
+
+/**
+ * Closure for #process_keys.
+ */
+struct ProcessKeysContext
+{
+
+ /**
+ * Function to call for each key.
+ */
+ PluginKeyProcessor proc;
+
+ /**
+ * Closure for @e proc.
+ */
+ void *proc_cls;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.
+ *
+ * @param cls closure with a `struct ProcessKeysContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+process_keys (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct ProcessKeysContext *pkc = cls;
+
+ for (unsigned i=0;i<num_results;i++)
+ {
+ struct GNUNET_HashCode key;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_auto_from_type ("hash",
+ &key),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result (result,
+ rs,
+ i))
+ {
+ GNUNET_break (0);
+ continue;
+ }
+ pkc->proc (pkc->proc_cls,
+ &key,
+ 1);
+ GNUNET_PQ_cleanup_result (rs);
+ }
}
void *proc_cls)
{
struct Plugin *plugin = cls;
- int ret;
- int i;
- struct GNUNET_HashCode key;
- PGresult * res;
-
- res = PQexecPrepared (plugin->dbh,
- "get_keys",
- 0, NULL, NULL, NULL, 1);
- ret = PQntuples (res);
- for (i=0;i<ret;i++)
- {
- if (sizeof (struct GNUNET_HashCode) !=
- PQgetlength (res, i, 0))
- {
- GNUNET_memcpy (&key,
- PQgetvalue (res, i, 0),
- sizeof (struct GNUNET_HashCode));
- proc (proc_cls, &key, 1);
- }
- }
- PQclear (res);
- proc (proc_cls, NULL, 0);
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
+ struct ProcessKeysContext pkc;
+
+ pkc.proc = proc;
+ pkc.proc_cls = proc_cls;
+ (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+ "get_keys",
+ params,
+ &process_keys,
+ &pkc);
+ proc (proc_cls,
+ NULL,
+ 0);
}
postgres_plugin_drop (void *cls)
{
struct Plugin *plugin = cls;
+ struct GNUNET_PQ_ExecuteStatement es[] = {
+ GNUNET_PQ_make_execute ("DROP TABLE gn090"),
+ GNUNET_PQ_EXECUTE_STATEMENT_END
+ };
if (GNUNET_OK !=
- GNUNET_POSTGRES_exec (plugin->dbh,
- "DROP TABLE gn090"))
+ GNUNET_PQ_exec_statements (plugin->dbh,
+ es))
GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
"postgres",
_("Failed to drop table from database.\n"));