commented out wrong message type
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
index c766e1a6c8169ce3919ad4329176a2783ecee3de..c76e7e6b12d2337ce2e7dcdad624e41494309476 100644 (file)
 
 /**
  * @file psycstore/plugin_psycstore_mysql.c
- * @brief sqlite-based psycstore backend
+ * @brief mysql-based psycstore backend
  * @author Gabor X Toth
  * @author Christian Grothoff
  * @author Christophe Genevey
  */
 
-/*
- * FIXME: SQLite3 only supports signed 64-bit integers natively,
- *        thus it can only store 63 bits of the uint64_t's.
- */
-
 #include "platform.h"
 #include "gnunet_psycstore_plugin.h"
 #include "gnunet_psycstore_service.h"
@@ -38,7 +33,9 @@
 #include "gnunet_crypto_lib.h"
 #include "gnunet_psyc_util_lib.h"
 #include "psycstore.h"
-#include <sqlite3.h>
+#include "gnunet_my_lib.h"
+#include "gnunet_mysql_lib.h"
+#include <mysql/mysql.h>
 
 /**
  * After how many ms "busy" should a DB operation fail for good?  A
  * a failure of the command 'cmd' on file 'filename'
  * with the message given by strerror(errno).
  */
-#define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
+#define LOG_MYSQL(db, level, cmd, stmt)                                 \
+  do {                                                                  \
+    GNUNET_log_from (level, "psycstore-mysql",                          \
+                     _("`%s' failed at %s:%d with error: %s\n"),        \
+                     cmd, __FILE__, __LINE__,                           \
+                     mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
+  } while (0)
 
-#define LOG_MYSQL(db, level, cmd, stmt) do { GNUNET_log_from (level, "psycstore-mysql", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_stmt_error (stmt)); } while(0)
-
-#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
+#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
 
 enum Transactions {
   TRANSACTION_NONE = 0,
@@ -80,13 +81,8 @@ struct Plugin
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Database filename.
+   * MySQL context.
    */
-  char *fn;
-
-  /**
-    *Handle to talk to Mysql
-    */
   struct GNUNET_MYSQL_Context *mc;
 
   /**
@@ -94,146 +90,139 @@ struct Plugin
    */
   enum Transactions transaction;
 
-  GNUNET_MYSQL_StatementHandle *transaction_begin;
-
-  GNUNET_MYSQL_StatementHandle *transaction_commit;
-
-  GNUNET_MYSQL_StatementHandle *transaction_rollback;
-
   /**
    * Precompiled SQL for channel_key_store()
    */
-  GNUNET_MYSQL_StatementHandle *insert_channel_key;
-
+  struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
 
   /**
    * Precompiled SQL for slave_key_store()
    */
-  GNUNET_MYSQL_StatementHandle *insert_slave_key;
+  struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
 
   /**
    * Precompiled SQL for membership_store()
    */
-  GNUNET_MYSQL_StatementHandle *insert_membership;
+  struct GNUNET_MYSQL_StatementHandle *insert_membership;
 
   /**
    * Precompiled SQL for membership_test()
    */
-  GNUNET_MYSQL_StatementHandle *select_membership;
+  struct GNUNET_MYSQL_StatementHandle *select_membership;
 
   /**
    * Precompiled SQL for fragment_store()
    */
-  GNUNET_MYSQL_StatementHandle *insert_fragment;
+  struct GNUNET_MYSQL_StatementHandle *insert_fragment;
 
   /**
    * Precompiled SQL for message_add_flags()
    */
-  GNUNET_MYSQL_StatementHandle *update_message_flags;
+  struct GNUNET_MYSQL_StatementHandle *update_message_flags;
 
   /**
    * Precompiled SQL for fragment_get()
    */
-  GNUNET_MYSQL_StatementHandle *select_fragments;
+  struct GNUNET_MYSQL_StatementHandle *select_fragments;
 
   /**
    * Precompiled SQL for fragment_get()
    */
-  GNUNET_MYSQL_StatementHandle *select_latest_fragments;
+  struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
 
   /**
    * Precompiled SQL for message_get()
    */
-  GNUNET_MYSQL_StatementHandle *select_messages;
+  struct GNUNET_MYSQL_StatementHandle *select_messages;
 
   /**
    * Precompiled SQL for message_get()
    */
-  GNUNET_MYSQL_StatementHandle *select_latest_messages;
+  struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
 
   /**
    * Precompiled SQL for message_get_fragment()
    */
-  GNUNET_MYSQL_StatementHandle *select_message_fragment;
+  struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
 
   /**
    * Precompiled SQL for counters_get_message()
    */
-  GNUNET_MYSQL_StatementHandle *select_counters_message;
+  struct GNUNET_MYSQL_StatementHandle *select_counters_message;
 
   /**
    * Precompiled SQL for counters_get_state()
    */
-  GNUNET_MYSQL_StatementHandle *select_counters_state;
+  struct GNUNET_MYSQL_StatementHandle *select_counters_state;
 
   /**
    * Precompiled SQL for state_modify_end()
    */
-  GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
+  struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
 
   /**
    * Precompiled SQL for state_sync_end()
    */
-  GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
+  struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
 
   /**
    * Precompiled SQL for state_modify_op()
    */
-  GNUNET_MYSQL_StatementHandle *insert_state_current;
+  struct GNUNET_MYSQL_StatementHandle *insert_state_current;
 
   /**
    * Precompiled SQL for state_modify_end()
    */
-  GNUNET_MYSQL_StatementHandle *delete_state_empty;
+  struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
 
   /**
    * Precompiled SQL for state_set_signed()
    */
-  GNUNET_MYSQL_StatementHandle *update_state_signed;
+  struct GNUNET_MYSQL_StatementHandle *update_state_signed;
 
   /**
    * Precompiled SQL for state_sync()
    */
-  GNUNET_MYSQL_StatementHandle *insert_state_sync;
+  struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
 
   /**
    * Precompiled SQL for state_sync()
    */
-  GNUNET_MYSQL_StatementHandle *delete_state;
+  struct GNUNET_MYSQL_StatementHandle *delete_state;
 
   /**
    * Precompiled SQL for state_sync()
    */
-  GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
+  struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
 
   /**
    * Precompiled SQL for state_sync()
    */
-  GNUNET_MYSQL_StatementHandle *delete_state_sync;
+  struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
 
   /**
    * Precompiled SQL for state_get_signed()
    */
-  GNUNET_MYSQL_StatementHandle *select_state_signed;
+  struct GNUNET_MYSQL_StatementHandle *select_state_signed;
 
   /**
    * Precompiled SQL for state_get()
    */
-  GNUNET_MYSQL_StatementHandle *select_state_one;
+  struct GNUNET_MYSQL_StatementHandle *select_state_one;
 
   /**
    * Precompiled SQL for state_get_prefix()
    */
-  GNUNET_MYSQL_StatementHandle *select_state_prefix;
+  struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
 
 };
 
 #if DEBUG_PSYCSTORE
 
 static void
-sql_trace (void *cls, const char *sql)
+mysql_trace (void *cls, const char *sql)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
 }
 
 #endif
@@ -245,49 +234,29 @@ sql_trace (void *cls, const char *sql)
  * @param dbh handle to the database
  * @param sql SQL statement, UTF-8 encoded
  * @param stmt set to the prepared statement
- * @return 0 on success
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
 static int
 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
-              const char *sql, 
-              struct GNUNET_MYSQL_StatementHandle *stmt)
+              const char *sql,
+              struct GNUNET_MYSQL_StatementHandle **stmt)
 {
-  stmt = GNUNET_MYSQL_statement_prepare (mc,
+  *stmt = GNUNET_MYSQL_statement_prepare (mc,
                                           sql);
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Prepared `%s' / %p\n", sql, stmt);
-  if(NULL == stmt)
+  if (NULL == *stmt)
+  {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-   _("Error preparing SQL query: %s\n  %s\n"),
-   mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (stmt)), sql);
-
-  return 1;
-}
-
-
-/**
- * @brief Prepare a SQL statement
- *
- * @param dbh handle to the database
- * @param sql SQL statement, UTF-8 encoded
- * @return 0 on success
- */
-static int
-mysql_exec (struct GNUNET_MYSQL_Context *mc,
-            struct GNUNET_MYSQL_Statement *sh,
-            struct GNUNET_MY_QueryParam *qp)
-{
-  int result;
-
-  result = GNUNET_MY_exec_prepared (mc, sh, qp);
+         _("Error preparing SQL query: %s\n  %s\n"),
+         mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
+         sql);
+    return GNUNET_SYSERR;
+  }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Executed `%s' / %d\n", sql, result);
-  if (GNUNET_OK != result)
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-   _("Error executing SQL query: %s\n  %s\n"),
-   mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (sh)), sql);
-  return result;
+       "Prepared `%s' / %p\n",
+       sql,
+       stmt);
+  return GNUNET_OK;
 }
 
 
@@ -297,333 +266,270 @@ mysql_exec (struct GNUNET_MYSQL_Context *mc,
  * as needed as well).
  *
  * @param plugin the plugin context (state for this module)
- * @return GNUNET_OK on success
+ * @return #GNUNET_OK on success
  */
 static int
 database_setup (struct Plugin *plugin)
 {
-  char *filename;
-
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
-                                               "FILENAME", &filename))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
-                              "psycstore-sqlite", "FILENAME");
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK != GNUNET_DISK_file_test (filename))
-  {
-    if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
-    {
-      GNUNET_break (0);
-      GNUNET_free (filename);
-      return GNUNET_SYSERR;
-    }
-  }
-  /* filename should be UTF-8-encoded. If it isn't, it's a bug */
-  plugin->fn = filename;
-
   /* Open database and precompile statements */
-  plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
+  plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
+                                            "psycstore-mysql");
 
   if (NULL == plugin->mc)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-   _("Unable to initialize SQLite: %s.\n"),
-    sqlite3_errmsg (plugin->dbh));
-    return GNUNET_SYSERR; 
+         _("Unable to initialize Mysql.\n"));
+    return GNUNET_SYSERR;
   }
-  
-/*
-#if DEBUG_PSYCSTORE
-  sqlite3_trace (plugin->dbh, &sql_trace, NULL);
-#endif
-
-  sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
-  sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
-  sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
-  sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
-  sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
-#if ! DEBUG_PSYCSTORE
-  sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
-#endif
-  sql_exec (plugin->dbh, "PRAGMA page_size=4096");
 
-  sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
+#define STMT_RUN(sql) \
+  if (GNUNET_OK != \
+      GNUNET_MYSQL_statement_run (plugin->mc, \
+                                  sql)) \
+  { \
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
+                _("Failed to run SQL statement `%s'\n"), \
+                sql); \
+    return GNUNET_SYSERR; \
+  }
 
-*/
   /* Create tables */
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS channels (\n"
-                              "  id INT PRIMARY KEY,\n"
-                              "  pub_key BLOB UNIQUE,\n"
-                              "  max_state_message_id INT,\n" // last applied state message ID
-                              "  state_hash_message_id INT\n" // last message ID with a state hash
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS slaves (\n"
-                              "  id INT PRIMARY KEY,\n"
-                              "  pub_key BLOB UNIQUE\n"
-                              ");");
-
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " max_state_message_id BIGINT UNSIGNED,\n"
+            " state_hash_message_id BIGINT UNSIGNED,\n"
+            " PRIMARY KEY(id),\n"
+            " UNIQUE KEY(pub_key(32))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " PRIMARY KEY(id),\n"
+            " UNIQUE KEY(pub_key(32))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
+            "  did_join TINYINT NOT NULL,\n"
+            "  announced_at BIGINT UNSIGNED NOT NULL,\n"
+            "  effective_since BIGINT UNSIGNED NOT NULL,\n"
+            "  group_generation BIGINT UNSIGNED NOT NULL\n"
+            ");");
+
+/*** FIX because IF NOT EXISTS doesn't work ***/
   GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS membership (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
-                              "  did_join INT NOT NULL,\n"
-                              "  announced_at INT NOT NULL,\n"
-                              "  effective_since INT NOT NULL,\n"
-                              "  group_generation INT NOT NULL\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
+                              "CREATE INDEX idx_membership_channel_id_slave_id "
                               "ON membership (channel_id, slave_id);");
 
   /** @todo messages table: add method_name column */
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS messages (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  hop_counter INT NOT NULL,\n"
-                              "  signature BLOB,\n"
-                              "  purpose BLOB,\n"
-                              "  fragment_id INT NOT NULL,\n"
-                              "  fragment_offset INT NOT NULL,\n"
-                              "  message_id INT NOT NULL,\n"
-                              "  group_generation INT NOT NULL,\n"
-                              "  multicast_flags INT NOT NULL,\n"
-                              "  psycstore_flags INT NOT NULL,\n"
-                              "  data BLOB,\n"
-                              "  PRIMARY KEY (channel_id, fragment_id),\n"
-                              "  UNIQUE (channel_id, message_id, fragment_offset)\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS state (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  name TEXT NOT NULL,\n"
-                              "  value_current BLOB,\n"
-                              "  value_signed BLOB,\n"
-                              "  PRIMARY KEY (channel_id, name)\n"
-                              ");");
-
-  GNUNET_MYSQL_statement_run (plugin->mc,
-                              "CREATE TABLE IF NOT EXISTS state_sync (\n"
-                              "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-                              "  name TEXT NOT NULL,\n"
-                              "  value BLOB,\n"
-                              "  PRIMARY KEY (channel_id, name)\n"
-                              ");");
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
+            "  signature BLOB,\n"
+            "  purpose BLOB,\n"
+            "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
+            "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
+                              "  message_id BIGINT UNSIGNED NOT NULL,\n"
+            "  group_generation BIGINT UNSIGNED NOT NULL,\n"
+            "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
+            "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
+            "  data BLOB,\n"
+            "  PRIMARY KEY (channel_id, fragment_id),\n"
+            "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  name TEXT NOT NULL,\n"
+            "  value_current BLOB,\n"
+            "  value_signed BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
+            ");");
+
+  STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  name TEXT NOT NULL,\n"
+            "  value BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
+            ");");
+#undef STMT_RUN
 
   /* Prepare statements */
-  mysql_prepare (plugin->mc, 
-                "BEGIN", 
-                plugin->transaction_begin);
-
-  mysql_prepare (plugin->mc, 
-                "COMMIT", 
-                plugin->transaction_commit);
-
-  mysql_prepare (plugin->mc, 
-                "ROLLBACK;", 
-                plugin->transaction_rollback);
-
-  mysql_prepare (plugin->mc, 
-                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);", 
-                plugin->insert_channel_key);
-
-  mysql_prepare (plugin->mc, 
-                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);", 
-                plugin->insert_slave_key);
-  mysql_prepare (plugin->mc, 
-                "INSERT INTO membership\n"
-                " (channel_id, slave_id, did_join, announced_at,\n"
-                "  effective_since, group_generation)\n"
-                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
-                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
-                "        ?, ?, ?, ?);",
-                plugin->insert_membership);
-
-  mysql_prepare (plugin->mc,
-                "SELECT did_join FROM membership\n"
-               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-               "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
-               "      AND effective_since <= ? AND did_join = 1\n"
-               "ORDER BY announced_at DESC LIMIT 1;",
-               plugin->select_membership);
-
-  mysql_prepare (plugin->mc,
-                "INSERT OR IGNORE INTO messages\n"
-               " (channel_id, hop_counter, signature, purpose,\n"
-               "  fragment_id, fragment_offset, message_id,\n"
-               "  group_generation, multicast_flags, psycstore_flags, data)\n"
-               "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
-               "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
-                plugin->insert_fragment);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE messages\n"
-                "SET psycstore_flags = psycstore_flags | ?\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id = ? AND fragment_offset = 0;",
-                plugin->update_message_flags);
-
-  mysql_prepare (plugin->mc,
-                  "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                  "       fragment_offset, message_id, group_generation,\n"
-                  "       multicast_flags, psycstore_flags, data\n"
-                  "FROM messages\n"
-                  "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                  "      AND ? <= fragment_id AND fragment_id <= ?;",
-               plugin->select_fragments);
+#define PREP(stmt,handle)                                    \
+  if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
+  { \
+    GNUNET_break (0); \
+    return GNUNET_SYSERR; \
+  }
+  PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
+        &plugin->insert_channel_key);
+  PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
+        &plugin->insert_slave_key);
+  PREP ("INSERT INTO membership\n"
+        " (channel_id, slave_id, did_join, announced_at,\n"
+        "  effective_since, group_generation)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
+        "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
+        "        ?, ?, ?, ?);",
+        &plugin->insert_membership);
+  PREP ("SELECT did_join FROM membership\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
+        "      AND effective_since <= ? AND did_join = 1\n"
+        "ORDER BY announced_at DESC LIMIT 1;",
+        &plugin->select_membership);
+
+  PREP ("INSERT IGNORE INTO messages\n"
+        " (channel_id, hop_counter, signature, purpose,\n"
+        "  fragment_id, fragment_offset, message_id,\n"
+        "  group_generation, multicast_flags, psycstore_flags, data)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
+        "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
+        &plugin->insert_fragment);
+
+  PREP ("UPDATE messages\n"
+        "SET psycstore_flags = psycstore_flags | ?\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id = ? AND fragment_offset = 0;",
+        &plugin->update_message_flags);
+
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
+        &plugin->select_fragments);
 
   /** @todo select_messages: add method_prefix filter */
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "       multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND ? <= message_id AND message_id <= ?"
-                "LIMIT ?;",
-                plugin->select_messages);
-
-  mysql_prepare (plugin->mc,
-                "SELECT * FROM\n"
-                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "        fragment_offset, message_id, group_generation,\n"
-                "        multicast_flags, psycstore_flags, data\n"
-                " FROM messages\n"
-                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                " ORDER BY fragment_id DESC\n"
-                " LIMIT ?)\n"
-                "ORDER BY fragment_id;",
-                plugin->select_latest_fragments);
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND ? <= message_id AND message_id <= ?\n"
+        "LIMIT ?;",
+        &plugin->select_messages);
+
+  PREP ("SELECT * FROM\n"
+        "(SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "        fragment_offset, message_id, group_generation,\n"
+        "        multicast_flags, psycstore_flags, data\n"
+        " FROM messages\n"
+        " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        " ORDER BY fragment_id DESC\n"
+        " LIMIT ?)\n"
+        "ORDER BY fragment_id;",
+        &plugin->select_latest_fragments);
 
   /** @todo select_latest_messages: add method_prefix filter */
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "        multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id IN\n"
-                "      (SELECT message_id\n"
-                "       FROM messages\n"
-                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "       GROUP BY message_id\n"
-                "       ORDER BY message_id\n"
-                "       DESC LIMIT ?)\n"
-                "ORDER BY fragment_id;",
-               plugin->select_latest_messages);
-
-  mysql_prepare (plugin->mc,
-                "SELECT hop_counter, signature, purpose, fragment_id,\n"
-                "       fragment_offset, message_id, group_generation,\n"
-                "       multicast_flags, psycstore_flags, data\n"
-                "FROM messages\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND message_id = ? AND fragment_offset = ?;",
-                plugin->select_message_fragment);
-
-  mysql_prepare (plugin->mc,
-                "SELECT fragment_id, message_id, group_generation\n"
-               "FROM messages\n"
-               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-               "ORDER BY fragment_id DESC LIMIT 1;",
-               plugin->select_counters_message);
-
-  mysql_prepare (plugin->mc,
-                "SELECT max_state_message_id\n"
-                "FROM channels\n"
-                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
-               plugin->select_counters_state);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE channels\n"
-                "SET max_state_message_id = ?\n"
-                "WHERE pub_key = ?;",
-               plugin->update_max_state_message_id);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE channels\n"
-                "SET state_hash_message_id = ?\n"
-                "WHERE pub_key = ?;",
-                plugin->update_state_hash_message_id);
-
-  mysql_prepare (plugin->mc,
-                "INSERT OR REPLACE INTO state\n"
-                "  (channel_id, name, value_current, value_signed)\n"
-                "SELECT new.channel_id, new.name,\n"
-                "       new.value_current, old.value_signed\n"
-                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "             AS channel_id,\n"
-                "             ? AS name, ? AS value_current) AS new\n"
-                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
-                "           FROM state) AS old\n"
-                "ON new.channel_id = old.channel_id AND new.name = old.name;",
-                plugin->insert_state_current);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
-                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
-               plugin->delete_state_empty);
-
-  mysql_prepare (plugin->mc,
-                "UPDATE state\n"
-                "SET value_signed = value_current\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                plugin->update_state_signed);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                plugin->delete_state);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO state_sync (channel_id, name, value)\n"
-                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
-                plugin->insert_state_sync);
-
-  mysql_prepare (plugin->mc,
-                "INSERT INTO state\n"
-                " (channel_id, name, value_current, value_signed)\n"
-                "SELECT channel_id, name, value, value\n"
-                "FROM state_sync\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                plugin->insert_state_from_sync);
-
-  mysql_prepare (plugin->mc,
-                "DELETE FROM state_sync\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
-                plugin->delete_state_sync);
-
-  mysql_prepare (plugin->mc,
-                "SELECT value_current\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND name = ?;",
-                plugin->select_state_one);
-
-  mysql_prepare (plugin->mc,
-                "SELECT name, value_current\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-                "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
-                plugin->select_state_prefix);
-
-  mysql_prepare (plugin->mc,
-                "SELECT name, value_signed\n"
-                "FROM state\n"
-                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
-                "      AND value_signed IS NOT NULL;",
-                plugin->select_state_signed);
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "        multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id IN\n"
+        "      (SELECT message_id\n"
+        "       FROM messages\n"
+        "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "       GROUP BY message_id\n"
+        "       ORDER BY message_id\n"
+        "       DESC LIMIT ?)\n"
+        "ORDER BY fragment_id;",
+        &plugin->select_latest_messages);
+
+  PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
+        "       fragment_offset, message_id, group_generation,\n"
+        "       multicast_flags, psycstore_flags, data\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND message_id = ? AND fragment_offset = ?;",
+        &plugin->select_message_fragment);
+
+  PREP ("SELECT fragment_id, message_id, group_generation\n"
+        "FROM messages\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "ORDER BY fragment_id DESC LIMIT 1;",
+        &plugin->select_counters_message);
+
+  PREP ("SELECT max_state_message_id\n"
+        "FROM channels\n"
+        "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
+        &plugin->select_counters_state);
+
+  PREP ("UPDATE channels\n"
+        "SET max_state_message_id = ?\n"
+        "WHERE pub_key = ?;",
+        &plugin->update_max_state_message_id);
+
+  PREP ("UPDATE channels\n"
+        "SET state_hash_message_id = ?\n"
+        "WHERE pub_key = ?;",
+        &plugin->update_state_hash_message_id);
+
+  PREP ("REPLACE INTO state\n"
+        "  (channel_id, name, value_current, value_signed)\n"
+        "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
+        "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
+        "             (SELECT ?) AS name,\n"
+        "             (SELECT ?) AS value_current\n"
+        "     ) AS new\n"
+        "LEFT JOIN (SELECT channel_id, name, value_signed\n"
+        "           FROM state) AS old\n"
+        "ON new.channel_id = old.channel_id AND new.name = old.name;",
+        &plugin->insert_state_current);
+
+  PREP ("DELETE FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND (value_current IS NULL OR length(value_current) = 0)\n"
+        "      AND (value_signed IS NULL OR length(value_signed) = 0);",
+        &plugin->delete_state_empty);
+
+  PREP ("UPDATE state\n"
+        "SET value_signed = value_current\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->update_state_signed);
+
+  PREP ("DELETE FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->delete_state);
+
+  PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
+        "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
+        &plugin->insert_state_sync);
+
+  PREP ("INSERT INTO state\n"
+        " (channel_id, name, value_current, value_signed)\n"
+        "SELECT channel_id, name, value, value\n"
+        "FROM state_sync\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->insert_state_from_sync);
+
+  PREP ("DELETE FROM state_sync\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
+        &plugin->delete_state_sync);
+
+  PREP ("SELECT value_current\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND name = ?;",
+        &plugin->select_state_one);
+
+  PREP ("SELECT name, value_current\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+        "      AND (name = ? OR substr(name, 1, ?) = ?);",
+        &plugin->select_state_prefix);
+
+  PREP ("SELECT name, value_signed\n"
+        "FROM state\n"
+        "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
+        "      AND value_signed IS NOT NULL;",
+        &plugin->select_state_signed);
+#undef PREP
 
   return GNUNET_OK;
 }
@@ -637,19 +543,7 @@ database_setup (struct Plugin *plugin)
 static void
 database_shutdown (struct Plugin *plugin)
 {
-  int result;
-  sqlite3_stmt *stmt;
-  while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
-  {
-    result = sqlite3_finalize (stmt);
-    if (SQLITE_OK != result)
-      LOG (GNUNET_ERROR_TYPE_WARNING,
-           "Failed to close statement %p: %d\n", stmt, result);
-  }
-  if (SQLITE_OK != sqlite3_close (plugin->dbh))
-    LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
-
-  GNUNET_free_non_null (plugin->fn);
+  GNUNET_MYSQL_context_destroy (plugin->mc);
 }
 
 
@@ -666,34 +560,22 @@ static int
 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
 {
-  MYSQL_STMT * statement = NULL;
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if (NULL == statement)
-  {
-     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
-    return GNUNET_SYSERR;
-  }
-
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_end
   };
 
-  if(GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                          stmt,
-                                          results)
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_channel", stmt); 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_channel", stmt);
   }
 
-  if (0 != mysql_stmt_reset (statement))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return GNUNET_OK;
@@ -706,38 +588,12 @@ exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
 static int
 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 {
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
-  MYSQL_STMT * statement = NULL;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if (NULL == statement)
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
   {
-     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
     return GNUNET_SYSERR;
   }
 
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_extract_result (plugin->mc,
-                                            stmt,
-                                            results))
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
-    return GNUNET_SYSERR; 
-  }
-
-  if (0 != mysql_stmt_reset (statement))
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }
-
   plugin->transaction = transaction;
   return GNUNET_OK;
 }
@@ -749,36 +605,10 @@ transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 static int
 transaction_commit (struct Plugin *plugin)
 {
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
-  MYSQL_STMT *statement = NULL;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
-    return GNUNET_SYSERR; 
-  }
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
-                                            stmt,
-                                            results))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
-    return GNUNET_SYSERR; 
-  }
-
-  if (0 != mysql_stmt_reset (stmt))
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
+    return GNUNET_SYSERR;
   }
 
   plugin->transaction = TRANSACTION_NONE;
@@ -792,35 +622,10 @@ transaction_commit (struct Plugin *plugin)
 static int
 transaction_rollback (struct Plugin *plugin)
 {
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
-  MYSQL_STMT* statement = NULL;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
-    return GNUNET_SYSERR; 
-  }
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            results))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
-    return GNUNET_SYSERR;  
-  }
-
-  if (0 != mysql_stmt_reset (stmt))
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
+    return GNUNET_SYSERR;
   }
 
   plugin->transaction = TRANSACTION_NONE;
@@ -832,29 +637,24 @@ static int
 channel_key_store (struct Plugin *plugin,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
 {
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
-
-  MYSQL_STMT *statement = NULL;
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if(NULL == statement)
-  {
-   LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
-    return GNUNET_SYSERR;  
-  }
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
 
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            results))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
+
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -866,37 +666,25 @@ static int
 slave_key_store (struct Plugin *plugin,
                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
 {
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
-
-  MYSQL_STMT *statement = NULL;
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if(NULL == statement)
-  {
-   LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
-    return GNUNET_SYSERR;  
-  }
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
 
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (slave_key),
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
-                                            stmt,
-                                            results))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return GNUNET_OK;
@@ -912,7 +700,7 @@ slave_key_store (struct Plugin *plugin,
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-sqlite_membership_store (void *cls,
+mysql_membership_store (void *cls,
                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
                          int did_join,
@@ -921,15 +709,10 @@ sqlite_membership_store (void *cls,
                          uint64_t group_generation)
 {
   struct Plugin *plugin = cls;
-//  sqlite3_stmt *stmt = plugin->insert_membership;
-  
-  uint32_t idid_join = (uint32_t)did_join;
-  uint64_t iannounced_at = (uint64_t)announced_at;
-  uint64_t ieffective_since = (uint64_t)effective_since;
-  uint64_t igroup_generation = (uint64_t)group_generation;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
+  uint32_t idid_join = (uint32_t)did_join;
 
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
 
   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
 
@@ -949,26 +732,24 @@ sqlite_membership_store (void *cls,
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_auto_from_type (slave_key),
     GNUNET_MY_query_param_uint32 (&idid_join),
-    GNUNET_MY_query_param_uint64 (&iannounced_at),
-    GNUNET_MY_query_param_uint64 (&ieffective_since),
-    GNUNET_MY_query_param_uint64 (&igroup_generation),
+    GNUNET_MY_query_param_uint64 (&announced_at),
+    GNUNET_MY_query_param_uint64 (&effective_since),
+    GNUNET_MY_query_param_uint64 (&group_generation),
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
   return GNUNET_OK;
 }
@@ -988,20 +769,10 @@ membership_test (void *cls,
                  uint64_t message_id)
 {
   struct Plugin *plugin = cls;
-  //sqlite3_stmt *stmt = plugin->select_membership;
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
-  MYSQL_STMT *statement = NULL;
-  
-  uint32_t did_join = 0;
 
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
 
-  if(NULL == statement)
-  {
-   LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql statement invalide", statement);
-    return GNUNET_SYSERR;  
-  }
+  uint32_t did_join = 0;
 
   int ret = GNUNET_SYSERR;
 
@@ -1012,13 +783,11 @@ membership_test (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_MY_exec_prepared (plugin->mc,
-                              stmt,
-                              params_select))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql execute prepared", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                "mysql execute prepared", stmt);
+    return GNUNET_SYSERR;
   }
 
   struct GNUNET_MY_ResultSpec results_select[] = {
@@ -1026,28 +795,25 @@ membership_test (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_extract_result (stmt,
-                                results_select))
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql extract_result", statement);
-    return GNUNET_SYSERR; 
-  }
-
-  if(0 != did_join)
+  switch (GNUNET_MY_extract_result (stmt, results_select))
   {
-    ret = GNUNET_YES;
-  }
-  else
-  {
-    ret = GNUNET_NO;
+    case GNUNET_NO:
+      ret = GNUNET_NO;
+      break;
+    case GNUNET_OK:
+      ret = GNUNET_YES;
+      break;
+    default:
+      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                "mysql extract_result", stmt);
+      return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return ret;
@@ -1067,21 +833,24 @@ fragment_store (void *cls,
                 uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
-  
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
 
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
 
   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
 
   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
+
   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
   uint64_t message_id = GNUNET_ntohll (msg->message_id);
   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
 
+  uint64_t hop_counter = ntohl(msg->hop_counter);
+  uint64_t flags = ntohl(msg->flags);
+
   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
       message_id > INT64_MAX || group_generation > INT64_MAX)
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
+    LOG(GNUNET_ERROR_TYPE_ERROR,
          "Tried to store fragment with a field > INT64_MAX: "
          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
          message_id, group_generation);
@@ -1094,35 +863,34 @@ fragment_store (void *cls,
 
   struct GNUNET_MY_QueryParam params_insert[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
-    GNUNET_MY_query_param_uint64 (&msg->hop_counter),
-    GNUNET_MY_query_param_auto_from_type (msg->signature),
-    GNUNET_MY_query_param_auto_from_type (msg->purpose),
+    GNUNET_MY_query_param_uint64 (&hop_counter),
+    GNUNET_MY_query_param_auto_from_type (&msg->signature),
+    GNUNET_MY_query_param_auto_from_type (&msg->purpose),
     GNUNET_MY_query_param_uint64 (&fragment_id),
     GNUNET_MY_query_param_uint64 (&fragment_offset),
     GNUNET_MY_query_param_uint64 (&message_id),
     GNUNET_MY_query_param_uint64 (&group_generation),
-    GNUNET_MY_query_param_uint64 (&msg->flags),
-    GNUNET_MY_query_param_uint64 (&psycstore_flags),
-    GNUNET_MY_query_param_auto_from_type (msg[1]),
+    GNUNET_MY_query_param_uint64 (&flags),
+    GNUNET_MY_query_param_uint32 (&psycstore_flags),
+    GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
+                                                  - sizeof (*msg)),
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_MY_exec_prepared (plugin->mc,
-                              stmt,
-                              params_insert))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
-    return GNUNET_SYSERR;    
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql execute prepared", stmt);
+    return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
-  
+
   return GNUNET_OK;
 }
 
@@ -1137,105 +905,166 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
-  MYSQL_STMT *statement = NULL;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
+  int sql_ret;
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint64 (&psycstore_flags),
+    GNUNET_MY_query_param_uint32 (&psycstore_flags),
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&message_id),
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_update))
+  sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
+  switch (sql_ret)
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
-    return GNUNET_SYSERR;   
+    case GNUNET_OK:
+      ret = GNUNET_OK;
+      break;
+
+    default:
+       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql execute prepared", stmt);
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return ret;
 }
 
-/** Extract result from statement **/
-static int
-fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
-              void *cb_cls)
-{
-  int data_size = sqlite3_column_bytes (stmt, 9);
-  struct GNUNET_MULTICAST_MessageHeader *msg
-    = GNUNET_malloc (sizeof (*msg) + data_size);
-
-  msg->header.size = htons (sizeof (*msg) + data_size);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
-  msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
-  memcpy (&msg->signature,
-          sqlite3_column_blob (stmt, 1),
-          sqlite3_column_bytes (stmt, 1));
-  memcpy (&msg->purpose,
-          sqlite3_column_blob (stmt, 2),
-          sqlite3_column_bytes (stmt, 2));
-  msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
-  msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
-  msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
-  msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
-  msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
-  memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
-
-  return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
-}
-
 
 static int
-fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
-                 uint64_t *returned_fragments,
-                 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
+fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
+              GNUNET_PSYCSTORE_FragmentCallback cb,
+              void *cb_cls,
+              uint64_t *returned_fragments)
 {
+
+  uint32_t hop_counter;
+  void *signature = NULL;
+  void *purpose = NULL;
+  size_t signature_size;
+  size_t purpose_size;
+  uint64_t fragment_id;
+  uint64_t fragment_offset;
+  uint64_t message_id;
+  uint64_t group_generation;
+  uint64_t flags;
+  void *buf;
+  size_t buf_size;
   int ret = GNUNET_SYSERR;
   int sql_ret;
+  struct GNUNET_MULTICAST_MessageHeader *mp;
+  uint64_t msg_flags;
+  struct GNUNET_MY_ResultSpec results[] = {
+    GNUNET_MY_result_spec_uint32 (&hop_counter),
+    GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
+    GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
+    GNUNET_MY_result_spec_uint64 (&fragment_id),
+    GNUNET_MY_result_spec_uint64 (&fragment_offset),
+    GNUNET_MY_result_spec_uint64 (&message_id),
+    GNUNET_MY_result_spec_uint64 (&group_generation),
+    GNUNET_MY_result_spec_uint64 (&msg_flags),
+    GNUNET_MY_result_spec_uint64 (&flags),
+    GNUNET_MY_result_spec_variable_size (&buf,
+                                         &buf_size),
+    GNUNET_MY_result_spec_end
+  };
 
   do
   {
-    sql_ret = sqlite3_step (stmt);
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
-    case SQLITE_DONE:
-      if (ret != GNUNET_OK)
+    case GNUNET_NO:
+      if (ret != GNUNET_YES)
         ret = GNUNET_NO;
       break;
-    case SQLITE_ROW:
-      ret = fragment_row (stmt, cb, cb_cls);
-      (*returned_fragments)++;
-      if (ret != GNUNET_YES)
-        sql_ret = SQLITE_DONE;
+
+    case GNUNET_YES:
+      mp = GNUNET_malloc (sizeof (*mp) + buf_size);
+
+      mp->header.size = htons (sizeof (*mp) + buf_size);
+      mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+      mp->hop_counter = htonl (hop_counter);
+      GNUNET_memcpy (&mp->signature,
+                     signature,
+                     signature_size);
+      GNUNET_memcpy (&mp->purpose,
+                     purpose,
+                     purpose_size);
+      mp->fragment_id = GNUNET_htonll (fragment_id);
+      mp->fragment_offset = GNUNET_htonll (fragment_offset);
+      mp->message_id = GNUNET_htonll (message_id);
+      mp->group_generation = GNUNET_htonll (group_generation);
+      mp->flags = htonl(msg_flags);
+
+      GNUNET_memcpy (&mp[1],
+                     buf,
+                     buf_size);
+      ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
+      if (NULL != returned_fragments)
+        (*returned_fragments)++;
+      GNUNET_MY_cleanup_result (results);
       break;
+
     default:
-      LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                  "sqlite3_step");
+      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                 "mysql extract_result", stmt);
     }
   }
-  while (sql_ret == SQLITE_ROW);
+  while (GNUNET_YES == sql_ret);
 
+  // for debugging
+  if (GNUNET_NO == ret)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
+               "Empty result set\n");
+
+  return ret;
+}
+
+
+static int
+fragment_select (struct Plugin *plugin,
+                 struct GNUNET_MYSQL_StatementHandle *stmt,
+                 struct GNUNET_MY_QueryParam *params,
+                 uint64_t *returned_fragments,
+                 GNUNET_PSYCSTORE_FragmentCallback cb,
+                 void *cb_cls)
+{
+  int ret = GNUNET_SYSERR;
+  int sql_ret;
+
+  sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
+  switch (sql_ret)
+  {
+    case GNUNET_NO:
+      if (ret != GNUNET_YES)
+        ret = GNUNET_NO;
+      break;
+
+    case GNUNET_YES:
+      ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
+      break;
+
+    default:
+      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                 "mysql exec_prepared", stmt);
+  }
   return ret;
 }
 
+
 /**
  * Retrieve a message fragment range by fragment ID.
  *
@@ -1253,12 +1082,8 @@ fragment_get (void *cls,
               void *cb_cls)
 {
   struct Plugin *plugin = cls;
-
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
-
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
   int ret = GNUNET_SYSERR;
-  *returned_fragments = 0;
-
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&first_fragment_id),
@@ -1266,13 +1091,14 @@ fragment_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
+  *returned_fragments = 0;
+  ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return ret;
@@ -1296,7 +1122,7 @@ fragment_get_latest (void *cls,
 {
   struct Plugin *plugin = cls;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
 
   int ret = GNUNET_SYSERR;
   *returned_fragments = 0;
@@ -1307,14 +1133,14 @@ fragment_get_latest (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
+  ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1338,13 +1164,13 @@ message_get (void *cls,
              void *cb_cls)
 {
   struct Plugin *plugin = cls;
-//  sqlite3_stmt *stmt = plugin->select_messages;
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
+  int ret;
 
-  int ret = GNUNET_SYSERR;
-  *returned_fragments = 0;
+  if (0 == fragment_limit)
+    fragment_limit = UINT64_MAX;
 
-  struct GNUNET_MY_QueryParams params_select[] = {
+  struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&first_message_id),
     GNUNET_MY_query_param_uint64 (&last_message_id),
@@ -1352,14 +1178,15 @@ message_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
+  *returned_fragments = 0;
+  ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1382,7 +1209,7 @@ message_get_latest (void *cls,
 {
   struct Plugin *plugin = cls;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
 
   int ret = GNUNET_SYSERR;
   *returned_fragments = 0;
@@ -1394,14 +1221,14 @@ message_get_latest (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
+  ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1424,9 +1251,8 @@ message_get_fragment (void *cls,
                       void *cb_cls)
 {
   struct Plugin *plugin = cls;
-
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
-
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
+  int sql_ret;
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_select[] = {
@@ -1436,23 +1262,28 @@ message_get_fragment (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
+  sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
+  switch (sql_ret)
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
-    return GNUNET_SYSERR;    
-  }
+    case GNUNET_NO:
+      ret = GNUNET_NO;
+      break;
 
-  ret = fragment_row (stmt, cb, cb_cls);
+    case GNUNET_OK:
+      ret = fragment_row (stmt, cb, cb_cls, NULL);
+      break;
 
-  if (0 != mysql_stmt_reset (stmt))
+    default:
+      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql execute prepared", stmt);
+  }
+
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1472,16 +1303,8 @@ counters_message_get (void *cls,
                       uint64_t *max_group_generation)
 {
   struct Plugin *plugin = cls;
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
-  MYSQL_STMT *statement = NULL;
 
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql get statement", statement);
-    return GNUNET_SYSERR;
-  }  
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
 
   int ret = GNUNET_SYSERR;
 
@@ -1490,12 +1313,10 @@ counters_message_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql execute prepared", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -1506,22 +1327,21 @@ counters_message_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql extract_result", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql extract_result", stmt);
     return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1540,16 +1360,7 @@ counters_state_get (void *cls,
 {
   struct Plugin *plugin = cls;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
-  MYSQL_STMT *statement = NULL;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql get_stmt", statement);
-    return GNUNET_SYSERR; 
-  }
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
 
   int ret = GNUNET_SYSERR;
 
@@ -1558,12 +1369,10 @@ counters_state_get (void *cls,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql execute prepared", stmt);
     return GNUNET_SYSERR;
   }
 
@@ -1572,22 +1381,21 @@ counters_state_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql extract_result", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql extract_result", stmt);
     return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1599,69 +1407,46 @@ counters_state_get (void *cls,
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_assign (struct Plugin *plugin, GNUNET_MYSQL_StatementHandle *stmt,
+state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
               const char *name, const void *value, size_t value_size)
 {
   int ret = GNUNET_SYSERR;
 
-  MYSQL_STMT *statement = NULL;
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql get_stmt", statement);
-    return GNUNET_SYSERR;
-  }
-
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_string (name),
-    GNUNET_MY_query_param_auto_from_type (value_size),
+    GNUNET_MY_query_param_fixed_size(value, value_size),
     GNUNET_MY_query_param_end
   };
 
-  ret = GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params);
-
+  ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
   if (GNUNET_OK != ret)
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }    
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
 
 
 static int
-update_message_id (struct Plugin *plugin, GNUNET_MYSQL_StatementHandle *stmt,
+update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id)
 {
-  MYSQL_STMT *statement = NULL;
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql get_stmt", statement);
-    return GNUNET_SYSERR;
-  }
-
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_uint64 (&message_id),
-    GNUNET_MY_query_param_auto_from_type (channel_id),
+    GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_end
   };
 
@@ -1669,18 +1454,18 @@ update_message_id (struct Plugin *plugin, GNUNET_MYSQL_StatementHandle *stmt,
                                             stmt,
                                             params))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql execute prepared", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql execute prepared", stmt);
     return GNUNET_SYSERR;
   }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }   
-  
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
+
   return GNUNET_OK;
 }
 
@@ -1749,8 +1534,8 @@ state_modify_op (void *cls,
   switch (op)
   {
   case GNUNET_PSYC_OP_ASSIGN:
-    return state_assign (plugin, plugin->insert_state_current, channel_key,
-                         name, value, value_size);
+    return state_assign (plugin, plugin->insert_state_current,
+                         channel_key, name, value, value_size);
 
   default: /** @todo implement more state operations */
     GNUNET_break (0);
@@ -1805,8 +1590,8 @@ state_sync_assign (void *cls,
                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
-  return state_assign (cls, plugin->insert_state_sync, channel_key,
-                       name, value, value_size);
+  return state_assign (cls, plugin->insert_state_sync,
+                       channel_key, name, value, value_size);
 }
 
 
@@ -1891,8 +1676,9 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
 {
   struct Plugin *plugin = cls;
   int ret = GNUNET_SYSERR;
+  int sql_ret ;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
 
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
@@ -1900,22 +1686,44 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
     GNUNET_MY_query_param_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                                            stmt,
-                                            params_select))
-  {
+  void *value_current = NULL;
+  size_t value_size = 0;
+
+  struct GNUNET_MY_ResultSpec results[] = {
+    GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
+    GNUNET_MY_result_spec_end
+  };
 
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
   }
+  else
+  {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
+    {
+    case GNUNET_NO:
+      ret = GNUNET_NO;
+      break;
+
+    case GNUNET_YES:
+      ret = cb (cb_cls, name, value_current, value_size);
+      break;
 
-  ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
-                sqlite3_column_bytes (stmt, 0));
+    default:
+      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                "mysql extract_result", stmt);
+    }
+  }
 
-  if (0 != mysql_stmt_reset (stmt))
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
-  }   
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
+  }
 
   return ret;
 }
@@ -1936,16 +1744,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
   struct Plugin *plugin = cls;
   int ret = GNUNET_SYSERR;
 
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
-  MYSQL_STMT *statement = NULL;
-  statement = GNUNET_MYSQL_statement_get_stmt (stmt);
-
-  if (NULL == statement)
-  {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql get_stmt", statement);
-    return GNUNET_SYSERR;
-  }
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
 
   uint32_t name_len = (uint32_t) strlen (name);
 
@@ -1957,28 +1756,54 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
     GNUNET_MY_query_param_end
   };
 
-  int sql_ret;
+  char *name2 = "";
+  void *value_current = NULL;
+  size_t value_size = 0;
 
-  sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
-                                    stmt,
-                                    params_select);
+  struct GNUNET_MY_ResultSpec results[] = {
+    GNUNET_MY_result_spec_string (&name2),
+    GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
+    GNUNET_MY_result_spec_end
+  };;
+
+  int sql_ret;
 
-  if (GNUNET_OK != sql_ret)
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql exec_prepared", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
-  ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
-                  sqlite3_column_blob (stmt, 1),
-                  sqlite3_column_bytes (stmt, 1));
+  do
+  {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
+    {
+      case GNUNET_NO:
+        if (ret != GNUNET_YES)
+          ret = GNUNET_NO;
+        break;
 
-  if (0 != mysql_stmt_reset (stmt))
+      case GNUNET_YES:
+        ret = cb (cb_cls, (const char *) name2, value_current, value_size);
+
+        if (ret != GNUNET_YES)
+          sql_ret = GNUNET_NO;
+        break;
+
+      default:
+        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                  "mysql extract_result", stmt);
+    }
+  }
+  while (sql_ret == GNUNET_YES);
+
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return ret;
@@ -2000,8 +1825,7 @@ state_get_signed (void *cls,
   struct Plugin *plugin = cls;
   int ret = GNUNET_SYSERR;
 
-  //sqlite3_stmt *stmt = plugin->select_state_signed;
-  GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
+  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
 
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
@@ -2010,26 +1834,52 @@ state_get_signed (void *cls,
 
   int sql_ret;
 
-  sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
-                                      stmt,
-                                      params_select);
+  char *name = "";
+  void *value_signed = NULL;
+  size_t value_size = 0;
 
-  if (GNUNET_OK != sql_ret)
+  struct GNUNET_MY_ResultSpec results[] = {
+    GNUNET_MY_result_spec_string (&name),
+    GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
+    GNUNET_MY_result_spec_end
+  };
+
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_exec_prepared", statement);
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
     return GNUNET_SYSERR;
   }
 
-  ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
-                  sqlite3_column_blob (stmt, 1),
-                  sqlite3_column_bytes (stmt, 1));
+  do
+  {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
+    {
+      case GNUNET_NO:
+        if (ret != GNUNET_YES)
+          ret = GNUNET_NO;
+        break;
+
+      case GNUNET_YES:
+        ret = cb (cb_cls, (const char *) name, value_signed, value_size);
+
+        if (ret != GNUNET_YES)
+            sql_ret = GNUNET_NO;
+        break;
 
-  if (0 != mysql_stmt_reset (stmt))
+      default:
+         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql extract_result", stmt);
+    }
+  }
+  while (sql_ret == GNUNET_YES);
+
+  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
   {
-    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", statement);
-    return GNUNET_SYSERR; 
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql_stmt_reset", stmt);
+    return GNUNET_SYSERR;
   }
 
   return ret;
@@ -2043,7 +1893,7 @@ state_get_signed (void *cls,
  * @return NULL on error, otherwise the plugin context
  */
 void *
-libgnunet_plugin_psycstore_sqlite_init (void *cls)
+libgnunet_plugin_psycstore_mysql_init (void *cls)
 {
   static struct Plugin plugin;
   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
@@ -2060,7 +1910,7 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
   }
   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
   api->cls = &plugin;
-  api->membership_store = &sqlite_membership_store;
+  api->membership_store = &mysql_membership_store;
   api->membership_test = &membership_test;
   api->fragment_store = &fragment_store;
   api->message_add_flags = &message_add_flags;
@@ -2083,7 +1933,7 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
   api->state_get_prefix = &state_get_prefix;
   api->state_get_signed = &state_get_signed;
 
-  LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
+  LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
   return api;
 }
 
@@ -2095,7 +1945,7 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
  * @return Always NULL
  */
 void *
-libgnunet_plugin_psycstore_sqlite_done (void *cls)
+libgnunet_plugin_psycstore_mysql_done (void *cls)
 {
   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
   struct Plugin *plugin = api->cls;
@@ -2103,7 +1953,7 @@ libgnunet_plugin_psycstore_sqlite_done (void *cls)
   database_shutdown (plugin);
   plugin->cfg = NULL;
   GNUNET_free (api);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
   return NULL;
 }