psycstore/mysql: check return values
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_mysql.c
23  * @brief mysql-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  */
28
29 #include "platform.h"
30 #include "gnunet_psycstore_plugin.h"
31 #include "gnunet_psycstore_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_crypto_lib.h"
34 #include "gnunet_psyc_util_lib.h"
35 #include "psycstore.h"
36 #include "gnunet_my_lib.h"
37 #include "gnunet_mysql_lib.h"
38 #include <mysql/mysql.h>
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 /**
55  * Log an error message at log-level 'level' that indicates
56  * a failure of the command 'cmd' on file 'filename'
57  * with the message given by strerror(errno).
58  */
59 #define LOG_MYSQL(db, level, cmd, stmt)                                 \
60   do {                                                                  \
61     GNUNET_log_from (level, "psycstore-mysql",                          \
62                      _("`%s' failed at %s:%d with error: %s\n"),        \
63                      cmd, __FILE__, __LINE__,                           \
64                      mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
65   } while (0)
66
67 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
68
69 enum Transactions {
70   TRANSACTION_NONE = 0,
71   TRANSACTION_STATE_MODIFY,
72   TRANSACTION_STATE_SYNC,
73 };
74
75 /**
76  * Context for all functions in this plugin.
77  */
78 struct Plugin
79 {
80
81   const struct GNUNET_CONFIGURATION_Handle *cfg;
82
83   /**
84    * Database filename.
85    */
86   char *fn;
87
88   /**
89     *Handle to talk to Mysql
90     */
91   struct GNUNET_MYSQL_Context *mc;
92
93   /**
94    * Current transaction.
95    */
96   enum Transactions transaction;
97
98   struct GNUNET_MYSQL_StatementHandle *transaction_begin;
99
100   struct GNUNET_MYSQL_StatementHandle *transaction_commit;
101
102   struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
103
104   /**
105    * Precompiled SQL for channel_key_store()
106    */
107   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
108
109
110   /**
111    * Precompiled SQL for slave_key_store()
112    */
113   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
114
115   /**
116    * Precompiled SQL for membership_store()
117    */
118   struct GNUNET_MYSQL_StatementHandle *insert_membership;
119
120   /**
121    * Precompiled SQL for membership_test()
122    */
123   struct GNUNET_MYSQL_StatementHandle *select_membership;
124
125   /**
126    * Precompiled SQL for fragment_store()
127    */
128   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
129
130   /**
131    * Precompiled SQL for message_add_flags()
132    */
133   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
134
135   /**
136    * Precompiled SQL for fragment_get()
137    */
138   struct GNUNET_MYSQL_StatementHandle *select_fragments;
139
140   /**
141    * Precompiled SQL for fragment_get()
142    */
143   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
144
145   /**
146    * Precompiled SQL for message_get()
147    */
148   struct GNUNET_MYSQL_StatementHandle *select_messages;
149
150   /**
151    * Precompiled SQL for message_get()
152    */
153   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
154
155   /**
156    * Precompiled SQL for message_get_fragment()
157    */
158   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
159
160   /**
161    * Precompiled SQL for counters_get_message()
162    */
163   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
164
165   /**
166    * Precompiled SQL for counters_get_state()
167    */
168   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
169
170   /**
171    * Precompiled SQL for state_modify_end()
172    */
173   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
174
175   /**
176    * Precompiled SQL for state_sync_end()
177    */
178   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
179
180   /**
181    * Precompiled SQL for state_modify_op()
182    */
183   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
184
185   /**
186    * Precompiled SQL for state_modify_end()
187    */
188   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
189
190   /**
191    * Precompiled SQL for state_set_signed()
192    */
193   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
194
195   /**
196    * Precompiled SQL for state_sync()
197    */
198   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
199
200   /**
201    * Precompiled SQL for state_sync()
202    */
203   struct GNUNET_MYSQL_StatementHandle *delete_state;
204
205   /**
206    * Precompiled SQL for state_sync()
207    */
208   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
209
210   /**
211    * Precompiled SQL for state_sync()
212    */
213   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
214
215   /**
216    * Precompiled SQL for state_get_signed()
217    */
218   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
219
220   /**
221    * Precompiled SQL for state_get()
222    */
223   struct GNUNET_MYSQL_StatementHandle *select_state_one;
224
225   /**
226    * Precompiled SQL for state_get_prefix()
227    */
228   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
229
230 };
231
232 #if DEBUG_PSYCSTORE
233
234 static void
235 mysql_trace (void *cls, const char *sql)
236 {
237   LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
238 }
239
240 #endif
241
242
243 /**
244  * @brief Prepare a SQL statement
245  *
246  * @param dbh handle to the database
247  * @param sql SQL statement, UTF-8 encoded
248  * @param stmt set to the prepared statement
249  * @return 0 on success
250  */
251 static int
252 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
253               const char *sql,
254               struct GNUNET_MYSQL_StatementHandle **stmt)
255 {
256   *stmt = GNUNET_MYSQL_statement_prepare (mc,
257                                           sql);
258
259   LOG(GNUNET_ERROR_TYPE_DEBUG,
260        "Prepared `%s' / %p\n", sql, stmt);
261   if(NULL == *stmt)
262     LOG(GNUNET_ERROR_TYPE_ERROR,
263    _("Error preparing SQL query: %s\n  %s\n"),
264    mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
265
266   return 0;
267 }
268
269
270 /**
271  * Initialize the database connections and associated
272  * data structures (create tables and indices
273  * as needed as well).
274  *
275  * @param plugin the plugin context (state for this module)
276  * @return GNUNET_OK on success
277  */
278 static int
279 database_setup (struct Plugin *plugin)
280 {
281   char *filename;
282
283   if (GNUNET_OK !=
284       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
285                                                "FILENAME", &filename))
286   {
287     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
288                                "psycstore-mysql", "FILENAME");
289     return GNUNET_SYSERR;
290   }
291
292   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
293   {
294     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
295     {
296       GNUNET_break (0);
297       GNUNET_free (filename);
298       return GNUNET_SYSERR;
299     }
300   }
301   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
302   plugin->fn = filename;
303
304   /* Open database and precompile statements */
305   plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
306
307   if (NULL == plugin->mc)
308   {
309     LOG(GNUNET_ERROR_TYPE_ERROR,
310    _("Unable to initialize Mysql.\n"));
311     return GNUNET_SYSERR;
312   }
313
314   /* Create tables */
315
316   GNUNET_MYSQL_statement_run (plugin->mc,
317                               "CREATE TABLE IF NOT EXISTS channels (\n"
318                               " id INT AUTO_INCREMENT,\n"
319                               " pub_key BLOB,\n"
320                               " max_state_message_id INT,\n"
321                               " state_hash_message_id INT,\n"
322                               " PRIMARY KEY(id),\n"
323                               " UNIQUE KEY(pub_key(5))\n"
324                               ");");
325
326   GNUNET_MYSQL_statement_run (plugin->mc,
327                               "CREATE TABLE IF NOT EXISTS slaves (\n"
328                               " id INT AUTO_INCREMENT,\n"
329                               " pub_key BLOB,\n"
330                               " PRIMARY KEY(id),\n"
331                               " UNIQUE KEY(pub_key(5))\n"
332                               ");");
333
334   GNUNET_MYSQL_statement_run (plugin->mc,
335                               "CREATE TABLE IF NOT EXISTS membership (\n"
336                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
337                               "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
338                               "  did_join INT NOT NULL,\n"
339                               "  announced_at BIGINT UNSIGNED NOT NULL,\n"
340                               "  effective_since BIGINT UNSIGNED NOT NULL,\n"
341                               "  group_generation BIGINT UNSIGNED NOT NULL\n"
342                               ");");
343
344 /*** FIX because IF NOT EXISTS doesn't work ***/
345   GNUNET_MYSQL_statement_run (plugin->mc,
346                               "CREATE INDEX idx_membership_channel_id_slave_id "
347                               "ON membership (channel_id, slave_id);");
348
349   /** @todo messages table: add method_name column */
350   GNUNET_MYSQL_statement_run (plugin->mc,
351                               "CREATE TABLE IF NOT EXISTS messages (\n"
352                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
353                               "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
354                               "  signature BLOB,\n"
355                               "  purpose BLOB,\n"
356                               "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
357                               "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
358                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
359                               "  group_generation BIGINT UNSIGNED NOT NULL,\n"
360                               "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
361                               "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
362                               "  data BLOB,\n"
363                               "  PRIMARY KEY (channel_id, fragment_id),\n"
364                               "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
365                               ");");
366
367   GNUNET_MYSQL_statement_run (plugin->mc,
368                               "CREATE TABLE IF NOT EXISTS state (\n"
369                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
370                               "  name TEXT NOT NULL,\n"
371                               "  value_current BLOB,\n"
372                               "  value_signed BLOB,\n"
373                               "  PRIMARY KEY (channel_id, name(5))\n"
374                               ");");
375
376   GNUNET_MYSQL_statement_run (plugin->mc,
377                               "CREATE TABLE IF NOT EXISTS state_sync (\n"
378                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
379                               "  name TEXT NOT NULL,\n"
380                               "  value BLOB,\n"
381                               "  PRIMARY KEY (channel_id, name(5))\n"
382                               ");");
383
384   /* Prepare statements */
385   mysql_prepare (plugin->mc,
386                 "BEGIN",
387                 &plugin->transaction_begin);
388
389   mysql_prepare (plugin->mc,
390                 "COMMIT",
391                 &plugin->transaction_commit);
392
393   mysql_prepare (plugin->mc,
394                 "ROLLBACK;",
395                 &plugin->transaction_rollback);
396
397   mysql_prepare (plugin->mc,
398                 "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
399                 &plugin->insert_channel_key);
400
401   mysql_prepare (plugin->mc,
402                 "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
403                 &plugin->insert_slave_key);
404
405   mysql_prepare (plugin->mc,
406                 "INSERT INTO membership\n"
407                 " (channel_id, slave_id, did_join, announced_at,\n"
408                 "  effective_since, group_generation)\n"
409                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
410                 "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
411                 "        ?, ?, ?, ?);",
412                 &plugin->insert_membership);
413
414   mysql_prepare (plugin->mc,
415                 "SELECT did_join FROM membership\n"
416                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
417                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
418                "      AND effective_since <= ? AND did_join = 1\n"
419                "ORDER BY announced_at DESC LIMIT 1;",
420                &plugin->select_membership);
421
422   mysql_prepare (plugin->mc,
423                 "INSERT IGNORE INTO messages\n"
424                " (channel_id, hop_counter, signature, purpose,\n"
425                "  fragment_id, fragment_offset, message_id,\n"
426                "  group_generation, multicast_flags, psycstore_flags, data)\n"
427                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
428                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
429                 &plugin->insert_fragment);
430
431   mysql_prepare (plugin->mc,
432                 "UPDATE messages\n"
433                 "SET psycstore_flags = psycstore_flags | ?\n"
434                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
435                 "      AND message_id = ? AND fragment_offset = 0;",
436                 &plugin->update_message_flags);
437
438   mysql_prepare (plugin->mc,
439                   "SELECT hop_counter, signature, purpose, fragment_id,\n"
440                   "       fragment_offset, message_id, group_generation,\n"
441                   "       multicast_flags, psycstore_flags, data\n"
442                   "FROM messages\n"
443                   "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
444                   "      AND ? <= fragment_id AND fragment_id <= ?;",
445                 &plugin->select_fragments);
446
447   /** @todo select_messages: add method_prefix filter */
448   mysql_prepare (plugin->mc,
449                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
450                 "       fragment_offset, message_id, group_generation,\n"
451                 "       multicast_flags, psycstore_flags, data\n"
452                 "FROM messages\n"
453                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
454                 "      AND ? <= message_id AND message_id <= ?"
455                 "LIMIT ?;",
456                 &plugin->select_messages);
457
458   mysql_prepare (plugin->mc,
459                 "SELECT * FROM\n"
460                 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
461                 "        fragment_offset, message_id, group_generation,\n"
462                 "        multicast_flags, psycstore_flags, data\n"
463                 " FROM messages\n"
464                 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
465                 " ORDER BY fragment_id DESC\n"
466                 " LIMIT ?)\n"
467                 "ORDER BY fragment_id;",
468                 &plugin->select_latest_fragments);
469
470   /** @todo select_latest_messages: add method_prefix filter */
471   mysql_prepare (plugin->mc,
472                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
473                 "       fragment_offset, message_id, group_generation,\n"
474                 "        multicast_flags, psycstore_flags, data\n"
475                 "FROM messages\n"
476                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477                 "      AND message_id IN\n"
478                 "      (SELECT message_id\n"
479                 "       FROM messages\n"
480                 "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
481                 "       GROUP BY message_id\n"
482                 "       ORDER BY message_id\n"
483                 "       DESC LIMIT ?)\n"
484                 "ORDER BY fragment_id;",
485                 &plugin->select_latest_messages);
486
487   mysql_prepare (plugin->mc,
488                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
489                 "       fragment_offset, message_id, group_generation,\n"
490                 "       multicast_flags, psycstore_flags, data\n"
491                 "FROM messages\n"
492                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
493                 "      AND message_id = ? AND fragment_offset = ?;",
494                 &plugin->select_message_fragment);
495
496   mysql_prepare (plugin->mc,
497                 "SELECT fragment_id, message_id, group_generation\n"
498                 "FROM messages\n"
499                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
500                 "ORDER BY fragment_id DESC LIMIT 1;",
501                 &plugin->select_counters_message);
502
503   mysql_prepare (plugin->mc,
504                 "SELECT max_state_message_id\n"
505                 "FROM channels\n"
506                 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
507                 &plugin->select_counters_state);
508
509   mysql_prepare (plugin->mc,
510                 "UPDATE channels\n"
511                 "SET max_state_message_id = ?\n"
512                 "WHERE pub_key = ?;",
513                 &plugin->update_max_state_message_id);
514
515   mysql_prepare (plugin->mc,
516                 "UPDATE channels\n"
517                 "SET state_hash_message_id = ?\n"
518                 "WHERE pub_key = ?;",
519                 &plugin->update_state_hash_message_id);
520
521   mysql_prepare (plugin->mc,
522                 "REPLACE INTO state\n"
523                 "  (channel_id, name, value_current, value_signed)\n"
524                 "SELECT new.channel_id, new.name,\n"
525                 "       new.value_current, old.value_signed\n"
526                 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
527                 "             AS channel_id,\n"
528                 "             ? AS name, ? AS value_current) AS new\n"
529                 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
530                 "           FROM state) AS old\n"
531                 "ON new.channel_id = old.channel_id AND new.name = old.name;",
532                 &plugin->insert_state_current);
533
534   mysql_prepare (plugin->mc,
535                 "DELETE FROM state\n"
536                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
537                 "      AND (value_current IS NULL OR length(value_current) = 0)\n"
538                 "      AND (value_signed IS NULL OR length(value_signed) = 0);",
539                 &plugin->delete_state_empty);
540
541   mysql_prepare (plugin->mc,
542                 "UPDATE state\n"
543                 "SET value_signed = value_current\n"
544                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
545                 &plugin->update_state_signed);
546
547   mysql_prepare (plugin->mc,
548                 "DELETE FROM state\n"
549                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
550                 &plugin->delete_state);
551
552   mysql_prepare (plugin->mc,
553                 "INSERT INTO state_sync (channel_id, name, value)\n"
554                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
555                 &plugin->insert_state_sync);
556
557   mysql_prepare (plugin->mc,
558                 "INSERT INTO state\n"
559                 " (channel_id, name, value_current, value_signed)\n"
560                 "SELECT channel_id, name, value, value\n"
561                 "FROM state_sync\n"
562                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
563                 &plugin->insert_state_from_sync);
564
565   mysql_prepare (plugin->mc,
566                 "DELETE FROM state_sync\n"
567                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
568                 &plugin->delete_state_sync);
569
570   mysql_prepare (plugin->mc,
571                 "SELECT value_current\n"
572                 "FROM state\n"
573                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
574                 "      AND name = ?;",
575                 &plugin->select_state_one);
576
577   mysql_prepare (plugin->mc,
578                 "SELECT name, value_current\n"
579                 "FROM state\n"
580                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
581                 "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
582                 &plugin->select_state_prefix);
583
584   mysql_prepare (plugin->mc,
585                 "SELECT name, value_signed\n"
586                 "FROM state\n"
587                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
588                 "      AND value_signed IS NOT NULL;",
589                 &plugin->select_state_signed);
590
591   return GNUNET_OK;
592 }
593
594
595 /**
596  * Shutdown database connection and associate data
597  * structures.
598  * @param plugin the plugin context (state for this module)
599  */
600 static void
601 database_shutdown (struct Plugin *plugin)
602 {
603   GNUNET_MYSQL_context_destroy (plugin->mc);
604
605   GNUNET_free_non_null (plugin->fn);
606
607 }
608
609
610 /**
611  * Execute a prepared statement with a @a channel_key argument.
612  *
613  * @param plugin Plugin handle.
614  * @param stmt Statement to execute.
615  * @param channel_key Public key of the channel.
616  *
617  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
618  */
619 static int
620 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
621               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
622 {
623   struct GNUNET_MY_QueryParam params[] = {
624     GNUNET_MY_query_param_auto_from_type (channel_key),
625     GNUNET_MY_query_param_end
626   };
627
628   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
629   {
630     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
631               "mysql exec_channel", stmt);
632   }
633
634   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
635   {
636     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
637               "mysql_stmt_reset", stmt);
638     return GNUNET_SYSERR;
639   }
640
641   return GNUNET_OK;
642 }
643
644
645 /**
646  * Begin a transaction.
647  */
648 static int
649 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
650 {
651   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
652
653   struct GNUNET_MY_QueryParam params[] = {
654     GNUNET_MY_query_param_end
655   };
656
657   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
658   {
659     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
660                 "mysql exexc_prepared", stmt);
661     return GNUNET_SYSERR;
662   }
663
664   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
665   {
666     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
667               "mysql_stmt_reset", stmt);
668     return GNUNET_SYSERR;
669   }
670
671   plugin->transaction = transaction;
672   return GNUNET_OK;
673 }
674
675
676 /**
677  * Commit current transaction.
678  */
679 static int
680 transaction_commit (struct Plugin *plugin)
681 {
682   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
683
684   struct GNUNET_MY_QueryParam params[] = {
685     GNUNET_MY_query_param_end
686   };
687
688   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
689   {
690     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
691                 "mysql exec_prepared", stmt);
692     return GNUNET_SYSERR;
693   }
694
695   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
696   {
697     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
698               "mysql_stmt_reset", stmt);
699     return GNUNET_SYSERR;
700   }
701
702   plugin->transaction = TRANSACTION_NONE;
703   return GNUNET_OK;
704 }
705
706
707 /**
708  * Roll back current transaction.
709  */
710 static int
711 transaction_rollback (struct Plugin *plugin)
712 {
713   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
714
715   struct GNUNET_MY_QueryParam params[] = {
716     GNUNET_MY_query_param_end
717   };
718
719   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
720   {
721     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
722               "mysql exec_prepared", stmt);
723     return GNUNET_SYSERR;
724   }
725
726   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
727   {
728     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
729               "mysql_stmt_reset", stmt);
730     return GNUNET_SYSERR;
731   }
732
733   plugin->transaction = TRANSACTION_NONE;
734   return GNUNET_OK;
735 }
736
737
738 static int
739 channel_key_store (struct Plugin *plugin,
740                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
741 {
742   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
743
744   struct GNUNET_MY_QueryParam params[] = {
745     GNUNET_MY_query_param_auto_from_type (channel_key),
746     GNUNET_MY_query_param_end
747   };
748
749   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
750   {
751     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
752               "mysql exec_prepared", stmt);
753     return GNUNET_SYSERR;
754   }
755
756   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
757   {
758     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
759               "mysql_stmt_reset", stmt);
760     return GNUNET_SYSERR;
761   }
762
763   return GNUNET_OK;
764 }
765
766
767 static int
768 slave_key_store (struct Plugin *plugin,
769                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
770 {
771   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
772
773   struct GNUNET_MY_QueryParam params[] = {
774     GNUNET_MY_query_param_auto_from_type (slave_key),
775     GNUNET_MY_query_param_end
776   };
777
778   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
779   {
780     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
781               "mysql exec_prepared", stmt);
782     return GNUNET_SYSERR;
783   }
784
785   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
786   {
787     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
788               "mysql_stmt_reset", stmt);
789     return GNUNET_SYSERR;
790   }
791
792   return GNUNET_OK;
793 }
794
795
796 /**
797  * Store join/leave events for a PSYC channel in order to be able to answer
798  * membership test queries later.
799  *
800  * @see GNUNET_PSYCSTORE_membership_store()
801  *
802  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
803  */
804 static int
805 mysql_membership_store (void *cls,
806                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
807                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
808                          int did_join,
809                          uint64_t announced_at,
810                          uint64_t effective_since,
811                          uint64_t group_generation)
812 {
813   struct Plugin *plugin = cls;
814
815   uint32_t idid_join = (uint32_t)did_join;
816
817   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
818
819   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
820
821   if (announced_at > INT64_MAX ||
822       effective_since > INT64_MAX ||
823       group_generation > INT64_MAX)
824   {
825     GNUNET_break (0);
826     return GNUNET_SYSERR;
827   }
828
829   if (GNUNET_OK != channel_key_store (plugin, channel_key)
830       || GNUNET_OK != slave_key_store (plugin, slave_key))
831     return GNUNET_SYSERR;
832
833   struct GNUNET_MY_QueryParam params[] = {
834     GNUNET_MY_query_param_auto_from_type (channel_key),
835     GNUNET_MY_query_param_auto_from_type (slave_key),
836     GNUNET_MY_query_param_uint32 (&idid_join),
837     GNUNET_MY_query_param_uint64 (&announced_at),
838     GNUNET_MY_query_param_uint64 (&effective_since),
839     GNUNET_MY_query_param_uint64 (&group_generation),
840     GNUNET_MY_query_param_end
841   };
842
843   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
844   {
845     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
846               "mysql exec_prepared", stmt);
847     return GNUNET_SYSERR;
848   }
849
850   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
851   {
852     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
853               "mysql_stmt_reset", stmt);
854     return GNUNET_SYSERR;
855   }
856   return GNUNET_OK;
857 }
858
859 /**
860  * Test if a member was admitted to the channel at the given message ID.
861  *
862  * @see GNUNET_PSYCSTORE_membership_test()
863  *
864  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
865  *         #GNUNET_SYSERR if there was en error.
866  */
867 static int
868 membership_test (void *cls,
869                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
870                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
871                  uint64_t message_id)
872 {
873   struct Plugin *plugin = cls;
874
875   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
876
877   uint32_t did_join = 0;
878
879   int ret = GNUNET_SYSERR;
880
881   struct GNUNET_MY_QueryParam params_select[] = {
882     GNUNET_MY_query_param_auto_from_type (channel_key),
883     GNUNET_MY_query_param_auto_from_type (slave_key),
884     GNUNET_MY_query_param_uint64 (&message_id),
885     GNUNET_MY_query_param_end
886   };
887
888   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
889   {
890     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
891                 "mysql execute prepared", stmt);
892     return GNUNET_SYSERR;
893   }
894
895   struct GNUNET_MY_ResultSpec results_select[] = {
896     GNUNET_MY_result_spec_uint32 (&did_join),
897     GNUNET_MY_result_spec_end
898   };
899
900   switch(GNUNET_MY_extract_result (stmt,
901                                 results_select))
902   {
903     case GNUNET_NO:
904       ret = GNUNET_NO;
905       break;
906     case GNUNET_OK:
907       ret = GNUNET_YES;
908       break;
909     default:
910       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
911                 "mysql extract_result", stmt);
912       return GNUNET_SYSERR;
913   }
914
915   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
916   {
917     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
918               "mysql_stmt_reset", stmt);
919     return GNUNET_SYSERR;
920   }
921
922   return ret;
923 }
924
925 /**
926  * Store a message fragment sent to a channel.
927  *
928  * @see GNUNET_PSYCSTORE_fragment_store()
929  *
930  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
931  */
932 static int
933 fragment_store (void *cls,
934                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
935                 const struct GNUNET_MULTICAST_MessageHeader *msg,
936                 uint32_t psycstore_flags)
937 {
938   struct Plugin *plugin = cls;
939
940   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
941
942   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
943
944   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
945
946   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
947   uint64_t message_id = GNUNET_ntohll (msg->message_id);
948   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
949
950   uint64_t hop_counter = ntohl(msg->hop_counter);
951   uint64_t flags = ntohl(msg->flags);
952
953   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
954       message_id > INT64_MAX || group_generation > INT64_MAX)
955   {
956     LOG(GNUNET_ERROR_TYPE_ERROR,
957          "Tried to store fragment with a field > INT64_MAX: "
958          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
959          message_id, group_generation);
960     GNUNET_break (0);
961     return GNUNET_SYSERR;
962   }
963
964   if (GNUNET_OK != channel_key_store (plugin, channel_key))
965     return GNUNET_SYSERR;
966
967   struct GNUNET_MY_QueryParam params_insert[] = {
968     GNUNET_MY_query_param_auto_from_type (channel_key),
969     GNUNET_MY_query_param_uint64 (&hop_counter),
970     GNUNET_MY_query_param_auto_from_type (&msg->signature),
971     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
972     GNUNET_MY_query_param_uint64 (&fragment_id),
973     GNUNET_MY_query_param_uint64 (&fragment_offset),
974     GNUNET_MY_query_param_uint64 (&message_id),
975     GNUNET_MY_query_param_uint64 (&group_generation),
976     GNUNET_MY_query_param_uint64 (&flags),
977     GNUNET_MY_query_param_uint32 (&psycstore_flags),
978     GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
979                                                   - sizeof (*msg)),
980     GNUNET_MY_query_param_end
981   };
982
983   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
984   {
985     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
986               "mysql execute prepared", stmt);
987     return GNUNET_SYSERR;
988   }
989
990   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
991   {
992     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
993               "mysql_stmt_reset", stmt);
994     return GNUNET_SYSERR;
995   }
996
997   return GNUNET_OK;
998 }
999
1000 /**
1001  * Set additional flags for a given message.
1002  *
1003  * They are OR'd with any existing flags set.
1004  *
1005  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1006  */
1007 static int
1008 message_add_flags (void *cls,
1009                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1010                    uint64_t message_id,
1011                    uint64_t psycstore_flags)
1012 {
1013   struct Plugin *plugin = cls;
1014   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
1015
1016   int sql_ret;
1017   int ret = GNUNET_SYSERR;
1018
1019   struct GNUNET_MY_QueryParam params_update[] = {
1020     GNUNET_MY_query_param_uint64 (&psycstore_flags),
1021     GNUNET_MY_query_param_auto_from_type (channel_key),
1022     GNUNET_MY_query_param_uint64 (&message_id),
1023     GNUNET_MY_query_param_end
1024   };
1025
1026   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
1027   switch (sql_ret)
1028   {
1029     case GNUNET_OK:
1030       ret = GNUNET_OK;
1031       break;
1032
1033     default:
1034        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1035               "mysql execute prepared", stmt);
1036   }
1037
1038   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1039   {
1040     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1041               "mysql_stmt_reset", stmt);
1042     return GNUNET_SYSERR;
1043   }
1044
1045   return ret;
1046 }
1047
1048
1049 static int
1050 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
1051               GNUNET_PSYCSTORE_FragmentCallback cb,
1052               void *cb_cls)
1053 {
1054
1055   uint32_t hop_counter;
1056   void *signature = NULL;
1057   void *purpose = NULL;
1058   size_t signature_size;
1059   size_t purpose_size;
1060
1061   uint64_t fragment_id;
1062   uint64_t fragment_offset;
1063   uint64_t message_id;
1064   uint64_t group_generation;
1065   uint64_t flags;
1066   void *buf;
1067   size_t buf_size;
1068   int ret = GNUNET_SYSERR;
1069   int sql_ret;
1070   struct GNUNET_MULTICAST_MessageHeader *mp;
1071
1072   uint64_t msg_flags;
1073
1074
1075   struct GNUNET_MY_ResultSpec results[] = {
1076     GNUNET_MY_result_spec_uint32 (&hop_counter),
1077     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
1078     GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
1079     GNUNET_MY_result_spec_uint64 (&fragment_id),
1080     GNUNET_MY_result_spec_uint64 (&fragment_offset),
1081     GNUNET_MY_result_spec_uint64 (&message_id),
1082     GNUNET_MY_result_spec_uint64 (&group_generation),
1083     GNUNET_MY_result_spec_uint64 (&msg_flags),
1084     GNUNET_MY_result_spec_uint64 (&flags),
1085     GNUNET_MY_result_spec_variable_size (&buf,
1086                                          &buf_size),
1087     GNUNET_MY_result_spec_end
1088   };
1089
1090   sql_ret = GNUNET_MY_extract_result (stmt,
1091                                 results);
1092   switch (sql_ret)
1093   {
1094     case GNUNET_NO:
1095       if (ret != GNUNET_OK)
1096         ret = GNUNET_NO;
1097       break;
1098
1099     case GNUNET_OK:
1100       mp = GNUNET_malloc (sizeof (*mp) + buf_size);
1101
1102       mp->header.size = htons (sizeof (*mp) + buf_size);
1103       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1104       mp->hop_counter = htonl (hop_counter);
1105       GNUNET_memcpy (&mp->signature,
1106                     signature,
1107                     signature_size);
1108       GNUNET_memcpy (&mp->purpose,
1109                     purpose,
1110                     purpose_size);
1111       mp->fragment_id = GNUNET_htonll (fragment_id);
1112       mp->fragment_offset = GNUNET_htonll (fragment_offset);
1113       mp->message_id = GNUNET_htonll (message_id);
1114       mp->group_generation = GNUNET_htonll (group_generation);
1115       mp->flags = htonl(msg_flags);
1116
1117       GNUNET_memcpy (&mp[1],
1118                     buf,
1119                     buf_size);
1120       ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1121
1122       GNUNET_MY_cleanup_result (results);
1123       break;
1124
1125     default:
1126       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1127                     "mysql extract_result", stmt);
1128   }
1129
1130   return ret;
1131 }
1132
1133
1134 static int
1135 fragment_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1136                  struct GNUNET_MY_QueryParam *params,
1137                  uint64_t *returned_fragments,
1138                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1139 {
1140   int ret = GNUNET_SYSERR;
1141   int sql_ret;
1142
1143   // FIXME
1144   if (NULL == plugin->mc || NULL == stmt || NULL == params)
1145   {
1146     fprintf(stderr, "%p %p %p\n", plugin->mc, stmt, params);
1147     return GNUNET_SYSERR;
1148   }
1149
1150   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1151   switch (sql_ret)
1152   {
1153     case GNUNET_NO:
1154       if (ret != GNUNET_OK)
1155         ret = GNUNET_NO;
1156       break;
1157
1158     case GNUNET_YES:
1159        ret = fragment_row (stmt, cb, cb_cls);
1160        (*returned_fragments)++;
1161       break;
1162
1163     default:
1164       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1165                  "mysql exec_prepared", stmt);
1166   }
1167
1168   return ret;
1169 }
1170
1171 /**
1172  * Retrieve a message fragment range by fragment ID.
1173  *
1174  * @see GNUNET_PSYCSTORE_fragment_get()
1175  *
1176  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1177  */
1178 static int
1179 fragment_get (void *cls,
1180               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1181               uint64_t first_fragment_id,
1182               uint64_t last_fragment_id,
1183               uint64_t *returned_fragments,
1184               GNUNET_PSYCSTORE_FragmentCallback cb,
1185               void *cb_cls)
1186 {
1187   struct Plugin *plugin = cls;
1188   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1189   int ret = GNUNET_SYSERR;
1190   *returned_fragments = 0;
1191
1192   struct GNUNET_MY_QueryParam params_select[] = {
1193     GNUNET_MY_query_param_auto_from_type (channel_key),
1194     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1195     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1196     GNUNET_MY_query_param_end
1197   };
1198
1199   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1200
1201   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1202   {
1203     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1204               "mysql_stmt_reset", stmt);
1205     return GNUNET_SYSERR;
1206   }
1207
1208   return ret;
1209 }
1210
1211
1212 /**
1213  * Retrieve a message fragment range by fragment ID.
1214  *
1215  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1216  *
1217  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1218  */
1219 static int
1220 fragment_get_latest (void *cls,
1221                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222                      uint64_t fragment_limit,
1223                      uint64_t *returned_fragments,
1224                      GNUNET_PSYCSTORE_FragmentCallback cb,
1225                      void *cb_cls)
1226 {
1227   struct Plugin *plugin = cls;
1228
1229   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1230
1231   int ret = GNUNET_SYSERR;
1232   *returned_fragments = 0;
1233
1234   struct GNUNET_MY_QueryParam params_select[] = {
1235     GNUNET_MY_query_param_auto_from_type (channel_key),
1236     GNUNET_MY_query_param_uint64 (&fragment_limit),
1237     GNUNET_MY_query_param_end
1238   };
1239
1240   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1241
1242   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1243   {
1244     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1245               "mysql_stmt_reset", stmt);
1246     return GNUNET_SYSERR;
1247   }
1248
1249   return ret;
1250 }
1251
1252
1253 /**
1254  * Retrieve all fragments of a message ID range.
1255  *
1256  * @see GNUNET_PSYCSTORE_message_get()
1257  *
1258  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1259  */
1260 static int
1261 message_get (void *cls,
1262              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1263              uint64_t first_message_id,
1264              uint64_t last_message_id,
1265              uint64_t fragment_limit,
1266              uint64_t *returned_fragments,
1267              GNUNET_PSYCSTORE_FragmentCallback cb,
1268              void *cb_cls)
1269 {
1270   struct Plugin *plugin = cls;
1271
1272   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1273
1274   int ret = GNUNET_SYSERR;
1275   *returned_fragments = 0;
1276
1277   struct GNUNET_MY_QueryParam params_select[] = {
1278     GNUNET_MY_query_param_auto_from_type (channel_key),
1279     GNUNET_MY_query_param_uint64 (&first_message_id),
1280     GNUNET_MY_query_param_uint64 (&last_message_id),
1281     GNUNET_MY_query_param_uint64 (&fragment_limit),
1282     GNUNET_MY_query_param_end
1283   };
1284
1285   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1286
1287   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1288   {
1289     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1290               "mysql_stmt_reset", stmt);
1291     return GNUNET_SYSERR;
1292   }
1293
1294   return ret;
1295 }
1296
1297
1298 /**
1299  * Retrieve all fragments of the latest messages.
1300  *
1301  * @see GNUNET_PSYCSTORE_message_get_latest()
1302  *
1303  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1304  */
1305 static int
1306 message_get_latest (void *cls,
1307                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1308                     uint64_t message_limit,
1309                     uint64_t *returned_fragments,
1310                     GNUNET_PSYCSTORE_FragmentCallback cb,
1311                     void *cb_cls)
1312 {
1313   struct Plugin *plugin = cls;
1314
1315   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1316
1317   int ret = GNUNET_SYSERR;
1318   *returned_fragments = 0;
1319
1320   struct GNUNET_MY_QueryParam params_select[] = {
1321     GNUNET_MY_query_param_auto_from_type (channel_key),
1322     GNUNET_MY_query_param_auto_from_type (channel_key),
1323     GNUNET_MY_query_param_uint64 (&message_limit),
1324     GNUNET_MY_query_param_end
1325   };
1326
1327   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1328
1329   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1330   {
1331     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1332               "mysql_stmt_reset", stmt);
1333     return GNUNET_SYSERR;
1334   }
1335
1336   return ret;
1337 }
1338
1339
1340 /**
1341  * Retrieve a fragment of message specified by its message ID and fragment
1342  * offset.
1343  *
1344  * @see GNUNET_PSYCSTORE_message_get_fragment()
1345  *
1346  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1347  */
1348 static int
1349 message_get_fragment (void *cls,
1350                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1351                       uint64_t message_id,
1352                       uint64_t fragment_offset,
1353                       GNUNET_PSYCSTORE_FragmentCallback cb,
1354                       void *cb_cls)
1355 {
1356   struct Plugin *plugin = cls;
1357   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1358   int sql_ret;
1359   int ret = GNUNET_SYSERR;
1360
1361   struct GNUNET_MY_QueryParam params_select[] = {
1362     GNUNET_MY_query_param_auto_from_type (channel_key),
1363     GNUNET_MY_query_param_uint64 (&message_id),
1364     GNUNET_MY_query_param_uint64 (&fragment_offset),
1365     GNUNET_MY_query_param_end
1366   };
1367
1368   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1369   switch (sql_ret)
1370   {
1371     case GNUNET_NO:
1372       ret = GNUNET_NO;
1373       break;
1374
1375     case GNUNET_OK:
1376       ret = fragment_row (stmt, cb, cb_cls);
1377       break;
1378
1379     default:
1380       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1381               "mysql execute prepared", stmt);
1382   }
1383
1384   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1385   {
1386     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1387               "mysql_stmt_reset", stmt);
1388     return GNUNET_SYSERR;
1389   }
1390
1391   return ret;
1392 }
1393
1394 /**
1395  * Retrieve the max. values of message counters for a channel.
1396  *
1397  * @see GNUNET_PSYCSTORE_counters_get()
1398  *
1399  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1400  */
1401 static int
1402 counters_message_get (void *cls,
1403                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1404                       uint64_t *max_fragment_id,
1405                       uint64_t *max_message_id,
1406                       uint64_t *max_group_generation)
1407 {
1408   struct Plugin *plugin = cls;
1409
1410   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1411
1412   int ret = GNUNET_SYSERR;
1413
1414   struct GNUNET_MY_QueryParam params_select[] = {
1415     GNUNET_MY_query_param_auto_from_type (channel_key),
1416     GNUNET_MY_query_param_end
1417   };
1418
1419   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1420   {
1421     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422               "mysql execute prepared", stmt);
1423     return GNUNET_SYSERR;
1424   }
1425
1426   struct GNUNET_MY_ResultSpec results_select[] = {
1427     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1428     GNUNET_MY_result_spec_uint64 (max_message_id),
1429     GNUNET_MY_result_spec_uint64 (max_group_generation),
1430     GNUNET_MY_result_spec_end
1431   };
1432
1433   ret = GNUNET_MY_extract_result (stmt,
1434                                   results_select);
1435
1436   if (GNUNET_OK != ret)
1437   {
1438     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1439               "mysql extract_result", stmt);
1440     return GNUNET_SYSERR;
1441   }
1442
1443   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1444   {
1445     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1446               "mysql_stmt_reset", stmt);
1447     return GNUNET_SYSERR;
1448   }
1449
1450   return ret;
1451 }
1452
1453 /**
1454  * Retrieve the max. values of state counters for a channel.
1455  *
1456  * @see GNUNET_PSYCSTORE_counters_get()
1457  *
1458  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1459  */
1460 static int
1461 counters_state_get (void *cls,
1462                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1463                     uint64_t *max_state_message_id)
1464 {
1465   struct Plugin *plugin = cls;
1466
1467   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1468
1469   int ret = GNUNET_SYSERR;
1470
1471   struct GNUNET_MY_QueryParam params_select[] = {
1472     GNUNET_MY_query_param_auto_from_type (channel_key),
1473     GNUNET_MY_query_param_end
1474   };
1475
1476   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1477   {
1478     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1479               "mysql execute prepared", stmt);
1480     return GNUNET_SYSERR;
1481   }
1482
1483   struct GNUNET_MY_ResultSpec results_select[] = {
1484     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1485     GNUNET_MY_result_spec_end
1486   };
1487
1488   ret = GNUNET_MY_extract_result (stmt,
1489                                   results_select);
1490
1491   if (GNUNET_OK != ret)
1492   {
1493     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1494               "mysql extract_result", stmt);
1495     return GNUNET_SYSERR;
1496   }
1497
1498   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1499   {
1500     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1501               "mysql_stmt_reset", stmt);
1502     return GNUNET_SYSERR;
1503   }
1504
1505   return ret;
1506 }
1507
1508
1509 /**
1510  * Assign a value to a state variable.
1511  *
1512  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1513  */
1514 static int
1515 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1516               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1517               const char *name, const void *value, size_t value_size)
1518 {
1519   int ret = GNUNET_SYSERR;
1520
1521   struct GNUNET_MY_QueryParam params[] = {
1522     GNUNET_MY_query_param_auto_from_type (channel_key),
1523     GNUNET_MY_query_param_string (name),
1524     GNUNET_MY_query_param_auto_from_type (value),
1525     GNUNET_MY_query_param_end
1526   };
1527
1528   ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1529   if (GNUNET_OK != ret)
1530   {
1531     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1532               "mysql exec_prepared", stmt);
1533     return GNUNET_SYSERR;
1534   }
1535
1536   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1537   {
1538     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1539               "mysql_stmt_reset", stmt);
1540     return GNUNET_SYSERR;
1541   }
1542
1543   return ret;
1544 }
1545
1546
1547 static int
1548 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1549                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1550                    uint64_t message_id)
1551 {
1552   struct GNUNET_MY_QueryParam params[] = {
1553     GNUNET_MY_query_param_uint64 (&message_id),
1554     GNUNET_MY_query_param_auto_from_type (channel_key),
1555     GNUNET_MY_query_param_end
1556   };
1557
1558   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1559                                             stmt,
1560                                             params))
1561   {
1562     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1563               "mysql execute prepared", stmt);
1564     return GNUNET_SYSERR;
1565   }
1566
1567   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1568   {
1569     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1570               "mysql_stmt_reset", stmt);
1571     return GNUNET_SYSERR;
1572   }
1573
1574   return GNUNET_OK;
1575 }
1576
1577
1578 /**
1579  * Begin modifying current state.
1580  */
1581 static int
1582 state_modify_begin (void *cls,
1583                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1584                     uint64_t message_id, uint64_t state_delta)
1585 {
1586   struct Plugin *plugin = cls;
1587
1588   if (state_delta > 0)
1589   {
1590     /**
1591      * We can only apply state modifiers in the current message if modifiers in
1592      * the previous stateful message (message_id - state_delta) were already
1593      * applied.
1594      */
1595
1596     uint64_t max_state_message_id = 0;
1597     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1598     switch (ret)
1599     {
1600     case GNUNET_OK:
1601     case GNUNET_NO: // no state yet
1602       ret = GNUNET_OK;
1603       break;
1604     default:
1605       return ret;
1606     }
1607
1608     if (max_state_message_id < message_id - state_delta)
1609       return GNUNET_NO; /* some stateful messages not yet applied */
1610     else if (message_id - state_delta < max_state_message_id)
1611       return GNUNET_NO; /* changes already applied */
1612   }
1613
1614   if (TRANSACTION_NONE != plugin->transaction)
1615   {
1616     /** @todo FIXME: wait for other transaction to finish  */
1617     return GNUNET_SYSERR;
1618   }
1619   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1620 }
1621
1622
1623 /**
1624  * Set the current value of state variable.
1625  *
1626  * @see GNUNET_PSYCSTORE_state_modify()
1627  *
1628  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1629  */
1630 static int
1631 state_modify_op (void *cls,
1632                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1633                  enum GNUNET_PSYC_Operator op,
1634                  const char *name, const void *value, size_t value_size)
1635 {
1636   struct Plugin *plugin = cls;
1637   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1638
1639   switch (op)
1640   {
1641   case GNUNET_PSYC_OP_ASSIGN:
1642     return state_assign (plugin, plugin->insert_state_current, channel_key,
1643                          name, value, value_size);
1644
1645   default: /** @todo implement more state operations */
1646     GNUNET_break (0);
1647     return GNUNET_SYSERR;
1648   }
1649 }
1650
1651
1652 /**
1653  * End modifying current state.
1654  */
1655 static int
1656 state_modify_end (void *cls,
1657                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1658                   uint64_t message_id)
1659 {
1660   struct Plugin *plugin = cls;
1661   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1662
1663   return
1664     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1665     && GNUNET_OK == update_message_id (plugin,
1666                                        plugin->update_max_state_message_id,
1667                                        channel_key, message_id)
1668     && GNUNET_OK == transaction_commit (plugin)
1669     ? GNUNET_OK : GNUNET_SYSERR;
1670 }
1671
1672
1673 /**
1674  * Begin state synchronization.
1675  */
1676 static int
1677 state_sync_begin (void *cls,
1678                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1679 {
1680   struct Plugin *plugin = cls;
1681   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1682 }
1683
1684
1685 /**
1686  * Assign current value of a state variable.
1687  *
1688  * @see GNUNET_PSYCSTORE_state_modify()
1689  *
1690  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1691  */
1692 static int
1693 state_sync_assign (void *cls,
1694                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1695                 const char *name, const void *value, size_t value_size)
1696 {
1697   struct Plugin *plugin = cls;
1698   return state_assign (cls, plugin->insert_state_sync, channel_key,
1699                        name, value, value_size);
1700 }
1701
1702
1703 /**
1704  * End modifying current state.
1705  */
1706 static int
1707 state_sync_end (void *cls,
1708                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1709                 uint64_t max_state_message_id,
1710                 uint64_t state_hash_message_id)
1711 {
1712   struct Plugin *plugin = cls;
1713   int ret = GNUNET_SYSERR;
1714
1715   if (TRANSACTION_NONE != plugin->transaction)
1716   {
1717     /** @todo FIXME: wait for other transaction to finish  */
1718     return GNUNET_SYSERR;
1719   }
1720
1721   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1722     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1723     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1724                                   channel_key)
1725     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1726                                   channel_key)
1727     && GNUNET_OK == update_message_id (plugin,
1728                                        plugin->update_state_hash_message_id,
1729                                        channel_key, state_hash_message_id)
1730     && GNUNET_OK == update_message_id (plugin,
1731                                        plugin->update_max_state_message_id,
1732                                        channel_key, max_state_message_id)
1733     && GNUNET_OK == transaction_commit (plugin)
1734     ? ret = GNUNET_OK
1735     : transaction_rollback (plugin);
1736   return ret;
1737 }
1738
1739
1740 /**
1741  * Delete the whole state.
1742  *
1743  * @see GNUNET_PSYCSTORE_state_reset()
1744  *
1745  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1746  */
1747 static int
1748 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1749 {
1750   struct Plugin *plugin = cls;
1751   return exec_channel (plugin, plugin->delete_state, channel_key);
1752 }
1753
1754
1755 /**
1756  * Update signed values of state variables in the state store.
1757  *
1758  * @see GNUNET_PSYCSTORE_state_hash_update()
1759  *
1760  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1761  */
1762 static int
1763 state_update_signed (void *cls,
1764                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1765 {
1766   struct Plugin *plugin = cls;
1767   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1768 }
1769
1770
1771 /**
1772  * Retrieve a state variable by name.
1773  *
1774  * @see GNUNET_PSYCSTORE_state_get()
1775  *
1776  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1777  */
1778 static int
1779 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1780            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1781 {
1782   struct Plugin *plugin = cls;
1783   int ret = GNUNET_SYSERR;
1784   int sql_ret ;
1785
1786   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1787
1788   struct GNUNET_MY_QueryParam params_select[] = {
1789     GNUNET_MY_query_param_auto_from_type (channel_key),
1790     GNUNET_MY_query_param_string (name),
1791     GNUNET_MY_query_param_end
1792   };
1793
1794   void *value_current = NULL;
1795   size_t value_size = 0;
1796
1797   struct GNUNET_MY_ResultSpec results[] = {
1798     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1799     GNUNET_MY_result_spec_end
1800   };
1801
1802   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1803   {
1804     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1805               "mysql exec_prepared", stmt);
1806   }
1807   else
1808   {
1809     sql_ret = GNUNET_MY_extract_result (stmt, results);
1810     switch (sql_ret)
1811     {
1812     case GNUNET_NO:
1813       ret = GNUNET_NO;
1814       break;
1815     case GNUNET_YES:
1816       ret = cb (cb_cls, name, value_current,
1817                 value_size);
1818       break;
1819     default:
1820       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1821                 "mysql extract_result", stmt);
1822     }
1823   }
1824
1825   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1826   {
1827     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1828               "mysql_stmt_reset", stmt);
1829     return GNUNET_SYSERR;
1830   }
1831
1832   return ret;
1833 }
1834
1835
1836 /**
1837  * Retrieve all state variables for a channel with the given prefix.
1838  *
1839  * @see GNUNET_PSYCSTORE_state_get_prefix()
1840  *
1841  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1842  */
1843 static int
1844 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1845                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1846                   void *cb_cls)
1847 {
1848   struct Plugin *plugin = cls;
1849   int ret = GNUNET_SYSERR;
1850
1851   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1852
1853   uint32_t name_len = (uint32_t) strlen (name);
1854
1855   struct GNUNET_MY_QueryParam params_select[] = {
1856     GNUNET_MY_query_param_auto_from_type (channel_key),
1857     GNUNET_MY_query_param_string (name),
1858     GNUNET_MY_query_param_uint32 (&name_len),
1859     GNUNET_MY_query_param_string (name),
1860     GNUNET_MY_query_param_end
1861   };
1862
1863   char *name2 = "";
1864   void *value_current = NULL;
1865   size_t value_size = 0;
1866
1867   struct GNUNET_MY_ResultSpec results[] = {
1868     GNUNET_MY_result_spec_string (&name2),
1869     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1870     GNUNET_MY_result_spec_end
1871   };
1872
1873   int sql_ret;
1874
1875   do
1876   {
1877     if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1878     {
1879       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1880                 "mysql exec_prepared", stmt);
1881       break;
1882     }
1883     sql_ret = GNUNET_MY_extract_result (stmt, results);
1884     switch (sql_ret)
1885     {
1886       case GNUNET_NO:
1887         if (ret != GNUNET_OK)
1888           ret = GNUNET_NO;
1889         break;
1890
1891       case GNUNET_YES:
1892         ret = cb (cb_cls, (const char *) name2,
1893                   value_current,
1894                   value_size);
1895
1896         if (ret != GNUNET_YES)
1897           sql_ret = GNUNET_NO;
1898         break;
1899
1900       default:
1901         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1902               "mysql extract_result", stmt);
1903     }
1904   }
1905   while (sql_ret == GNUNET_YES);
1906
1907   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1908   {
1909     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1910               "mysql_stmt_reset", stmt);
1911     return GNUNET_SYSERR;
1912   }
1913
1914   return ret;
1915 }
1916
1917
1918 /**
1919  * Retrieve all signed state variables for a channel.
1920  *
1921  * @see GNUNET_PSYCSTORE_state_get_signed()
1922  *
1923  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1924  */
1925 static int
1926 state_get_signed (void *cls,
1927                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1928                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1929 {
1930   struct Plugin *plugin = cls;
1931   int ret = GNUNET_SYSERR;
1932
1933   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1934
1935   struct GNUNET_MY_QueryParam params_select[] = {
1936     GNUNET_MY_query_param_auto_from_type (channel_key),
1937     GNUNET_MY_query_param_end
1938   };
1939
1940   int sql_ret;
1941
1942   char *name = "";
1943   void *value_signed = NULL;
1944   size_t value_size = 0;
1945
1946   struct GNUNET_MY_ResultSpec results[] = {
1947     GNUNET_MY_result_spec_string (&name),
1948     GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1949     GNUNET_MY_result_spec_end
1950   };
1951
1952   do
1953   {
1954     if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1955     {
1956       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1957                 "mysql exec_prepared", stmt);
1958       break;
1959     }
1960     sql_ret = GNUNET_MY_extract_result (stmt, results);
1961     switch (sql_ret)
1962     {
1963       case GNUNET_NO:
1964         if (ret != GNUNET_OK)
1965           ret = GNUNET_NO;
1966         break;
1967       case GNUNET_YES:
1968         ret = cb (cb_cls, (const char *) name,
1969                   value_signed,
1970                   value_size);
1971
1972         if (ret != GNUNET_YES)
1973             sql_ret = GNUNET_NO;
1974         break;
1975       default:
1976          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1977               "mysql extract_result", stmt);
1978     }
1979   }
1980   while (sql_ret == GNUNET_YES);
1981
1982   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1983   {
1984     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1985               "mysql_stmt_reset", stmt);
1986     return GNUNET_SYSERR;
1987   }
1988
1989   return ret;
1990 }
1991
1992
1993 /**
1994  * Entry point for the plugin.
1995  *
1996  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1997  * @return NULL on error, otherwise the plugin context
1998  */
1999 void *
2000 libgnunet_plugin_psycstore_mysql_init (void *cls)
2001 {
2002   static struct Plugin plugin;
2003   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
2004   struct GNUNET_PSYCSTORE_PluginFunctions *api;
2005
2006   if (NULL != plugin.cfg)
2007     return NULL;                /* can only initialize once! */
2008   memset (&plugin, 0, sizeof (struct Plugin));
2009   plugin.cfg = cfg;
2010   if (GNUNET_OK != database_setup (&plugin))
2011   {
2012     database_shutdown (&plugin);
2013     return NULL;
2014   }
2015   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
2016   api->cls = &plugin;
2017   api->membership_store = &mysql_membership_store;
2018   api->membership_test = &membership_test;
2019   api->fragment_store = &fragment_store;
2020   api->message_add_flags = &message_add_flags;
2021   api->fragment_get = &fragment_get;
2022   api->fragment_get_latest = &fragment_get_latest;
2023   api->message_get = &message_get;
2024   api->message_get_latest = &message_get_latest;
2025   api->message_get_fragment = &message_get_fragment;
2026   api->counters_message_get = &counters_message_get;
2027   api->counters_state_get = &counters_state_get;
2028   api->state_modify_begin = &state_modify_begin;
2029   api->state_modify_op = &state_modify_op;
2030   api->state_modify_end = &state_modify_end;
2031   api->state_sync_begin = &state_sync_begin;
2032   api->state_sync_assign = &state_sync_assign;
2033   api->state_sync_end = &state_sync_end;
2034   api->state_reset = &state_reset;
2035   api->state_update_signed = &state_update_signed;
2036   api->state_get = &state_get;
2037   api->state_get_prefix = &state_get_prefix;
2038   api->state_get_signed = &state_get_signed;
2039
2040   LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
2041   return api;
2042 }
2043
2044
2045 /**
2046  * Exit point from the plugin.
2047  *
2048  * @param cls The plugin context (as returned by "init")
2049  * @return Always NULL
2050  */
2051 void *
2052 libgnunet_plugin_psycstore_mysql_done (void *cls)
2053 {
2054   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2055   struct Plugin *plugin = api->cls;
2056
2057   database_shutdown (plugin);
2058   plugin->cfg = NULL;
2059   GNUNET_free (api);
2060   LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
2061   return NULL;
2062 }
2063
2064 /* end of plugin_psycstore_mysql.c */