psycstore: fix postgres
authorGabor X Toth <*@tg-x.net>
Wed, 12 Oct 2016 21:54:30 +0000 (21:54 +0000)
committerGabor X Toth <*@tg-x.net>
Wed, 12 Oct 2016 21:54:30 +0000 (21:54 +0000)
src/include/gnunet_psycstore_plugin.h
src/psycstore/plugin_psycstore_mysql.c
src/psycstore/plugin_psycstore_postgres.c
src/psycstore/plugin_psycstore_sqlite.c

index 4d19ce86d36e4a340421b36c6341b3af69ae3655..b8dd0cc98e60448f258cd58cc80ad52b94068736 100644 (file)
@@ -114,7 +114,7 @@ struct GNUNET_PSYCSTORE_PluginFunctions
   (*message_add_flags) (void *cls,
                         const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                         uint64_t message_id,
-                        uint64_t psycstore_flags);
+                        uint32_t psycstore_flags);
 
   /**
    * Retrieve a message fragment range by fragment ID.
index 71f2eb5b86647fd08b18af543fe72e73b6edd557..01a0282c8b6e8280e0b9021f884873d7e803377e 100644 (file)
@@ -296,18 +296,18 @@ database_setup (struct Plugin *plugin)
   /* Create tables */
   STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
-            " pub_key BLOB,\n"
+            " pub_key BLOB(23),\n"
             " max_state_message_id BIGINT UNSIGNED,\n"
             " state_hash_message_id BIGINT UNSIGNED,\n"
             " PRIMARY KEY(id),\n"
-            " UNIQUE KEY(pub_key(5))\n"
+            " UNIQUE KEY(pub_key(32))\n"
             ");");
 
   STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
-            " pub_key BLOB,\n"
+            " pub_key BLOB(32),\n"
             " PRIMARY KEY(id),\n"
-            " UNIQUE KEY(pub_key(5))\n"
+            " UNIQUE KEY(pub_key(32))\n"
             ");");
 
   STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
@@ -521,7 +521,7 @@ database_setup (struct Plugin *plugin)
   PREP ("SELECT name, value_current\n"
         "FROM state\n"
         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-        "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
+        "      AND (name = ? OR substr(name, 1, ?) = ?);",
         &plugin->select_state_prefix);
 
   PREP ("SELECT name, value_signed\n"
@@ -905,7 +905,7 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
@@ -914,7 +914,7 @@ message_add_flags (void *cls,
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint64 (&psycstore_flags),
+    GNUNET_MY_query_param_uint32 (&psycstore_flags),
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&message_id),
     GNUNET_MY_query_param_end
index 5bbb3c44751b5639a523b457ea90ddacf38d66cd..3439856b9434a2bbac2957bdf0ae21c2606aaac1 100644 (file)
@@ -100,7 +100,7 @@ database_setup (struct Plugin *plugin)
          GNUNET_POSTGRES_exec(plugin->dbh,
                               "CREATE TABLE IF NOT EXISTS channels (\n"
                               " id SERIAL,\n"
-                              " pub_key BYTEA,\n"
+                              " pub_key BYTEA(32),\n"
                               " max_state_message_id BIGINT,\n"
                               " state_hash_message_id BIGINT,\n"
                               " PRIMARY KEY(id)\n"
@@ -109,8 +109,7 @@ database_setup (struct Plugin *plugin)
       (GNUNET_OK !=
          GNUNET_POSTGRES_exec(plugin->dbh,
                               "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
-                              " ON channels (substring(pub_key from 1 for 5)\n"
-                              ")")) ||
+                              " ON channels (pub_key)")) ||
 
       (GNUNET_OK !=
          GNUNET_POSTGRES_exec(plugin->dbh,
@@ -122,15 +121,14 @@ database_setup (struct Plugin *plugin)
          GNUNET_POSTGRES_exec(plugin->dbh,
                               "CREATE TABLE IF NOT EXISTS slaves (\n"
                               " id SERIAL,\n"
-                              " pub_key BYTEA,\n"
+                              " pub_key BYTEA(32),\n"
                               " PRIMARY KEY(id)\n"
                               ")" "WITH OIDS")) ||
 
       (GNUNET_OK !=
          GNUNET_POSTGRES_exec(plugin->dbh,
                               "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
-                              " ON slaves (substring(pub_key from 1 for 5)\n"
-                              ")")) ||
+                              " ON slaves (pub_key)")) ||
 
       (GNUNET_OK !=
          GNUNET_POSTGRES_exec(plugin->dbh,
@@ -143,7 +141,7 @@ database_setup (struct Plugin *plugin)
                               "CREATE TABLE IF NOT EXISTS membership (\n"
                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
                               "  slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
-                              "  did_join BIGINT NOT NULL,\n"
+                              "  did_join INT NOT NULL,\n"
                               "  announced_at BIGINT NOT NULL,\n"
                               "  effective_since BIGINT NOT NULL,\n"
                               "  group_generation BIGINT NOT NULL\n"
@@ -159,7 +157,7 @@ database_setup (struct Plugin *plugin)
          GNUNET_POSTGRES_exec(plugin->dbh,
                               "CREATE TABLE IF NOT EXISTS messages (\n"
                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
-                              "  hop_counter BIGINT NOT NULL,\n"
+                              "  hop_counter INT NOT NULL,\n"
                               "  signature BYTEA,\n"
                               "  purpose BYTEA,\n"
                               "  fragment_id BIGINT NOT NULL,\n"
@@ -179,29 +177,17 @@ database_setup (struct Plugin *plugin)
                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
                               "  name TEXT NOT NULL,\n"
                               "  value_current BYTEA,\n"
-                              "  value_signed BYTEA\n"
+                              "  value_signed BYTEA,\n"
+                              "  PRIMARY KEY (channel_id, name)\n"
                               ")" "WITH OIDS")) ||
-
-      (GNUNET_OK !=
-         GNUNET_POSTGRES_exec(plugin->dbh,
-                              "CREATE UNIQUE INDEX IF NOT EXISTS state_uniq_idx \n"
-                              " ON state (channel_id, substring(name from 1 for 5)\n"
-                              ")")) ||
-
       (GNUNET_OK !=
          GNUNET_POSTGRES_exec(plugin->dbh,
                               "CREATE TABLE IF NOT EXISTS state_sync (\n"
                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
                               "  name TEXT NOT NULL,\n"
                               "  value BYTEA,\n"
-                              "  PRIMARY KEY (channel_id)\n"
-                              ")" "WITH OIDS")) ||
-
-      (GNUNET_OK !=
-         GNUNET_POSTGRES_exec(plugin->dbh,
-                              "CREATE UNIQUE INDEX IF NOT EXISTS state_sync_name_idx \n"
-                              " ON state_sync (substring(name from 1 for 5)\n"
-                              ")")))
+                              "  PRIMARY KEY (channel_id, name)\n"
+                              ")" "WITH OIDS")))
   {
     PQfinish (plugin->dbh);
     plugin->dbh = NULL;
@@ -370,7 +356,7 @@ database_setup (struct Plugin *plugin)
                            "LEFT JOIN (SELECT channel_id, name, value_signed\n"
                            "           FROM state) AS old\n"
                            "ON new.channel_id = old.channel_id AND new.name = old.name\n"
-                           "ON CONFLICT ( channel_id, substring(name from 1 for 5) )\n"
+                           "ON CONFLICT (channel_id, name)\n"
                            "   DO UPDATE SET value_current = EXCLUDED.value_current,\n"
                            "                 value_signed = EXCLUDED.value_signed", 3)) ||
 
@@ -422,7 +408,7 @@ database_setup (struct Plugin *plugin)
                            "SELECT name, value_current\n"
                            "FROM state\n"
                            "WHERE channel_id = get_chan_id($1)\n"
-                           "      AND (name = $2 OR substr(name, 1, $3) = $4 || '_')", 4)) ||
+                           "      AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) ||
 
       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
                            "select_state_signed",
@@ -764,7 +750,7 @@ fragment_store (void *cls,
   uint64_t message_id = GNUNET_ntohll (msg->message_id);
   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
 
-  uint64_t hop_counter = ntohl(msg->hop_counter);
+  uint32_t hop_counter = ntohl(msg->hop_counter);
   uint32_t flags = ntohl(msg->flags);
 
   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
@@ -783,7 +769,7 @@ fragment_store (void *cls,
 
   struct GNUNET_PQ_QueryParam params_insert[] = {
     GNUNET_PQ_query_param_auto_from_type (channel_key),
-    GNUNET_PQ_query_param_uint64 (&hop_counter),
+    GNUNET_PQ_query_param_uint32 (&hop_counter),
     GNUNET_PQ_query_param_auto_from_type (&msg->signature),
     GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
     GNUNET_PQ_query_param_uint64 (&fragment_id),
@@ -819,15 +805,13 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   PGresult *res;
   struct Plugin *plugin = cls;
 
-  int ret = GNUNET_SYSERR;
-
   struct GNUNET_PQ_QueryParam params_update[] = {
-    GNUNET_PQ_query_param_uint64 (&psycstore_flags),
+    GNUNET_PQ_query_param_uint32 (&psycstore_flags),
     GNUNET_PQ_query_param_auto_from_type (channel_key),
     GNUNET_PQ_query_param_uint64 (&message_id),
     GNUNET_PQ_query_param_end
@@ -838,10 +822,10 @@ message_add_flags (void *cls,
                                                  res,
                                                  PGRES_COMMAND_OK,
                                                  "PQexecPrepared","update_message_flags"))
-    return ret;
+    return GNUNET_SYSERR;
 
   PQclear (res);
-  return ret;
+  return GNUNET_OK;
 }
 
 
@@ -850,7 +834,8 @@ fragment_row (struct Plugin *plugin,
               const char *stmt,
               PGresult *res,
               GNUNET_PSYCSTORE_FragmentCallback cb,
-              void *cb_cls)
+              void *cb_cls,
+              uint64_t *returned_fragments)
 {
   uint32_t hop_counter;
   void *signature = NULL;
@@ -862,14 +847,13 @@ fragment_row (struct Plugin *plugin,
   uint64_t fragment_offset;
   uint64_t message_id;
   uint64_t group_generation;
-  uint64_t flags;
+  uint32_t flags;
   void *buf;
   size_t buf_size;
   int ret = GNUNET_SYSERR;
   struct GNUNET_MULTICAST_MessageHeader *mp;
 
-  uint64_t msg_flags;
-  unsigned int cnt;
+  uint32_t msg_flags;
 
   struct GNUNET_PQ_ResultSpec results[] = {
     GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
@@ -879,8 +863,8 @@ fragment_row (struct Plugin *plugin,
     GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
     GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
     GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
-    GNUNET_PQ_result_spec_uint64 ("multicast_flags", &msg_flags),
-    GNUNET_PQ_result_spec_uint64 ("psycstore_flags", &flags),
+    GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
+    GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
     GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
     GNUNET_PQ_result_spec_end
   };
@@ -895,15 +879,12 @@ fragment_row (struct Plugin *plugin,
     return GNUNET_SYSERR;
   }
 
-  cnt = PQntuples (res);
-  if (cnt == 0)
+  int nrows = PQntuples (res);
+  for (int row = 0; row < nrows; row++)
   {
-    ret = GNUNET_NO;
-  }
-  else
-  {
-    if (GNUNET_OK != GNUNET_PQ_extract_result(res, results, 0)) {
-      return GNUNET_SYSERR;
+    if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
+    {
+      break;
     }
 
     mp = GNUNET_malloc (sizeof (*mp) + buf_size);
@@ -928,6 +909,8 @@ fragment_row (struct Plugin *plugin,
                    buf_size);
     GNUNET_PQ_cleanup_result(results);
     ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
+    if (NULL != returned_fragments)
+      (*returned_fragments)++;
   }
 
   return ret;
@@ -956,8 +939,7 @@ fragment_select (struct Plugin *plugin,
       ret = GNUNET_NO;
     else
     {
-      ret = fragment_row (plugin, stmt, res, cb, cb_cls);
-      (*returned_fragments)++;
+      ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
     }
     PQclear (res);
   }
@@ -1044,6 +1026,9 @@ message_get (void *cls,
   struct Plugin *plugin = cls;
   *returned_fragments = 0;
 
+  if (0 == fragment_limit)
+    fragment_limit = INT64_MAX;
+
   struct GNUNET_PQ_QueryParam params_select[] = {
     GNUNET_PQ_query_param_auto_from_type (channel_key),
     GNUNET_PQ_query_param_uint64 (&first_message_id),
@@ -1122,7 +1107,7 @@ message_get_fragment (void *cls,
     if (PQntuples (res) == 0)
       ret = GNUNET_NO;
     else
-      ret = fragment_row (plugin, stmt, res, cb, cb_cls);
+      ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
 
     PQclear (res);
   }
@@ -1164,14 +1149,13 @@ counters_message_get (void *cls,
   }
 
   struct GNUNET_PQ_ResultSpec results_select[] = {
-    GNUNET_PQ_result_spec_uint64 ("max_fragment_id", max_fragment_id),
-    GNUNET_PQ_result_spec_uint64 ("max_message_id", max_message_id),
-    GNUNET_PQ_result_spec_uint64 ("max_group_generation", max_group_generation),
+    GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
+    GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
+    GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
     GNUNET_PQ_result_spec_end
   };
 
-  if (GNUNET_OK != GNUNET_PQ_extract_result (res,
-                                  results_select, 0))
+  if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0))
   {
     PQclear (res);
     return GNUNET_SYSERR;
@@ -1221,8 +1205,7 @@ counters_state_get (void *cls,
     GNUNET_PQ_result_spec_end
   };
 
-  ret = GNUNET_PQ_extract_result (res,
-                                  results_select, 0);
+  ret = GNUNET_PQ_extract_result (res, results_select, 0);
 
   if (GNUNET_OK != ret)
   {
@@ -1540,8 +1523,7 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
     ret = GNUNET_NO;
   }
 
-  ret = GNUNET_PQ_extract_result (res,
-                                  results, 0);
+  ret = GNUNET_PQ_extract_result (res, results, 0);
 
   if (GNUNET_OK != ret)
   {
@@ -1573,7 +1555,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
 {
   PGresult *res;
   struct Plugin *plugin = cls;
-  int ret = GNUNET_SYSERR;
+  int ret = GNUNET_NO;
 
   const char *stmt = "select_state_prefix";
 
@@ -1606,18 +1588,11 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
     return GNUNET_SYSERR;
   }
 
-  do
+  int nrows = PQntuples (res);
+  for (int row = 0; row < nrows; row++)
   {
-    if (PQntuples (res) == 0)
+    if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
     {
-      PQclear (res);
-      ret = GNUNET_NO;
-      break;
-    }
-
-    if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0))
-    {
-      PQclear (res);
       break;
     }
 
@@ -1626,7 +1601,6 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
               value_size);
     GNUNET_PQ_cleanup_result(results);
   }
-  while (ret == GNUNET_YES);
 
   PQclear (res);
 
@@ -1648,7 +1622,7 @@ state_get_signed (void *cls,
 {
   PGresult *res;
   struct Plugin *plugin = cls;
-  int ret = GNUNET_SYSERR;
+  int ret = GNUNET_NO;
 
   const char *stmt = "select_state_signed";
 
@@ -1676,18 +1650,11 @@ state_get_signed (void *cls,
     return GNUNET_SYSERR;
   }
 
-  do
+  int nrows = PQntuples (res);
+  for (int row = 0; row < nrows; row++)
   {
-    if (PQntuples (res) == 0)
+    if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
     {
-      PQclear (res);
-      ret = GNUNET_NO;
-      break;
-    }
-
-    if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0))
-    {
-      PQclear (res);
       break;
     }
 
@@ -1695,9 +1662,8 @@ state_get_signed (void *cls,
               value_signed,
               value_size);
 
-    GNUNET_PQ_cleanup_result(results);
+    GNUNET_PQ_cleanup_result (results);
   }
-  while (ret == GNUNET_YES);
 
   PQclear (res);
 
index e6f79597179a422369b91f6ebab9305fddc79f94..4d21696ce52d12f6b8fe00cfd9279c25a97e71de 100644 (file)
@@ -599,7 +599,7 @@ database_setup (struct Plugin *plugin)
                "SELECT name, value_current\n"
                "FROM state\n"
                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-               "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
+               "      AND (name = ? OR substr(name, 1, ?) = ?);",
                &plugin->select_state_prefix);
 
   sql_prepare (plugin->dbh,
@@ -998,7 +998,7 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
   sqlite3_stmt *stmt = plugin->update_message_flags;
@@ -1773,7 +1773,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
                                       sizeof (*channel_key), SQLITE_STATIC)
       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
-      || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len + 1)
+      || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len)
       || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC))
   {
     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,