commented out wrong message type
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
index 4c28b00b061b972138e3cd59842502fd5b89d81c..c76e7e6b12d2337ce2e7dcdad624e41494309476 100644 (file)
  * a failure of the command 'cmd' on file 'filename'
  * with the message given by strerror(errno).
  */
-#define LOG_MYSQL(db, level, cmd, stmt) do { GNUNET_log_from (level, "psycstore-mysql", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); } while(0)
+#define LOG_MYSQL(db, level, cmd, stmt)                                 \
+  do {                                                                  \
+    GNUNET_log_from (level, "psycstore-mysql",                          \
+                     _("`%s' failed at %s:%d with error: %s\n"),        \
+                     cmd, __FILE__, __LINE__,                           \
+                     mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
+  } while (0)
 
 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
 
@@ -75,13 +81,8 @@ struct Plugin
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Database filename.
+   * MySQL context.
    */
-  char *fn;
-
-  /**
-    *Handle to talk to Mysql
-    */
   struct GNUNET_MYSQL_Context *mc;
 
   /**
@@ -89,18 +90,11 @@ struct Plugin
    */
   enum Transactions transaction;
 
-  struct GNUNET_MYSQL_StatementHandle *transaction_begin;
-
-  struct GNUNET_MYSQL_StatementHandle *transaction_commit;
-
-  struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
-
   /**
    * Precompiled SQL for channel_key_store()
    */
   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
 
-
   /**
    * Precompiled SQL for slave_key_store()
    */
@@ -240,7 +234,7 @@ mysql_trace (void *cls, const char *sql)
  * @param dbh handle to the database
  * @param sql SQL statement, UTF-8 encoded
  * @param stmt set to the prepared statement
- * @return 0 on success
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
 static int
 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
@@ -250,14 +244,19 @@ mysql_prepare (struct GNUNET_MYSQL_Context *mc,
   *stmt = GNUNET_MYSQL_statement_prepare (mc,
                                           sql);
 
-  LOG(GNUNET_ERROR_TYPE_DEBUG,
-       "Prepared `%s' / %p\n", sql, stmt);
-  if(NULL == *stmt)
-    LOG(GNUNET_ERROR_TYPE_ERROR,
-   _("Error preparing SQL query: %s\n  %s\n"),
-   mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
-
-  return 0;
+  if (NULL == *stmt)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         _("Error preparing SQL query: %s\n  %s\n"),
+         mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
+         sql);
+    return GNUNET_SYSERR;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Prepared `%s' / %p\n",
+       sql,
+       stmt);
+  return GNUNET_OK;
 }
 
 
@@ -267,73 +266,58 @@ mysql_prepare (struct GNUNET_MYSQL_Context *mc,
  * as needed as well).
  *
  * @param plugin the plugin context (state for this module)
- * @return GNUNET_OK on success
+ * @return #GNUNET_OK on success
  */
 static int
 database_setup (struct Plugin *plugin)
 {
-  char *filename;
-
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
-                                               "FILENAME", &filename))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
-                              "psycstore-mysql", "FILENAME");
-    return GNUNET_SYSERR;
-  }
-  
-  if (GNUNET_OK != GNUNET_DISK_file_test (filename))
-  {
-    if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
-    {
-      GNUNET_break (0);
-      GNUNET_free (filename);
-      return GNUNET_SYSERR;
-    }
-  }
-  /* filename should be UTF-8-encoded. If it isn't, it's a bug */
-  plugin->fn = filename;
-
   /* Open database and precompile statements */
-  plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
+  plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
+                                            "psycstore-mysql");
 
   if (NULL == plugin->mc)
   {
-    LOG(GNUNET_ERROR_TYPE_ERROR,
-   _("Unable to initialize Mysql.\n"));
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         _("Unable to initialize Mysql.\n"));
     return GNUNET_SYSERR;
   }
 
-  /* Create tables */
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS channels (\n"
-                              " id INT AUTO_INCREMENT,\n"
-                              " pub_key BLOB,\n"
-                              " max_state_message_id INT,\n"
-                              " state_hash_message_id INT,\n"
-                              " PRIMARY KEY(id),\n"
-                              " UNIQUE KEY(pub_key(5))\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS slaves (\n"
-                              " id INT AUTO_INCREMENT,\n"
-                              " pub_key BLOB,\n"
-                              " PRIMARY KEY(id),\n"
-                              " UNIQUE KEY(pub_key(5))\n"
-                              ");");
+#define STMT_RUN(sql) \
+  if (GNUNET_OK != \
+      GNUNET_MYSQL_statement_run (plugin->mc, \
+                                  sql)) \
+  { \
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
+                _("Failed to run SQL statement `%s'\n"), \
+                sql); \
+    return GNUNET_SYSERR; \
+  }
 
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS membership (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
-                              "  did_join INT NOT NULL,\n"
-                              "  announced_at INT NOT NULL,\n"
-                              "  effective_since INT NOT NULL,\n"
-                              "  group_generation INT NOT NULL\n"
-                              ");");
+  /* Create tables */
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " max_state_message_id BIGINT UNSIGNED,\n"
+            " state_hash_message_id BIGINT UNSIGNED,\n"
+            " PRIMARY KEY(id),\n"
+            " UNIQUE KEY(pub_key(32))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " PRIMARY KEY(id),\n"
+            " UNIQUE KEY(pub_key(32))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
+            "  did_join TINYINT NOT NULL,\n"
+            "  announced_at BIGINT UNSIGNED NOT NULL,\n"
+            "  effective_since BIGINT UNSIGNED NOT NULL,\n"
+            "  group_generation BIGINT UNSIGNED NOT NULL\n"
+            ");");
 
 /*** FIX because IF NOT EXISTS doesn't work ***/
   GNUNET_MYSQL_statement_run (plugin->mc,
@@ -341,246 +325,211 @@ database_setup (struct Plugin *plugin)
                               "ON membership (channel_id, slave_id);");
 
   /** @todo messages table: add method_name column */
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS messages (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  hop_counter INT NOT NULL,\n"
-                              "  signature BLOB,\n"
-                              "  purpose BLOB,\n"
-                              "  fragment_id INT NOT NULL,\n"
-                              "  fragment_offset INT NOT NULL,\n"
-                              "  message_id INT NOT NULL,\n"
-                              "  group_generation INT NOT NULL,\n"
-                              "  multicast_flags INT NOT NULL,\n"
-                              "  psycstore_flags INT NOT NULL,\n"
-                              "  data BLOB,\n"
-                              "  PRIMARY KEY (channel_id, fragment_id),\n"
-                              "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS state (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  name TEXT NOT NULL,\n"
-                              "  value_current BLOB,\n"
-                              "  value_signed BLOB,\n"
-                              "  PRIMARY KEY (channel_id, name(5))\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS state_sync (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  name TEXT NOT NULL,\n"
-                              "  value BLOB,\n"
-                              "  PRIMARY KEY (channel_id, name(5))\n"
-                              ");");
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
+            "  signature BLOB,\n"
+            "  purpose BLOB,\n"
+            "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
+            "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
+                              "  message_id BIGINT UNSIGNED NOT NULL,\n"
+            "  group_generation BIGINT UNSIGNED NOT NULL,\n"
+            "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
+            "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
+            "  data BLOB,\n"
+            "  PRIMARY KEY (channel_id, fragment_id),\n"
+            "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  name TEXT NOT NULL,\n"
+            "  value_current BLOB,\n"
+            "  value_signed BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  name TEXT NOT NULL,\n"
+            "  value BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
+            ");");
+#undef STMT_RUN
 
   /* Prepare statements */
-  mysql_prepare (plugin->mc,
-                "BEGIN",
-                &plugin->transaction_begin);
-
-  mysql_prepare (plugin->mc,
-                "COMMIT",
-                &plugin->transaction_commit);
-
-  mysql_prepare (plugin->mc,
-                "ROLLBACK;",
-                &plugin->transaction_rollback);
-
-  mysql_prepare (plugin->mc,
-                "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
-                &plugin->insert_channel_key);
-
-  mysql_prepare (plugin->mc,
-                "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
-                &plugin->insert_slave_key);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO membership\n"
-                " (channel_id, slave_id, did_join, announced_at,\n"
-                "  effective_since, group_generation)\n"
-                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
-                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
-                "        ?, ?, ?, ?);",
-                &plugin->insert_membership);
-
-  mysql_prepare (plugin->mc,
-                "SELECT did_join FROM membership\n"
-               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-               "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
-               "      AND effective_since <= ? AND did_join = 1\n"
-               "ORDER BY announced_at DESC LIMIT 1;",
-               &plugin->select_membership);
-
-  mysql_prepare (plugin->mc,
-                "INSERT IGNORE INTO messages\n"
-               " (channel_id, hop_counter, signature, purpose,\n"
-               "  fragment_id, fragment_offset, message_id,\n"
-               "  group_generation, multicast_flags, psycstore_flags, data)\n"
-               "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
-               "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
-                &plugin->insert_fragment);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE messages\n"
-                "SET psycstore_flags = psycstore_flags | ?\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id = ? AND fragment_offset = 0;",
-                &plugin->update_message_flags);
-
-  mysql_prepare (plugin->mc,
-                  "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                  "       fragment_offset, message_id, group_generation,\n"
-                  "       multicast_flags, psycstore_flags, data\n"
-                  "FROM messages\n"
-                  "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                  "      AND ? <= fragment_id AND fragment_id <= ?;",
-                &plugin->select_fragments);
+#define PREP(stmt,handle)                                    \
+  if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
+  { \
+    GNUNET_break (0); \
+    return GNUNET_SYSERR; \
+  }
+  PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
+        &plugin->insert_channel_key);
+  PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
+        &plugin->insert_slave_key);
+  PREP ("INSERT INTO membership\n"
+        " (channel_id, slave_id, did_join, announced_at,\n"
+        "  effective_since, group_generation)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
+        "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
+        "        ?, ?, ?, ?);",
+        &plugin->insert_membership);
+  PREP ("SELECT did_join FROM membership\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
+        "      AND effective_since <= ? AND did_join = 1\n"
+        "ORDER BY announced_at DESC LIMIT 1;",
+        &plugin->select_membership);
+
+  PREP ("INSERT IGNORE INTO messages\n"
+        " (channel_id, hop_counter, signature, purpose,\n"
+        "  fragment_id, fragment_offset, message_id,\n"
+        "  group_generation, multicast_flags, psycstore_flags, data)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
+        "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
+        &plugin->insert_fragment);
+
+  PREP ("UPDATE messages\n"
+        "SET psycstore_flags = psycstore_flags | ?\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id = ? AND fragment_offset = 0;",
+        &plugin->update_message_flags);
+
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
+        &plugin->select_fragments);
 
   /** @todo select_messages: add method_prefix filter */
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "       multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND ? <= message_id AND message_id <= ?"
-                "LIMIT ?;",
-                &plugin->select_messages);
-
-  mysql_prepare (plugin->mc,
-                "SELECT * FROM\n"
-                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "        fragment_offset, message_id, group_generation,\n"
-                "        multicast_flags, psycstore_flags, data\n"
-                " FROM messages\n"
-                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                " ORDER BY fragment_id DESC\n"
-                " LIMIT ?)\n"
-                "ORDER BY fragment_id;",
-                &plugin->select_latest_fragments);
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND ? <= message_id AND message_id <= ?\n"
+        "LIMIT ?;",
+        &plugin->select_messages);
+
+  PREP ("SELECT * FROM\n"
+        "(SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "        fragment_offset, message_id, group_generation,\n"
+        "        multicast_flags, psycstore_flags, data\n"
+        " FROM messages\n"
+        " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        " ORDER BY fragment_id DESC\n"
+        " LIMIT ?)\n"
+        "ORDER BY fragment_id;",
+        &plugin->select_latest_fragments);
 
   /** @todo select_latest_messages: add method_prefix filter */
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "        multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id IN\n"
-                "      (SELECT message_id\n"
-                "       FROM messages\n"
-                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "       GROUP BY message_id\n"
-                "       ORDER BY message_id\n"
-                "       DESC LIMIT ?)\n"
-                "ORDER BY fragment_id;",
-                &plugin->select_latest_messages);
-
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "       multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id = ? AND fragment_offset = ?;",
-                &plugin->select_message_fragment);
-
-  mysql_prepare (plugin->mc,
-                "SELECT fragment_id, message_id, group_generation\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "ORDER BY fragment_id DESC LIMIT 1;",
-                &plugin->select_counters_message);
-
-  mysql_prepare (plugin->mc,
-                "SELECT max_state_message_id\n"
-                "FROM channels\n"
-                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
-                &plugin->select_counters_state);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE channels\n"
-                "SET max_state_message_id = ?\n"
-                "WHERE pub_key = ?;",
-                &plugin->update_max_state_message_id);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE channels\n"
-                "SET state_hash_message_id = ?\n"
-                "WHERE pub_key = ?;",
-                &plugin->update_state_hash_message_id);
-
-  mysql_prepare (plugin->mc,
-                "REPLACE INTO state\n"
-                "  (channel_id, name, value_current, value_signed)\n"
-                "SELECT new.channel_id, new.name,\n"
-                "       new.value_current, old.value_signed\n"
-                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "             AS channel_id,\n"
-                "             ? AS name, ? AS value_current) AS new\n"
-                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
-                "           FROM state) AS old\n"
-                "ON new.channel_id = old.channel_id AND new.name = old.name;",
-                &plugin->insert_state_current);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
-                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
-                &plugin->delete_state_empty);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE state\n"
-                "SET value_signed = value_current\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->update_state_signed);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->delete_state);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO state_sync (channel_id, name, value)\n"
-                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
-                &plugin->insert_state_sync);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO state\n"
-                " (channel_id, name, value_current, value_signed)\n"
-                "SELECT channel_id, name, value, value\n"
-                "FROM state_sync\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->insert_state_from_sync);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state_sync\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                &plugin->delete_state_sync);
-
-  mysql_prepare (plugin->mc,
-                "SELECT value_current\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND name = ?;",
-                &plugin->select_state_one);
-
-  mysql_prepare (plugin->mc,
-                "SELECT name, value_current\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
-                &plugin->select_state_prefix);
-
-  mysql_prepare (plugin->mc,
-                "SELECT name, value_signed\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
-                "      AND value_signed IS NOT NULL;",
-                &plugin->select_state_signed);
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "        multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id IN\n"
+        "      (SELECT message_id\n"
+        "       FROM messages\n"
+        "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "       GROUP BY message_id\n"
+        "       ORDER BY message_id\n"
+        "       DESC LIMIT ?)\n"
+        "ORDER BY fragment_id;",
+        &plugin->select_latest_messages);
+
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id = ? AND fragment_offset = ?;",
+        &plugin->select_message_fragment);
+
+  PREP ("SELECT fragment_id, message_id, group_generation\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "ORDER BY fragment_id DESC LIMIT 1;",
+        &plugin->select_counters_message);
+
+  PREP ("SELECT max_state_message_id\n"
+        "FROM channels\n"
+        "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
+        &plugin->select_counters_state);
+
+  PREP ("UPDATE channels\n"
+        "SET max_state_message_id = ?\n"
+        "WHERE pub_key = ?;",
+        &plugin->update_max_state_message_id);
+
+  PREP ("UPDATE channels\n"
+        "SET state_hash_message_id = ?\n"
+        "WHERE pub_key = ?;",
+        &plugin->update_state_hash_message_id);
+
+  PREP ("REPLACE INTO state\n"
+        "  (channel_id, name, value_current, value_signed)\n"
+        "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
+        "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
+        "             (SELECT ?) AS name,\n"
+        "             (SELECT ?) AS value_current\n"
+        "     ) AS new\n"
+        "LEFT JOIN (SELECT channel_id, name, value_signed\n"
+        "           FROM state) AS old\n"
+        "ON new.channel_id = old.channel_id AND new.name = old.name;",
+        &plugin->insert_state_current);
+
+  PREP ("DELETE FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND (value_current IS NULL OR length(value_current) = 0)\n"
+        "      AND (value_signed IS NULL OR length(value_signed) = 0);",
+        &plugin->delete_state_empty);
+
+  PREP ("UPDATE state\n"
+        "SET value_signed = value_current\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->update_state_signed);
+
+  PREP ("DELETE FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->delete_state);
+
+  PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
+        &plugin->insert_state_sync);
+
+  PREP ("INSERT INTO state\n"
+        " (channel_id, name, value_current, value_signed)\n"
+        "SELECT channel_id, name, value, value\n"
+        "FROM state_sync\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->insert_state_from_sync);
+
+  PREP ("DELETE FROM state_sync\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->delete_state_sync);
+
+  PREP ("SELECT value_current\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND name = ?;",
+        &plugin->select_state_one);
+
+  PREP ("SELECT name, value_current\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND (name = ? OR substr(name, 1, ?) = ?);",
+        &plugin->select_state_prefix);
+
+  PREP ("SELECT name, value_signed\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
+        "      AND value_signed IS NOT NULL;",
+        &plugin->select_state_signed);
+#undef PREP
 
   return GNUNET_OK;
 }
@@ -595,9 +544,6 @@ static void
 database_shutdown (struct Plugin *plugin)
 {
   GNUNET_MYSQL_context_destroy (plugin->mc);
-
-  GNUNET_free_non_null (plugin->fn);
-
 }
 
 
@@ -619,12 +565,10 @@ exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
     GNUNET_MY_query_param_end
   };
 
-  if(GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                          stmt,
-                                          params))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_channel", stmt);
+              "mysql exec_channel", stmt);
   }
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -644,25 +588,9 @@ exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
 static int
 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
     return GNUNET_SYSERR;
   }
 
@@ -677,25 +605,9 @@ transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 static int
 transaction_commit (struct Plugin *plugin)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
-                                            stmt,
-                                            params))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
     return GNUNET_SYSERR;
   }
 
@@ -710,25 +622,9 @@ transaction_commit (struct Plugin *plugin)
 static int
 transaction_rollback (struct Plugin *plugin)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
     return GNUNET_SYSERR;
   }
 
@@ -748,12 +644,10 @@ channel_key_store (struct Plugin *plugin,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -779,12 +673,10 @@ slave_key_store (struct Plugin *plugin,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
-                                            stmt,
-                                            params))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -846,12 +738,10 @@ mysql_membership_store (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -893,9 +783,7 @@ membership_test (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                              stmt,
-                              params_select))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "mysql execute prepared", stmt);
@@ -907,8 +795,7 @@ membership_test (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  switch(GNUNET_MY_extract_result (stmt,
-                                results_select))
+  switch (GNUNET_MY_extract_result (stmt, results_select))
   {
     case GNUNET_NO:
       ret = GNUNET_NO;
@@ -952,6 +839,7 @@ fragment_store (void *cls,
   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
 
   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
+
   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
   uint64_t message_id = GNUNET_ntohll (msg->message_id);
   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
@@ -978,8 +866,6 @@ fragment_store (void *cls,
     GNUNET_MY_query_param_uint64 (&hop_counter),
     GNUNET_MY_query_param_auto_from_type (&msg->signature),
     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
-    //GNUNET_MY_query_param_fixed_size (&msg->signature, sizeof (msg->signature)),
-    //GNUNET_MY_query_param_fixed_size (&msg->purpose, sizeof (msg->purpose)),
     GNUNET_MY_query_param_uint64 (&fragment_id),
     GNUNET_MY_query_param_uint64 (&fragment_offset),
     GNUNET_MY_query_param_uint64 (&message_id),
@@ -991,9 +877,7 @@ fragment_store (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                              stmt,
-                              params_insert))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql execute prepared", stmt);
@@ -1021,28 +905,31 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
-
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
 
+  int sql_ret;
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint64 (&psycstore_flags),
+    GNUNET_MY_query_param_uint32 (&psycstore_flags),
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&message_id),
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_update))
+  sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
+  switch (sql_ret)
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    case GNUNET_OK:
+      ret = GNUNET_OK;
+      break;
+
+    default:
+       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql execute prepared", stmt);
-    return GNUNET_SYSERR;
   }
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -1059,30 +946,26 @@ message_add_flags (void *cls,
 static int
 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
               GNUNET_PSYCSTORE_FragmentCallback cb,
-              void *cb_cls)
+              void *cb_cls,
+              uint64_t *returned_fragments)
 {
 
+  uint32_t hop_counter;
+  void *signature = NULL;
+  void *purpose = NULL;
+  size_t signature_size;
+  size_t purpose_size;
   uint64_t fragment_id;
   uint64_t fragment_offset;
   uint64_t message_id;
   uint64_t group_generation;
+  uint64_t flags;
   void *buf;
   size_t buf_size;
   int ret = GNUNET_SYSERR;
   int sql_ret;
-  uint64_t flags;
-  struct GNUNET_MULTICAST_MessageHeader msg;
   struct GNUNET_MULTICAST_MessageHeader *mp;
-
-
-  uint32_t hop_counter;
-  void *signature = NULL;
-  void *purpose = NULL;
-  size_t signature_size;
-  size_t purpose_size;
-  uint32_t msg_flags;
-
-
+  uint64_t msg_flags;
   struct GNUNET_MY_ResultSpec results[] = {
     GNUNET_MY_result_spec_uint32 (&hop_counter),
     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
@@ -1091,87 +974,97 @@ fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
     GNUNET_MY_result_spec_uint64 (&fragment_offset),
     GNUNET_MY_result_spec_uint64 (&message_id),
     GNUNET_MY_result_spec_uint64 (&group_generation),
-    GNUNET_MY_result_spec_uint32 (&msg_flags),
+    GNUNET_MY_result_spec_uint64 (&msg_flags),
     GNUNET_MY_result_spec_uint64 (&flags),
     GNUNET_MY_result_spec_variable_size (&buf,
                                          &buf_size),
     GNUNET_MY_result_spec_end
   };
 
-  sql_ret = GNUNET_MY_extract_result (stmt,
-                                results);
-  switch (sql_ret)
+  do
   {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
+    {
     case GNUNET_NO:
-      if (ret != GNUNET_OK)
-          ret = GNUNET_NO;
+      if (ret != GNUNET_YES)
+        ret = GNUNET_NO;
       break;
-    case GNUNET_OK:
 
-      mp = GNUNET_malloc (sizeof (msg) + buf_size);
-      *mp = msg;  
-      mp->hop_counter = hop_counter;
+    case GNUNET_YES:
+      mp = GNUNET_malloc (sizeof (*mp) + buf_size);
+
+      mp->header.size = htons (sizeof (*mp) + buf_size);
+      mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+      mp->hop_counter = htonl (hop_counter);
       GNUNET_memcpy (&mp->signature,
-                    signature,
-                    signature_size);
+                     signature,
+                     signature_size);
       GNUNET_memcpy (&mp->purpose,
-                    purpose,
-                    purpose_size);
+                     purpose,
+                     purpose_size);
       mp->fragment_id = GNUNET_htonll (fragment_id);
       mp->fragment_offset = GNUNET_htonll (fragment_offset);
       mp->message_id = GNUNET_htonll (message_id);
       mp->group_generation = GNUNET_htonll (group_generation);
-      mp->flags = msg_flags;
+      mp->flags = htonl(msg_flags);
 
       GNUNET_memcpy (&mp[1],
-                    buf,
-                    buf_size);
-      ret = cb (cb_cls,
-                mp,
-                (enum GNUNET_PSYCSTORE_MessageFlags) flags);
-      if (ret != GNUNET_YES)
-        sql_ret = GNUNET_NO;
-      GNUNET_free (mp);
+                     buf,
+                     buf_size);
+      ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
+      if (NULL != returned_fragments)
+        (*returned_fragments)++;
+      GNUNET_MY_cleanup_result (results);
       break;
+
     default:
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                    "mysql extract_result", stmt);
+      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                 "mysql extract_result", stmt);
+    }
   }
+  while (GNUNET_YES == sql_ret);
+
+  // for debugging
+  if (GNUNET_NO == ret)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
+               "Empty result set\n");
 
-//  GNUNET_MY_cleanup_result (results);
   return ret;
 }
 
 
 static int
-fragment_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
+fragment_select (struct Plugin *plugin,
+                 struct GNUNET_MYSQL_StatementHandle *stmt,
                  struct GNUNET_MY_QueryParam *params,
                  uint64_t *returned_fragments,
-                 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
+                 GNUNET_PSYCSTORE_FragmentCallback cb,
+                 void *cb_cls)
 {
   int ret = GNUNET_SYSERR;
   int sql_ret;
 
-  struct GNUNET_MULTICAST_MessageHeader *msg
-    = GNUNET_malloc (sizeof (*msg) + 0);
-
-  sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
-                          stmt,
-                          params);
-  switch(sql_ret)
+  sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
+  switch (sql_ret)
   {
+    case GNUNET_NO:
+      if (ret != GNUNET_YES)
+        ret = GNUNET_NO;
+      break;
+
     case GNUNET_YES:
-       ret = fragment_row (stmt, cb, cb_cls);
+      ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
       break;
+
     default:
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);   
+      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                 "mysql exec_prepared", stmt);
   }
-  
   return ret;
 }
 
+
 /**
  * Retrieve a message fragment range by fragment ID.
  *
@@ -1189,12 +1082,8 @@ fragment_get (void *cls,
               void *cb_cls)
 {
   struct Plugin *plugin = cls;
-
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
-
   int ret = GNUNET_SYSERR;
-  *returned_fragments = 0;
-
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&first_fragment_id),
@@ -1202,6 +1091,7 @@ fragment_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
+  *returned_fragments = 0;
   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -1274,11 +1164,11 @@ message_get (void *cls,
              void *cb_cls)
 {
   struct Plugin *plugin = cls;
-
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
+  int ret;
 
-  int ret = GNUNET_SYSERR;
-  *returned_fragments = 0;
+  if (0 == fragment_limit)
+    fragment_limit = UINT64_MAX;
 
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
@@ -1288,6 +1178,7 @@ message_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
+  *returned_fragments = 0;
   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -1360,9 +1251,8 @@ message_get_fragment (void *cls,
                       void *cb_cls)
 {
   struct Plugin *plugin = cls;
-
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
-
+  int sql_ret;
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_select[] = {
@@ -1372,17 +1262,22 @@ message_get_fragment (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
+  sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
+  switch (sql_ret)
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    case GNUNET_NO:
+      ret = GNUNET_NO;
+      break;
+
+    case GNUNET_OK:
+      ret = fragment_row (stmt, cb, cb_cls, NULL);
+      break;
+
+    default:
+      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql execute prepared", stmt);
-    return GNUNET_SYSERR;
   }
 
-  ret = fragment_row (stmt, cb, cb_cls);
-
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
@@ -1418,9 +1313,7 @@ counters_message_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql execute prepared", stmt);
@@ -1434,8 +1327,7 @@ counters_message_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
@@ -1477,9 +1369,7 @@ counters_state_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql execute prepared", stmt);
@@ -1491,8 +1381,7 @@ counters_state_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
@@ -1527,18 +1416,15 @@ state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_string (name),
-    GNUNET_MY_query_param_auto_from_type (value),
+    GNUNET_MY_query_param_fixed_size(value, value_size),
     GNUNET_MY_query_param_end
   };
 
-  ret = GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params);
-
+  ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
   if (GNUNET_OK != ret)
   {
     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", stmt);
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -1648,8 +1534,8 @@ state_modify_op (void *cls,
   switch (op)
   {
   case GNUNET_PSYC_OP_ASSIGN:
-    return state_assign (plugin, plugin->insert_state_current, channel_key,
-                         name, value, value_size);
+    return state_assign (plugin, plugin->insert_state_current,
+                         channel_key, name, value, value_size);
 
   default: /** @todo implement more state operations */
     GNUNET_break (0);
@@ -1704,8 +1590,8 @@ state_sync_assign (void *cls,
                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
-  return state_assign (cls, plugin->insert_state_sync, channel_key,
-                       name, value, value_size);
+  return state_assign (cls, plugin->insert_state_sync,
+                       channel_key, name, value, value_size);
 }
 
 
@@ -1808,26 +1694,28 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
     GNUNET_MY_result_spec_end
   };
 
-  GNUNET_MY_exec_prepared (plugin->mc,
-                          stmt,
-                          params_select);
-
-
-  sql_ret = GNUNET_MY_extract_result (stmt,
-                                      results);
-
-  switch (sql_ret)
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+  }
+  else
   {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
+    {
     case GNUNET_NO:
       ret = GNUNET_NO;
       break;
+
     case GNUNET_YES:
-      ret = cb (cb_cls, name, value_current,
-                value_size);
+      ret = cb (cb_cls, name, value_current, value_size);
       break;
+
     default:
       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql extract_result", stmt);
+                "mysql extract_result", stmt);
+    }
   }
 
   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
@@ -1876,34 +1764,37 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
     GNUNET_MY_result_spec_string (&name2),
     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
     GNUNET_MY_result_spec_end
-  };
+  };;
 
   int sql_ret;
 
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
+
   do
   {
-    GNUNET_MY_exec_prepared (plugin->mc,
-                            stmt,
-                            params_select);
-    sql_ret = GNUNET_MY_extract_result (stmt,
-                                        results);
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
       case GNUNET_NO:
-        if (ret != GNUNET_OK)
+        if (ret != GNUNET_YES)
           ret = GNUNET_NO;
         break;
+
       case GNUNET_YES:
-        ret = cb (cb_cls, (const char *) name2,
-                  value_current,
-                  value_size);
+        ret = cb (cb_cls, (const char *) name2, value_current, value_size);
 
         if (ret != GNUNET_YES)
           sql_ret = GNUNET_NO;
         break;
+
       default:
         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql extract_result", stmt);
+                  "mysql extract_result", stmt);
     }
   }
   while (sql_ret == GNUNET_YES);
@@ -1953,28 +1844,30 @@ state_get_signed (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  do
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
-    GNUNET_MY_exec_prepared (plugin->mc,
-                             stmt,
-                             params_select);
-    sql_ret = GNUNET_MY_extract_result (stmt,
-                                        results);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
 
+  do
+  {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
       case GNUNET_NO:
-        if (ret != GNUNET_OK)
+        if (ret != GNUNET_YES)
           ret = GNUNET_NO;
         break;
+
       case GNUNET_YES:
-        ret = cb (cb_cls, (const char *) name,
-                  value_signed,
-                  value_size);
+        ret = cb (cb_cls, (const char *) name, value_signed, value_size);
 
         if (ret != GNUNET_YES)
             sql_ret = GNUNET_NO;
         break;
+
       default:
          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql extract_result", stmt);