From 61ef51d43a9069b5a2d680883b5d47c1fb237d82 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 4 Jun 2017 00:52:25 +0200 Subject: [PATCH] finish datastore pq refactoring --- src/datastore/Makefile.am | 1 - src/datastore/plugin_datastore_postgres.c | 483 +++++++++++++--------- 2 files changed, 277 insertions(+), 207 deletions(-) diff --git a/src/datastore/Makefile.am b/src/datastore/Makefile.am index 9b8cf365f..240abbc67 100644 --- a/src/datastore/Makefile.am +++ b/src/datastore/Makefile.am @@ -148,7 +148,6 @@ libgnunet_plugin_datastore_postgres_la_SOURCES = \ plugin_datastore_postgres.c libgnunet_plugin_datastore_postgres_la_LIBADD = \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ - $(top_builddir)/src/postgres/libgnunetpostgres.la \ $(top_builddir)/src/pq/libgnunetpq.la \ $(top_builddir)/src/util/libgnunetutil.la $(XLIBS) -lpq libgnunet_plugin_datastore_postgres_la_LDFLAGS = \ diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index 7496aeacc..9380a56c0 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c @@ -23,10 +23,8 @@ * @brief postgres-based datastore backend * @author Christian Grothoff */ - #include "platform.h" #include "gnunet_datastore_plugin.h" -#include "gnunet_postgres_lib.h" #include "gnunet_pq_lib.h" @@ -152,6 +150,9 @@ init_connection (struct Plugin *plugin) GNUNET_PQ_make_prepare ("get_keys", "SELECT hash FROM gn090", 0), + GNUNET_PQ_make_prepare ("estimate_size", + "SELECT SUM(LENGTH(value))+256*COUNT(*) AS total FROM gn090", + 0), GNUNET_PQ_PREPARED_STATEMENT_END }; #undef RESULT_COLUMNS @@ -184,44 +185,32 @@ init_connection (struct Plugin *plugin) * @return number of bytes used on disk */ static void -postgres_plugin_estimate_size (void *cls, unsigned long long *estimate) +postgres_plugin_estimate_size (void *cls, + unsigned long long *estimate) { struct Plugin *plugin = cls; - unsigned long long total; - PGresult *ret; + uint64_t total; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("total", + &total), + GNUNET_PQ_result_spec_end + }; + enum GNUNET_PQ_QueryStatus ret; if (NULL == estimate) return; - ret = - PQexecParams (plugin->dbh, - "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0, - NULL, NULL, NULL, NULL, 1); - if (GNUNET_OK != - GNUNET_POSTGRES_check_result (plugin->dbh, - ret, - PGRES_TUPLES_OK, - "PQexecParams", - "get_size")) + ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, + "estimate_size", + params, + rs); + if (GNUNET_PQ_STATUS_SUCCESS_ONE_RESULT != ret) { - *estimate = 0; + *estimate = 0LL; return; } - if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ) - { - GNUNET_break (0); - PQclear (ret); - *estimate = 0; - return; - } - if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)) - { - GNUNET_break (0 == PQgetlength (ret, 0, 0)); - PQclear (ret); - *estimate = 0; - return; - } - total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); - PQclear (ret); *estimate = total; } @@ -342,135 +331,150 @@ postgres_plugin_put (void *cls, /** - * Function invoked to process the result and call the processor. + * Closure for #process_result. + */ +struct ProcessResultContext +{ + + /** + * The plugin handle. + */ + struct Plugin *plugin; + + /** + * Function to call on each result. + */ + PluginDatumProcessor proc; + + /** + * Closure for @e proc. + */ + void *proc_cls; + +}; + + +/** + * Function invoked to process the result and call the processor of @a + * cls. * - * @param plugin global plugin data - * @param proc function to call the value (once only). - * @param proc_cls closure for proc + * @param cls our `struct ProcessResultContext` * @param res result from exec - * @param filename filename for error messages - * @param line line number for error messages + * @param num_results number of results in @a res */ static void -process_result (struct Plugin *plugin, - PluginDatumProcessor proc, - void *proc_cls, - PGresult * res, - const char *filename, int line) +process_result (void *cls, + PGresult *res, + unsigned int num_results) { - int iret; - uint32_t rowid; - uint32_t utype; - uint32_t anonymity; - uint32_t replication; - uint32_t priority; - size_t size; - void *data; - struct GNUNET_TIME_Absolute expiration_time; - struct GNUNET_HashCode key; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_uint32 ("repl", &replication), - GNUNET_PQ_result_spec_uint32 ("type", &utype), - GNUNET_PQ_result_spec_uint32 ("prio", &priority), - GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), - GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), - GNUNET_PQ_result_spec_auto_from_type ("hash", &key), - GNUNET_PQ_result_spec_variable_size ("value", &data, &size), - GNUNET_PQ_result_spec_uint32 ("oid", &rowid), - GNUNET_PQ_result_spec_end - }; + struct ProcessResultContext *prc = cls; + struct Plugin *plugin = prc->plugin; - if (GNUNET_OK != - GNUNET_POSTGRES_check_result_ (plugin->dbh, - res, - PGRES_TUPLES_OK, - "PQexecPrepared", - "select", - filename, line)) - { - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "datastore-postgres", - "Ending iteration (postgres error)\n"); - proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; - } - - if (0 == PQntuples (res)) + if (0 == num_results) { /* no result */ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", "Ending iteration (no more results)\n"); - proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - PQclear (res); + prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - if (1 != PQntuples (res)) + if (1 != num_results) { GNUNET_break (0); - proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - PQclear (res); + prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - if (GNUNET_OK != - GNUNET_PQ_extract_result (res, - rs, - 0)) + /* Technically we don't need the loop here, but nicer in case + we ever relax the condition above. */ + for (unsigned int i=0;idbh, - "delrow", - rowid); - proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; - } + int iret; + uint32_t rowid; + uint32_t utype; + uint32_t anonymity; + uint32_t replication; + uint32_t priority; + size_t size; + void *data; + struct GNUNET_TIME_Absolute expiration_time; + struct GNUNET_HashCode key; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint32 ("repl", &replication), + GNUNET_PQ_result_spec_uint32 ("type", &utype), + GNUNET_PQ_result_spec_uint32 ("prio", &priority), + GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), + GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), + GNUNET_PQ_result_spec_auto_from_type ("hash", &key), + GNUNET_PQ_result_spec_variable_size ("value", &data, &size), + GNUNET_PQ_result_spec_uint32 ("oid", &rowid), + GNUNET_PQ_result_spec_end + }; - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "datastore-postgres", - "Found result of size %u bytes and type %u in database\n", - (unsigned int) size, - (unsigned int) utype); - iret = proc (proc_cls, - &key, - size, - data, - (enum GNUNET_BLOCK_Type) utype, - priority, - anonymity, - replication, - expiration_time, - rowid); - PQclear (res); - if (iret == GNUNET_NO) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processor asked for item %u to be removed.\n", - (unsigned int) rowid); - if (GNUNET_OK == - GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, - "delrow", - rowid)) + if (GNUNET_OK != + GNUNET_PQ_extract_result (res, + rs, + i)) { - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "datastore-postgres", - "Deleting %u bytes from database\n", - (unsigned int) size); - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "datastore-postgres", - "Deleted %u bytes from database\n", - (unsigned int) size); + GNUNET_break (0); + prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; } - } + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "datastore-postgres", + "Found result of size %u bytes and type %u in database\n", + (unsigned int) size, + (unsigned int) utype); + iret = prc->proc (prc->proc_cls, + &key, + size, + data, + (enum GNUNET_BLOCK_Type) utype, + priority, + anonymity, + replication, + expiration_time, + rowid); + if (iret == GNUNET_NO) + { + struct GNUNET_PQ_QueryParam param[] = { + GNUNET_PQ_query_param_uint32 (&rowid), + GNUNET_PQ_query_param_end + }; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Processor asked for item %u to be removed.\n", + (unsigned int) rowid); + if (0 < + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, + "delrow", + param)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "datastore-postgres", + "Deleting %u bytes from database\n", + (unsigned int) size); + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "datastore-postgres", + "Deleted %u bytes from database\n", + (unsigned int) size); + } + } + GNUNET_PQ_cleanup_result (rs); + } /* for (i) */ } /** * Get one of the results for a particular key in the datastore. * - * @param cls closure with the 'struct Plugin' + * @param cls closure with the `struct Plugin` * @param next_uid return the result with lowest uid >= next_uid * @param random if true, return a random result instead of using next_uid * @param key maybe NULL (to match all entries) @@ -505,7 +509,8 @@ postgres_plugin_get_key (void *cls, GNUNET_PQ_query_param_uint16 (&use_type), GNUNET_PQ_query_param_end }; - PGresult *ret; + struct ProcessResultContext prc; + enum GNUNET_PQ_QueryStatus res; if (random) { @@ -514,16 +519,21 @@ postgres_plugin_get_key (void *cls, next_uid = 0; } else + { rvalue = 0; - - ret = GNUNET_PQ_exec_prepared (plugin->dbh, - "get", - params); - process_result (plugin, - proc, - proc_cls, - ret, - __FILE__, __LINE__); + } + prc.plugin = plugin; + prc.proc = proc; + prc.proc_cls = proc_cls; + + res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + "get", + params, + &process_result, + &prc); + if (0 > res) + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); } @@ -553,16 +563,20 @@ postgres_plugin_get_zero_anonymity (void *cls, GNUNET_PQ_query_param_uint64 (&next_uid), GNUNET_PQ_query_param_end }; - PGresult *ret; - - ret = GNUNET_PQ_exec_prepared (plugin->dbh, - "select_non_anonymous", - params); - - process_result (plugin, - proc, proc_cls, - ret, - __FILE__, __LINE__); + struct ProcessResultContext prc; + enum GNUNET_PQ_QueryStatus res; + + prc.plugin = plugin; + prc.proc = proc; + prc.proc_cls = proc_cls; + res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + "select_non_anonymous", + params, + &process_result, + &prc); + if (0 > res) + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); } @@ -630,7 +644,7 @@ repl_proc (void *cls, GNUNET_PQ_query_param_uint32 (&oid), GNUNET_PQ_query_param_end }; - PGresult *qret; + enum GNUNET_PQ_QueryStatus qret; ret = rc->proc (rc->proc_cls, key, @@ -644,17 +658,11 @@ repl_proc (void *cls, uid); if (NULL == key) return ret; - qret = GNUNET_PQ_exec_prepared (plugin->dbh, - "decrepl", - params); - if (GNUNET_OK != - GNUNET_POSTGRES_check_result (plugin->dbh, - qret, - PGRES_COMMAND_OK, - "PQexecPrepared", - "decrepl")) + qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, + "decrepl", + params); + if (0 > qret) return GNUNET_SYSERR; - PQclear (qret); return ret; } @@ -676,20 +684,27 @@ postgres_plugin_get_replication (void *cls, void *proc_cls) { struct Plugin *plugin = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_end + }; struct ReplCtx rc; - PGresult *ret; + struct ProcessResultContext prc; + enum GNUNET_PQ_QueryStatus res; rc.plugin = plugin; rc.proc = proc; rc.proc_cls = proc_cls; - ret = PQexecPrepared (plugin->dbh, - "select_replication_order", 0, NULL, NULL, - NULL, 1); - process_result (plugin, - &repl_proc, - &rc, - ret, - __FILE__, __LINE__); + prc.plugin = plugin; + prc.proc = &repl_proc; + prc.proc_cls = &rc; + res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + "select_replication_order", + params, + &process_result, + &prc); + if (0 > res) + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); } @@ -712,16 +727,75 @@ postgres_plugin_get_expiration (void *cls, GNUNET_PQ_query_param_absolute_time (&now), GNUNET_PQ_query_param_end }; - PGresult *ret; + struct ProcessResultContext prc; now = GNUNET_TIME_absolute_get (); - ret = GNUNET_PQ_exec_prepared (plugin->dbh, - "select_expiration_order", - params); - process_result (plugin, - proc, proc_cls, - ret, - __FILE__, __LINE__); + prc.plugin = plugin; + prc.proc = proc; + prc.proc_cls = proc_cls; + (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + "select_expiration_order", + params, + &process_result, + &prc); +} + + +/** + * Closure for #process_keys. + */ +struct ProcessKeysContext +{ + + /** + * Function to call for each key. + */ + PluginKeyProcessor proc; + + /** + * Closure for @e proc. + */ + void *proc_cls; +}; + + +/** + * Function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure with a `struct ProcessKeysContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +process_keys (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct ProcessKeysContext *pkc = cls; + + for (unsigned i=0;iproc (pkc->proc_cls, + &key, + 1); + GNUNET_PQ_cleanup_result (rs); + } } @@ -738,28 +812,21 @@ postgres_plugin_get_keys (void *cls, void *proc_cls) { struct Plugin *plugin = cls; - int ret; - int i; - struct GNUNET_HashCode key; - PGresult * res; - - res = PQexecPrepared (plugin->dbh, - "get_keys", - 0, NULL, NULL, NULL, 1); - ret = PQntuples (res); - for (i=0;idbh, + "get_keys", + params, + &process_keys, + &pkc); + proc (proc_cls, + NULL, + 0); } @@ -772,10 +839,14 @@ static void postgres_plugin_drop (void *cls) { struct Plugin *plugin = cls; + struct GNUNET_PQ_ExecuteStatement es[] = { + GNUNET_PQ_make_execute ("DROP TABLE gn090"), + GNUNET_PQ_EXECUTE_STATEMENT_END + }; if (GNUNET_OK != - GNUNET_POSTGRES_exec (plugin->dbh, - "DROP TABLE gn090")) + GNUNET_PQ_exec_statements (plugin->dbh, + es)) GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n")); -- 2.25.1