Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 /**
20  * @file psycstore/plugin_psycstore_mysql.c
21  * @brief mysql-based psycstore backend
22  * @author Gabor X Toth
23  * @author Christian Grothoff
24  * @author Christophe Genevey
25  */
26
27 #include "platform.h"
28 #include "gnunet_psycstore_plugin.h"
29 #include "gnunet_psycstore_service.h"
30 #include "gnunet_multicast_service.h"
31 #include "gnunet_crypto_lib.h"
32 #include "gnunet_psyc_util_lib.h"
33 #include "psycstore.h"
34 #include "gnunet_my_lib.h"
35 #include "gnunet_mysql_lib.h"
36 #include <mysql/mysql.h>
37
38 /**
39  * After how many ms "busy" should a DB operation fail for good?  A
40  * low value makes sure that we are more responsive to requests
41  * (especially PUTs).  A high value guarantees a higher success rate
42  * (SELECTs in iterate can take several seconds despite LIMIT=1).
43  *
44  * The default value of 1s should ensure that users do not experience
45  * huge latencies while at the same time allowing operations to
46  * succeed with reasonable probability.
47  */
48 #define BUSY_TIMEOUT_MS 1000
49
50 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
51
52 /**
53  * Log an error message at log-level 'level' that indicates
54  * a failure of the command 'cmd' on file 'filename'
55  * with the message given by strerror(errno).
56  */
57 #define LOG_MYSQL(db, level, cmd, stmt)                                 \
58   do {                                                                  \
59     GNUNET_log_from (level, "psycstore-mysql",                          \
60                      _("`%s' failed at %s:%d with error: %s\n"),        \
61                      cmd, __FILE__, __LINE__,                           \
62                      mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
63   } while (0)
64
65 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
66
67 enum Transactions {
68   TRANSACTION_NONE = 0,
69   TRANSACTION_STATE_MODIFY,
70   TRANSACTION_STATE_SYNC,
71 };
72
73 /**
74  * Context for all functions in this plugin.
75  */
76 struct Plugin
77 {
78
79   const struct GNUNET_CONFIGURATION_Handle *cfg;
80
81   /**
82    * MySQL context.
83    */
84   struct GNUNET_MYSQL_Context *mc;
85
86   /**
87    * Current transaction.
88    */
89   enum Transactions transaction;
90
91   /**
92    * Precompiled SQL for channel_key_store()
93    */
94   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
95
96   /**
97    * Precompiled SQL for slave_key_store()
98    */
99   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
100
101   /**
102    * Precompiled SQL for membership_store()
103    */
104   struct GNUNET_MYSQL_StatementHandle *insert_membership;
105
106   /**
107    * Precompiled SQL for membership_test()
108    */
109   struct GNUNET_MYSQL_StatementHandle *select_membership;
110
111   /**
112    * Precompiled SQL for fragment_store()
113    */
114   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
115
116   /**
117    * Precompiled SQL for message_add_flags()
118    */
119   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
120
121   /**
122    * Precompiled SQL for fragment_get()
123    */
124   struct GNUNET_MYSQL_StatementHandle *select_fragments;
125
126   /**
127    * Precompiled SQL for fragment_get()
128    */
129   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
130
131   /**
132    * Precompiled SQL for message_get()
133    */
134   struct GNUNET_MYSQL_StatementHandle *select_messages;
135
136   /**
137    * Precompiled SQL for message_get()
138    */
139   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
140
141   /**
142    * Precompiled SQL for message_get_fragment()
143    */
144   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
145
146   /**
147    * Precompiled SQL for counters_get_message()
148    */
149   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
150
151   /**
152    * Precompiled SQL for counters_get_state()
153    */
154   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
155
156   /**
157    * Precompiled SQL for state_modify_end()
158    */
159   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
160
161   /**
162    * Precompiled SQL for state_sync_end()
163    */
164   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
165
166   /**
167    * Precompiled SQL for state_modify_op()
168    */
169   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
170
171   /**
172    * Precompiled SQL for state_modify_end()
173    */
174   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
175
176   /**
177    * Precompiled SQL for state_set_signed()
178    */
179   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
180
181   /**
182    * Precompiled SQL for state_sync()
183    */
184   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
185
186   /**
187    * Precompiled SQL for state_sync()
188    */
189   struct GNUNET_MYSQL_StatementHandle *delete_state;
190
191   /**
192    * Precompiled SQL for state_sync()
193    */
194   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
195
196   /**
197    * Precompiled SQL for state_sync()
198    */
199   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
200
201   /**
202    * Precompiled SQL for state_get_signed()
203    */
204   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
205
206   /**
207    * Precompiled SQL for state_get()
208    */
209   struct GNUNET_MYSQL_StatementHandle *select_state_one;
210
211   /**
212    * Precompiled SQL for state_get_prefix()
213    */
214   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
215
216 };
217
218 #if DEBUG_PSYCSTORE
219
220 static void
221 mysql_trace (void *cls, const char *sql)
222 {
223   LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
224 }
225
226 #endif
227
228
229 /**
230  * @brief Prepare a SQL statement
231  *
232  * @param dbh handle to the database
233  * @param sql SQL statement, UTF-8 encoded
234  * @param stmt set to the prepared statement
235  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
236  */
237 static int
238 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
239               const char *sql,
240               struct GNUNET_MYSQL_StatementHandle **stmt)
241 {
242   *stmt = GNUNET_MYSQL_statement_prepare (mc,
243                                           sql);
244
245   if (NULL == *stmt)
246   {
247     LOG (GNUNET_ERROR_TYPE_ERROR,
248          _("Error preparing SQL query: %s\n  %s\n"),
249          mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
250          sql);
251     return GNUNET_SYSERR;
252   }
253   LOG (GNUNET_ERROR_TYPE_DEBUG,
254        "Prepared `%s' / %p\n",
255        sql,
256        stmt);
257   return GNUNET_OK;
258 }
259
260
261 /**
262  * Initialize the database connections and associated
263  * data structures (create tables and indices
264  * as needed as well).
265  *
266  * @param plugin the plugin context (state for this module)
267  * @return #GNUNET_OK on success
268  */
269 static int
270 database_setup (struct Plugin *plugin)
271 {
272   /* Open database and precompile statements */
273   plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
274                                             "psycstore-mysql");
275
276   if (NULL == plugin->mc)
277   {
278     LOG (GNUNET_ERROR_TYPE_ERROR,
279          _("Unable to initialize Mysql.\n"));
280     return GNUNET_SYSERR;
281   }
282
283 #define STMT_RUN(sql) \
284   if (GNUNET_OK != \
285       GNUNET_MYSQL_statement_run (plugin->mc, \
286                                   sql)) \
287   { \
288     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
289                 _("Failed to run SQL statement `%s'\n"), \
290                 sql); \
291     return GNUNET_SYSERR; \
292   }
293
294   /* Create tables */
295   STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
296             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
297             " pub_key BLOB(32),\n"
298             " max_state_message_id BIGINT UNSIGNED,\n"
299             " state_hash_message_id BIGINT UNSIGNED,\n"
300             " PRIMARY KEY(id),\n"
301             " UNIQUE KEY(pub_key(32))\n"
302             ");");
303
304   STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
305             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
306             " pub_key BLOB(32),\n"
307             " PRIMARY KEY(id),\n"
308             " UNIQUE KEY(pub_key(32))\n"
309             ");");
310
311   STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
312             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
313             "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
314             "  did_join TINYINT NOT NULL,\n"
315             "  announced_at BIGINT UNSIGNED NOT NULL,\n"
316             "  effective_since BIGINT UNSIGNED NOT NULL,\n"
317             "  group_generation BIGINT UNSIGNED NOT NULL\n"
318             ");");
319
320 /*** FIX because IF NOT EXISTS doesn't work ***/
321   GNUNET_MYSQL_statement_run (plugin->mc,
322                               "CREATE INDEX idx_membership_channel_id_slave_id "
323                               "ON membership (channel_id, slave_id);");
324
325   /** @todo messages table: add method_name column */
326   STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
327             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
328             "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
329             "  signature BLOB,\n"
330             "  purpose BLOB,\n"
331             "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
332             "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
333                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
334             "  group_generation BIGINT UNSIGNED NOT NULL,\n"
335             "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
336             "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
337             "  data BLOB,\n"
338             "  PRIMARY KEY (channel_id, fragment_id),\n"
339             "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
340             ");");
341
342   STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
343             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
344             "  name TEXT NOT NULL,\n"
345             "  value_current BLOB,\n"
346             "  value_signed BLOB\n"
347             //"  PRIMARY KEY (channel_id, name(255))\n"
348             ");");
349
350   STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
351             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
352             "  name TEXT NOT NULL,\n"
353             "  value BLOB\n"
354             //"  PRIMARY KEY (channel_id, name(255))\n"
355             ");");
356 #undef STMT_RUN
357
358   /* Prepare statements */
359 #define PREP(stmt,handle)                                    \
360   if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
361   { \
362     GNUNET_break (0); \
363     return GNUNET_SYSERR; \
364   }
365   PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
366         &plugin->insert_channel_key);
367   PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
368         &plugin->insert_slave_key);
369   PREP ("INSERT INTO membership\n"
370         " (channel_id, slave_id, did_join, announced_at,\n"
371         "  effective_since, group_generation)\n"
372         "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
373         "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
374         "        ?, ?, ?, ?);",
375         &plugin->insert_membership);
376   PREP ("SELECT did_join FROM membership\n"
377         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
378         "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
379         "      AND effective_since <= ? AND did_join = 1\n"
380         "ORDER BY announced_at DESC LIMIT 1;",
381         &plugin->select_membership);
382
383   PREP ("INSERT IGNORE INTO messages\n"
384         " (channel_id, hop_counter, signature, purpose,\n"
385         "  fragment_id, fragment_offset, message_id,\n"
386         "  group_generation, multicast_flags, psycstore_flags, data)\n"
387         "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
388         "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
389         &plugin->insert_fragment);
390
391   PREP ("UPDATE messages\n"
392         "SET psycstore_flags = psycstore_flags | ?\n"
393         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
394         "      AND message_id = ? AND fragment_offset = 0;",
395         &plugin->update_message_flags);
396
397   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
398         "       fragment_offset, message_id, group_generation,\n"
399         "       multicast_flags, psycstore_flags, data\n"
400         "FROM messages\n"
401         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
402         "      AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
403         &plugin->select_fragments);
404
405   /** @todo select_messages: add method_prefix filter */
406   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
407         "       fragment_offset, message_id, group_generation,\n"
408         "       multicast_flags, psycstore_flags, data\n"
409         "FROM messages\n"
410         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
411         "      AND ? <= message_id AND message_id <= ?\n"
412         "LIMIT ?;",
413         &plugin->select_messages);
414
415   PREP ("SELECT * FROM\n"
416         "(SELECT hop_counter, signature, purpose, fragment_id,\n"
417         "        fragment_offset, message_id, group_generation,\n"
418         "        multicast_flags, psycstore_flags, data\n"
419         " FROM messages\n"
420         " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
421         " ORDER BY fragment_id DESC\n"
422         " LIMIT ?)\n"
423         "ORDER BY fragment_id;",
424         &plugin->select_latest_fragments);
425
426   /** @todo select_latest_messages: add method_prefix filter */
427   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
428         "       fragment_offset, message_id, group_generation,\n"
429         "        multicast_flags, psycstore_flags, data\n"
430         "FROM messages\n"
431         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
432         "      AND message_id IN\n"
433         "      (SELECT message_id\n"
434         "       FROM messages\n"
435         "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
436         "       GROUP BY message_id\n"
437         "       ORDER BY message_id\n"
438         "       DESC LIMIT ?)\n"
439         "ORDER BY fragment_id;",
440         &plugin->select_latest_messages);
441
442   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
443         "       fragment_offset, message_id, group_generation,\n"
444         "       multicast_flags, psycstore_flags, data\n"
445         "FROM messages\n"
446         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
447         "      AND message_id = ? AND fragment_offset = ?;",
448         &plugin->select_message_fragment);
449
450   PREP ("SELECT fragment_id, message_id, group_generation\n"
451         "FROM messages\n"
452         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
453         "ORDER BY fragment_id DESC LIMIT 1;",
454         &plugin->select_counters_message);
455
456   PREP ("SELECT max_state_message_id\n"
457         "FROM channels\n"
458         "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
459         &plugin->select_counters_state);
460
461   PREP ("UPDATE channels\n"
462         "SET max_state_message_id = ?\n"
463         "WHERE pub_key = ?;",
464         &plugin->update_max_state_message_id);
465
466   PREP ("UPDATE channels\n"
467         "SET state_hash_message_id = ?\n"
468         "WHERE pub_key = ?;",
469         &plugin->update_state_hash_message_id);
470
471   PREP ("REPLACE INTO state\n"
472         "  (channel_id, name, value_current, value_signed)\n"
473         "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
474         "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
475         "             (SELECT ?) AS name,\n"
476         "             (SELECT ?) AS value_current\n"
477         "     ) AS new\n"
478         "LEFT JOIN (SELECT channel_id, name, value_signed\n"
479         "           FROM state) AS old\n"
480         "ON new.channel_id = old.channel_id AND new.name = old.name;",
481         &plugin->insert_state_current);
482
483   PREP ("DELETE FROM state\n"
484         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
485         "      AND (value_current IS NULL OR length(value_current) = 0)\n"
486         "      AND (value_signed IS NULL OR length(value_signed) = 0);",
487         &plugin->delete_state_empty);
488
489   PREP ("UPDATE state\n"
490         "SET value_signed = value_current\n"
491         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
492         &plugin->update_state_signed);
493
494   PREP ("DELETE FROM state\n"
495         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
496         &plugin->delete_state);
497
498   PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
499         "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
500         &plugin->insert_state_sync);
501
502   PREP ("INSERT INTO state\n"
503         " (channel_id, name, value_current, value_signed)\n"
504         "SELECT channel_id, name, value, value\n"
505         "FROM state_sync\n"
506         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
507         &plugin->insert_state_from_sync);
508
509   PREP ("DELETE FROM state_sync\n"
510         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
511         &plugin->delete_state_sync);
512
513   PREP ("SELECT value_current\n"
514         "FROM state\n"
515         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
516         "      AND name = ?;",
517         &plugin->select_state_one);
518
519   PREP ("SELECT name, value_current\n"
520         "FROM state\n"
521         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
522         "      AND (name = ? OR substr(name, 1, ?) = ?);",
523         &plugin->select_state_prefix);
524
525   PREP ("SELECT name, value_signed\n"
526         "FROM state\n"
527         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
528         "      AND value_signed IS NOT NULL;",
529         &plugin->select_state_signed);
530 #undef PREP
531
532   return GNUNET_OK;
533 }
534
535
536 /**
537  * Shutdown database connection and associate data
538  * structures.
539  * @param plugin the plugin context (state for this module)
540  */
541 static void
542 database_shutdown (struct Plugin *plugin)
543 {
544   GNUNET_MYSQL_context_destroy (plugin->mc);
545 }
546
547
548 /**
549  * Execute a prepared statement with a @a channel_key argument.
550  *
551  * @param plugin Plugin handle.
552  * @param stmt Statement to execute.
553  * @param channel_key Public key of the channel.
554  *
555  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
556  */
557 static int
558 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
559               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
560 {
561   struct GNUNET_MY_QueryParam params[] = {
562     GNUNET_MY_query_param_auto_from_type (channel_key),
563     GNUNET_MY_query_param_end
564   };
565
566   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
567   {
568     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
569               "mysql exec_channel", stmt);
570   }
571
572   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
573   {
574     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
575               "mysql_stmt_reset", stmt);
576     return GNUNET_SYSERR;
577   }
578
579   return GNUNET_OK;
580 }
581
582
583 /**
584  * Begin a transaction.
585  */
586 static int
587 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
588 {
589   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
590   {
591     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
592     return GNUNET_SYSERR;
593   }
594
595   plugin->transaction = transaction;
596   return GNUNET_OK;
597 }
598
599
600 /**
601  * Commit current transaction.
602  */
603 static int
604 transaction_commit (struct Plugin *plugin)
605 {
606   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
607   {
608     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
609     return GNUNET_SYSERR;
610   }
611
612   plugin->transaction = TRANSACTION_NONE;
613   return GNUNET_OK;
614 }
615
616
617 /**
618  * Roll back current transaction.
619  */
620 static int
621 transaction_rollback (struct Plugin *plugin)
622 {
623   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
624   {
625     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
626     return GNUNET_SYSERR;
627   }
628
629   plugin->transaction = TRANSACTION_NONE;
630   return GNUNET_OK;
631 }
632
633
634 static int
635 channel_key_store (struct Plugin *plugin,
636                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
637 {
638   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
639
640   struct GNUNET_MY_QueryParam params[] = {
641     GNUNET_MY_query_param_auto_from_type (channel_key),
642     GNUNET_MY_query_param_end
643   };
644
645   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
646   {
647     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
648               "mysql exec_prepared", stmt);
649     return GNUNET_SYSERR;
650   }
651
652   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
653   {
654     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
655               "mysql_stmt_reset", stmt);
656     return GNUNET_SYSERR;
657   }
658
659   return GNUNET_OK;
660 }
661
662
663 static int
664 slave_key_store (struct Plugin *plugin,
665                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
666 {
667   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
668
669   struct GNUNET_MY_QueryParam params[] = {
670     GNUNET_MY_query_param_auto_from_type (slave_key),
671     GNUNET_MY_query_param_end
672   };
673
674   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
675   {
676     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
677               "mysql exec_prepared", stmt);
678     return GNUNET_SYSERR;
679   }
680
681   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
682   {
683     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
684               "mysql_stmt_reset", stmt);
685     return GNUNET_SYSERR;
686   }
687
688   return GNUNET_OK;
689 }
690
691
692 /**
693  * Store join/leave events for a PSYC channel in order to be able to answer
694  * membership test queries later.
695  *
696  * @see GNUNET_PSYCSTORE_membership_store()
697  *
698  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
699  */
700 static int
701 mysql_membership_store (void *cls,
702                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
703                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
704                          int did_join,
705                          uint64_t announced_at,
706                          uint64_t effective_since,
707                          uint64_t group_generation)
708 {
709   struct Plugin *plugin = cls;
710
711   uint32_t idid_join = (uint32_t)did_join;
712
713   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
714
715   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
716
717   if (announced_at > INT64_MAX ||
718       effective_since > INT64_MAX ||
719       group_generation > INT64_MAX)
720   {
721     GNUNET_break (0);
722     return GNUNET_SYSERR;
723   }
724
725   if (GNUNET_OK != channel_key_store (plugin, channel_key)
726       || GNUNET_OK != slave_key_store (plugin, slave_key))
727     return GNUNET_SYSERR;
728
729   struct GNUNET_MY_QueryParam params[] = {
730     GNUNET_MY_query_param_auto_from_type (channel_key),
731     GNUNET_MY_query_param_auto_from_type (slave_key),
732     GNUNET_MY_query_param_uint32 (&idid_join),
733     GNUNET_MY_query_param_uint64 (&announced_at),
734     GNUNET_MY_query_param_uint64 (&effective_since),
735     GNUNET_MY_query_param_uint64 (&group_generation),
736     GNUNET_MY_query_param_end
737   };
738
739   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
740   {
741     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
742               "mysql exec_prepared", stmt);
743     return GNUNET_SYSERR;
744   }
745
746   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
747   {
748     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
749               "mysql_stmt_reset", stmt);
750     return GNUNET_SYSERR;
751   }
752   return GNUNET_OK;
753 }
754
755 /**
756  * Test if a member was admitted to the channel at the given message ID.
757  *
758  * @see GNUNET_PSYCSTORE_membership_test()
759  *
760  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
761  *         #GNUNET_SYSERR if there was en error.
762  */
763 static int
764 membership_test (void *cls,
765                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
766                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
767                  uint64_t message_id)
768 {
769   struct Plugin *plugin = cls;
770
771   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
772
773   uint32_t did_join = 0;
774
775   int ret = GNUNET_SYSERR;
776
777   struct GNUNET_MY_QueryParam params_select[] = {
778     GNUNET_MY_query_param_auto_from_type (channel_key),
779     GNUNET_MY_query_param_auto_from_type (slave_key),
780     GNUNET_MY_query_param_uint64 (&message_id),
781     GNUNET_MY_query_param_end
782   };
783
784   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
785   {
786     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
787                 "mysql execute prepared", stmt);
788     return GNUNET_SYSERR;
789   }
790
791   struct GNUNET_MY_ResultSpec results_select[] = {
792     GNUNET_MY_result_spec_uint32 (&did_join),
793     GNUNET_MY_result_spec_end
794   };
795
796   switch (GNUNET_MY_extract_result (stmt, results_select))
797   {
798     case GNUNET_NO:
799       ret = GNUNET_NO;
800       break;
801     case GNUNET_OK:
802       ret = GNUNET_YES;
803       break;
804     default:
805       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
806                 "mysql extract_result", stmt);
807       return GNUNET_SYSERR;
808   }
809
810   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
811   {
812     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
813               "mysql_stmt_reset", stmt);
814     return GNUNET_SYSERR;
815   }
816
817   return ret;
818 }
819
820 /**
821  * Store a message fragment sent to a channel.
822  *
823  * @see GNUNET_PSYCSTORE_fragment_store()
824  *
825  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
826  */
827 static int
828 fragment_store (void *cls,
829                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
830                 const struct GNUNET_MULTICAST_MessageHeader *msg,
831                 uint32_t psycstore_flags)
832 {
833   struct Plugin *plugin = cls;
834
835   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
836
837   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
838
839   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
840
841   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
842   uint64_t message_id = GNUNET_ntohll (msg->message_id);
843   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
844
845   uint64_t hop_counter = ntohl(msg->hop_counter);
846   uint64_t flags = ntohl(msg->flags);
847
848   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
849       message_id > INT64_MAX || group_generation > INT64_MAX)
850   {
851     LOG(GNUNET_ERROR_TYPE_ERROR,
852          "Tried to store fragment with a field > INT64_MAX: "
853          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
854          message_id, group_generation);
855     GNUNET_break (0);
856     return GNUNET_SYSERR;
857   }
858
859   if (GNUNET_OK != channel_key_store (plugin, channel_key))
860     return GNUNET_SYSERR;
861
862   struct GNUNET_MY_QueryParam params_insert[] = {
863     GNUNET_MY_query_param_auto_from_type (channel_key),
864     GNUNET_MY_query_param_uint64 (&hop_counter),
865     GNUNET_MY_query_param_auto_from_type (&msg->signature),
866     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
867     GNUNET_MY_query_param_uint64 (&fragment_id),
868     GNUNET_MY_query_param_uint64 (&fragment_offset),
869     GNUNET_MY_query_param_uint64 (&message_id),
870     GNUNET_MY_query_param_uint64 (&group_generation),
871     GNUNET_MY_query_param_uint64 (&flags),
872     GNUNET_MY_query_param_uint32 (&psycstore_flags),
873     GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
874                                                   - sizeof (*msg)),
875     GNUNET_MY_query_param_end
876   };
877
878   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
879   {
880     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
881               "mysql execute prepared", stmt);
882     return GNUNET_SYSERR;
883   }
884
885   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
886   {
887     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
888               "mysql_stmt_reset", stmt);
889     return GNUNET_SYSERR;
890   }
891
892   return GNUNET_OK;
893 }
894
895 /**
896  * Set additional flags for a given message.
897  *
898  * They are OR'd with any existing flags set.
899  *
900  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
901  */
902 static int
903 message_add_flags (void *cls,
904                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
905                    uint64_t message_id,
906                    uint32_t psycstore_flags)
907 {
908   struct Plugin *plugin = cls;
909   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
910
911   int sql_ret;
912   int ret = GNUNET_SYSERR;
913
914   struct GNUNET_MY_QueryParam params_update[] = {
915     GNUNET_MY_query_param_uint32 (&psycstore_flags),
916     GNUNET_MY_query_param_auto_from_type (channel_key),
917     GNUNET_MY_query_param_uint64 (&message_id),
918     GNUNET_MY_query_param_end
919   };
920
921   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
922   switch (sql_ret)
923   {
924     case GNUNET_OK:
925       ret = GNUNET_OK;
926       break;
927
928     default:
929        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
930               "mysql execute prepared", stmt);
931   }
932
933   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
934   {
935     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
936               "mysql_stmt_reset", stmt);
937     return GNUNET_SYSERR;
938   }
939
940   return ret;
941 }
942
943
944 static int
945 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
946               GNUNET_PSYCSTORE_FragmentCallback cb,
947               void *cb_cls,
948               uint64_t *returned_fragments)
949 {
950
951   uint32_t hop_counter;
952   void *signature = NULL;
953   void *purpose = NULL;
954   size_t signature_size;
955   size_t purpose_size;
956   uint64_t fragment_id;
957   uint64_t fragment_offset;
958   uint64_t message_id;
959   uint64_t group_generation;
960   uint64_t flags;
961   void *buf;
962   size_t buf_size;
963   int ret = GNUNET_SYSERR;
964   int sql_ret;
965   struct GNUNET_MULTICAST_MessageHeader *mp;
966   uint64_t msg_flags;
967   struct GNUNET_MY_ResultSpec results[] = {
968     GNUNET_MY_result_spec_uint32 (&hop_counter),
969     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
970     GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
971     GNUNET_MY_result_spec_uint64 (&fragment_id),
972     GNUNET_MY_result_spec_uint64 (&fragment_offset),
973     GNUNET_MY_result_spec_uint64 (&message_id),
974     GNUNET_MY_result_spec_uint64 (&group_generation),
975     GNUNET_MY_result_spec_uint64 (&msg_flags),
976     GNUNET_MY_result_spec_uint64 (&flags),
977     GNUNET_MY_result_spec_variable_size (&buf,
978                                          &buf_size),
979     GNUNET_MY_result_spec_end
980   };
981
982   do
983   {
984     sql_ret = GNUNET_MY_extract_result (stmt, results);
985     switch (sql_ret)
986     {
987     case GNUNET_NO:
988       if (ret != GNUNET_YES)
989         ret = GNUNET_NO;
990       break;
991
992     case GNUNET_YES:
993       mp = GNUNET_malloc (sizeof (*mp) + buf_size);
994
995       mp->header.size = htons (sizeof (*mp) + buf_size);
996       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
997       mp->hop_counter = htonl (hop_counter);
998       GNUNET_memcpy (&mp->signature,
999                      signature,
1000                      signature_size);
1001       GNUNET_memcpy (&mp->purpose,
1002                      purpose,
1003                      purpose_size);
1004       mp->fragment_id = GNUNET_htonll (fragment_id);
1005       mp->fragment_offset = GNUNET_htonll (fragment_offset);
1006       mp->message_id = GNUNET_htonll (message_id);
1007       mp->group_generation = GNUNET_htonll (group_generation);
1008       mp->flags = htonl(msg_flags);
1009
1010       GNUNET_memcpy (&mp[1],
1011                      buf,
1012                      buf_size);
1013       ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1014       if (NULL != returned_fragments)
1015         (*returned_fragments)++;
1016       GNUNET_MY_cleanup_result (results);
1017       break;
1018
1019     default:
1020       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1021                  "mysql extract_result", stmt);
1022     }
1023   }
1024   while (GNUNET_YES == sql_ret);
1025
1026   // for debugging
1027   if (GNUNET_NO == ret)
1028     GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1029                "Empty result set\n");
1030
1031   return ret;
1032 }
1033
1034
1035 static int
1036 fragment_select (struct Plugin *plugin,
1037                  struct GNUNET_MYSQL_StatementHandle *stmt,
1038                  struct GNUNET_MY_QueryParam *params,
1039                  uint64_t *returned_fragments,
1040                  GNUNET_PSYCSTORE_FragmentCallback cb,
1041                  void *cb_cls)
1042 {
1043   int ret = GNUNET_SYSERR;
1044   int sql_ret;
1045
1046   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1047   switch (sql_ret)
1048   {
1049     case GNUNET_NO:
1050       if (ret != GNUNET_YES)
1051         ret = GNUNET_NO;
1052       break;
1053
1054     case GNUNET_YES:
1055       ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
1056       break;
1057
1058     default:
1059       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1060                  "mysql exec_prepared", stmt);
1061   }
1062   return ret;
1063 }
1064
1065
1066 /**
1067  * Retrieve a message fragment range by fragment ID.
1068  *
1069  * @see GNUNET_PSYCSTORE_fragment_get()
1070  *
1071  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1072  */
1073 static int
1074 fragment_get (void *cls,
1075               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1076               uint64_t first_fragment_id,
1077               uint64_t last_fragment_id,
1078               uint64_t *returned_fragments,
1079               GNUNET_PSYCSTORE_FragmentCallback cb,
1080               void *cb_cls)
1081 {
1082   struct Plugin *plugin = cls;
1083   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1084   int ret = GNUNET_SYSERR;
1085   struct GNUNET_MY_QueryParam params_select[] = {
1086     GNUNET_MY_query_param_auto_from_type (channel_key),
1087     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1088     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1089     GNUNET_MY_query_param_end
1090   };
1091
1092   *returned_fragments = 0;
1093   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1094
1095   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1096   {
1097     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1098               "mysql_stmt_reset", stmt);
1099     return GNUNET_SYSERR;
1100   }
1101
1102   return ret;
1103 }
1104
1105
1106 /**
1107  * Retrieve a message fragment range by fragment ID.
1108  *
1109  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1110  *
1111  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1112  */
1113 static int
1114 fragment_get_latest (void *cls,
1115                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1116                      uint64_t fragment_limit,
1117                      uint64_t *returned_fragments,
1118                      GNUNET_PSYCSTORE_FragmentCallback cb,
1119                      void *cb_cls)
1120 {
1121   struct Plugin *plugin = cls;
1122
1123   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1124
1125   int ret = GNUNET_SYSERR;
1126   *returned_fragments = 0;
1127
1128   struct GNUNET_MY_QueryParam params_select[] = {
1129     GNUNET_MY_query_param_auto_from_type (channel_key),
1130     GNUNET_MY_query_param_uint64 (&fragment_limit),
1131     GNUNET_MY_query_param_end
1132   };
1133
1134   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1135
1136   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1137   {
1138     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1139               "mysql_stmt_reset", stmt);
1140     return GNUNET_SYSERR;
1141   }
1142
1143   return ret;
1144 }
1145
1146
1147 /**
1148  * Retrieve all fragments of a message ID range.
1149  *
1150  * @see GNUNET_PSYCSTORE_message_get()
1151  *
1152  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1153  */
1154 static int
1155 message_get (void *cls,
1156              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1157              uint64_t first_message_id,
1158              uint64_t last_message_id,
1159              uint64_t fragment_limit,
1160              uint64_t *returned_fragments,
1161              GNUNET_PSYCSTORE_FragmentCallback cb,
1162              void *cb_cls)
1163 {
1164   struct Plugin *plugin = cls;
1165   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1166   int ret;
1167
1168   if (0 == fragment_limit)
1169     fragment_limit = UINT64_MAX;
1170
1171   struct GNUNET_MY_QueryParam params_select[] = {
1172     GNUNET_MY_query_param_auto_from_type (channel_key),
1173     GNUNET_MY_query_param_uint64 (&first_message_id),
1174     GNUNET_MY_query_param_uint64 (&last_message_id),
1175     GNUNET_MY_query_param_uint64 (&fragment_limit),
1176     GNUNET_MY_query_param_end
1177   };
1178
1179   *returned_fragments = 0;
1180   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1181
1182   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1183   {
1184     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1185               "mysql_stmt_reset", stmt);
1186     return GNUNET_SYSERR;
1187   }
1188
1189   return ret;
1190 }
1191
1192
1193 /**
1194  * Retrieve all fragments of the latest messages.
1195  *
1196  * @see GNUNET_PSYCSTORE_message_get_latest()
1197  *
1198  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1199  */
1200 static int
1201 message_get_latest (void *cls,
1202                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1203                     uint64_t message_limit,
1204                     uint64_t *returned_fragments,
1205                     GNUNET_PSYCSTORE_FragmentCallback cb,
1206                     void *cb_cls)
1207 {
1208   struct Plugin *plugin = cls;
1209
1210   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1211
1212   int ret = GNUNET_SYSERR;
1213   *returned_fragments = 0;
1214
1215   struct GNUNET_MY_QueryParam params_select[] = {
1216     GNUNET_MY_query_param_auto_from_type (channel_key),
1217     GNUNET_MY_query_param_auto_from_type (channel_key),
1218     GNUNET_MY_query_param_uint64 (&message_limit),
1219     GNUNET_MY_query_param_end
1220   };
1221
1222   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1223
1224   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1225   {
1226     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1227               "mysql_stmt_reset", stmt);
1228     return GNUNET_SYSERR;
1229   }
1230
1231   return ret;
1232 }
1233
1234
1235 /**
1236  * Retrieve a fragment of message specified by its message ID and fragment
1237  * offset.
1238  *
1239  * @see GNUNET_PSYCSTORE_message_get_fragment()
1240  *
1241  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1242  */
1243 static int
1244 message_get_fragment (void *cls,
1245                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1246                       uint64_t message_id,
1247                       uint64_t fragment_offset,
1248                       GNUNET_PSYCSTORE_FragmentCallback cb,
1249                       void *cb_cls)
1250 {
1251   struct Plugin *plugin = cls;
1252   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1253   int sql_ret;
1254   int ret = GNUNET_SYSERR;
1255
1256   struct GNUNET_MY_QueryParam params_select[] = {
1257     GNUNET_MY_query_param_auto_from_type (channel_key),
1258     GNUNET_MY_query_param_uint64 (&message_id),
1259     GNUNET_MY_query_param_uint64 (&fragment_offset),
1260     GNUNET_MY_query_param_end
1261   };
1262
1263   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1264   switch (sql_ret)
1265   {
1266     case GNUNET_NO:
1267       ret = GNUNET_NO;
1268       break;
1269
1270     case GNUNET_OK:
1271       ret = fragment_row (stmt, cb, cb_cls, NULL);
1272       break;
1273
1274     default:
1275       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1276               "mysql execute prepared", stmt);
1277   }
1278
1279   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1280   {
1281     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1282               "mysql_stmt_reset", stmt);
1283     return GNUNET_SYSERR;
1284   }
1285
1286   return ret;
1287 }
1288
1289 /**
1290  * Retrieve the max. values of message counters for a channel.
1291  *
1292  * @see GNUNET_PSYCSTORE_counters_get()
1293  *
1294  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1295  */
1296 static int
1297 counters_message_get (void *cls,
1298                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1299                       uint64_t *max_fragment_id,
1300                       uint64_t *max_message_id,
1301                       uint64_t *max_group_generation)
1302 {
1303   struct Plugin *plugin = cls;
1304
1305   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1306
1307   int ret = GNUNET_SYSERR;
1308
1309   struct GNUNET_MY_QueryParam params_select[] = {
1310     GNUNET_MY_query_param_auto_from_type (channel_key),
1311     GNUNET_MY_query_param_end
1312   };
1313
1314   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1315   {
1316     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1317               "mysql execute prepared", stmt);
1318     return GNUNET_SYSERR;
1319   }
1320
1321   struct GNUNET_MY_ResultSpec results_select[] = {
1322     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1323     GNUNET_MY_result_spec_uint64 (max_message_id),
1324     GNUNET_MY_result_spec_uint64 (max_group_generation),
1325     GNUNET_MY_result_spec_end
1326   };
1327
1328   ret = GNUNET_MY_extract_result (stmt, results_select);
1329
1330   if (GNUNET_OK != ret)
1331   {
1332     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1333               "mysql extract_result", stmt);
1334     return GNUNET_SYSERR;
1335   }
1336
1337   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1338   {
1339     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1340               "mysql_stmt_reset", stmt);
1341     return GNUNET_SYSERR;
1342   }
1343
1344   return ret;
1345 }
1346
1347 /**
1348  * Retrieve the max. values of state counters for a channel.
1349  *
1350  * @see GNUNET_PSYCSTORE_counters_get()
1351  *
1352  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1353  */
1354 static int
1355 counters_state_get (void *cls,
1356                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1357                     uint64_t *max_state_message_id)
1358 {
1359   struct Plugin *plugin = cls;
1360
1361   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1362
1363   int ret = GNUNET_SYSERR;
1364
1365   struct GNUNET_MY_QueryParam params_select[] = {
1366     GNUNET_MY_query_param_auto_from_type (channel_key),
1367     GNUNET_MY_query_param_end
1368   };
1369
1370   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1371   {
1372     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1373               "mysql execute prepared", stmt);
1374     return GNUNET_SYSERR;
1375   }
1376
1377   struct GNUNET_MY_ResultSpec results_select[] = {
1378     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1379     GNUNET_MY_result_spec_end
1380   };
1381
1382   ret = GNUNET_MY_extract_result (stmt, results_select);
1383
1384   if (GNUNET_OK != ret)
1385   {
1386     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1387               "mysql extract_result", stmt);
1388     return GNUNET_SYSERR;
1389   }
1390
1391   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1392   {
1393     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1394               "mysql_stmt_reset", stmt);
1395     return GNUNET_SYSERR;
1396   }
1397
1398   return ret;
1399 }
1400
1401
1402 /**
1403  * Assign a value to a state variable.
1404  *
1405  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1406  */
1407 static int
1408 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1409               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1410               const char *name, const void *value, size_t value_size)
1411 {
1412   int ret = GNUNET_SYSERR;
1413
1414   struct GNUNET_MY_QueryParam params[] = {
1415     GNUNET_MY_query_param_auto_from_type (channel_key),
1416     GNUNET_MY_query_param_string (name),
1417     GNUNET_MY_query_param_fixed_size(value, value_size),
1418     GNUNET_MY_query_param_end
1419   };
1420
1421   ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1422   if (GNUNET_OK != ret)
1423   {
1424     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1425               "mysql exec_prepared", stmt);
1426     return GNUNET_SYSERR;
1427   }
1428
1429   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1430   {
1431     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1432               "mysql_stmt_reset", stmt);
1433     return GNUNET_SYSERR;
1434   }
1435
1436   return ret;
1437 }
1438
1439
1440 static int
1441 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1442                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1443                    uint64_t message_id)
1444 {
1445   struct GNUNET_MY_QueryParam params[] = {
1446     GNUNET_MY_query_param_uint64 (&message_id),
1447     GNUNET_MY_query_param_auto_from_type (channel_key),
1448     GNUNET_MY_query_param_end
1449   };
1450
1451   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1452                                             stmt,
1453                                             params))
1454   {
1455     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1456               "mysql execute prepared", stmt);
1457     return GNUNET_SYSERR;
1458   }
1459
1460   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1461   {
1462     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1463               "mysql_stmt_reset", stmt);
1464     return GNUNET_SYSERR;
1465   }
1466
1467   return GNUNET_OK;
1468 }
1469
1470
1471 /**
1472  * Begin modifying current state.
1473  */
1474 static int
1475 state_modify_begin (void *cls,
1476                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1477                     uint64_t message_id, uint64_t state_delta)
1478 {
1479   struct Plugin *plugin = cls;
1480
1481   if (state_delta > 0)
1482   {
1483     /**
1484      * We can only apply state modifiers in the current message if modifiers in
1485      * the previous stateful message (message_id - state_delta) were already
1486      * applied.
1487      */
1488
1489     uint64_t max_state_message_id = 0;
1490     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1491     switch (ret)
1492     {
1493     case GNUNET_OK:
1494     case GNUNET_NO: // no state yet
1495       ret = GNUNET_OK;
1496       break;
1497     default:
1498       return ret;
1499     }
1500
1501     if (max_state_message_id < message_id - state_delta)
1502       return GNUNET_NO; /* some stateful messages not yet applied */
1503     else if (message_id - state_delta < max_state_message_id)
1504       return GNUNET_NO; /* changes already applied */
1505   }
1506
1507   if (TRANSACTION_NONE != plugin->transaction)
1508   {
1509     /** @todo FIXME: wait for other transaction to finish  */
1510     return GNUNET_SYSERR;
1511   }
1512   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1513 }
1514
1515
1516 /**
1517  * Set the current value of state variable.
1518  *
1519  * @see GNUNET_PSYCSTORE_state_modify()
1520  *
1521  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1522  */
1523 static int
1524 state_modify_op (void *cls,
1525                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1526                  enum GNUNET_PSYC_Operator op,
1527                  const char *name, const void *value, size_t value_size)
1528 {
1529   struct Plugin *plugin = cls;
1530   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1531
1532   switch (op)
1533   {
1534   case GNUNET_PSYC_OP_ASSIGN:
1535     return state_assign (plugin, plugin->insert_state_current,
1536                          channel_key, name, value, value_size);
1537
1538   default: /** @todo implement more state operations */
1539     GNUNET_break (0);
1540     return GNUNET_SYSERR;
1541   }
1542 }
1543
1544
1545 /**
1546  * End modifying current state.
1547  */
1548 static int
1549 state_modify_end (void *cls,
1550                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1551                   uint64_t message_id)
1552 {
1553   struct Plugin *plugin = cls;
1554   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1555
1556   return
1557     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1558     && GNUNET_OK == update_message_id (plugin,
1559                                        plugin->update_max_state_message_id,
1560                                        channel_key, message_id)
1561     && GNUNET_OK == transaction_commit (plugin)
1562     ? GNUNET_OK : GNUNET_SYSERR;
1563 }
1564
1565
1566 /**
1567  * Begin state synchronization.
1568  */
1569 static int
1570 state_sync_begin (void *cls,
1571                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1572 {
1573   struct Plugin *plugin = cls;
1574   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1575 }
1576
1577
1578 /**
1579  * Assign current value of a state variable.
1580  *
1581  * @see GNUNET_PSYCSTORE_state_modify()
1582  *
1583  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1584  */
1585 static int
1586 state_sync_assign (void *cls,
1587                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1588                 const char *name, const void *value, size_t value_size)
1589 {
1590   struct Plugin *plugin = cls;
1591   return state_assign (cls, plugin->insert_state_sync,
1592                        channel_key, name, value, value_size);
1593 }
1594
1595
1596 /**
1597  * End modifying current state.
1598  */
1599 static int
1600 state_sync_end (void *cls,
1601                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1602                 uint64_t max_state_message_id,
1603                 uint64_t state_hash_message_id)
1604 {
1605   struct Plugin *plugin = cls;
1606   int ret = GNUNET_SYSERR;
1607
1608   if (TRANSACTION_NONE != plugin->transaction)
1609   {
1610     /** @todo FIXME: wait for other transaction to finish  */
1611     return GNUNET_SYSERR;
1612   }
1613
1614   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1615     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1616     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1617                                   channel_key)
1618     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1619                                   channel_key)
1620     && GNUNET_OK == update_message_id (plugin,
1621                                        plugin->update_state_hash_message_id,
1622                                        channel_key, state_hash_message_id)
1623     && GNUNET_OK == update_message_id (plugin,
1624                                        plugin->update_max_state_message_id,
1625                                        channel_key, max_state_message_id)
1626     && GNUNET_OK == transaction_commit (plugin)
1627     ? ret = GNUNET_OK
1628     : transaction_rollback (plugin);
1629   return ret;
1630 }
1631
1632
1633 /**
1634  * Delete the whole state.
1635  *
1636  * @see GNUNET_PSYCSTORE_state_reset()
1637  *
1638  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1639  */
1640 static int
1641 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1642 {
1643   struct Plugin *plugin = cls;
1644   return exec_channel (plugin, plugin->delete_state, channel_key);
1645 }
1646
1647
1648 /**
1649  * Update signed values of state variables in the state store.
1650  *
1651  * @see GNUNET_PSYCSTORE_state_hash_update()
1652  *
1653  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1654  */
1655 static int
1656 state_update_signed (void *cls,
1657                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1658 {
1659   struct Plugin *plugin = cls;
1660   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1661 }
1662
1663
1664 /**
1665  * Retrieve a state variable by name.
1666  *
1667  * @see GNUNET_PSYCSTORE_state_get()
1668  *
1669  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1670  */
1671 static int
1672 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1673            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1674 {
1675   struct Plugin *plugin = cls;
1676   int ret = GNUNET_SYSERR;
1677   int sql_ret ;
1678
1679   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1680
1681   struct GNUNET_MY_QueryParam params_select[] = {
1682     GNUNET_MY_query_param_auto_from_type (channel_key),
1683     GNUNET_MY_query_param_string (name),
1684     GNUNET_MY_query_param_end
1685   };
1686
1687   void *value_current = NULL;
1688   size_t value_size = 0;
1689
1690   struct GNUNET_MY_ResultSpec results[] = {
1691     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1692     GNUNET_MY_result_spec_end
1693   };
1694
1695   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1696   {
1697     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1698               "mysql exec_prepared", stmt);
1699   }
1700   else
1701   {
1702     sql_ret = GNUNET_MY_extract_result (stmt, results);
1703     switch (sql_ret)
1704     {
1705     case GNUNET_NO:
1706       ret = GNUNET_NO;
1707       break;
1708
1709     case GNUNET_YES:
1710       ret = cb (cb_cls, name, value_current, value_size);
1711       break;
1712
1713     default:
1714       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1715                 "mysql extract_result", stmt);
1716     }
1717   }
1718
1719   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1720   {
1721     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1722               "mysql_stmt_reset", stmt);
1723     return GNUNET_SYSERR;
1724   }
1725
1726   return ret;
1727 }
1728
1729
1730 /**
1731  * Retrieve all state variables for a channel with the given prefix.
1732  *
1733  * @see GNUNET_PSYCSTORE_state_get_prefix()
1734  *
1735  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1736  */
1737 static int
1738 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1739                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1740                   void *cb_cls)
1741 {
1742   struct Plugin *plugin = cls;
1743   int ret = GNUNET_SYSERR;
1744
1745   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1746
1747   uint32_t name_len = (uint32_t) strlen (name);
1748
1749   struct GNUNET_MY_QueryParam params_select[] = {
1750     GNUNET_MY_query_param_auto_from_type (channel_key),
1751     GNUNET_MY_query_param_string (name),
1752     GNUNET_MY_query_param_uint32 (&name_len),
1753     GNUNET_MY_query_param_string (name),
1754     GNUNET_MY_query_param_end
1755   };
1756
1757   char *name2 = "";
1758   void *value_current = NULL;
1759   size_t value_size = 0;
1760
1761   struct GNUNET_MY_ResultSpec results[] = {
1762     GNUNET_MY_result_spec_string (&name2),
1763     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1764     GNUNET_MY_result_spec_end
1765   };;
1766
1767   int sql_ret;
1768
1769   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1770   {
1771     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1772               "mysql exec_prepared", stmt);
1773     return GNUNET_SYSERR;
1774   }
1775
1776   do
1777   {
1778     sql_ret = GNUNET_MY_extract_result (stmt, results);
1779     switch (sql_ret)
1780     {
1781       case GNUNET_NO:
1782         if (ret != GNUNET_YES)
1783           ret = GNUNET_NO;
1784         break;
1785
1786       case GNUNET_YES:
1787         ret = cb (cb_cls, (const char *) name2, value_current, value_size);
1788
1789         if (ret != GNUNET_YES)
1790           sql_ret = GNUNET_NO;
1791         break;
1792
1793       default:
1794         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1795                   "mysql extract_result", stmt);
1796     }
1797   }
1798   while (sql_ret == GNUNET_YES);
1799
1800   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1801   {
1802     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1803               "mysql_stmt_reset", stmt);
1804     return GNUNET_SYSERR;
1805   }
1806
1807   return ret;
1808 }
1809
1810
1811 /**
1812  * Retrieve all signed state variables for a channel.
1813  *
1814  * @see GNUNET_PSYCSTORE_state_get_signed()
1815  *
1816  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1817  */
1818 static int
1819 state_get_signed (void *cls,
1820                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1821                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1822 {
1823   struct Plugin *plugin = cls;
1824   int ret = GNUNET_SYSERR;
1825
1826   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1827
1828   struct GNUNET_MY_QueryParam params_select[] = {
1829     GNUNET_MY_query_param_auto_from_type (channel_key),
1830     GNUNET_MY_query_param_end
1831   };
1832
1833   int sql_ret;
1834
1835   char *name = "";
1836   void *value_signed = NULL;
1837   size_t value_size = 0;
1838
1839   struct GNUNET_MY_ResultSpec results[] = {
1840     GNUNET_MY_result_spec_string (&name),
1841     GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1842     GNUNET_MY_result_spec_end
1843   };
1844
1845   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1846   {
1847     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1848               "mysql exec_prepared", stmt);
1849     return GNUNET_SYSERR;
1850   }
1851
1852   do
1853   {
1854     sql_ret = GNUNET_MY_extract_result (stmt, results);
1855     switch (sql_ret)
1856     {
1857       case GNUNET_NO:
1858         if (ret != GNUNET_YES)
1859           ret = GNUNET_NO;
1860         break;
1861
1862       case GNUNET_YES:
1863         ret = cb (cb_cls, (const char *) name, value_signed, value_size);
1864
1865         if (ret != GNUNET_YES)
1866             sql_ret = GNUNET_NO;
1867         break;
1868
1869       default:
1870          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1871               "mysql extract_result", stmt);
1872     }
1873   }
1874   while (sql_ret == GNUNET_YES);
1875
1876   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1877   {
1878     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1879               "mysql_stmt_reset", stmt);
1880     return GNUNET_SYSERR;
1881   }
1882
1883   return ret;
1884 }
1885
1886
1887 /**
1888  * Entry point for the plugin.
1889  *
1890  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1891  * @return NULL on error, otherwise the plugin context
1892  */
1893 void *
1894 libgnunet_plugin_psycstore_mysql_init (void *cls)
1895 {
1896   static struct Plugin plugin;
1897   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1898   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1899
1900   if (NULL != plugin.cfg)
1901     return NULL;                /* can only initialize once! */
1902   memset (&plugin, 0, sizeof (struct Plugin));
1903   plugin.cfg = cfg;
1904   if (GNUNET_OK != database_setup (&plugin))
1905   {
1906     database_shutdown (&plugin);
1907     return NULL;
1908   }
1909   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1910   api->cls = &plugin;
1911   api->membership_store = &mysql_membership_store;
1912   api->membership_test = &membership_test;
1913   api->fragment_store = &fragment_store;
1914   api->message_add_flags = &message_add_flags;
1915   api->fragment_get = &fragment_get;
1916   api->fragment_get_latest = &fragment_get_latest;
1917   api->message_get = &message_get;
1918   api->message_get_latest = &message_get_latest;
1919   api->message_get_fragment = &message_get_fragment;
1920   api->counters_message_get = &counters_message_get;
1921   api->counters_state_get = &counters_state_get;
1922   api->state_modify_begin = &state_modify_begin;
1923   api->state_modify_op = &state_modify_op;
1924   api->state_modify_end = &state_modify_end;
1925   api->state_sync_begin = &state_sync_begin;
1926   api->state_sync_assign = &state_sync_assign;
1927   api->state_sync_end = &state_sync_end;
1928   api->state_reset = &state_reset;
1929   api->state_update_signed = &state_update_signed;
1930   api->state_get = &state_get;
1931   api->state_get_prefix = &state_get_prefix;
1932   api->state_get_signed = &state_get_signed;
1933
1934   LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
1935   return api;
1936 }
1937
1938
1939 /**
1940  * Exit point from the plugin.
1941  *
1942  * @param cls The plugin context (as returned by "init")
1943  * @return Always NULL
1944  */
1945 void *
1946 libgnunet_plugin_psycstore_mysql_done (void *cls)
1947 {
1948   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1949   struct Plugin *plugin = api->cls;
1950
1951   database_shutdown (plugin);
1952   plugin->cfg = NULL;
1953   GNUNET_free (api);
1954   LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
1955   return NULL;
1956 }
1957
1958 /* end of plugin_psycstore_mysql.c */