commented out wrong message type
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
index 2896f4968e9253951aa0ce7069b3170e091762a0..c76e7e6b12d2337ce2e7dcdad624e41494309476 100644 (file)
@@ -81,13 +81,8 @@ struct Plugin
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Database filename.
+   * MySQL context.
    */
-  char *fn;
-
-  /**
-    *Handle to talk to Mysql
-    */
   struct GNUNET_MYSQL_Context *mc;
 
   /**
@@ -95,18 +90,11 @@ struct Plugin
    */
   enum Transactions transaction;
 
-  struct GNUNET_MYSQL_StatementHandle *transaction_begin;
-
-  struct GNUNET_MYSQL_StatementHandle *transaction_commit;
-
-  struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
-
   /**
    * Precompiled SQL for channel_key_store()
    */
   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
 
-
   /**
    * Precompiled SQL for slave_key_store()
    */
@@ -246,7 +234,7 @@ mysql_trace (void *cls, const char *sql)
  * @param dbh handle to the database
  * @param sql SQL statement, UTF-8 encoded
  * @param stmt set to the prepared statement
- * @return 0 on success
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
 static int
 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
@@ -256,14 +244,19 @@ mysql_prepare (struct GNUNET_MYSQL_Context *mc,
   *stmt = GNUNET_MYSQL_statement_prepare (mc,
                                           sql);
 
-  LOG(GNUNET_ERROR_TYPE_DEBUG,
-       "Prepared `%s' / %p\n", sql, stmt);
-  if(NULL == *stmt)
-    LOG(GNUNET_ERROR_TYPE_ERROR,
-   _("Error preparing SQL query: %s\n  %s\n"),
-   mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
-
-  return 0;
+  if (NULL == *stmt)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         _("Error preparing SQL query: %s\n  %s\n"),
+         mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
+         sql);
+    return GNUNET_SYSERR;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Prepared `%s' / %p\n",
+       sql,
+       stmt);
+  return GNUNET_OK;
 }
 
 
@@ -273,73 +266,58 @@ mysql_prepare (struct GNUNET_MYSQL_Context *mc,
  * as needed as well).
  *
  * @param plugin the plugin context (state for this module)
- * @return GNUNET_OK on success
+ * @return #GNUNET_OK on success
  */
 static int
 database_setup (struct Plugin *plugin)
 {
-  char *filename;
-
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
-                                               "FILENAME", &filename))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
-                              "psycstore-mysql", "FILENAME");
-    return GNUNET_SYSERR;
-  }
-
-  if (GNUNET_OK != GNUNET_DISK_file_test (filename))
-  {
-    if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
-    {
-      GNUNET_break (0);
-      GNUNET_free (filename);
-      return GNUNET_SYSERR;
-    }
-  }
-  /* filename should be UTF-8-encoded. If it isn't, it's a bug */
-  plugin->fn = filename;
-
   /* Open database and precompile statements */
-  plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
+  plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
+                                            "psycstore-mysql");
 
   if (NULL == plugin->mc)
   {
-    LOG(GNUNET_ERROR_TYPE_ERROR,
-   _("Unable to initialize Mysql.\n"));
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         _("Unable to initialize Mysql.\n"));
     return GNUNET_SYSERR;
   }
 
-  /* Create tables */
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS channels (\n"
-                              " id INT AUTO_INCREMENT,\n"
-                              " pub_key BLOB,\n"
-                              " max_state_message_id INT,\n"
-                              " state_hash_message_id INT,\n"
-                              " PRIMARY KEY(id),\n"
-                              " UNIQUE KEY(pub_key(5))\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS slaves (\n"
-                              " id INT AUTO_INCREMENT,\n"
-                              " pub_key BLOB,\n"
-                              " PRIMARY KEY(id),\n"
-                              " UNIQUE KEY(pub_key(5))\n"
-                              ");");
+#define STMT_RUN(sql) \
+  if (GNUNET_OK != \
+      GNUNET_MYSQL_statement_run (plugin->mc, \
+                                  sql)) \
+  { \
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
+                _("Failed to run SQL statement `%s'\n"), \
+                sql); \
+    return GNUNET_SYSERR; \
+  }
 
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS membership (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
-                              "  did_join INT NOT NULL,\n"
-                              "  announced_at BIGINT UNSIGNED NOT NULL,\n"
-                              "  effective_since BIGINT UNSIGNED NOT NULL,\n"
-                              "  group_generation BIGINT UNSIGNED NOT NULL\n"
-                              ");");
+  /* Create tables */
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " max_state_message_id BIGINT UNSIGNED,\n"
+            " state_hash_message_id BIGINT UNSIGNED,\n"
+            " PRIMARY KEY(id),\n"
+            " UNIQUE KEY(pub_key(32))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " PRIMARY KEY(id),\n"
+            " UNIQUE KEY(pub_key(32))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
+            "  did_join TINYINT NOT NULL,\n"
+            "  announced_at BIGINT UNSIGNED NOT NULL,\n"
+            "  effective_since BIGINT UNSIGNED NOT NULL,\n"
+            "  group_generation BIGINT UNSIGNED NOT NULL\n"
+            ");");
 
 /*** FIX because IF NOT EXISTS doesn't work ***/
   GNUNET_MYSQL_statement_run (plugin->mc,
@@ -347,246 +325,211 @@ database_setup (struct Plugin *plugin)
                               "ON membership (channel_id, slave_id);");
 
   /** @todo messages table: add method_name column */
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS messages (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
-                              "  signature BLOB,\n"
-                              "  purpose BLOB,\n"
-                              "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
-                              "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
+            "  signature BLOB,\n"
+            "  purpose BLOB,\n"
+            "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
+            "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
-                              "  group_generation BIGINT UNSIGNED NOT NULL,\n"
-                              "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
-                              "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
-                              "  data BLOB,\n"
-                              "  PRIMARY KEY (channel_id, fragment_id),\n"
-                              "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS state (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  name TEXT NOT NULL,\n"
-                              "  value_current BLOB,\n"
-                              "  value_signed BLOB,\n"
-                              "  PRIMARY KEY (channel_id, name(5))\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS state_sync (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  name TEXT NOT NULL,\n"
-                              "  value BLOB,\n"
-                              "  PRIMARY KEY (channel_id, name(5))\n"
-                              ");");
+            "  group_generation BIGINT UNSIGNED NOT NULL,\n"
+            "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
+            "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
+            "  data BLOB,\n"
+            "  PRIMARY KEY (channel_id, fragment_id),\n"
+            "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  name TEXT NOT NULL,\n"
+            "  value_current BLOB,\n"
+            "  value_signed BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  name TEXT NOT NULL,\n"
+            "  value BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
+            ");");
+#undef STMT_RUN
 
   /* Prepare statements */
-  mysql_prepare (plugin->mc,
-                "BEGIN",
-                &plugin->transaction_begin);
-
-  mysql_prepare (plugin->mc,
-                "COMMIT",
-                &plugin->transaction_commit);
-
-  mysql_prepare (plugin->mc,
-                "ROLLBACK;",
-                &plugin->transaction_rollback);
-
-  mysql_prepare (plugin->mc,
-                "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
-                &plugin->insert_channel_key);
-
-  mysql_prepare (plugin->mc,
-                "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
-                &plugin->insert_slave_key);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO membership\n"
-                " (channel_id, slave_id, did_join, announced_at,\n"
-                "  effective_since, group_generation)\n"
-                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
-                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
-                "        ?, ?, ?, ?);",
-                &plugin->insert_membership);
-
-  mysql_prepare (plugin->mc,
-                "SELECT did_join FROM membership\n"
-               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-               "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
-               "      AND effective_since <= ? AND did_join = 1\n"
-               "ORDER BY announced_at DESC LIMIT 1;",
-               &plugin->select_membership);
-
-  mysql_prepare (plugin->mc,
-                "INSERT IGNORE INTO messages\n"
-               " (channel_id, hop_counter, signature, purpose,\n"
-               "  fragment_id, fragment_offset, message_id,\n"
-               "  group_generation, multicast_flags, psycstore_flags, data)\n"
-               "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
-               "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
-                &plugin->insert_fragment);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE messages\n"
-                "SET psycstore_flags = psycstore_flags | ?\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id = ? AND fragment_offset = 0;",
-                &plugin->update_message_flags);
-
-  mysql_prepare (plugin->mc,
-                  "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                  "       fragment_offset, message_id, group_generation,\n"
-                  "       multicast_flags, psycstore_flags, data\n"
-                  "FROM messages\n"
-                  "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                  "      AND ? <= fragment_id AND fragment_id <= ?;",
-                &plugin->select_fragments);
+#define PREP(stmt,handle)                                    \
+  if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
+  { \
+    GNUNET_break (0); \
+    return GNUNET_SYSERR; \
+  }
+  PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
+        &plugin->insert_channel_key);
+  PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
+        &plugin->insert_slave_key);
+  PREP ("INSERT INTO membership\n"
+        " (channel_id, slave_id, did_join, announced_at,\n"
+        "  effective_since, group_generation)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
+        "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
+        "        ?, ?, ?, ?);",
+        &plugin->insert_membership);
+  PREP ("SELECT did_join FROM membership\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
+        "      AND effective_since <= ? AND did_join = 1\n"
+        "ORDER BY announced_at DESC LIMIT 1;",
+        &plugin->select_membership);
+
+  PREP ("INSERT IGNORE INTO messages\n"
+        " (channel_id, hop_counter, signature, purpose,\n"
+        "  fragment_id, fragment_offset, message_id,\n"
+        "  group_generation, multicast_flags, psycstore_flags, data)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
+        "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
+        &plugin->insert_fragment);
+
+  PREP ("UPDATE messages\n"
+        "SET psycstore_flags = psycstore_flags | ?\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id = ? AND fragment_offset = 0;",
+        &plugin->update_message_flags);
+
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
+        &plugin->select_fragments);
 
   /** @todo select_messages: add method_prefix filter */
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "       multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND ? <= message_id AND message_id <= ?"
-                "LIMIT ?;",
-                &plugin->select_messages);
-
-  mysql_prepare (plugin->mc,
-                "SELECT * FROM\n"
-                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "        fragment_offset, message_id, group_generation,\n"
-                "        multicast_flags, psycstore_flags, data\n"
-                " FROM messages\n"
-                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                " ORDER BY fragment_id DESC\n"
-                " LIMIT ?)\n"
-                "ORDER BY fragment_id;",
-                &plugin->select_latest_fragments);
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND ? <= message_id AND message_id <= ?\n"
+        "LIMIT ?;",
+        &plugin->select_messages);
+
+  PREP ("SELECT * FROM\n"
+        "(SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "        fragment_offset, message_id, group_generation,\n"
+        "        multicast_flags, psycstore_flags, data\n"
+        " FROM messages\n"
+        " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        " ORDER BY fragment_id DESC\n"
+        " LIMIT ?)\n"
+        "ORDER BY fragment_id;",
+        &plugin->select_latest_fragments);
 
   /** @todo select_latest_messages: add method_prefix filter */
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "        multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id IN\n"
-                "      (SELECT message_id\n"
-                "       FROM messages\n"
-                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "       GROUP BY message_id\n"
-                "       ORDER BY message_id\n"
-                "       DESC LIMIT ?)\n"
-                "ORDER BY fragment_id;",
-                &plugin->select_latest_messages);
-
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "       multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id = ? AND fragment_offset = ?;",
-                &plugin->select_message_fragment);
-
-  mysql_prepare (plugin->mc,
-                "SELECT fragment_id, message_id, group_generation\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "ORDER BY fragment_id DESC LIMIT 1;",
-                &plugin->select_counters_message);
-
-  mysql_prepare (plugin->mc,
-                "SELECT max_state_message_id\n"
-                "FROM channels\n"
-                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
-                &plugin->select_counters_state);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE channels\n"
-                "SET max_state_message_id = ?\n"
-                "WHERE pub_key = ?;",
-                &plugin->update_max_state_message_id);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE channels\n"
-                "SET state_hash_message_id = ?\n"
-                "WHERE pub_key = ?;",
-                &plugin->update_state_hash_message_id);
-
-  mysql_prepare (plugin->mc,
-                "REPLACE INTO state\n"
-                "  (channel_id, name, value_current, value_signed)\n"
-                "SELECT new.channel_id, new.name,\n"
-                "       new.value_current, old.value_signed\n"
-                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "             AS channel_id,\n"
-                "             ? AS name, ? AS value_current) AS new\n"
-                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
-                "           FROM state) AS old\n"
-                "ON new.channel_id = old.channel_id AND new.name = old.name;",
-                &plugin->insert_state_current);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
-                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
-                &plugin->delete_state_empty);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE state\n"
-                "SET value_signed = value_current\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->update_state_signed);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->delete_state);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO state_sync (channel_id, name, value)\n"
-                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
-                &plugin->insert_state_sync);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO state\n"
-                " (channel_id, name, value_current, value_signed)\n"
-                "SELECT channel_id, name, value, value\n"
-                "FROM state_sync\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->insert_state_from_sync);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state_sync\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->delete_state_sync);
-
-  mysql_prepare (plugin->mc,
-                "SELECT value_current\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND name = ?;",
-                &plugin->select_state_one);
-
-  mysql_prepare (plugin->mc,
-                "SELECT name, value_current\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
-                &plugin->select_state_prefix);
-
-  mysql_prepare (plugin->mc,
-                "SELECT name, value_signed\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
-                "      AND value_signed IS NOT NULL;",
-                &plugin->select_state_signed);
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "        multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id IN\n"
+        "      (SELECT message_id\n"
+        "       FROM messages\n"
+        "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "       GROUP BY message_id\n"
+        "       ORDER BY message_id\n"
+        "       DESC LIMIT ?)\n"
+        "ORDER BY fragment_id;",
+        &plugin->select_latest_messages);
+
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id = ? AND fragment_offset = ?;",
+        &plugin->select_message_fragment);
+
+  PREP ("SELECT fragment_id, message_id, group_generation\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "ORDER BY fragment_id DESC LIMIT 1;",
+        &plugin->select_counters_message);
+
+  PREP ("SELECT max_state_message_id\n"
+        "FROM channels\n"
+        "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
+        &plugin->select_counters_state);
+
+  PREP ("UPDATE channels\n"
+        "SET max_state_message_id = ?\n"
+        "WHERE pub_key = ?;",
+        &plugin->update_max_state_message_id);
+
+  PREP ("UPDATE channels\n"
+        "SET state_hash_message_id = ?\n"
+        "WHERE pub_key = ?;",
+        &plugin->update_state_hash_message_id);
+
+  PREP ("REPLACE INTO state\n"
+        "  (channel_id, name, value_current, value_signed)\n"
+        "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
+        "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
+        "             (SELECT ?) AS name,\n"
+        "             (SELECT ?) AS value_current\n"
+        "     ) AS new\n"
+        "LEFT JOIN (SELECT channel_id, name, value_signed\n"
+        "           FROM state) AS old\n"
+        "ON new.channel_id = old.channel_id AND new.name = old.name;",
+        &plugin->insert_state_current);
+
+  PREP ("DELETE FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND (value_current IS NULL OR length(value_current) = 0)\n"
+        "      AND (value_signed IS NULL OR length(value_signed) = 0);",
+        &plugin->delete_state_empty);
+
+  PREP ("UPDATE state\n"
+        "SET value_signed = value_current\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->update_state_signed);
+
+  PREP ("DELETE FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->delete_state);
+
+  PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
+        &plugin->insert_state_sync);
+
+  PREP ("INSERT INTO state\n"
+        " (channel_id, name, value_current, value_signed)\n"
+        "SELECT channel_id, name, value, value\n"
+        "FROM state_sync\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->insert_state_from_sync);
+
+  PREP ("DELETE FROM state_sync\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->delete_state_sync);
+
+  PREP ("SELECT value_current\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND name = ?;",
+        &plugin->select_state_one);
+
+  PREP ("SELECT name, value_current\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND (name = ? OR substr(name, 1, ?) = ?);",
+        &plugin->select_state_prefix);
+
+  PREP ("SELECT name, value_signed\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
+        "      AND value_signed IS NOT NULL;",
+        &plugin->select_state_signed);
+#undef PREP
 
   return GNUNET_OK;
 }
@@ -601,9 +544,6 @@ static void
 database_shutdown (struct Plugin *plugin)
 {
   GNUNET_MYSQL_context_destroy (plugin->mc);
-
-  GNUNET_free_non_null (plugin->fn);
-
 }
 
 
@@ -648,23 +588,9 @@ exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
 static int
 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exexc_prepared", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
     return GNUNET_SYSERR;
   }
 
@@ -679,23 +605,9 @@ transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 static int
 transaction_commit (struct Plugin *plugin)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
     return GNUNET_SYSERR;
   }
 
@@ -710,23 +622,9 @@ transaction_commit (struct Plugin *plugin)
 static int
 transaction_rollback (struct Plugin *plugin)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql exec_prepared", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
     return GNUNET_SYSERR;
   }
 
@@ -897,8 +795,7 @@ membership_test (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  switch(GNUNET_MY_extract_result (stmt,
-                                results_select))
+  switch (GNUNET_MY_extract_result (stmt, results_select))
   {
     case GNUNET_NO:
       ret = GNUNET_NO;
@@ -1008,7 +905,7 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
@@ -1017,7 +914,7 @@ message_add_flags (void *cls,
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint64 (&psycstore_flags),
+    GNUNET_MY_query_param_uint32 (&psycstore_flags),
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&message_id),
     GNUNET_MY_query_param_end
@@ -1049,7 +946,8 @@ message_add_flags (void *cls,
 static int
 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
               GNUNET_PSYCSTORE_FragmentCallback cb,
-              void *cb_cls)
+              void *cb_cls,
+              uint64_t *returned_fragments)
 {
 
   uint32_t hop_counter;
@@ -1057,7 +955,6 @@ fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
   void *purpose = NULL;
   size_t signature_size;
   size_t purpose_size;
-
   uint64_t fragment_id;
   uint64_t fragment_offset;
   uint64_t message_id;
@@ -1068,10 +965,7 @@ fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
   int ret = GNUNET_SYSERR;
   int sql_ret;
   struct GNUNET_MULTICAST_MessageHeader *mp;
-
   uint64_t msg_flags;
-
-
   struct GNUNET_MY_ResultSpec results[] = {
     GNUNET_MY_result_spec_uint32 (&hop_counter),
     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
@@ -1087,27 +981,28 @@ fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
     GNUNET_MY_result_spec_end
   };
 
-  sql_ret = GNUNET_MY_extract_result (stmt,
-                                results);
-  switch (sql_ret)
+  do
   {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
+    {
     case GNUNET_NO:
-      if (ret != GNUNET_OK)
+      if (ret != GNUNET_YES)
         ret = GNUNET_NO;
       break;
 
-    case GNUNET_OK:
+    case GNUNET_YES:
       mp = GNUNET_malloc (sizeof (*mp) + buf_size);
 
       mp->header.size = htons (sizeof (*mp) + buf_size);
       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
       mp->hop_counter = htonl (hop_counter);
       GNUNET_memcpy (&mp->signature,
-                    signature,
-                    signature_size);
+                     signature,
+                     signature_size);
       GNUNET_memcpy (&mp->purpose,
-                    purpose,
-                    purpose_size);
+                     purpose,
+                     purpose_size);
       mp->fragment_id = GNUNET_htonll (fragment_id);
       mp->fragment_offset = GNUNET_htonll (fragment_offset);
       mp->message_id = GNUNET_htonll (message_id);
@@ -1115,59 +1010,61 @@ fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
       mp->flags = htonl(msg_flags);
 
       GNUNET_memcpy (&mp[1],
-                    buf,
-                    buf_size);
+                     buf,
+                     buf_size);
       ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
-
+      if (NULL != returned_fragments)
+        (*returned_fragments)++;
       GNUNET_MY_cleanup_result (results);
       break;
 
     default:
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                    "mysql extract_result", stmt);
+      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                 "mysql extract_result", stmt);
+    }
   }
+  while (GNUNET_YES == sql_ret);
+
+  // for debugging
+  if (GNUNET_NO == ret)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
+               "Empty result set\n");
 
   return ret;
 }
 
 
 static int
-fragment_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
+fragment_select (struct Plugin *plugin,
+                 struct GNUNET_MYSQL_StatementHandle *stmt,
                  struct GNUNET_MY_QueryParam *params,
                  uint64_t *returned_fragments,
-                 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
+                 GNUNET_PSYCSTORE_FragmentCallback cb,
+                 void *cb_cls)
 {
   int ret = GNUNET_SYSERR;
   int sql_ret;
 
-  // FIXME
-  if (NULL == plugin->mc || NULL == stmt || NULL == params)
-  {
-    fprintf(stderr, "%p %p %p\n", plugin->mc, stmt, params);
-    return GNUNET_SYSERR;
-  }
-
   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
   switch (sql_ret)
   {
     case GNUNET_NO:
-      if (ret != GNUNET_OK)
+      if (ret != GNUNET_YES)
         ret = GNUNET_NO;
       break;
 
     case GNUNET_YES:
-       ret = fragment_row (stmt, cb, cb_cls);
-       (*returned_fragments)++;
+      ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
       break;
 
     default:
       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                  "mysql exec_prepared", stmt);
   }
-
   return ret;
 }
 
+
 /**
  * Retrieve a message fragment range by fragment ID.
  *
@@ -1187,8 +1084,6 @@ fragment_get (void *cls,
   struct Plugin *plugin = cls;
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
   int ret = GNUNET_SYSERR;
-  *returned_fragments = 0;
-
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&first_fragment_id),
@@ -1196,6 +1091,7 @@ fragment_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
+  *returned_fragments = 0;
   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -1268,11 +1164,11 @@ message_get (void *cls,
              void *cb_cls)
 {
   struct Plugin *plugin = cls;
-
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
+  int ret;
 
-  int ret = GNUNET_SYSERR;
-  *returned_fragments = 0;
+  if (0 == fragment_limit)
+    fragment_limit = UINT64_MAX;
 
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
@@ -1282,6 +1178,7 @@ message_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
+  *returned_fragments = 0;
   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -1373,7 +1270,7 @@ message_get_fragment (void *cls,
       break;
 
     case GNUNET_OK:
-      ret = fragment_row (stmt, cb, cb_cls);
+      ret = fragment_row (stmt, cb, cb_cls, NULL);
       break;
 
     default:
@@ -1430,8 +1327,7 @@ counters_message_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
@@ -1485,8 +1381,7 @@ counters_state_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
@@ -1521,7 +1416,7 @@ state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_string (name),
-    GNUNET_MY_query_param_auto_from_type (value),
+    GNUNET_MY_query_param_fixed_size(value, value_size),
     GNUNET_MY_query_param_end
   };
 
@@ -1639,8 +1534,8 @@ state_modify_op (void *cls,
   switch (op)
   {
   case GNUNET_PSYC_OP_ASSIGN:
-    return state_assign (plugin, plugin->insert_state_current, channel_key,
-                         name, value, value_size);
+    return state_assign (plugin, plugin->insert_state_current,
+                         channel_key, name, value, value_size);
 
   default: /** @todo implement more state operations */
     GNUNET_break (0);
@@ -1695,8 +1590,8 @@ state_sync_assign (void *cls,
                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
-  return state_assign (cls, plugin->insert_state_sync, channel_key,
-                       name, value, value_size);
+  return state_assign (cls, plugin->insert_state_sync,
+                       channel_key, name, value, value_size);
 }
 
 
@@ -1812,10 +1707,11 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
     case GNUNET_NO:
       ret = GNUNET_NO;
       break;
+
     case GNUNET_YES:
-      ret = cb (cb_cls, name, value_current,
-                value_size);
+      ret = cb (cb_cls, name, value_current, value_size);
       break;
+
     default:
       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "mysql extract_result", stmt);
@@ -1868,30 +1764,29 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
     GNUNET_MY_result_spec_string (&name2),
     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
     GNUNET_MY_result_spec_end
-  };
+  };;
 
   int sql_ret;
 
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
+
   do
   {
-    if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
-    {
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
-      break;
-    }
     sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
       case GNUNET_NO:
-        if (ret != GNUNET_OK)
+        if (ret != GNUNET_YES)
           ret = GNUNET_NO;
         break;
 
       case GNUNET_YES:
-        ret = cb (cb_cls, (const char *) name2,
-                  value_current,
-                  value_size);
+        ret = cb (cb_cls, (const char *) name2, value_current, value_size);
 
         if (ret != GNUNET_YES)
           sql_ret = GNUNET_NO;
@@ -1899,7 +1794,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
 
       default:
         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql extract_result", stmt);
+                  "mysql extract_result", stmt);
     }
   }
   while (sql_ret == GNUNET_YES);
@@ -1949,29 +1844,30 @@ state_get_signed (void *cls,
     GNUNET_MY_result_spec_end
   };
 
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
+
   do
   {
-    if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
-    {
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
-      break;
-    }
     sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
       case GNUNET_NO:
-        if (ret != GNUNET_OK)
+        if (ret != GNUNET_YES)
           ret = GNUNET_NO;
         break;
+
       case GNUNET_YES:
-        ret = cb (cb_cls, (const char *) name,
-                  value_signed,
-                  value_size);
+        ret = cb (cb_cls, (const char *) name, value_signed, value_size);
 
         if (ret != GNUNET_YES)
             sql_ret = GNUNET_NO;
         break;
+
       default:
          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql extract_result", stmt);