psycstore: fix postgres
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_mysql.c
23  * @brief mysql-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  */
28
29 #include "platform.h"
30 #include "gnunet_psycstore_plugin.h"
31 #include "gnunet_psycstore_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_crypto_lib.h"
34 #include "gnunet_psyc_util_lib.h"
35 #include "psycstore.h"
36 #include "gnunet_my_lib.h"
37 #include "gnunet_mysql_lib.h"
38 #include <mysql/mysql.h>
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 /**
55  * Log an error message at log-level 'level' that indicates
56  * a failure of the command 'cmd' on file 'filename'
57  * with the message given by strerror(errno).
58  */
59 #define LOG_MYSQL(db, level, cmd, stmt)                                 \
60   do {                                                                  \
61     GNUNET_log_from (level, "psycstore-mysql",                          \
62                      _("`%s' failed at %s:%d with error: %s\n"),        \
63                      cmd, __FILE__, __LINE__,                           \
64                      mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
65   } while (0)
66
67 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
68
69 enum Transactions {
70   TRANSACTION_NONE = 0,
71   TRANSACTION_STATE_MODIFY,
72   TRANSACTION_STATE_SYNC,
73 };
74
75 /**
76  * Context for all functions in this plugin.
77  */
78 struct Plugin
79 {
80
81   const struct GNUNET_CONFIGURATION_Handle *cfg;
82
83   /**
84    * MySQL context.
85    */
86   struct GNUNET_MYSQL_Context *mc;
87
88   /**
89    * Current transaction.
90    */
91   enum Transactions transaction;
92
93   /**
94    * Precompiled SQL for channel_key_store()
95    */
96   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
97
98   /**
99    * Precompiled SQL for slave_key_store()
100    */
101   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
102
103   /**
104    * Precompiled SQL for membership_store()
105    */
106   struct GNUNET_MYSQL_StatementHandle *insert_membership;
107
108   /**
109    * Precompiled SQL for membership_test()
110    */
111   struct GNUNET_MYSQL_StatementHandle *select_membership;
112
113   /**
114    * Precompiled SQL for fragment_store()
115    */
116   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
117
118   /**
119    * Precompiled SQL for message_add_flags()
120    */
121   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
122
123   /**
124    * Precompiled SQL for fragment_get()
125    */
126   struct GNUNET_MYSQL_StatementHandle *select_fragments;
127
128   /**
129    * Precompiled SQL for fragment_get()
130    */
131   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
132
133   /**
134    * Precompiled SQL for message_get()
135    */
136   struct GNUNET_MYSQL_StatementHandle *select_messages;
137
138   /**
139    * Precompiled SQL for message_get()
140    */
141   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
142
143   /**
144    * Precompiled SQL for message_get_fragment()
145    */
146   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
147
148   /**
149    * Precompiled SQL for counters_get_message()
150    */
151   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
152
153   /**
154    * Precompiled SQL for counters_get_state()
155    */
156   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
157
158   /**
159    * Precompiled SQL for state_modify_end()
160    */
161   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
162
163   /**
164    * Precompiled SQL for state_sync_end()
165    */
166   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
167
168   /**
169    * Precompiled SQL for state_modify_op()
170    */
171   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
172
173   /**
174    * Precompiled SQL for state_modify_end()
175    */
176   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
177
178   /**
179    * Precompiled SQL for state_set_signed()
180    */
181   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
182
183   /**
184    * Precompiled SQL for state_sync()
185    */
186   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
187
188   /**
189    * Precompiled SQL for state_sync()
190    */
191   struct GNUNET_MYSQL_StatementHandle *delete_state;
192
193   /**
194    * Precompiled SQL for state_sync()
195    */
196   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
197
198   /**
199    * Precompiled SQL for state_sync()
200    */
201   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
202
203   /**
204    * Precompiled SQL for state_get_signed()
205    */
206   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
207
208   /**
209    * Precompiled SQL for state_get()
210    */
211   struct GNUNET_MYSQL_StatementHandle *select_state_one;
212
213   /**
214    * Precompiled SQL for state_get_prefix()
215    */
216   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
217
218 };
219
220 #if DEBUG_PSYCSTORE
221
222 static void
223 mysql_trace (void *cls, const char *sql)
224 {
225   LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
226 }
227
228 #endif
229
230
231 /**
232  * @brief Prepare a SQL statement
233  *
234  * @param dbh handle to the database
235  * @param sql SQL statement, UTF-8 encoded
236  * @param stmt set to the prepared statement
237  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
238  */
239 static int
240 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
241               const char *sql,
242               struct GNUNET_MYSQL_StatementHandle **stmt)
243 {
244   *stmt = GNUNET_MYSQL_statement_prepare (mc,
245                                           sql);
246
247   if (NULL == *stmt)
248   {
249     LOG (GNUNET_ERROR_TYPE_ERROR,
250          _("Error preparing SQL query: %s\n  %s\n"),
251          mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
252          sql);
253     return GNUNET_SYSERR;
254   }
255   LOG (GNUNET_ERROR_TYPE_DEBUG,
256        "Prepared `%s' / %p\n",
257        sql,
258        stmt);
259   return GNUNET_OK;
260 }
261
262
263 /**
264  * Initialize the database connections and associated
265  * data structures (create tables and indices
266  * as needed as well).
267  *
268  * @param plugin the plugin context (state for this module)
269  * @return #GNUNET_OK on success
270  */
271 static int
272 database_setup (struct Plugin *plugin)
273 {
274   /* Open database and precompile statements */
275   plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
276                                             "psycstore-mysql");
277
278   if (NULL == plugin->mc)
279   {
280     LOG (GNUNET_ERROR_TYPE_ERROR,
281          _("Unable to initialize Mysql.\n"));
282     return GNUNET_SYSERR;
283   }
284
285 #define STMT_RUN(sql) \
286   if (GNUNET_OK != \
287       GNUNET_MYSQL_statement_run (plugin->mc, \
288                                   sql)) \
289   { \
290     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
291                 _("Failed to run SQL statement `%s'\n"), \
292                 sql); \
293     return GNUNET_SYSERR; \
294   }
295
296   /* Create tables */
297   STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
298             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
299             " pub_key BLOB(23),\n"
300             " max_state_message_id BIGINT UNSIGNED,\n"
301             " state_hash_message_id BIGINT UNSIGNED,\n"
302             " PRIMARY KEY(id),\n"
303             " UNIQUE KEY(pub_key(32))\n"
304             ");");
305
306   STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
307             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
308             " pub_key BLOB(32),\n"
309             " PRIMARY KEY(id),\n"
310             " UNIQUE KEY(pub_key(32))\n"
311             ");");
312
313   STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
314             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
315             "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
316             "  did_join TINYINT NOT NULL,\n"
317             "  announced_at BIGINT UNSIGNED NOT NULL,\n"
318             "  effective_since BIGINT UNSIGNED NOT NULL,\n"
319             "  group_generation BIGINT UNSIGNED NOT NULL\n"
320             ");");
321
322 /*** FIX because IF NOT EXISTS doesn't work ***/
323   GNUNET_MYSQL_statement_run (plugin->mc,
324                               "CREATE INDEX idx_membership_channel_id_slave_id "
325                               "ON membership (channel_id, slave_id);");
326
327   /** @todo messages table: add method_name column */
328   STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
329             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
330             "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
331             "  signature BLOB,\n"
332             "  purpose BLOB,\n"
333             "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
334             "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
335                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
336             "  group_generation BIGINT UNSIGNED NOT NULL,\n"
337             "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
338             "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
339             "  data BLOB,\n"
340             "  PRIMARY KEY (channel_id, fragment_id),\n"
341             "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
342             ");");
343
344   STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
345             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
346             "  name TEXT NOT NULL,\n"
347             "  value_current BLOB,\n"
348             "  value_signed BLOB\n"
349             //"  PRIMARY KEY (channel_id, name(255))\n"
350             ");");
351
352   STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
353             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
354             "  name TEXT NOT NULL,\n"
355             "  value BLOB\n"
356             //"  PRIMARY KEY (channel_id, name(255))\n"
357             ");");
358 #undef STMT_RUN
359
360   /* Prepare statements */
361 #define PREP(stmt,handle)                                    \
362   if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
363   { \
364     GNUNET_break (0); \
365     return GNUNET_SYSERR; \
366   }
367   PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
368         &plugin->insert_channel_key);
369   PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
370         &plugin->insert_slave_key);
371   PREP ("INSERT INTO membership\n"
372         " (channel_id, slave_id, did_join, announced_at,\n"
373         "  effective_since, group_generation)\n"
374         "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
375         "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
376         "        ?, ?, ?, ?);",
377         &plugin->insert_membership);
378   PREP ("SELECT did_join FROM membership\n"
379         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
380         "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
381         "      AND effective_since <= ? AND did_join = 1\n"
382         "ORDER BY announced_at DESC LIMIT 1;",
383         &plugin->select_membership);
384
385   PREP ("INSERT IGNORE INTO messages\n"
386         " (channel_id, hop_counter, signature, purpose,\n"
387         "  fragment_id, fragment_offset, message_id,\n"
388         "  group_generation, multicast_flags, psycstore_flags, data)\n"
389         "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
390         "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
391         &plugin->insert_fragment);
392
393   PREP ("UPDATE messages\n"
394         "SET psycstore_flags = psycstore_flags | ?\n"
395         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
396         "      AND message_id = ? AND fragment_offset = 0;",
397         &plugin->update_message_flags);
398
399   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
400         "       fragment_offset, message_id, group_generation,\n"
401         "       multicast_flags, psycstore_flags, data\n"
402         "FROM messages\n"
403         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
404         "      AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
405         &plugin->select_fragments);
406
407   /** @todo select_messages: add method_prefix filter */
408   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
409         "       fragment_offset, message_id, group_generation,\n"
410         "       multicast_flags, psycstore_flags, data\n"
411         "FROM messages\n"
412         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
413         "      AND ? <= message_id AND message_id <= ?\n"
414         "LIMIT ?;",
415         &plugin->select_messages);
416
417   PREP ("SELECT * FROM\n"
418         "(SELECT hop_counter, signature, purpose, fragment_id,\n"
419         "        fragment_offset, message_id, group_generation,\n"
420         "        multicast_flags, psycstore_flags, data\n"
421         " FROM messages\n"
422         " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
423         " ORDER BY fragment_id DESC\n"
424         " LIMIT ?)\n"
425         "ORDER BY fragment_id;",
426         &plugin->select_latest_fragments);
427
428   /** @todo select_latest_messages: add method_prefix filter */
429   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
430         "       fragment_offset, message_id, group_generation,\n"
431         "        multicast_flags, psycstore_flags, data\n"
432         "FROM messages\n"
433         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
434         "      AND message_id IN\n"
435         "      (SELECT message_id\n"
436         "       FROM messages\n"
437         "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
438         "       GROUP BY message_id\n"
439         "       ORDER BY message_id\n"
440         "       DESC LIMIT ?)\n"
441         "ORDER BY fragment_id;",
442         &plugin->select_latest_messages);
443
444   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
445         "       fragment_offset, message_id, group_generation,\n"
446         "       multicast_flags, psycstore_flags, data\n"
447         "FROM messages\n"
448         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
449         "      AND message_id = ? AND fragment_offset = ?;",
450         &plugin->select_message_fragment);
451
452   PREP ("SELECT fragment_id, message_id, group_generation\n"
453         "FROM messages\n"
454         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
455         "ORDER BY fragment_id DESC LIMIT 1;",
456         &plugin->select_counters_message);
457
458   PREP ("SELECT max_state_message_id\n"
459         "FROM channels\n"
460         "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
461         &plugin->select_counters_state);
462
463   PREP ("UPDATE channels\n"
464         "SET max_state_message_id = ?\n"
465         "WHERE pub_key = ?;",
466         &plugin->update_max_state_message_id);
467
468   PREP ("UPDATE channels\n"
469         "SET state_hash_message_id = ?\n"
470         "WHERE pub_key = ?;",
471         &plugin->update_state_hash_message_id);
472
473   PREP ("REPLACE INTO state\n"
474         "  (channel_id, name, value_current, value_signed)\n"
475         "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
476         "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
477         "             (SELECT ?) AS name,\n"
478         "             (SELECT ?) AS value_current\n"
479         "     ) AS new\n"
480         "LEFT JOIN (SELECT channel_id, name, value_signed\n"
481         "           FROM state) AS old\n"
482         "ON new.channel_id = old.channel_id AND new.name = old.name;",
483         &plugin->insert_state_current);
484
485   PREP ("DELETE FROM state\n"
486         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
487         "      AND (value_current IS NULL OR length(value_current) = 0)\n"
488         "      AND (value_signed IS NULL OR length(value_signed) = 0);",
489         &plugin->delete_state_empty);
490
491   PREP ("UPDATE state\n"
492         "SET value_signed = value_current\n"
493         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
494         &plugin->update_state_signed);
495
496   PREP ("DELETE FROM state\n"
497         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
498         &plugin->delete_state);
499
500   PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
501         "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
502         &plugin->insert_state_sync);
503
504   PREP ("INSERT INTO state\n"
505         " (channel_id, name, value_current, value_signed)\n"
506         "SELECT channel_id, name, value, value\n"
507         "FROM state_sync\n"
508         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
509         &plugin->insert_state_from_sync);
510
511   PREP ("DELETE FROM state_sync\n"
512         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
513         &plugin->delete_state_sync);
514
515   PREP ("SELECT value_current\n"
516         "FROM state\n"
517         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
518         "      AND name = ?;",
519         &plugin->select_state_one);
520
521   PREP ("SELECT name, value_current\n"
522         "FROM state\n"
523         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
524         "      AND (name = ? OR substr(name, 1, ?) = ?);",
525         &plugin->select_state_prefix);
526
527   PREP ("SELECT name, value_signed\n"
528         "FROM state\n"
529         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
530         "      AND value_signed IS NOT NULL;",
531         &plugin->select_state_signed);
532 #undef PREP
533
534   return GNUNET_OK;
535 }
536
537
538 /**
539  * Shutdown database connection and associate data
540  * structures.
541  * @param plugin the plugin context (state for this module)
542  */
543 static void
544 database_shutdown (struct Plugin *plugin)
545 {
546   GNUNET_MYSQL_context_destroy (plugin->mc);
547 }
548
549
550 /**
551  * Execute a prepared statement with a @a channel_key argument.
552  *
553  * @param plugin Plugin handle.
554  * @param stmt Statement to execute.
555  * @param channel_key Public key of the channel.
556  *
557  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
558  */
559 static int
560 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
561               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
562 {
563   struct GNUNET_MY_QueryParam params[] = {
564     GNUNET_MY_query_param_auto_from_type (channel_key),
565     GNUNET_MY_query_param_end
566   };
567
568   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
569   {
570     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
571               "mysql exec_channel", stmt);
572   }
573
574   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
575   {
576     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
577               "mysql_stmt_reset", stmt);
578     return GNUNET_SYSERR;
579   }
580
581   return GNUNET_OK;
582 }
583
584
585 /**
586  * Begin a transaction.
587  */
588 static int
589 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
590 {
591   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
592   {
593     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
594     return GNUNET_SYSERR;
595   }
596
597   plugin->transaction = transaction;
598   return GNUNET_OK;
599 }
600
601
602 /**
603  * Commit current transaction.
604  */
605 static int
606 transaction_commit (struct Plugin *plugin)
607 {
608   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
609   {
610     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
611     return GNUNET_SYSERR;
612   }
613
614   plugin->transaction = TRANSACTION_NONE;
615   return GNUNET_OK;
616 }
617
618
619 /**
620  * Roll back current transaction.
621  */
622 static int
623 transaction_rollback (struct Plugin *plugin)
624 {
625   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
626   {
627     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
628     return GNUNET_SYSERR;
629   }
630
631   plugin->transaction = TRANSACTION_NONE;
632   return GNUNET_OK;
633 }
634
635
636 static int
637 channel_key_store (struct Plugin *plugin,
638                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
639 {
640   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
641
642   struct GNUNET_MY_QueryParam params[] = {
643     GNUNET_MY_query_param_auto_from_type (channel_key),
644     GNUNET_MY_query_param_end
645   };
646
647   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
648   {
649     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
650               "mysql exec_prepared", stmt);
651     return GNUNET_SYSERR;
652   }
653
654   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
655   {
656     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
657               "mysql_stmt_reset", stmt);
658     return GNUNET_SYSERR;
659   }
660
661   return GNUNET_OK;
662 }
663
664
665 static int
666 slave_key_store (struct Plugin *plugin,
667                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
668 {
669   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
670
671   struct GNUNET_MY_QueryParam params[] = {
672     GNUNET_MY_query_param_auto_from_type (slave_key),
673     GNUNET_MY_query_param_end
674   };
675
676   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
677   {
678     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
679               "mysql exec_prepared", stmt);
680     return GNUNET_SYSERR;
681   }
682
683   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
684   {
685     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
686               "mysql_stmt_reset", stmt);
687     return GNUNET_SYSERR;
688   }
689
690   return GNUNET_OK;
691 }
692
693
694 /**
695  * Store join/leave events for a PSYC channel in order to be able to answer
696  * membership test queries later.
697  *
698  * @see GNUNET_PSYCSTORE_membership_store()
699  *
700  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
701  */
702 static int
703 mysql_membership_store (void *cls,
704                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
705                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
706                          int did_join,
707                          uint64_t announced_at,
708                          uint64_t effective_since,
709                          uint64_t group_generation)
710 {
711   struct Plugin *plugin = cls;
712
713   uint32_t idid_join = (uint32_t)did_join;
714
715   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
716
717   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
718
719   if (announced_at > INT64_MAX ||
720       effective_since > INT64_MAX ||
721       group_generation > INT64_MAX)
722   {
723     GNUNET_break (0);
724     return GNUNET_SYSERR;
725   }
726
727   if (GNUNET_OK != channel_key_store (plugin, channel_key)
728       || GNUNET_OK != slave_key_store (plugin, slave_key))
729     return GNUNET_SYSERR;
730
731   struct GNUNET_MY_QueryParam params[] = {
732     GNUNET_MY_query_param_auto_from_type (channel_key),
733     GNUNET_MY_query_param_auto_from_type (slave_key),
734     GNUNET_MY_query_param_uint32 (&idid_join),
735     GNUNET_MY_query_param_uint64 (&announced_at),
736     GNUNET_MY_query_param_uint64 (&effective_since),
737     GNUNET_MY_query_param_uint64 (&group_generation),
738     GNUNET_MY_query_param_end
739   };
740
741   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
742   {
743     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
744               "mysql exec_prepared", stmt);
745     return GNUNET_SYSERR;
746   }
747
748   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
749   {
750     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
751               "mysql_stmt_reset", stmt);
752     return GNUNET_SYSERR;
753   }
754   return GNUNET_OK;
755 }
756
757 /**
758  * Test if a member was admitted to the channel at the given message ID.
759  *
760  * @see GNUNET_PSYCSTORE_membership_test()
761  *
762  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
763  *         #GNUNET_SYSERR if there was en error.
764  */
765 static int
766 membership_test (void *cls,
767                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
768                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
769                  uint64_t message_id)
770 {
771   struct Plugin *plugin = cls;
772
773   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
774
775   uint32_t did_join = 0;
776
777   int ret = GNUNET_SYSERR;
778
779   struct GNUNET_MY_QueryParam params_select[] = {
780     GNUNET_MY_query_param_auto_from_type (channel_key),
781     GNUNET_MY_query_param_auto_from_type (slave_key),
782     GNUNET_MY_query_param_uint64 (&message_id),
783     GNUNET_MY_query_param_end
784   };
785
786   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
787   {
788     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
789                 "mysql execute prepared", stmt);
790     return GNUNET_SYSERR;
791   }
792
793   struct GNUNET_MY_ResultSpec results_select[] = {
794     GNUNET_MY_result_spec_uint32 (&did_join),
795     GNUNET_MY_result_spec_end
796   };
797
798   switch (GNUNET_MY_extract_result (stmt, results_select))
799   {
800     case GNUNET_NO:
801       ret = GNUNET_NO;
802       break;
803     case GNUNET_OK:
804       ret = GNUNET_YES;
805       break;
806     default:
807       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
808                 "mysql extract_result", stmt);
809       return GNUNET_SYSERR;
810   }
811
812   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
813   {
814     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
815               "mysql_stmt_reset", stmt);
816     return GNUNET_SYSERR;
817   }
818
819   return ret;
820 }
821
822 /**
823  * Store a message fragment sent to a channel.
824  *
825  * @see GNUNET_PSYCSTORE_fragment_store()
826  *
827  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
828  */
829 static int
830 fragment_store (void *cls,
831                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
832                 const struct GNUNET_MULTICAST_MessageHeader *msg,
833                 uint32_t psycstore_flags)
834 {
835   struct Plugin *plugin = cls;
836
837   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
838
839   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
840
841   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
842
843   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
844   uint64_t message_id = GNUNET_ntohll (msg->message_id);
845   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
846
847   uint64_t hop_counter = ntohl(msg->hop_counter);
848   uint64_t flags = ntohl(msg->flags);
849
850   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
851       message_id > INT64_MAX || group_generation > INT64_MAX)
852   {
853     LOG(GNUNET_ERROR_TYPE_ERROR,
854          "Tried to store fragment with a field > INT64_MAX: "
855          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
856          message_id, group_generation);
857     GNUNET_break (0);
858     return GNUNET_SYSERR;
859   }
860
861   if (GNUNET_OK != channel_key_store (plugin, channel_key))
862     return GNUNET_SYSERR;
863
864   struct GNUNET_MY_QueryParam params_insert[] = {
865     GNUNET_MY_query_param_auto_from_type (channel_key),
866     GNUNET_MY_query_param_uint64 (&hop_counter),
867     GNUNET_MY_query_param_auto_from_type (&msg->signature),
868     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
869     GNUNET_MY_query_param_uint64 (&fragment_id),
870     GNUNET_MY_query_param_uint64 (&fragment_offset),
871     GNUNET_MY_query_param_uint64 (&message_id),
872     GNUNET_MY_query_param_uint64 (&group_generation),
873     GNUNET_MY_query_param_uint64 (&flags),
874     GNUNET_MY_query_param_uint32 (&psycstore_flags),
875     GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
876                                                   - sizeof (*msg)),
877     GNUNET_MY_query_param_end
878   };
879
880   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
881   {
882     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
883               "mysql execute prepared", stmt);
884     return GNUNET_SYSERR;
885   }
886
887   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
888   {
889     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
890               "mysql_stmt_reset", stmt);
891     return GNUNET_SYSERR;
892   }
893
894   return GNUNET_OK;
895 }
896
897 /**
898  * Set additional flags for a given message.
899  *
900  * They are OR'd with any existing flags set.
901  *
902  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
903  */
904 static int
905 message_add_flags (void *cls,
906                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
907                    uint64_t message_id,
908                    uint32_t psycstore_flags)
909 {
910   struct Plugin *plugin = cls;
911   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
912
913   int sql_ret;
914   int ret = GNUNET_SYSERR;
915
916   struct GNUNET_MY_QueryParam params_update[] = {
917     GNUNET_MY_query_param_uint32 (&psycstore_flags),
918     GNUNET_MY_query_param_auto_from_type (channel_key),
919     GNUNET_MY_query_param_uint64 (&message_id),
920     GNUNET_MY_query_param_end
921   };
922
923   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
924   switch (sql_ret)
925   {
926     case GNUNET_OK:
927       ret = GNUNET_OK;
928       break;
929
930     default:
931        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
932               "mysql execute prepared", stmt);
933   }
934
935   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
936   {
937     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
938               "mysql_stmt_reset", stmt);
939     return GNUNET_SYSERR;
940   }
941
942   return ret;
943 }
944
945
946 static int
947 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
948               GNUNET_PSYCSTORE_FragmentCallback cb,
949               void *cb_cls,
950               uint64_t *returned_fragments)
951 {
952
953   uint32_t hop_counter;
954   void *signature = NULL;
955   void *purpose = NULL;
956   size_t signature_size;
957   size_t purpose_size;
958   uint64_t fragment_id;
959   uint64_t fragment_offset;
960   uint64_t message_id;
961   uint64_t group_generation;
962   uint64_t flags;
963   void *buf;
964   size_t buf_size;
965   int ret = GNUNET_SYSERR;
966   int sql_ret;
967   struct GNUNET_MULTICAST_MessageHeader *mp;
968   uint64_t msg_flags;
969   struct GNUNET_MY_ResultSpec results[] = {
970     GNUNET_MY_result_spec_uint32 (&hop_counter),
971     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
972     GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
973     GNUNET_MY_result_spec_uint64 (&fragment_id),
974     GNUNET_MY_result_spec_uint64 (&fragment_offset),
975     GNUNET_MY_result_spec_uint64 (&message_id),
976     GNUNET_MY_result_spec_uint64 (&group_generation),
977     GNUNET_MY_result_spec_uint64 (&msg_flags),
978     GNUNET_MY_result_spec_uint64 (&flags),
979     GNUNET_MY_result_spec_variable_size (&buf,
980                                          &buf_size),
981     GNUNET_MY_result_spec_end
982   };
983
984   do
985   {
986     sql_ret = GNUNET_MY_extract_result (stmt, results);
987     switch (sql_ret)
988     {
989     case GNUNET_NO:
990       if (ret != GNUNET_YES)
991         ret = GNUNET_NO;
992       break;
993
994     case GNUNET_YES:
995       mp = GNUNET_malloc (sizeof (*mp) + buf_size);
996
997       mp->header.size = htons (sizeof (*mp) + buf_size);
998       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
999       mp->hop_counter = htonl (hop_counter);
1000       GNUNET_memcpy (&mp->signature,
1001                      signature,
1002                      signature_size);
1003       GNUNET_memcpy (&mp->purpose,
1004                      purpose,
1005                      purpose_size);
1006       mp->fragment_id = GNUNET_htonll (fragment_id);
1007       mp->fragment_offset = GNUNET_htonll (fragment_offset);
1008       mp->message_id = GNUNET_htonll (message_id);
1009       mp->group_generation = GNUNET_htonll (group_generation);
1010       mp->flags = htonl(msg_flags);
1011
1012       GNUNET_memcpy (&mp[1],
1013                      buf,
1014                      buf_size);
1015       ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1016       if (NULL != returned_fragments)
1017         (*returned_fragments)++;
1018       GNUNET_MY_cleanup_result (results);
1019       break;
1020
1021     default:
1022       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1023                  "mysql extract_result", stmt);
1024     }
1025   }
1026   while (GNUNET_YES == sql_ret);
1027
1028   // for debugging
1029   if (GNUNET_NO == ret)
1030     GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1031                "Empty result set\n");
1032
1033   return ret;
1034 }
1035
1036
1037 static int
1038 fragment_select (struct Plugin *plugin,
1039                  struct GNUNET_MYSQL_StatementHandle *stmt,
1040                  struct GNUNET_MY_QueryParam *params,
1041                  uint64_t *returned_fragments,
1042                  GNUNET_PSYCSTORE_FragmentCallback cb,
1043                  void *cb_cls)
1044 {
1045   int ret = GNUNET_SYSERR;
1046   int sql_ret;
1047
1048   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1049   switch (sql_ret)
1050   {
1051     case GNUNET_NO:
1052       if (ret != GNUNET_YES)
1053         ret = GNUNET_NO;
1054       break;
1055
1056     case GNUNET_YES:
1057       ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
1058       break;
1059
1060     default:
1061       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1062                  "mysql exec_prepared", stmt);
1063   }
1064   return ret;
1065 }
1066
1067
1068 /**
1069  * Retrieve a message fragment range by fragment ID.
1070  *
1071  * @see GNUNET_PSYCSTORE_fragment_get()
1072  *
1073  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1074  */
1075 static int
1076 fragment_get (void *cls,
1077               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1078               uint64_t first_fragment_id,
1079               uint64_t last_fragment_id,
1080               uint64_t *returned_fragments,
1081               GNUNET_PSYCSTORE_FragmentCallback cb,
1082               void *cb_cls)
1083 {
1084   struct Plugin *plugin = cls;
1085   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1086   int ret = GNUNET_SYSERR;
1087   struct GNUNET_MY_QueryParam params_select[] = {
1088     GNUNET_MY_query_param_auto_from_type (channel_key),
1089     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1090     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1091     GNUNET_MY_query_param_end
1092   };
1093
1094   *returned_fragments = 0;
1095   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1096
1097   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1098   {
1099     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1100               "mysql_stmt_reset", stmt);
1101     return GNUNET_SYSERR;
1102   }
1103
1104   return ret;
1105 }
1106
1107
1108 /**
1109  * Retrieve a message fragment range by fragment ID.
1110  *
1111  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1112  *
1113  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1114  */
1115 static int
1116 fragment_get_latest (void *cls,
1117                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1118                      uint64_t fragment_limit,
1119                      uint64_t *returned_fragments,
1120                      GNUNET_PSYCSTORE_FragmentCallback cb,
1121                      void *cb_cls)
1122 {
1123   struct Plugin *plugin = cls;
1124
1125   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1126
1127   int ret = GNUNET_SYSERR;
1128   *returned_fragments = 0;
1129
1130   struct GNUNET_MY_QueryParam params_select[] = {
1131     GNUNET_MY_query_param_auto_from_type (channel_key),
1132     GNUNET_MY_query_param_uint64 (&fragment_limit),
1133     GNUNET_MY_query_param_end
1134   };
1135
1136   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1137
1138   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1139   {
1140     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1141               "mysql_stmt_reset", stmt);
1142     return GNUNET_SYSERR;
1143   }
1144
1145   return ret;
1146 }
1147
1148
1149 /**
1150  * Retrieve all fragments of a message ID range.
1151  *
1152  * @see GNUNET_PSYCSTORE_message_get()
1153  *
1154  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1155  */
1156 static int
1157 message_get (void *cls,
1158              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1159              uint64_t first_message_id,
1160              uint64_t last_message_id,
1161              uint64_t fragment_limit,
1162              uint64_t *returned_fragments,
1163              GNUNET_PSYCSTORE_FragmentCallback cb,
1164              void *cb_cls)
1165 {
1166   struct Plugin *plugin = cls;
1167   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1168   int ret;
1169
1170   if (0 == fragment_limit)
1171     fragment_limit = UINT64_MAX;
1172
1173   struct GNUNET_MY_QueryParam params_select[] = {
1174     GNUNET_MY_query_param_auto_from_type (channel_key),
1175     GNUNET_MY_query_param_uint64 (&first_message_id),
1176     GNUNET_MY_query_param_uint64 (&last_message_id),
1177     GNUNET_MY_query_param_uint64 (&fragment_limit),
1178     GNUNET_MY_query_param_end
1179   };
1180
1181   *returned_fragments = 0;
1182   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1183
1184   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1185   {
1186     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1187               "mysql_stmt_reset", stmt);
1188     return GNUNET_SYSERR;
1189   }
1190
1191   return ret;
1192 }
1193
1194
1195 /**
1196  * Retrieve all fragments of the latest messages.
1197  *
1198  * @see GNUNET_PSYCSTORE_message_get_latest()
1199  *
1200  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1201  */
1202 static int
1203 message_get_latest (void *cls,
1204                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1205                     uint64_t message_limit,
1206                     uint64_t *returned_fragments,
1207                     GNUNET_PSYCSTORE_FragmentCallback cb,
1208                     void *cb_cls)
1209 {
1210   struct Plugin *plugin = cls;
1211
1212   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1213
1214   int ret = GNUNET_SYSERR;
1215   *returned_fragments = 0;
1216
1217   struct GNUNET_MY_QueryParam params_select[] = {
1218     GNUNET_MY_query_param_auto_from_type (channel_key),
1219     GNUNET_MY_query_param_auto_from_type (channel_key),
1220     GNUNET_MY_query_param_uint64 (&message_limit),
1221     GNUNET_MY_query_param_end
1222   };
1223
1224   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1225
1226   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1227   {
1228     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1229               "mysql_stmt_reset", stmt);
1230     return GNUNET_SYSERR;
1231   }
1232
1233   return ret;
1234 }
1235
1236
1237 /**
1238  * Retrieve a fragment of message specified by its message ID and fragment
1239  * offset.
1240  *
1241  * @see GNUNET_PSYCSTORE_message_get_fragment()
1242  *
1243  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1244  */
1245 static int
1246 message_get_fragment (void *cls,
1247                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1248                       uint64_t message_id,
1249                       uint64_t fragment_offset,
1250                       GNUNET_PSYCSTORE_FragmentCallback cb,
1251                       void *cb_cls)
1252 {
1253   struct Plugin *plugin = cls;
1254   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1255   int sql_ret;
1256   int ret = GNUNET_SYSERR;
1257
1258   struct GNUNET_MY_QueryParam params_select[] = {
1259     GNUNET_MY_query_param_auto_from_type (channel_key),
1260     GNUNET_MY_query_param_uint64 (&message_id),
1261     GNUNET_MY_query_param_uint64 (&fragment_offset),
1262     GNUNET_MY_query_param_end
1263   };
1264
1265   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1266   switch (sql_ret)
1267   {
1268     case GNUNET_NO:
1269       ret = GNUNET_NO;
1270       break;
1271
1272     case GNUNET_OK:
1273       ret = fragment_row (stmt, cb, cb_cls, NULL);
1274       break;
1275
1276     default:
1277       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1278               "mysql execute prepared", stmt);
1279   }
1280
1281   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1282   {
1283     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1284               "mysql_stmt_reset", stmt);
1285     return GNUNET_SYSERR;
1286   }
1287
1288   return ret;
1289 }
1290
1291 /**
1292  * Retrieve the max. values of message counters for a channel.
1293  *
1294  * @see GNUNET_PSYCSTORE_counters_get()
1295  *
1296  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1297  */
1298 static int
1299 counters_message_get (void *cls,
1300                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1301                       uint64_t *max_fragment_id,
1302                       uint64_t *max_message_id,
1303                       uint64_t *max_group_generation)
1304 {
1305   struct Plugin *plugin = cls;
1306
1307   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1308
1309   int ret = GNUNET_SYSERR;
1310
1311   struct GNUNET_MY_QueryParam params_select[] = {
1312     GNUNET_MY_query_param_auto_from_type (channel_key),
1313     GNUNET_MY_query_param_end
1314   };
1315
1316   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1317   {
1318     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1319               "mysql execute prepared", stmt);
1320     return GNUNET_SYSERR;
1321   }
1322
1323   struct GNUNET_MY_ResultSpec results_select[] = {
1324     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1325     GNUNET_MY_result_spec_uint64 (max_message_id),
1326     GNUNET_MY_result_spec_uint64 (max_group_generation),
1327     GNUNET_MY_result_spec_end
1328   };
1329
1330   ret = GNUNET_MY_extract_result (stmt, results_select);
1331
1332   if (GNUNET_OK != ret)
1333   {
1334     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1335               "mysql extract_result", stmt);
1336     return GNUNET_SYSERR;
1337   }
1338
1339   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1340   {
1341     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1342               "mysql_stmt_reset", stmt);
1343     return GNUNET_SYSERR;
1344   }
1345
1346   return ret;
1347 }
1348
1349 /**
1350  * Retrieve the max. values of state counters for a channel.
1351  *
1352  * @see GNUNET_PSYCSTORE_counters_get()
1353  *
1354  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1355  */
1356 static int
1357 counters_state_get (void *cls,
1358                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1359                     uint64_t *max_state_message_id)
1360 {
1361   struct Plugin *plugin = cls;
1362
1363   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1364
1365   int ret = GNUNET_SYSERR;
1366
1367   struct GNUNET_MY_QueryParam params_select[] = {
1368     GNUNET_MY_query_param_auto_from_type (channel_key),
1369     GNUNET_MY_query_param_end
1370   };
1371
1372   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1373   {
1374     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1375               "mysql execute prepared", stmt);
1376     return GNUNET_SYSERR;
1377   }
1378
1379   struct GNUNET_MY_ResultSpec results_select[] = {
1380     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1381     GNUNET_MY_result_spec_end
1382   };
1383
1384   ret = GNUNET_MY_extract_result (stmt, results_select);
1385
1386   if (GNUNET_OK != ret)
1387   {
1388     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1389               "mysql extract_result", stmt);
1390     return GNUNET_SYSERR;
1391   }
1392
1393   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1394   {
1395     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1396               "mysql_stmt_reset", stmt);
1397     return GNUNET_SYSERR;
1398   }
1399
1400   return ret;
1401 }
1402
1403
1404 /**
1405  * Assign a value to a state variable.
1406  *
1407  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1408  */
1409 static int
1410 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1411               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1412               const char *name, const void *value, size_t value_size)
1413 {
1414   int ret = GNUNET_SYSERR;
1415
1416   struct GNUNET_MY_QueryParam params[] = {
1417     GNUNET_MY_query_param_auto_from_type (channel_key),
1418     GNUNET_MY_query_param_string (name),
1419     GNUNET_MY_query_param_fixed_size(value, value_size),
1420     GNUNET_MY_query_param_end
1421   };
1422
1423   ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1424   if (GNUNET_OK != ret)
1425   {
1426     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1427               "mysql exec_prepared", stmt);
1428     return GNUNET_SYSERR;
1429   }
1430
1431   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1432   {
1433     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1434               "mysql_stmt_reset", stmt);
1435     return GNUNET_SYSERR;
1436   }
1437
1438   return ret;
1439 }
1440
1441
1442 static int
1443 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1444                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1445                    uint64_t message_id)
1446 {
1447   struct GNUNET_MY_QueryParam params[] = {
1448     GNUNET_MY_query_param_uint64 (&message_id),
1449     GNUNET_MY_query_param_auto_from_type (channel_key),
1450     GNUNET_MY_query_param_end
1451   };
1452
1453   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1454                                             stmt,
1455                                             params))
1456   {
1457     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1458               "mysql execute prepared", stmt);
1459     return GNUNET_SYSERR;
1460   }
1461
1462   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1463   {
1464     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1465               "mysql_stmt_reset", stmt);
1466     return GNUNET_SYSERR;
1467   }
1468
1469   return GNUNET_OK;
1470 }
1471
1472
1473 /**
1474  * Begin modifying current state.
1475  */
1476 static int
1477 state_modify_begin (void *cls,
1478                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1479                     uint64_t message_id, uint64_t state_delta)
1480 {
1481   struct Plugin *plugin = cls;
1482
1483   if (state_delta > 0)
1484   {
1485     /**
1486      * We can only apply state modifiers in the current message if modifiers in
1487      * the previous stateful message (message_id - state_delta) were already
1488      * applied.
1489      */
1490
1491     uint64_t max_state_message_id = 0;
1492     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1493     switch (ret)
1494     {
1495     case GNUNET_OK:
1496     case GNUNET_NO: // no state yet
1497       ret = GNUNET_OK;
1498       break;
1499     default:
1500       return ret;
1501     }
1502
1503     if (max_state_message_id < message_id - state_delta)
1504       return GNUNET_NO; /* some stateful messages not yet applied */
1505     else if (message_id - state_delta < max_state_message_id)
1506       return GNUNET_NO; /* changes already applied */
1507   }
1508
1509   if (TRANSACTION_NONE != plugin->transaction)
1510   {
1511     /** @todo FIXME: wait for other transaction to finish  */
1512     return GNUNET_SYSERR;
1513   }
1514   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1515 }
1516
1517
1518 /**
1519  * Set the current value of state variable.
1520  *
1521  * @see GNUNET_PSYCSTORE_state_modify()
1522  *
1523  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1524  */
1525 static int
1526 state_modify_op (void *cls,
1527                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1528                  enum GNUNET_PSYC_Operator op,
1529                  const char *name, const void *value, size_t value_size)
1530 {
1531   struct Plugin *plugin = cls;
1532   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1533
1534   switch (op)
1535   {
1536   case GNUNET_PSYC_OP_ASSIGN:
1537     return state_assign (plugin, plugin->insert_state_current,
1538                          channel_key, name, value, value_size);
1539
1540   default: /** @todo implement more state operations */
1541     GNUNET_break (0);
1542     return GNUNET_SYSERR;
1543   }
1544 }
1545
1546
1547 /**
1548  * End modifying current state.
1549  */
1550 static int
1551 state_modify_end (void *cls,
1552                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1553                   uint64_t message_id)
1554 {
1555   struct Plugin *plugin = cls;
1556   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1557
1558   return
1559     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1560     && GNUNET_OK == update_message_id (plugin,
1561                                        plugin->update_max_state_message_id,
1562                                        channel_key, message_id)
1563     && GNUNET_OK == transaction_commit (plugin)
1564     ? GNUNET_OK : GNUNET_SYSERR;
1565 }
1566
1567
1568 /**
1569  * Begin state synchronization.
1570  */
1571 static int
1572 state_sync_begin (void *cls,
1573                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1574 {
1575   struct Plugin *plugin = cls;
1576   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1577 }
1578
1579
1580 /**
1581  * Assign current value of a state variable.
1582  *
1583  * @see GNUNET_PSYCSTORE_state_modify()
1584  *
1585  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1586  */
1587 static int
1588 state_sync_assign (void *cls,
1589                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1590                 const char *name, const void *value, size_t value_size)
1591 {
1592   struct Plugin *plugin = cls;
1593   return state_assign (cls, plugin->insert_state_sync,
1594                        channel_key, name, value, value_size);
1595 }
1596
1597
1598 /**
1599  * End modifying current state.
1600  */
1601 static int
1602 state_sync_end (void *cls,
1603                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1604                 uint64_t max_state_message_id,
1605                 uint64_t state_hash_message_id)
1606 {
1607   struct Plugin *plugin = cls;
1608   int ret = GNUNET_SYSERR;
1609
1610   if (TRANSACTION_NONE != plugin->transaction)
1611   {
1612     /** @todo FIXME: wait for other transaction to finish  */
1613     return GNUNET_SYSERR;
1614   }
1615
1616   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1617     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1618     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1619                                   channel_key)
1620     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1621                                   channel_key)
1622     && GNUNET_OK == update_message_id (plugin,
1623                                        plugin->update_state_hash_message_id,
1624                                        channel_key, state_hash_message_id)
1625     && GNUNET_OK == update_message_id (plugin,
1626                                        plugin->update_max_state_message_id,
1627                                        channel_key, max_state_message_id)
1628     && GNUNET_OK == transaction_commit (plugin)
1629     ? ret = GNUNET_OK
1630     : transaction_rollback (plugin);
1631   return ret;
1632 }
1633
1634
1635 /**
1636  * Delete the whole state.
1637  *
1638  * @see GNUNET_PSYCSTORE_state_reset()
1639  *
1640  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1641  */
1642 static int
1643 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1644 {
1645   struct Plugin *plugin = cls;
1646   return exec_channel (plugin, plugin->delete_state, channel_key);
1647 }
1648
1649
1650 /**
1651  * Update signed values of state variables in the state store.
1652  *
1653  * @see GNUNET_PSYCSTORE_state_hash_update()
1654  *
1655  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1656  */
1657 static int
1658 state_update_signed (void *cls,
1659                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1660 {
1661   struct Plugin *plugin = cls;
1662   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1663 }
1664
1665
1666 /**
1667  * Retrieve a state variable by name.
1668  *
1669  * @see GNUNET_PSYCSTORE_state_get()
1670  *
1671  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1672  */
1673 static int
1674 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1675            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1676 {
1677   struct Plugin *plugin = cls;
1678   int ret = GNUNET_SYSERR;
1679   int sql_ret ;
1680
1681   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1682
1683   struct GNUNET_MY_QueryParam params_select[] = {
1684     GNUNET_MY_query_param_auto_from_type (channel_key),
1685     GNUNET_MY_query_param_string (name),
1686     GNUNET_MY_query_param_end
1687   };
1688
1689   void *value_current = NULL;
1690   size_t value_size = 0;
1691
1692   struct GNUNET_MY_ResultSpec results[] = {
1693     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1694     GNUNET_MY_result_spec_end
1695   };
1696
1697   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1698   {
1699     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1700               "mysql exec_prepared", stmt);
1701   }
1702   else
1703   {
1704     sql_ret = GNUNET_MY_extract_result (stmt, results);
1705     switch (sql_ret)
1706     {
1707     case GNUNET_NO:
1708       ret = GNUNET_NO;
1709       break;
1710
1711     case GNUNET_YES:
1712       ret = cb (cb_cls, name, value_current, value_size);
1713       break;
1714
1715     default:
1716       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1717                 "mysql extract_result", stmt);
1718     }
1719   }
1720
1721   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1722   {
1723     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1724               "mysql_stmt_reset", stmt);
1725     return GNUNET_SYSERR;
1726   }
1727
1728   return ret;
1729 }
1730
1731
1732 /**
1733  * Retrieve all state variables for a channel with the given prefix.
1734  *
1735  * @see GNUNET_PSYCSTORE_state_get_prefix()
1736  *
1737  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1738  */
1739 static int
1740 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1741                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1742                   void *cb_cls)
1743 {
1744   struct Plugin *plugin = cls;
1745   int ret = GNUNET_SYSERR;
1746
1747   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1748
1749   uint32_t name_len = (uint32_t) strlen (name);
1750
1751   struct GNUNET_MY_QueryParam params_select[] = {
1752     GNUNET_MY_query_param_auto_from_type (channel_key),
1753     GNUNET_MY_query_param_string (name),
1754     GNUNET_MY_query_param_uint32 (&name_len),
1755     GNUNET_MY_query_param_string (name),
1756     GNUNET_MY_query_param_end
1757   };
1758
1759   char *name2 = "";
1760   void *value_current = NULL;
1761   size_t value_size = 0;
1762
1763   struct GNUNET_MY_ResultSpec results[] = {
1764     GNUNET_MY_result_spec_string (&name2),
1765     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1766     GNUNET_MY_result_spec_end
1767   };;
1768
1769   int sql_ret;
1770
1771   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1772   {
1773     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1774               "mysql exec_prepared", stmt);
1775     return GNUNET_SYSERR;
1776   }
1777
1778   do
1779   {
1780     sql_ret = GNUNET_MY_extract_result (stmt, results);
1781     switch (sql_ret)
1782     {
1783       case GNUNET_NO:
1784         if (ret != GNUNET_YES)
1785           ret = GNUNET_NO;
1786         break;
1787
1788       case GNUNET_YES:
1789         ret = cb (cb_cls, (const char *) name2, value_current, value_size);
1790
1791         if (ret != GNUNET_YES)
1792           sql_ret = GNUNET_NO;
1793         break;
1794
1795       default:
1796         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1797                   "mysql extract_result", stmt);
1798     }
1799   }
1800   while (sql_ret == GNUNET_YES);
1801
1802   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1803   {
1804     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1805               "mysql_stmt_reset", stmt);
1806     return GNUNET_SYSERR;
1807   }
1808
1809   return ret;
1810 }
1811
1812
1813 /**
1814  * Retrieve all signed state variables for a channel.
1815  *
1816  * @see GNUNET_PSYCSTORE_state_get_signed()
1817  *
1818  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1819  */
1820 static int
1821 state_get_signed (void *cls,
1822                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1823                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1824 {
1825   struct Plugin *plugin = cls;
1826   int ret = GNUNET_SYSERR;
1827
1828   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1829
1830   struct GNUNET_MY_QueryParam params_select[] = {
1831     GNUNET_MY_query_param_auto_from_type (channel_key),
1832     GNUNET_MY_query_param_end
1833   };
1834
1835   int sql_ret;
1836
1837   char *name = "";
1838   void *value_signed = NULL;
1839   size_t value_size = 0;
1840
1841   struct GNUNET_MY_ResultSpec results[] = {
1842     GNUNET_MY_result_spec_string (&name),
1843     GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1844     GNUNET_MY_result_spec_end
1845   };
1846
1847   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1848   {
1849     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1850               "mysql exec_prepared", stmt);
1851     return GNUNET_SYSERR;
1852   }
1853
1854   do
1855   {
1856     sql_ret = GNUNET_MY_extract_result (stmt, results);
1857     switch (sql_ret)
1858     {
1859       case GNUNET_NO:
1860         if (ret != GNUNET_YES)
1861           ret = GNUNET_NO;
1862         break;
1863
1864       case GNUNET_YES:
1865         ret = cb (cb_cls, (const char *) name, value_signed, value_size);
1866
1867         if (ret != GNUNET_YES)
1868             sql_ret = GNUNET_NO;
1869         break;
1870
1871       default:
1872          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1873               "mysql extract_result", stmt);
1874     }
1875   }
1876   while (sql_ret == GNUNET_YES);
1877
1878   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1879   {
1880     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1881               "mysql_stmt_reset", stmt);
1882     return GNUNET_SYSERR;
1883   }
1884
1885   return ret;
1886 }
1887
1888
1889 /**
1890  * Entry point for the plugin.
1891  *
1892  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1893  * @return NULL on error, otherwise the plugin context
1894  */
1895 void *
1896 libgnunet_plugin_psycstore_mysql_init (void *cls)
1897 {
1898   static struct Plugin plugin;
1899   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1900   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1901
1902   if (NULL != plugin.cfg)
1903     return NULL;                /* can only initialize once! */
1904   memset (&plugin, 0, sizeof (struct Plugin));
1905   plugin.cfg = cfg;
1906   if (GNUNET_OK != database_setup (&plugin))
1907   {
1908     database_shutdown (&plugin);
1909     return NULL;
1910   }
1911   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1912   api->cls = &plugin;
1913   api->membership_store = &mysql_membership_store;
1914   api->membership_test = &membership_test;
1915   api->fragment_store = &fragment_store;
1916   api->message_add_flags = &message_add_flags;
1917   api->fragment_get = &fragment_get;
1918   api->fragment_get_latest = &fragment_get_latest;
1919   api->message_get = &message_get;
1920   api->message_get_latest = &message_get_latest;
1921   api->message_get_fragment = &message_get_fragment;
1922   api->counters_message_get = &counters_message_get;
1923   api->counters_state_get = &counters_state_get;
1924   api->state_modify_begin = &state_modify_begin;
1925   api->state_modify_op = &state_modify_op;
1926   api->state_modify_end = &state_modify_end;
1927   api->state_sync_begin = &state_sync_begin;
1928   api->state_sync_assign = &state_sync_assign;
1929   api->state_sync_end = &state_sync_end;
1930   api->state_reset = &state_reset;
1931   api->state_update_signed = &state_update_signed;
1932   api->state_get = &state_get;
1933   api->state_get_prefix = &state_get_prefix;
1934   api->state_get_signed = &state_get_signed;
1935
1936   LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
1937   return api;
1938 }
1939
1940
1941 /**
1942  * Exit point from the plugin.
1943  *
1944  * @param cls The plugin context (as returned by "init")
1945  * @return Always NULL
1946  */
1947 void *
1948 libgnunet_plugin_psycstore_mysql_done (void *cls)
1949 {
1950   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1951   struct Plugin *plugin = api->cls;
1952
1953   database_shutdown (plugin);
1954   plugin->cfg = NULL;
1955   GNUNET_free (api);
1956   LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
1957   return NULL;
1958 }
1959
1960 /* end of plugin_psycstore_mysql.c */