use gnunetcheck for the DB
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_mysql.c
23  * @brief mysql-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  */
28
29 #include "platform.h"
30 #include "gnunet_psycstore_plugin.h"
31 #include "gnunet_psycstore_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_crypto_lib.h"
34 #include "gnunet_psyc_util_lib.h"
35 #include "psycstore.h"
36 #include "gnunet_my_lib.h"
37 #include "gnunet_mysql_lib.h"
38 #include <mysql/mysql.h>
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 /**
55  * Log an error message at log-level 'level' that indicates
56  * a failure of the command 'cmd' on file 'filename'
57  * with the message given by strerror(errno).
58  */
59 #define LOG_MYSQL(db, level, cmd, stmt)                                 \
60   do {                                                                  \
61     GNUNET_log_from (level, "psycstore-mysql",                          \
62                      _("`%s' failed at %s:%d with error: %s\n"),        \
63                      cmd, __FILE__, __LINE__,                           \
64                      mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
65   } while (0)
66
67 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
68
69 enum Transactions {
70   TRANSACTION_NONE = 0,
71   TRANSACTION_STATE_MODIFY,
72   TRANSACTION_STATE_SYNC,
73 };
74
75 /**
76  * Context for all functions in this plugin.
77  */
78 struct Plugin
79 {
80
81   const struct GNUNET_CONFIGURATION_Handle *cfg;
82
83   /**
84    * MySQL context.
85    */
86   struct GNUNET_MYSQL_Context *mc;
87
88   /**
89    * Current transaction.
90    */
91   enum Transactions transaction;
92
93   struct GNUNET_MYSQL_StatementHandle *transaction_begin;
94
95   struct GNUNET_MYSQL_StatementHandle *transaction_commit;
96
97   struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
98
99   /**
100    * Precompiled SQL for channel_key_store()
101    */
102   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
103
104   /**
105    * Precompiled SQL for slave_key_store()
106    */
107   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   struct GNUNET_MYSQL_StatementHandle *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   struct GNUNET_MYSQL_StatementHandle *select_membership;
118
119   /**
120    * Precompiled SQL for fragment_store()
121    */
122   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
123
124   /**
125    * Precompiled SQL for message_add_flags()
126    */
127   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
128
129   /**
130    * Precompiled SQL for fragment_get()
131    */
132   struct GNUNET_MYSQL_StatementHandle *select_fragments;
133
134   /**
135    * Precompiled SQL for fragment_get()
136    */
137   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
138
139   /**
140    * Precompiled SQL for message_get()
141    */
142   struct GNUNET_MYSQL_StatementHandle *select_messages;
143
144   /**
145    * Precompiled SQL for message_get()
146    */
147   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
148
149   /**
150    * Precompiled SQL for message_get_fragment()
151    */
152   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
153
154   /**
155    * Precompiled SQL for counters_get_message()
156    */
157   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
158
159   /**
160    * Precompiled SQL for counters_get_state()
161    */
162   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
163
164   /**
165    * Precompiled SQL for state_modify_end()
166    */
167   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
168
169   /**
170    * Precompiled SQL for state_sync_end()
171    */
172   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
173
174   /**
175    * Precompiled SQL for state_modify_op()
176    */
177   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
178
179   /**
180    * Precompiled SQL for state_modify_end()
181    */
182   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
183
184   /**
185    * Precompiled SQL for state_set_signed()
186    */
187   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
188
189   /**
190    * Precompiled SQL for state_sync()
191    */
192   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
193
194   /**
195    * Precompiled SQL for state_sync()
196    */
197   struct GNUNET_MYSQL_StatementHandle *delete_state;
198
199   /**
200    * Precompiled SQL for state_sync()
201    */
202   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
203
204   /**
205    * Precompiled SQL for state_sync()
206    */
207   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
208
209   /**
210    * Precompiled SQL for state_get_signed()
211    */
212   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
213
214   /**
215    * Precompiled SQL for state_get()
216    */
217   struct GNUNET_MYSQL_StatementHandle *select_state_one;
218
219   /**
220    * Precompiled SQL for state_get_prefix()
221    */
222   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
223
224 };
225
226 #if DEBUG_PSYCSTORE
227
228 static void
229 mysql_trace (void *cls, const char *sql)
230 {
231   LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
232 }
233
234 #endif
235
236
237 /**
238  * @brief Prepare a SQL statement
239  *
240  * @param dbh handle to the database
241  * @param sql SQL statement, UTF-8 encoded
242  * @param stmt set to the prepared statement
243  * @return 0 on success
244  */
245 static int
246 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
247               const char *sql,
248               struct GNUNET_MYSQL_StatementHandle **stmt)
249 {
250   *stmt = GNUNET_MYSQL_statement_prepare (mc,
251                                           sql);
252
253   LOG(GNUNET_ERROR_TYPE_DEBUG,
254        "Prepared `%s' / %p\n", sql, stmt);
255   if(NULL == *stmt)
256     LOG(GNUNET_ERROR_TYPE_ERROR,
257    _("Error preparing SQL query: %s\n  %s\n"),
258    mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
259
260   return 0;
261 }
262
263
264 /**
265  * Initialize the database connections and associated
266  * data structures (create tables and indices
267  * as needed as well).
268  *
269  * @param plugin the plugin context (state for this module)
270  * @return #GNUNET_OK on success
271  */
272 static int
273 database_setup (struct Plugin *plugin)
274 {
275   /* Open database and precompile statements */
276   plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
277                                             "psycstore-mysql");
278
279   if (NULL == plugin->mc)
280   {
281     LOG (GNUNET_ERROR_TYPE_ERROR,
282          _("Unable to initialize Mysql.\n"));
283     return GNUNET_SYSERR;
284   }
285
286 #define STMT_RUN(sql) \
287   if (GNUNET_OK != \
288       GNUNET_MYSQL_statement_run (plugin->mc, \
289                                   sql)) \
290   { \
291     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
292                 _("Failed to run SQL statement `%s'\n"), \
293                 sql); \
294     return GNUNET_SYSERR; \
295   }
296
297   /* Create tables */
298   STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
299             " id INT AUTO_INCREMENT,\n"
300             " pub_key BLOB,\n"
301             " max_state_message_id INT,\n"
302             " state_hash_message_id INT,\n"
303             " PRIMARY KEY(id),\n"
304             " UNIQUE KEY(pub_key(5))\n"
305             ");");
306
307   STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
308             " id INT AUTO_INCREMENT,\n"
309             " pub_key BLOB,\n"
310             " PRIMARY KEY(id),\n"
311             " UNIQUE KEY(pub_key(5))\n"
312             ");");
313
314   STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
315             "  channel_id INT NOT NULL REFERENCES channels(id),\n"
316             "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
317             "  did_join INT NOT NULL,\n"
318             "  announced_at BIGINT UNSIGNED NOT NULL,\n"
319             "  effective_since BIGINT UNSIGNED NOT NULL,\n"
320             "  group_generation BIGINT UNSIGNED NOT NULL\n"
321             ");");
322
323 /*** FIX because IF NOT EXISTS doesn't work ***/
324   GNUNET_MYSQL_statement_run (plugin->mc,
325                               "CREATE INDEX idx_membership_channel_id_slave_id "
326                               "ON membership (channel_id, slave_id);");
327
328   /** @todo messages table: add method_name column */
329   STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
330             "  channel_id INT NOT NULL REFERENCES channels(id),\n"
331             "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
332             "  signature BLOB,\n"
333             "  purpose BLOB,\n"
334             "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
335             "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
336                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
337             "  group_generation BIGINT UNSIGNED NOT NULL,\n"
338             "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
339             "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
340             "  data BLOB,\n"
341             "  PRIMARY KEY (channel_id, fragment_id),\n"
342             "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
343             ");");
344
345   STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
346             "  channel_id INT NOT NULL REFERENCES channels(id),\n"
347             "  name TEXT NOT NULL,\n"
348             "  value_current BLOB,\n"
349             "  value_signed BLOB,\n"
350             "  PRIMARY KEY (channel_id, name(5))\n"
351             ");");
352
353   STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
354             "  channel_id INT NOT NULL REFERENCES channels(id),\n"
355             "  name TEXT NOT NULL,\n"
356             "  value BLOB,\n"
357             "  PRIMARY KEY (channel_id, name(5))\n"
358             ");");
359 #undef STMT_RUN
360
361   /* Prepare statements */
362   mysql_prepare (plugin->mc,
363                 "BEGIN",
364                 &plugin->transaction_begin);
365
366   mysql_prepare (plugin->mc,
367                 "COMMIT",
368                 &plugin->transaction_commit);
369
370   mysql_prepare (plugin->mc,
371                 "ROLLBACK;",
372                 &plugin->transaction_rollback);
373
374   mysql_prepare (plugin->mc,
375                 "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
376                 &plugin->insert_channel_key);
377
378   mysql_prepare (plugin->mc,
379                 "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
380                 &plugin->insert_slave_key);
381
382   mysql_prepare (plugin->mc,
383                 "INSERT INTO membership\n"
384                 " (channel_id, slave_id, did_join, announced_at,\n"
385                 "  effective_since, group_generation)\n"
386                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
387                 "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
388                 "        ?, ?, ?, ?);",
389                 &plugin->insert_membership);
390
391   mysql_prepare (plugin->mc,
392                 "SELECT did_join FROM membership\n"
393                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
394                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
395                "      AND effective_since <= ? AND did_join = 1\n"
396                "ORDER BY announced_at DESC LIMIT 1;",
397                &plugin->select_membership);
398
399   mysql_prepare (plugin->mc,
400                 "INSERT IGNORE INTO messages\n"
401                " (channel_id, hop_counter, signature, purpose,\n"
402                "  fragment_id, fragment_offset, message_id,\n"
403                "  group_generation, multicast_flags, psycstore_flags, data)\n"
404                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
405                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
406                 &plugin->insert_fragment);
407
408   mysql_prepare (plugin->mc,
409                 "UPDATE messages\n"
410                 "SET psycstore_flags = psycstore_flags | ?\n"
411                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
412                 "      AND message_id = ? AND fragment_offset = 0;",
413                 &plugin->update_message_flags);
414
415   mysql_prepare (plugin->mc,
416                   "SELECT hop_counter, signature, purpose, fragment_id,\n"
417                   "       fragment_offset, message_id, group_generation,\n"
418                   "       multicast_flags, psycstore_flags, data\n"
419                   "FROM messages\n"
420                   "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
421                   "      AND ? <= fragment_id AND fragment_id <= ?;",
422                 &plugin->select_fragments);
423
424   /** @todo select_messages: add method_prefix filter */
425   mysql_prepare (plugin->mc,
426                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
427                 "       fragment_offset, message_id, group_generation,\n"
428                 "       multicast_flags, psycstore_flags, data\n"
429                 "FROM messages\n"
430                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
431                 "      AND ? <= message_id AND message_id <= ?\n"
432                 "LIMIT ?;",
433                 &plugin->select_messages);
434
435   mysql_prepare (plugin->mc,
436                 "SELECT * FROM\n"
437                 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
438                 "        fragment_offset, message_id, group_generation,\n"
439                 "        multicast_flags, psycstore_flags, data\n"
440                 " FROM messages\n"
441                 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
442                 " ORDER BY fragment_id DESC\n"
443                 " LIMIT ?)\n"
444                 "ORDER BY fragment_id;",
445                 &plugin->select_latest_fragments);
446
447   /** @todo select_latest_messages: add method_prefix filter */
448   mysql_prepare (plugin->mc,
449                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
450                 "       fragment_offset, message_id, group_generation,\n"
451                 "        multicast_flags, psycstore_flags, data\n"
452                 "FROM messages\n"
453                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
454                 "      AND message_id IN\n"
455                 "      (SELECT message_id\n"
456                 "       FROM messages\n"
457                 "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
458                 "       GROUP BY message_id\n"
459                 "       ORDER BY message_id\n"
460                 "       DESC LIMIT ?)\n"
461                 "ORDER BY fragment_id;",
462                 &plugin->select_latest_messages);
463
464   mysql_prepare (plugin->mc,
465                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
466                 "       fragment_offset, message_id, group_generation,\n"
467                 "       multicast_flags, psycstore_flags, data\n"
468                 "FROM messages\n"
469                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
470                 "      AND message_id = ? AND fragment_offset = ?;",
471                 &plugin->select_message_fragment);
472
473   mysql_prepare (plugin->mc,
474                 "SELECT fragment_id, message_id, group_generation\n"
475                 "FROM messages\n"
476                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477                 "ORDER BY fragment_id DESC LIMIT 1;",
478                 &plugin->select_counters_message);
479
480   mysql_prepare (plugin->mc,
481                 "SELECT max_state_message_id\n"
482                 "FROM channels\n"
483                 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
484                 &plugin->select_counters_state);
485
486   mysql_prepare (plugin->mc,
487                 "UPDATE channels\n"
488                 "SET max_state_message_id = ?\n"
489                 "WHERE pub_key = ?;",
490                 &plugin->update_max_state_message_id);
491
492   mysql_prepare (plugin->mc,
493                 "UPDATE channels\n"
494                 "SET state_hash_message_id = ?\n"
495                 "WHERE pub_key = ?;",
496                 &plugin->update_state_hash_message_id);
497
498   mysql_prepare (plugin->mc,
499                 "REPLACE INTO state\n"
500                 "  (channel_id, name, value_current, value_signed)\n"
501                 "SELECT new.channel_id, new.name,\n"
502                 "       new.value_current, old.value_signed\n"
503                 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
504                 "             AS channel_id,\n"
505                 "             ? AS name, ? AS value_current) AS new\n"
506                 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
507                 "           FROM state) AS old\n"
508                 "ON new.channel_id = old.channel_id AND new.name = old.name;",
509                 &plugin->insert_state_current);
510
511   mysql_prepare (plugin->mc,
512                 "DELETE FROM state\n"
513                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
514                 "      AND (value_current IS NULL OR length(value_current) = 0)\n"
515                 "      AND (value_signed IS NULL OR length(value_signed) = 0);",
516                 &plugin->delete_state_empty);
517
518   mysql_prepare (plugin->mc,
519                 "UPDATE state\n"
520                 "SET value_signed = value_current\n"
521                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
522                 &plugin->update_state_signed);
523
524   mysql_prepare (plugin->mc,
525                 "DELETE FROM state\n"
526                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
527                 &plugin->delete_state);
528
529   mysql_prepare (plugin->mc,
530                 "INSERT INTO state_sync (channel_id, name, value)\n"
531                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
532                 &plugin->insert_state_sync);
533
534   mysql_prepare (plugin->mc,
535                 "INSERT INTO state\n"
536                 " (channel_id, name, value_current, value_signed)\n"
537                 "SELECT channel_id, name, value, value\n"
538                 "FROM state_sync\n"
539                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
540                 &plugin->insert_state_from_sync);
541
542   mysql_prepare (plugin->mc,
543                 "DELETE FROM state_sync\n"
544                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
545                 &plugin->delete_state_sync);
546
547   mysql_prepare (plugin->mc,
548                 "SELECT value_current\n"
549                 "FROM state\n"
550                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
551                 "      AND name = ?;",
552                 &plugin->select_state_one);
553
554   mysql_prepare (plugin->mc,
555                 "SELECT name, value_current\n"
556                 "FROM state\n"
557                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
558                 "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
559                 &plugin->select_state_prefix);
560
561   mysql_prepare (plugin->mc,
562                 "SELECT name, value_signed\n"
563                 "FROM state\n"
564                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
565                 "      AND value_signed IS NOT NULL;",
566                 &plugin->select_state_signed);
567
568   return GNUNET_OK;
569 }
570
571
572 /**
573  * Shutdown database connection and associate data
574  * structures.
575  * @param plugin the plugin context (state for this module)
576  */
577 static void
578 database_shutdown (struct Plugin *plugin)
579 {
580   GNUNET_MYSQL_context_destroy (plugin->mc);
581 }
582
583
584 /**
585  * Execute a prepared statement with a @a channel_key argument.
586  *
587  * @param plugin Plugin handle.
588  * @param stmt Statement to execute.
589  * @param channel_key Public key of the channel.
590  *
591  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
592  */
593 static int
594 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
595               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
596 {
597   struct GNUNET_MY_QueryParam params[] = {
598     GNUNET_MY_query_param_auto_from_type (channel_key),
599     GNUNET_MY_query_param_end
600   };
601
602   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
603   {
604     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
605               "mysql exec_channel", stmt);
606   }
607
608   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
609   {
610     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
611               "mysql_stmt_reset", stmt);
612     return GNUNET_SYSERR;
613   }
614
615   return GNUNET_OK;
616 }
617
618
619 /**
620  * Begin a transaction.
621  */
622 static int
623 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
624 {
625   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
626
627   struct GNUNET_MY_QueryParam params[] = {
628     GNUNET_MY_query_param_end
629   };
630
631   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
632   {
633     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
634                 "mysql exexc_prepared", stmt);
635     return GNUNET_SYSERR;
636   }
637
638   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
639   {
640     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
641               "mysql_stmt_reset", stmt);
642     return GNUNET_SYSERR;
643   }
644
645   plugin->transaction = transaction;
646   return GNUNET_OK;
647 }
648
649
650 /**
651  * Commit current transaction.
652  */
653 static int
654 transaction_commit (struct Plugin *plugin)
655 {
656   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
657
658   struct GNUNET_MY_QueryParam params[] = {
659     GNUNET_MY_query_param_end
660   };
661
662   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
663   {
664     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
665                 "mysql exec_prepared", stmt);
666     return GNUNET_SYSERR;
667   }
668
669   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
670   {
671     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
672               "mysql_stmt_reset", stmt);
673     return GNUNET_SYSERR;
674   }
675
676   plugin->transaction = TRANSACTION_NONE;
677   return GNUNET_OK;
678 }
679
680
681 /**
682  * Roll back current transaction.
683  */
684 static int
685 transaction_rollback (struct Plugin *plugin)
686 {
687   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
688
689   struct GNUNET_MY_QueryParam params[] = {
690     GNUNET_MY_query_param_end
691   };
692
693   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
694   {
695     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
696               "mysql exec_prepared", stmt);
697     return GNUNET_SYSERR;
698   }
699
700   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
701   {
702     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
703               "mysql_stmt_reset", stmt);
704     return GNUNET_SYSERR;
705   }
706
707   plugin->transaction = TRANSACTION_NONE;
708   return GNUNET_OK;
709 }
710
711
712 static int
713 channel_key_store (struct Plugin *plugin,
714                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
715 {
716   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
717
718   struct GNUNET_MY_QueryParam params[] = {
719     GNUNET_MY_query_param_auto_from_type (channel_key),
720     GNUNET_MY_query_param_end
721   };
722
723   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
724   {
725     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
726               "mysql exec_prepared", stmt);
727     return GNUNET_SYSERR;
728   }
729
730   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
731   {
732     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
733               "mysql_stmt_reset", stmt);
734     return GNUNET_SYSERR;
735   }
736
737   return GNUNET_OK;
738 }
739
740
741 static int
742 slave_key_store (struct Plugin *plugin,
743                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
744 {
745   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
746
747   struct GNUNET_MY_QueryParam params[] = {
748     GNUNET_MY_query_param_auto_from_type (slave_key),
749     GNUNET_MY_query_param_end
750   };
751
752   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
753   {
754     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
755               "mysql exec_prepared", stmt);
756     return GNUNET_SYSERR;
757   }
758
759   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
760   {
761     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
762               "mysql_stmt_reset", stmt);
763     return GNUNET_SYSERR;
764   }
765
766   return GNUNET_OK;
767 }
768
769
770 /**
771  * Store join/leave events for a PSYC channel in order to be able to answer
772  * membership test queries later.
773  *
774  * @see GNUNET_PSYCSTORE_membership_store()
775  *
776  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
777  */
778 static int
779 mysql_membership_store (void *cls,
780                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
781                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
782                          int did_join,
783                          uint64_t announced_at,
784                          uint64_t effective_since,
785                          uint64_t group_generation)
786 {
787   struct Plugin *plugin = cls;
788
789   uint32_t idid_join = (uint32_t)did_join;
790
791   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
792
793   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
794
795   if (announced_at > INT64_MAX ||
796       effective_since > INT64_MAX ||
797       group_generation > INT64_MAX)
798   {
799     GNUNET_break (0);
800     return GNUNET_SYSERR;
801   }
802
803   if (GNUNET_OK != channel_key_store (plugin, channel_key)
804       || GNUNET_OK != slave_key_store (plugin, slave_key))
805     return GNUNET_SYSERR;
806
807   struct GNUNET_MY_QueryParam params[] = {
808     GNUNET_MY_query_param_auto_from_type (channel_key),
809     GNUNET_MY_query_param_auto_from_type (slave_key),
810     GNUNET_MY_query_param_uint32 (&idid_join),
811     GNUNET_MY_query_param_uint64 (&announced_at),
812     GNUNET_MY_query_param_uint64 (&effective_since),
813     GNUNET_MY_query_param_uint64 (&group_generation),
814     GNUNET_MY_query_param_end
815   };
816
817   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
818   {
819     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
820               "mysql exec_prepared", stmt);
821     return GNUNET_SYSERR;
822   }
823
824   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
825   {
826     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
827               "mysql_stmt_reset", stmt);
828     return GNUNET_SYSERR;
829   }
830   return GNUNET_OK;
831 }
832
833 /**
834  * Test if a member was admitted to the channel at the given message ID.
835  *
836  * @see GNUNET_PSYCSTORE_membership_test()
837  *
838  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
839  *         #GNUNET_SYSERR if there was en error.
840  */
841 static int
842 membership_test (void *cls,
843                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
844                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
845                  uint64_t message_id)
846 {
847   struct Plugin *plugin = cls;
848
849   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
850
851   uint32_t did_join = 0;
852
853   int ret = GNUNET_SYSERR;
854
855   struct GNUNET_MY_QueryParam params_select[] = {
856     GNUNET_MY_query_param_auto_from_type (channel_key),
857     GNUNET_MY_query_param_auto_from_type (slave_key),
858     GNUNET_MY_query_param_uint64 (&message_id),
859     GNUNET_MY_query_param_end
860   };
861
862   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
863   {
864     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
865                 "mysql execute prepared", stmt);
866     return GNUNET_SYSERR;
867   }
868
869   struct GNUNET_MY_ResultSpec results_select[] = {
870     GNUNET_MY_result_spec_uint32 (&did_join),
871     GNUNET_MY_result_spec_end
872   };
873
874   switch(GNUNET_MY_extract_result (stmt,
875                                 results_select))
876   {
877     case GNUNET_NO:
878       ret = GNUNET_NO;
879       break;
880     case GNUNET_OK:
881       ret = GNUNET_YES;
882       break;
883     default:
884       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
885                 "mysql extract_result", stmt);
886       return GNUNET_SYSERR;
887   }
888
889   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
890   {
891     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
892               "mysql_stmt_reset", stmt);
893     return GNUNET_SYSERR;
894   }
895
896   return ret;
897 }
898
899 /**
900  * Store a message fragment sent to a channel.
901  *
902  * @see GNUNET_PSYCSTORE_fragment_store()
903  *
904  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
905  */
906 static int
907 fragment_store (void *cls,
908                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
909                 const struct GNUNET_MULTICAST_MessageHeader *msg,
910                 uint32_t psycstore_flags)
911 {
912   struct Plugin *plugin = cls;
913
914   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
915
916   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
917
918   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
919
920   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
921   uint64_t message_id = GNUNET_ntohll (msg->message_id);
922   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
923
924   uint64_t hop_counter = ntohl(msg->hop_counter);
925   uint64_t flags = ntohl(msg->flags);
926
927   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
928       message_id > INT64_MAX || group_generation > INT64_MAX)
929   {
930     LOG(GNUNET_ERROR_TYPE_ERROR,
931          "Tried to store fragment with a field > INT64_MAX: "
932          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
933          message_id, group_generation);
934     GNUNET_break (0);
935     return GNUNET_SYSERR;
936   }
937
938   if (GNUNET_OK != channel_key_store (plugin, channel_key))
939     return GNUNET_SYSERR;
940
941   struct GNUNET_MY_QueryParam params_insert[] = {
942     GNUNET_MY_query_param_auto_from_type (channel_key),
943     GNUNET_MY_query_param_uint64 (&hop_counter),
944     GNUNET_MY_query_param_auto_from_type (&msg->signature),
945     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
946     GNUNET_MY_query_param_uint64 (&fragment_id),
947     GNUNET_MY_query_param_uint64 (&fragment_offset),
948     GNUNET_MY_query_param_uint64 (&message_id),
949     GNUNET_MY_query_param_uint64 (&group_generation),
950     GNUNET_MY_query_param_uint64 (&flags),
951     GNUNET_MY_query_param_uint32 (&psycstore_flags),
952     GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
953                                                   - sizeof (*msg)),
954     GNUNET_MY_query_param_end
955   };
956
957   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
958   {
959     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
960               "mysql execute prepared", stmt);
961     return GNUNET_SYSERR;
962   }
963
964   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
965   {
966     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
967               "mysql_stmt_reset", stmt);
968     return GNUNET_SYSERR;
969   }
970
971   return GNUNET_OK;
972 }
973
974 /**
975  * Set additional flags for a given message.
976  *
977  * They are OR'd with any existing flags set.
978  *
979  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
980  */
981 static int
982 message_add_flags (void *cls,
983                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
984                    uint64_t message_id,
985                    uint64_t psycstore_flags)
986 {
987   struct Plugin *plugin = cls;
988   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
989
990   int sql_ret;
991   int ret = GNUNET_SYSERR;
992
993   struct GNUNET_MY_QueryParam params_update[] = {
994     GNUNET_MY_query_param_uint64 (&psycstore_flags),
995     GNUNET_MY_query_param_auto_from_type (channel_key),
996     GNUNET_MY_query_param_uint64 (&message_id),
997     GNUNET_MY_query_param_end
998   };
999
1000   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
1001   switch (sql_ret)
1002   {
1003     case GNUNET_OK:
1004       ret = GNUNET_OK;
1005       break;
1006
1007     default:
1008        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1009               "mysql execute prepared", stmt);
1010   }
1011
1012   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1013   {
1014     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1015               "mysql_stmt_reset", stmt);
1016     return GNUNET_SYSERR;
1017   }
1018
1019   return ret;
1020 }
1021
1022
1023 static int
1024 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
1025               GNUNET_PSYCSTORE_FragmentCallback cb,
1026               void *cb_cls)
1027 {
1028
1029   uint32_t hop_counter;
1030   void *signature = NULL;
1031   void *purpose = NULL;
1032   size_t signature_size;
1033   size_t purpose_size;
1034
1035   uint64_t fragment_id;
1036   uint64_t fragment_offset;
1037   uint64_t message_id;
1038   uint64_t group_generation;
1039   uint64_t flags;
1040   void *buf;
1041   size_t buf_size;
1042   int ret = GNUNET_SYSERR;
1043   int sql_ret;
1044   struct GNUNET_MULTICAST_MessageHeader *mp;
1045
1046   uint64_t msg_flags;
1047
1048
1049   struct GNUNET_MY_ResultSpec results[] = {
1050     GNUNET_MY_result_spec_uint32 (&hop_counter),
1051     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
1052     GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
1053     GNUNET_MY_result_spec_uint64 (&fragment_id),
1054     GNUNET_MY_result_spec_uint64 (&fragment_offset),
1055     GNUNET_MY_result_spec_uint64 (&message_id),
1056     GNUNET_MY_result_spec_uint64 (&group_generation),
1057     GNUNET_MY_result_spec_uint64 (&msg_flags),
1058     GNUNET_MY_result_spec_uint64 (&flags),
1059     GNUNET_MY_result_spec_variable_size (&buf,
1060                                          &buf_size),
1061     GNUNET_MY_result_spec_end
1062   };
1063
1064   sql_ret = GNUNET_MY_extract_result (stmt,
1065                                 results);
1066   switch (sql_ret)
1067   {
1068     case GNUNET_NO:
1069       if (ret != GNUNET_OK)
1070         ret = GNUNET_NO;
1071       break;
1072
1073     case GNUNET_OK:
1074       mp = GNUNET_malloc (sizeof (*mp) + buf_size);
1075
1076       mp->header.size = htons (sizeof (*mp) + buf_size);
1077       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1078       mp->hop_counter = htonl (hop_counter);
1079       GNUNET_memcpy (&mp->signature,
1080                     signature,
1081                     signature_size);
1082       GNUNET_memcpy (&mp->purpose,
1083                     purpose,
1084                     purpose_size);
1085       mp->fragment_id = GNUNET_htonll (fragment_id);
1086       mp->fragment_offset = GNUNET_htonll (fragment_offset);
1087       mp->message_id = GNUNET_htonll (message_id);
1088       mp->group_generation = GNUNET_htonll (group_generation);
1089       mp->flags = htonl(msg_flags);
1090
1091       GNUNET_memcpy (&mp[1],
1092                     buf,
1093                     buf_size);
1094       ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1095
1096       GNUNET_MY_cleanup_result (results);
1097       break;
1098
1099     default:
1100       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1101                     "mysql extract_result", stmt);
1102   }
1103
1104   return ret;
1105 }
1106
1107
1108 static int
1109 fragment_select (struct Plugin *plugin,
1110                  struct GNUNET_MYSQL_StatementHandle *stmt,
1111                  struct GNUNET_MY_QueryParam *params,
1112                  uint64_t *returned_fragments,
1113                  GNUNET_PSYCSTORE_FragmentCallback cb,
1114                  void *cb_cls)
1115 {
1116   int ret = GNUNET_SYSERR;
1117   int sql_ret;
1118
1119   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1120   switch (sql_ret)
1121   {
1122     case GNUNET_NO:
1123       if (ret != GNUNET_OK)
1124         ret = GNUNET_NO;
1125       break;
1126
1127     case GNUNET_YES:
1128        ret = fragment_row (stmt, cb, cb_cls);
1129        (*returned_fragments)++;
1130       break;
1131
1132     default:
1133       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1134                  "mysql exec_prepared", stmt);
1135   }
1136
1137   return ret;
1138 }
1139
1140 /**
1141  * Retrieve a message fragment range by fragment ID.
1142  *
1143  * @see GNUNET_PSYCSTORE_fragment_get()
1144  *
1145  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1146  */
1147 static int
1148 fragment_get (void *cls,
1149               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1150               uint64_t first_fragment_id,
1151               uint64_t last_fragment_id,
1152               uint64_t *returned_fragments,
1153               GNUNET_PSYCSTORE_FragmentCallback cb,
1154               void *cb_cls)
1155 {
1156   struct Plugin *plugin = cls;
1157   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1158   int ret = GNUNET_SYSERR;
1159   *returned_fragments = 0;
1160
1161   struct GNUNET_MY_QueryParam params_select[] = {
1162     GNUNET_MY_query_param_auto_from_type (channel_key),
1163     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1164     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1165     GNUNET_MY_query_param_end
1166   };
1167
1168   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1169
1170   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1171   {
1172     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1173               "mysql_stmt_reset", stmt);
1174     return GNUNET_SYSERR;
1175   }
1176
1177   return ret;
1178 }
1179
1180
1181 /**
1182  * Retrieve a message fragment range by fragment ID.
1183  *
1184  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1185  *
1186  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1187  */
1188 static int
1189 fragment_get_latest (void *cls,
1190                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1191                      uint64_t fragment_limit,
1192                      uint64_t *returned_fragments,
1193                      GNUNET_PSYCSTORE_FragmentCallback cb,
1194                      void *cb_cls)
1195 {
1196   struct Plugin *plugin = cls;
1197
1198   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1199
1200   int ret = GNUNET_SYSERR;
1201   *returned_fragments = 0;
1202
1203   struct GNUNET_MY_QueryParam params_select[] = {
1204     GNUNET_MY_query_param_auto_from_type (channel_key),
1205     GNUNET_MY_query_param_uint64 (&fragment_limit),
1206     GNUNET_MY_query_param_end
1207   };
1208
1209   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1210
1211   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1212   {
1213     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1214               "mysql_stmt_reset", stmt);
1215     return GNUNET_SYSERR;
1216   }
1217
1218   return ret;
1219 }
1220
1221
1222 /**
1223  * Retrieve all fragments of a message ID range.
1224  *
1225  * @see GNUNET_PSYCSTORE_message_get()
1226  *
1227  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1228  */
1229 static int
1230 message_get (void *cls,
1231              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1232              uint64_t first_message_id,
1233              uint64_t last_message_id,
1234              uint64_t fragment_limit,
1235              uint64_t *returned_fragments,
1236              GNUNET_PSYCSTORE_FragmentCallback cb,
1237              void *cb_cls)
1238 {
1239   struct Plugin *plugin = cls;
1240
1241   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1242
1243   int ret = GNUNET_SYSERR;
1244   *returned_fragments = 0;
1245
1246   struct GNUNET_MY_QueryParam params_select[] = {
1247     GNUNET_MY_query_param_auto_from_type (channel_key),
1248     GNUNET_MY_query_param_uint64 (&first_message_id),
1249     GNUNET_MY_query_param_uint64 (&last_message_id),
1250     GNUNET_MY_query_param_uint64 (&fragment_limit),
1251     GNUNET_MY_query_param_end
1252   };
1253
1254   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1255
1256   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1257   {
1258     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1259               "mysql_stmt_reset", stmt);
1260     return GNUNET_SYSERR;
1261   }
1262
1263   return ret;
1264 }
1265
1266
1267 /**
1268  * Retrieve all fragments of the latest messages.
1269  *
1270  * @see GNUNET_PSYCSTORE_message_get_latest()
1271  *
1272  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1273  */
1274 static int
1275 message_get_latest (void *cls,
1276                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1277                     uint64_t message_limit,
1278                     uint64_t *returned_fragments,
1279                     GNUNET_PSYCSTORE_FragmentCallback cb,
1280                     void *cb_cls)
1281 {
1282   struct Plugin *plugin = cls;
1283
1284   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1285
1286   int ret = GNUNET_SYSERR;
1287   *returned_fragments = 0;
1288
1289   struct GNUNET_MY_QueryParam params_select[] = {
1290     GNUNET_MY_query_param_auto_from_type (channel_key),
1291     GNUNET_MY_query_param_auto_from_type (channel_key),
1292     GNUNET_MY_query_param_uint64 (&message_limit),
1293     GNUNET_MY_query_param_end
1294   };
1295
1296   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1297
1298   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1299   {
1300     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1301               "mysql_stmt_reset", stmt);
1302     return GNUNET_SYSERR;
1303   }
1304
1305   return ret;
1306 }
1307
1308
1309 /**
1310  * Retrieve a fragment of message specified by its message ID and fragment
1311  * offset.
1312  *
1313  * @see GNUNET_PSYCSTORE_message_get_fragment()
1314  *
1315  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1316  */
1317 static int
1318 message_get_fragment (void *cls,
1319                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1320                       uint64_t message_id,
1321                       uint64_t fragment_offset,
1322                       GNUNET_PSYCSTORE_FragmentCallback cb,
1323                       void *cb_cls)
1324 {
1325   struct Plugin *plugin = cls;
1326   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1327   int sql_ret;
1328   int ret = GNUNET_SYSERR;
1329
1330   struct GNUNET_MY_QueryParam params_select[] = {
1331     GNUNET_MY_query_param_auto_from_type (channel_key),
1332     GNUNET_MY_query_param_uint64 (&message_id),
1333     GNUNET_MY_query_param_uint64 (&fragment_offset),
1334     GNUNET_MY_query_param_end
1335   };
1336
1337   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1338   switch (sql_ret)
1339   {
1340     case GNUNET_NO:
1341       ret = GNUNET_NO;
1342       break;
1343
1344     case GNUNET_OK:
1345       ret = fragment_row (stmt, cb, cb_cls);
1346       break;
1347
1348     default:
1349       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1350               "mysql execute prepared", stmt);
1351   }
1352
1353   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1354   {
1355     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1356               "mysql_stmt_reset", stmt);
1357     return GNUNET_SYSERR;
1358   }
1359
1360   return ret;
1361 }
1362
1363 /**
1364  * Retrieve the max. values of message counters for a channel.
1365  *
1366  * @see GNUNET_PSYCSTORE_counters_get()
1367  *
1368  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1369  */
1370 static int
1371 counters_message_get (void *cls,
1372                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1373                       uint64_t *max_fragment_id,
1374                       uint64_t *max_message_id,
1375                       uint64_t *max_group_generation)
1376 {
1377   struct Plugin *plugin = cls;
1378
1379   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1380
1381   int ret = GNUNET_SYSERR;
1382
1383   struct GNUNET_MY_QueryParam params_select[] = {
1384     GNUNET_MY_query_param_auto_from_type (channel_key),
1385     GNUNET_MY_query_param_end
1386   };
1387
1388   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1389   {
1390     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1391               "mysql execute prepared", stmt);
1392     return GNUNET_SYSERR;
1393   }
1394
1395   struct GNUNET_MY_ResultSpec results_select[] = {
1396     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1397     GNUNET_MY_result_spec_uint64 (max_message_id),
1398     GNUNET_MY_result_spec_uint64 (max_group_generation),
1399     GNUNET_MY_result_spec_end
1400   };
1401
1402   ret = GNUNET_MY_extract_result (stmt,
1403                                   results_select);
1404
1405   if (GNUNET_OK != ret)
1406   {
1407     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1408               "mysql extract_result", stmt);
1409     return GNUNET_SYSERR;
1410   }
1411
1412   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1413   {
1414     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1415               "mysql_stmt_reset", stmt);
1416     return GNUNET_SYSERR;
1417   }
1418
1419   return ret;
1420 }
1421
1422 /**
1423  * Retrieve the max. values of state counters for a channel.
1424  *
1425  * @see GNUNET_PSYCSTORE_counters_get()
1426  *
1427  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1428  */
1429 static int
1430 counters_state_get (void *cls,
1431                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1432                     uint64_t *max_state_message_id)
1433 {
1434   struct Plugin *plugin = cls;
1435
1436   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1437
1438   int ret = GNUNET_SYSERR;
1439
1440   struct GNUNET_MY_QueryParam params_select[] = {
1441     GNUNET_MY_query_param_auto_from_type (channel_key),
1442     GNUNET_MY_query_param_end
1443   };
1444
1445   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1446   {
1447     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1448               "mysql execute prepared", stmt);
1449     return GNUNET_SYSERR;
1450   }
1451
1452   struct GNUNET_MY_ResultSpec results_select[] = {
1453     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1454     GNUNET_MY_result_spec_end
1455   };
1456
1457   ret = GNUNET_MY_extract_result (stmt,
1458                                   results_select);
1459
1460   if (GNUNET_OK != ret)
1461   {
1462     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1463               "mysql extract_result", stmt);
1464     return GNUNET_SYSERR;
1465   }
1466
1467   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1468   {
1469     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1470               "mysql_stmt_reset", stmt);
1471     return GNUNET_SYSERR;
1472   }
1473
1474   return ret;
1475 }
1476
1477
1478 /**
1479  * Assign a value to a state variable.
1480  *
1481  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1482  */
1483 static int
1484 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1485               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1486               const char *name, const void *value, size_t value_size)
1487 {
1488   int ret = GNUNET_SYSERR;
1489
1490   struct GNUNET_MY_QueryParam params[] = {
1491     GNUNET_MY_query_param_auto_from_type (channel_key),
1492     GNUNET_MY_query_param_string (name),
1493     GNUNET_MY_query_param_auto_from_type (value),
1494     GNUNET_MY_query_param_end
1495   };
1496
1497   ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1498   if (GNUNET_OK != ret)
1499   {
1500     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1501               "mysql exec_prepared", stmt);
1502     return GNUNET_SYSERR;
1503   }
1504
1505   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1506   {
1507     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1508               "mysql_stmt_reset", stmt);
1509     return GNUNET_SYSERR;
1510   }
1511
1512   return ret;
1513 }
1514
1515
1516 static int
1517 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1518                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1519                    uint64_t message_id)
1520 {
1521   struct GNUNET_MY_QueryParam params[] = {
1522     GNUNET_MY_query_param_uint64 (&message_id),
1523     GNUNET_MY_query_param_auto_from_type (channel_key),
1524     GNUNET_MY_query_param_end
1525   };
1526
1527   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1528                                             stmt,
1529                                             params))
1530   {
1531     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1532               "mysql execute prepared", stmt);
1533     return GNUNET_SYSERR;
1534   }
1535
1536   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1537   {
1538     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1539               "mysql_stmt_reset", stmt);
1540     return GNUNET_SYSERR;
1541   }
1542
1543   return GNUNET_OK;
1544 }
1545
1546
1547 /**
1548  * Begin modifying current state.
1549  */
1550 static int
1551 state_modify_begin (void *cls,
1552                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1553                     uint64_t message_id, uint64_t state_delta)
1554 {
1555   struct Plugin *plugin = cls;
1556
1557   if (state_delta > 0)
1558   {
1559     /**
1560      * We can only apply state modifiers in the current message if modifiers in
1561      * the previous stateful message (message_id - state_delta) were already
1562      * applied.
1563      */
1564
1565     uint64_t max_state_message_id = 0;
1566     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1567     switch (ret)
1568     {
1569     case GNUNET_OK:
1570     case GNUNET_NO: // no state yet
1571       ret = GNUNET_OK;
1572       break;
1573     default:
1574       return ret;
1575     }
1576
1577     if (max_state_message_id < message_id - state_delta)
1578       return GNUNET_NO; /* some stateful messages not yet applied */
1579     else if (message_id - state_delta < max_state_message_id)
1580       return GNUNET_NO; /* changes already applied */
1581   }
1582
1583   if (TRANSACTION_NONE != plugin->transaction)
1584   {
1585     /** @todo FIXME: wait for other transaction to finish  */
1586     return GNUNET_SYSERR;
1587   }
1588   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1589 }
1590
1591
1592 /**
1593  * Set the current value of state variable.
1594  *
1595  * @see GNUNET_PSYCSTORE_state_modify()
1596  *
1597  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1598  */
1599 static int
1600 state_modify_op (void *cls,
1601                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1602                  enum GNUNET_PSYC_Operator op,
1603                  const char *name, const void *value, size_t value_size)
1604 {
1605   struct Plugin *plugin = cls;
1606   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1607
1608   switch (op)
1609   {
1610   case GNUNET_PSYC_OP_ASSIGN:
1611     return state_assign (plugin, plugin->insert_state_current, channel_key,
1612                          name, value, value_size);
1613
1614   default: /** @todo implement more state operations */
1615     GNUNET_break (0);
1616     return GNUNET_SYSERR;
1617   }
1618 }
1619
1620
1621 /**
1622  * End modifying current state.
1623  */
1624 static int
1625 state_modify_end (void *cls,
1626                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1627                   uint64_t message_id)
1628 {
1629   struct Plugin *plugin = cls;
1630   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1631
1632   return
1633     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1634     && GNUNET_OK == update_message_id (plugin,
1635                                        plugin->update_max_state_message_id,
1636                                        channel_key, message_id)
1637     && GNUNET_OK == transaction_commit (plugin)
1638     ? GNUNET_OK : GNUNET_SYSERR;
1639 }
1640
1641
1642 /**
1643  * Begin state synchronization.
1644  */
1645 static int
1646 state_sync_begin (void *cls,
1647                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1648 {
1649   struct Plugin *plugin = cls;
1650   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1651 }
1652
1653
1654 /**
1655  * Assign current value of a state variable.
1656  *
1657  * @see GNUNET_PSYCSTORE_state_modify()
1658  *
1659  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1660  */
1661 static int
1662 state_sync_assign (void *cls,
1663                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1664                 const char *name, const void *value, size_t value_size)
1665 {
1666   struct Plugin *plugin = cls;
1667   return state_assign (cls, plugin->insert_state_sync, channel_key,
1668                        name, value, value_size);
1669 }
1670
1671
1672 /**
1673  * End modifying current state.
1674  */
1675 static int
1676 state_sync_end (void *cls,
1677                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1678                 uint64_t max_state_message_id,
1679                 uint64_t state_hash_message_id)
1680 {
1681   struct Plugin *plugin = cls;
1682   int ret = GNUNET_SYSERR;
1683
1684   if (TRANSACTION_NONE != plugin->transaction)
1685   {
1686     /** @todo FIXME: wait for other transaction to finish  */
1687     return GNUNET_SYSERR;
1688   }
1689
1690   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1691     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1692     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1693                                   channel_key)
1694     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1695                                   channel_key)
1696     && GNUNET_OK == update_message_id (plugin,
1697                                        plugin->update_state_hash_message_id,
1698                                        channel_key, state_hash_message_id)
1699     && GNUNET_OK == update_message_id (plugin,
1700                                        plugin->update_max_state_message_id,
1701                                        channel_key, max_state_message_id)
1702     && GNUNET_OK == transaction_commit (plugin)
1703     ? ret = GNUNET_OK
1704     : transaction_rollback (plugin);
1705   return ret;
1706 }
1707
1708
1709 /**
1710  * Delete the whole state.
1711  *
1712  * @see GNUNET_PSYCSTORE_state_reset()
1713  *
1714  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1715  */
1716 static int
1717 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1718 {
1719   struct Plugin *plugin = cls;
1720   return exec_channel (plugin, plugin->delete_state, channel_key);
1721 }
1722
1723
1724 /**
1725  * Update signed values of state variables in the state store.
1726  *
1727  * @see GNUNET_PSYCSTORE_state_hash_update()
1728  *
1729  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1730  */
1731 static int
1732 state_update_signed (void *cls,
1733                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1734 {
1735   struct Plugin *plugin = cls;
1736   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1737 }
1738
1739
1740 /**
1741  * Retrieve a state variable by name.
1742  *
1743  * @see GNUNET_PSYCSTORE_state_get()
1744  *
1745  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1746  */
1747 static int
1748 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1749            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1750 {
1751   struct Plugin *plugin = cls;
1752   int ret = GNUNET_SYSERR;
1753   int sql_ret ;
1754
1755   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1756
1757   struct GNUNET_MY_QueryParam params_select[] = {
1758     GNUNET_MY_query_param_auto_from_type (channel_key),
1759     GNUNET_MY_query_param_string (name),
1760     GNUNET_MY_query_param_end
1761   };
1762
1763   void *value_current = NULL;
1764   size_t value_size = 0;
1765
1766   struct GNUNET_MY_ResultSpec results[] = {
1767     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1768     GNUNET_MY_result_spec_end
1769   };
1770
1771   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1772   {
1773     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1774               "mysql exec_prepared", stmt);
1775   }
1776   else
1777   {
1778     sql_ret = GNUNET_MY_extract_result (stmt, results);
1779     switch (sql_ret)
1780     {
1781     case GNUNET_NO:
1782       ret = GNUNET_NO;
1783       break;
1784     case GNUNET_YES:
1785       ret = cb (cb_cls, name, value_current,
1786                 value_size);
1787       break;
1788     default:
1789       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1790                 "mysql extract_result", stmt);
1791     }
1792   }
1793
1794   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1795   {
1796     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1797               "mysql_stmt_reset", stmt);
1798     return GNUNET_SYSERR;
1799   }
1800
1801   return ret;
1802 }
1803
1804
1805 /**
1806  * Retrieve all state variables for a channel with the given prefix.
1807  *
1808  * @see GNUNET_PSYCSTORE_state_get_prefix()
1809  *
1810  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1811  */
1812 static int
1813 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1814                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1815                   void *cb_cls)
1816 {
1817   struct Plugin *plugin = cls;
1818   int ret = GNUNET_SYSERR;
1819
1820   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1821
1822   uint32_t name_len = (uint32_t) strlen (name);
1823
1824   struct GNUNET_MY_QueryParam params_select[] = {
1825     GNUNET_MY_query_param_auto_from_type (channel_key),
1826     GNUNET_MY_query_param_string (name),
1827     GNUNET_MY_query_param_uint32 (&name_len),
1828     GNUNET_MY_query_param_string (name),
1829     GNUNET_MY_query_param_end
1830   };
1831
1832   char *name2 = "";
1833   void *value_current = NULL;
1834   size_t value_size = 0;
1835
1836   struct GNUNET_MY_ResultSpec results[] = {
1837     GNUNET_MY_result_spec_string (&name2),
1838     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1839     GNUNET_MY_result_spec_end
1840   };
1841
1842   int sql_ret;
1843
1844   do
1845   {
1846     if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1847     {
1848       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1849                 "mysql exec_prepared", stmt);
1850       break;
1851     }
1852     sql_ret = GNUNET_MY_extract_result (stmt, results);
1853     switch (sql_ret)
1854     {
1855       case GNUNET_NO:
1856         if (ret != GNUNET_OK)
1857           ret = GNUNET_NO;
1858         break;
1859
1860       case GNUNET_YES:
1861         ret = cb (cb_cls, (const char *) name2,
1862                   value_current,
1863                   value_size);
1864
1865         if (ret != GNUNET_YES)
1866           sql_ret = GNUNET_NO;
1867         break;
1868
1869       default:
1870         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1871               "mysql extract_result", stmt);
1872     }
1873   }
1874   while (sql_ret == GNUNET_YES);
1875
1876   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1877   {
1878     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1879               "mysql_stmt_reset", stmt);
1880     return GNUNET_SYSERR;
1881   }
1882
1883   return ret;
1884 }
1885
1886
1887 /**
1888  * Retrieve all signed state variables for a channel.
1889  *
1890  * @see GNUNET_PSYCSTORE_state_get_signed()
1891  *
1892  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1893  */
1894 static int
1895 state_get_signed (void *cls,
1896                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1897                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1898 {
1899   struct Plugin *plugin = cls;
1900   int ret = GNUNET_SYSERR;
1901
1902   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1903
1904   struct GNUNET_MY_QueryParam params_select[] = {
1905     GNUNET_MY_query_param_auto_from_type (channel_key),
1906     GNUNET_MY_query_param_end
1907   };
1908
1909   int sql_ret;
1910
1911   char *name = "";
1912   void *value_signed = NULL;
1913   size_t value_size = 0;
1914
1915   struct GNUNET_MY_ResultSpec results[] = {
1916     GNUNET_MY_result_spec_string (&name),
1917     GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1918     GNUNET_MY_result_spec_end
1919   };
1920
1921   do
1922   {
1923     if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1924     {
1925       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1926                 "mysql exec_prepared", stmt);
1927       break;
1928     }
1929     sql_ret = GNUNET_MY_extract_result (stmt, results);
1930     switch (sql_ret)
1931     {
1932       case GNUNET_NO:
1933         if (ret != GNUNET_OK)
1934           ret = GNUNET_NO;
1935         break;
1936       case GNUNET_YES:
1937         ret = cb (cb_cls, (const char *) name,
1938                   value_signed,
1939                   value_size);
1940
1941         if (ret != GNUNET_YES)
1942             sql_ret = GNUNET_NO;
1943         break;
1944       default:
1945          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1946               "mysql extract_result", stmt);
1947     }
1948   }
1949   while (sql_ret == GNUNET_YES);
1950
1951   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1952   {
1953     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1954               "mysql_stmt_reset", stmt);
1955     return GNUNET_SYSERR;
1956   }
1957
1958   return ret;
1959 }
1960
1961
1962 /**
1963  * Entry point for the plugin.
1964  *
1965  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1966  * @return NULL on error, otherwise the plugin context
1967  */
1968 void *
1969 libgnunet_plugin_psycstore_mysql_init (void *cls)
1970 {
1971   static struct Plugin plugin;
1972   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1973   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1974
1975   if (NULL != plugin.cfg)
1976     return NULL;                /* can only initialize once! */
1977   memset (&plugin, 0, sizeof (struct Plugin));
1978   plugin.cfg = cfg;
1979   if (GNUNET_OK != database_setup (&plugin))
1980   {
1981     database_shutdown (&plugin);
1982     return NULL;
1983   }
1984   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1985   api->cls = &plugin;
1986   api->membership_store = &mysql_membership_store;
1987   api->membership_test = &membership_test;
1988   api->fragment_store = &fragment_store;
1989   api->message_add_flags = &message_add_flags;
1990   api->fragment_get = &fragment_get;
1991   api->fragment_get_latest = &fragment_get_latest;
1992   api->message_get = &message_get;
1993   api->message_get_latest = &message_get_latest;
1994   api->message_get_fragment = &message_get_fragment;
1995   api->counters_message_get = &counters_message_get;
1996   api->counters_state_get = &counters_state_get;
1997   api->state_modify_begin = &state_modify_begin;
1998   api->state_modify_op = &state_modify_op;
1999   api->state_modify_end = &state_modify_end;
2000   api->state_sync_begin = &state_sync_begin;
2001   api->state_sync_assign = &state_sync_assign;
2002   api->state_sync_end = &state_sync_end;
2003   api->state_reset = &state_reset;
2004   api->state_update_signed = &state_update_signed;
2005   api->state_get = &state_get;
2006   api->state_get_prefix = &state_get_prefix;
2007   api->state_get_signed = &state_get_signed;
2008
2009   LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
2010   return api;
2011 }
2012
2013
2014 /**
2015  * Exit point from the plugin.
2016  *
2017  * @param cls The plugin context (as returned by "init")
2018  * @return Always NULL
2019  */
2020 void *
2021 libgnunet_plugin_psycstore_mysql_done (void *cls)
2022 {
2023   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2024   struct Plugin *plugin = api->cls;
2025
2026   database_shutdown (plugin);
2027   plugin->cfg = NULL;
2028   GNUNET_free (api);
2029   LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
2030   return NULL;
2031 }
2032
2033 /* end of plugin_psycstore_mysql.c */