finish datastore pq refactoring
authorChristian Grothoff <christian@grothoff.org>
Sat, 3 Jun 2017 22:52:25 +0000 (00:52 +0200)
committerChristian Grothoff <christian@grothoff.org>
Sat, 3 Jun 2017 22:52:25 +0000 (00:52 +0200)
src/datastore/Makefile.am
src/datastore/plugin_datastore_postgres.c

index 9b8cf365fd633f0d6646cd382ce87abed7e3d620..240abbc6742d968dcd1077343eeb8dc563690d93 100644 (file)
@@ -148,7 +148,6 @@ libgnunet_plugin_datastore_postgres_la_SOURCES = \
   plugin_datastore_postgres.c
 libgnunet_plugin_datastore_postgres_la_LIBADD = \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
-  $(top_builddir)/src/postgres/libgnunetpostgres.la \
   $(top_builddir)/src/pq/libgnunetpq.la \
   $(top_builddir)/src/util/libgnunetutil.la $(XLIBS) -lpq
 libgnunet_plugin_datastore_postgres_la_LDFLAGS = \
index 7496aeacc3b1e8f145ce8d0c8f61d244a941e1ac..9380a56c0f778be84d214fb17a016b1423c59bff 100644 (file)
  * @brief postgres-based datastore backend
  * @author Christian Grothoff
  */
-
 #include "platform.h"
 #include "gnunet_datastore_plugin.h"
-#include "gnunet_postgres_lib.h"
 #include "gnunet_pq_lib.h"
 
 
@@ -152,6 +150,9 @@ init_connection (struct Plugin *plugin)
     GNUNET_PQ_make_prepare ("get_keys",
                             "SELECT hash FROM gn090",
                             0),
+    GNUNET_PQ_make_prepare ("estimate_size",
+                            "SELECT SUM(LENGTH(value))+256*COUNT(*) AS total FROM gn090",
+                            0),
     GNUNET_PQ_PREPARED_STATEMENT_END
   };
 #undef RESULT_COLUMNS
@@ -184,44 +185,32 @@ init_connection (struct Plugin *plugin)
  * @return number of bytes used on disk
  */
 static void
-postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
+postgres_plugin_estimate_size (void *cls,
+                               unsigned long long *estimate)
 {
   struct Plugin *plugin = cls;
-  unsigned long long total;
-  PGresult *ret;
+  uint64_t total;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_end
+  };
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_uint64 ("total",
+                                  &total),
+    GNUNET_PQ_result_spec_end
+  };
+  enum GNUNET_PQ_QueryStatus ret;
 
   if (NULL == estimate)
     return;
-  ret =
-      PQexecParams (plugin->dbh,
-                    "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
-                    NULL, NULL, NULL, NULL, 1);
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh,
-                                   ret,
-                                   PGRES_TUPLES_OK,
-                                   "PQexecParams",
-                                   "get_size"))
+  ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
+                                                  "estimate_size",
+                                                  params,
+                                                  rs);
+  if (GNUNET_PQ_STATUS_SUCCESS_ONE_RESULT != ret)
   {
-    *estimate = 0;
+    *estimate = 0LL;
     return;
   }
-  if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
-  {
-    GNUNET_break (0);
-    PQclear (ret);
-    *estimate = 0;
-    return;
-  }
-  if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
-  {
-    GNUNET_break (0 == PQgetlength (ret, 0, 0));
-    PQclear (ret);
-    *estimate = 0;
-    return;
-  }
-  total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
-  PQclear (ret);
   *estimate = total;
 }
 
@@ -342,135 +331,150 @@ postgres_plugin_put (void *cls,
 
 
 /**
- * Function invoked to process the result and call the processor.
+ * Closure for #process_result.
+ */
+struct ProcessResultContext
+{
+
+  /**
+   * The plugin handle.
+   */
+  struct Plugin *plugin;
+
+  /**
+   * Function to call on each result.
+   */
+  PluginDatumProcessor proc;
+
+  /**
+   * Closure for @e proc.
+   */
+  void *proc_cls;
+
+};
+
+
+/**
+ * Function invoked to process the result and call the processor of @a
+ * cls.
  *
- * @param plugin global plugin data
- * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param cls our `struct ProcessResultContext`
  * @param res result from exec
- * @param filename filename for error messages
- * @param line line number for error messages
+ * @param num_results number of results in @a res
  */
 static void
-process_result (struct Plugin *plugin,
-               PluginDatumProcessor proc,
-                void *proc_cls,
-               PGresult * res,
-               const char *filename, int line)
+process_result (void *cls,
+               PGresult *res,
+               unsigned int num_results)
 {
-  int iret;
-  uint32_t rowid;
-  uint32_t utype;
-  uint32_t anonymity;
-  uint32_t replication;
-  uint32_t priority;
-  size_t size;
-  void *data;
-  struct GNUNET_TIME_Absolute expiration_time;
-  struct GNUNET_HashCode key;
-  struct GNUNET_PQ_ResultSpec rs[] = {
-    GNUNET_PQ_result_spec_uint32 ("repl", &replication),
-    GNUNET_PQ_result_spec_uint32 ("type", &utype),
-    GNUNET_PQ_result_spec_uint32 ("prio", &priority),
-    GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
-    GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
-    GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
-    GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
-    GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
-    GNUNET_PQ_result_spec_end
-  };
+  struct ProcessResultContext *prc = cls;
+  struct Plugin *plugin = prc->plugin;
 
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result_ (plugin->dbh,
-                                    res,
-                                    PGRES_TUPLES_OK,
-                                    "PQexecPrepared",
-                                    "select",
-                                    filename, line))
-  {
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                    "datastore-postgres",
-                     "Ending iteration (postgres error)\n");
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
-
-  if (0 == PQntuples (res))
+  if (0 == num_results)
   {
     /* no result */
     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
                     "datastore-postgres",
                      "Ending iteration (no more results)\n");
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    PQclear (res);
+    prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+               GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
   }
-  if (1 != PQntuples (res))
+  if (1 != num_results)
   {
     GNUNET_break (0);
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    PQclear (res);
+    prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+               GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
   }
-  if (GNUNET_OK !=
-      GNUNET_PQ_extract_result (res,
-                               rs,
-                               0))
+  /* Technically we don't need the loop here, but nicer in case
+     we ever relax the condition above. */
+  for (unsigned int i=0;i<num_results;i++)
   {
-    GNUNET_break (0);
-    PQclear (res);
-    GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
-                                    "delrow",
-                                    rowid);
-    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-    return;
-  }
+    int iret;
+    uint32_t rowid;
+    uint32_t utype;
+    uint32_t anonymity;
+    uint32_t replication;
+    uint32_t priority;
+    size_t size;
+    void *data;
+    struct GNUNET_TIME_Absolute expiration_time;
+    struct GNUNET_HashCode key;
+    struct GNUNET_PQ_ResultSpec rs[] = {
+      GNUNET_PQ_result_spec_uint32 ("repl", &replication),
+      GNUNET_PQ_result_spec_uint32 ("type", &utype),
+      GNUNET_PQ_result_spec_uint32 ("prio", &priority),
+      GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
+      GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
+      GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
+      GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
+      GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
+      GNUNET_PQ_result_spec_end
+    };
 
-  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                  "datastore-postgres",
-                   "Found result of size %u bytes and type %u in database\n",
-                   (unsigned int) size,
-                  (unsigned int) utype);
-  iret = proc (proc_cls,
-               &key,
-               size,
-               data,
-               (enum GNUNET_BLOCK_Type) utype,
-               priority,
-               anonymity,
-               replication,
-               expiration_time,
-               rowid);
-  PQclear (res);
-  if (iret == GNUNET_NO)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Processor asked for item %u to be removed.\n",
-               (unsigned int) rowid);
-    if (GNUNET_OK ==
-       GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
-                                        "delrow",
-                                        rowid))
+    if (GNUNET_OK !=
+        GNUNET_PQ_extract_result (res,
+                                  rs,
+                                  i))
     {
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "datastore-postgres",
-                       "Deleting %u bytes from database\n",
-                       (unsigned int) size);
-      plugin->env->duc (plugin->env->cls,
-                        - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "datastore-postgres",
-                       "Deleted %u bytes from database\n",
-                      (unsigned int) size);
+      GNUNET_break (0);
+      prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
     }
-  }
+
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+                     "datastore-postgres",
+                     "Found result of size %u bytes and type %u in database\n",
+                     (unsigned int) size,
+                     (unsigned int) utype);
+    iret = prc->proc (prc->proc_cls,
+                      &key,
+                      size,
+                      data,
+                      (enum GNUNET_BLOCK_Type) utype,
+                      priority,
+                      anonymity,
+                      replication,
+                      expiration_time,
+                      rowid);
+    if (iret == GNUNET_NO)
+    {
+      struct GNUNET_PQ_QueryParam param[] = {
+        GNUNET_PQ_query_param_uint32 (&rowid),
+        GNUNET_PQ_query_param_end
+      };
+
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Processor asked for item %u to be removed.\n",
+                  (unsigned int) rowid);
+      if (0 <
+          GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+                                              "delrow",
+                                              param))
+      {
+        GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+                         "datastore-postgres",
+                         "Deleting %u bytes from database\n",
+                         (unsigned int) size);
+        plugin->env->duc (plugin->env->cls,
+                          - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+        GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+                         "datastore-postgres",
+                         "Deleted %u bytes from database\n",
+                         (unsigned int) size);
+      }
+    }
+    GNUNET_PQ_cleanup_result (rs);
+  } /* for (i) */
 }
 
 
 /**
  * Get one of the results for a particular key in the datastore.
  *
- * @param cls closure with the 'struct Plugin'
+ * @param cls closure with the `struct Plugin`
  * @param next_uid return the result with lowest uid >= next_uid
  * @param random if true, return a random result instead of using next_uid
  * @param key maybe NULL (to match all entries)
@@ -505,7 +509,8 @@ postgres_plugin_get_key (void *cls,
     GNUNET_PQ_query_param_uint16 (&use_type),
     GNUNET_PQ_query_param_end
   };
-  PGresult *ret;
+  struct ProcessResultContext prc;
+  enum GNUNET_PQ_QueryStatus res;
 
   if (random)
   {
@@ -514,16 +519,21 @@ postgres_plugin_get_key (void *cls,
     next_uid = 0;
   }
   else
+  {
     rvalue = 0;
-
-  ret = GNUNET_PQ_exec_prepared (plugin->dbh,
-                                 "get",
-                                 params);
-  process_result (plugin,
-                 proc,
-                 proc_cls,
-                 ret,
-                 __FILE__, __LINE__);
+  }
+  prc.plugin = plugin;
+  prc.proc = proc;
+  prc.proc_cls = proc_cls;
+
+  res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                              "get",
+                                              params,
+                                              &process_result,
+                                              &prc);
+  if (0 > res)
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+          GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
@@ -553,16 +563,20 @@ postgres_plugin_get_zero_anonymity (void *cls,
     GNUNET_PQ_query_param_uint64 (&next_uid),
     GNUNET_PQ_query_param_end
   };
-  PGresult *ret;
-
-  ret = GNUNET_PQ_exec_prepared (plugin->dbh,
-                                "select_non_anonymous",
-                                params);
-
-  process_result (plugin,
-                 proc, proc_cls,
-                 ret,
-                 __FILE__, __LINE__);
+  struct ProcessResultContext prc;
+  enum GNUNET_PQ_QueryStatus res;
+
+  prc.plugin = plugin;
+  prc.proc = proc;
+  prc.proc_cls = proc_cls;
+  res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                              "select_non_anonymous",
+                                              params,
+                                              &process_result,
+                                              &prc);
+  if (0 > res)
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+          GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
@@ -630,7 +644,7 @@ repl_proc (void *cls,
     GNUNET_PQ_query_param_uint32 (&oid),
     GNUNET_PQ_query_param_end
   };
-  PGresult *qret;
+  enum GNUNET_PQ_QueryStatus qret;
 
   ret = rc->proc (rc->proc_cls,
                   key,
@@ -644,17 +658,11 @@ repl_proc (void *cls,
                   uid);
   if (NULL == key)
     return ret;
-  qret = GNUNET_PQ_exec_prepared (plugin->dbh,
-                                 "decrepl",
-                                 params);
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh,
-                                   qret,
-                                   PGRES_COMMAND_OK,
-                                   "PQexecPrepared",
-                                   "decrepl"))
+  qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
+                                             "decrepl",
+                                             params);
+  if (0 > qret)
     return GNUNET_SYSERR;
-  PQclear (qret);
   return ret;
 }
 
@@ -676,20 +684,27 @@ postgres_plugin_get_replication (void *cls,
                                  void *proc_cls)
 {
   struct Plugin *plugin = cls;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_end
+  };
   struct ReplCtx rc;
-  PGresult *ret;
+  struct ProcessResultContext prc;
+  enum GNUNET_PQ_QueryStatus res;
 
   rc.plugin = plugin;
   rc.proc = proc;
   rc.proc_cls = proc_cls;
-  ret = PQexecPrepared (plugin->dbh,
-                       "select_replication_order", 0, NULL, NULL,
-                       NULL, 1);
-  process_result (plugin,
-                 &repl_proc,
-                 &rc,
-                 ret,
-                 __FILE__, __LINE__);
+  prc.plugin = plugin;
+  prc.proc = &repl_proc;
+  prc.proc_cls = &rc;
+  res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                              "select_replication_order",
+                                              params,
+                                              &process_result,
+                                              &prc);
+  if (0 > res)
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
+          GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
@@ -712,16 +727,75 @@ postgres_plugin_get_expiration (void *cls,
     GNUNET_PQ_query_param_absolute_time (&now),
     GNUNET_PQ_query_param_end
   };
-  PGresult *ret;
+  struct ProcessResultContext prc;
 
   now = GNUNET_TIME_absolute_get ();
-  ret = GNUNET_PQ_exec_prepared (plugin->dbh,
-                                "select_expiration_order",
-                                params);
-  process_result (plugin,
-                 proc, proc_cls,
-                 ret,
-                 __FILE__, __LINE__);
+  prc.plugin = plugin;
+  prc.proc = proc;
+  prc.proc_cls = proc_cls;
+  (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                               "select_expiration_order",
+                                               params,
+                                               &process_result,
+                                               &prc);
+}
+
+
+/**
+ * Closure for #process_keys.
+ */
+struct ProcessKeysContext
+{
+
+  /**
+   * Function to call for each key.
+   */
+  PluginKeyProcessor proc;
+
+  /**
+   * Closure for @e proc.
+   */
+  void *proc_cls;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.
+ *
+ * @param cls closure with a `struct ProcessKeysContext`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+process_keys (void *cls,
+              PGresult *result,
+              unsigned int num_results)
+{
+  struct ProcessKeysContext *pkc = cls;
+
+  for (unsigned i=0;i<num_results;i++)
+  {
+    struct GNUNET_HashCode key;
+    struct GNUNET_PQ_ResultSpec rs[] = {
+      GNUNET_PQ_result_spec_auto_from_type ("hash",
+                                            &key),
+      GNUNET_PQ_result_spec_end
+    };
+
+    if (GNUNET_OK !=
+        GNUNET_PQ_extract_result (result,
+                                  rs,
+                                  i))
+    {
+      GNUNET_break (0);
+      continue;
+    }
+    pkc->proc (pkc->proc_cls,
+               &key,
+               1);
+    GNUNET_PQ_cleanup_result (rs);
+  }
 }
 
 
@@ -738,28 +812,21 @@ postgres_plugin_get_keys (void *cls,
                          void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  int ret;
-  int i;
-  struct GNUNET_HashCode key;
-  PGresult * res;
-
-  res = PQexecPrepared (plugin->dbh,
-                       "get_keys",
-                       0, NULL, NULL, NULL, 1);
-  ret = PQntuples (res);
-  for (i=0;i<ret;i++)
-  {
-    if (sizeof (struct GNUNET_HashCode) !=
-       PQgetlength (res, i, 0))
-    {
-      GNUNET_memcpy (&key,
-             PQgetvalue (res, i, 0),
-             sizeof (struct GNUNET_HashCode));
-      proc (proc_cls, &key, 1);
-    }
-  }
-  PQclear (res);
-  proc (proc_cls, NULL, 0);
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_end
+  };
+  struct ProcessKeysContext pkc;
+
+  pkc.proc = proc;
+  pkc.proc_cls = proc_cls;
+  (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
+                                               "get_keys",
+                                               params,
+                                               &process_keys,
+                                               &pkc);
+  proc (proc_cls,
+        NULL,
+        0);
 }
 
 
@@ -772,10 +839,14 @@ static void
 postgres_plugin_drop (void *cls)
 {
   struct Plugin *plugin = cls;
+  struct GNUNET_PQ_ExecuteStatement es[] = {
+    GNUNET_PQ_make_execute ("DROP TABLE gn090"),
+    GNUNET_PQ_EXECUTE_STATEMENT_END
+  };
 
   if (GNUNET_OK !=
-      GNUNET_POSTGRES_exec (plugin->dbh,
-                            "DROP TABLE gn090"))
+      GNUNET_PQ_exec_statements (plugin->dbh,
+                                 es))
     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
                     "postgres",
                     _("Failed to drop table from database.\n"));