-rps doxygen
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_mysql.c
23  * @brief mysql-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  */
28
29 #include "platform.h"
30 #include "gnunet_psycstore_plugin.h"
31 #include "gnunet_psycstore_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_crypto_lib.h"
34 #include "gnunet_psyc_util_lib.h"
35 #include "psycstore.h"
36 #include "gnunet_my_lib.h"
37 #include "gnunet_mysql_lib.h"
38 #include <mysql/mysql.h>
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 /**
55  * Log an error message at log-level 'level' that indicates
56  * a failure of the command 'cmd' on file 'filename'
57  * with the message given by strerror(errno).
58  */
59 #define LOG_MYSQL(db, level, cmd, stmt) do { GNUNET_log_from (level, "psycstore-mysql", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); } while(0)
60
61 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
62
63 enum Transactions {
64   TRANSACTION_NONE = 0,
65   TRANSACTION_STATE_MODIFY,
66   TRANSACTION_STATE_SYNC,
67 };
68
69 /**
70  * Context for all functions in this plugin.
71  */
72 struct Plugin
73 {
74
75   const struct GNUNET_CONFIGURATION_Handle *cfg;
76
77   /**
78    * Database filename.
79    */
80   char *fn;
81
82   /**
83     *Handle to talk to Mysql
84     */
85   struct GNUNET_MYSQL_Context *mc;
86
87   /**
88    * Current transaction.
89    */
90   enum Transactions transaction;
91
92   struct GNUNET_MYSQL_StatementHandle *transaction_begin;
93
94   struct GNUNET_MYSQL_StatementHandle *transaction_commit;
95
96   struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
97
98   /**
99    * Precompiled SQL for channel_key_store()
100    */
101   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
102
103
104   /**
105    * Precompiled SQL for slave_key_store()
106    */
107   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   struct GNUNET_MYSQL_StatementHandle *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   struct GNUNET_MYSQL_StatementHandle *select_membership;
118
119   /**
120    * Precompiled SQL for fragment_store()
121    */
122   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
123
124   /**
125    * Precompiled SQL for message_add_flags()
126    */
127   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
128
129   /**
130    * Precompiled SQL for fragment_get()
131    */
132   struct GNUNET_MYSQL_StatementHandle *select_fragments;
133
134   /**
135    * Precompiled SQL for fragment_get()
136    */
137   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
138
139   /**
140    * Precompiled SQL for message_get()
141    */
142   struct GNUNET_MYSQL_StatementHandle *select_messages;
143
144   /**
145    * Precompiled SQL for message_get()
146    */
147   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
148
149   /**
150    * Precompiled SQL for message_get_fragment()
151    */
152   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
153
154   /**
155    * Precompiled SQL for counters_get_message()
156    */
157   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
158
159   /**
160    * Precompiled SQL for counters_get_state()
161    */
162   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
163
164   /**
165    * Precompiled SQL for state_modify_end()
166    */
167   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
168
169   /**
170    * Precompiled SQL for state_sync_end()
171    */
172   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
173
174   /**
175    * Precompiled SQL for state_modify_op()
176    */
177   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
178
179   /**
180    * Precompiled SQL for state_modify_end()
181    */
182   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
183
184   /**
185    * Precompiled SQL for state_set_signed()
186    */
187   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
188
189   /**
190    * Precompiled SQL for state_sync()
191    */
192   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
193
194   /**
195    * Precompiled SQL for state_sync()
196    */
197   struct GNUNET_MYSQL_StatementHandle *delete_state;
198
199   /**
200    * Precompiled SQL for state_sync()
201    */
202   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
203
204   /**
205    * Precompiled SQL for state_sync()
206    */
207   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
208
209   /**
210    * Precompiled SQL for state_get_signed()
211    */
212   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
213
214   /**
215    * Precompiled SQL for state_get()
216    */
217   struct GNUNET_MYSQL_StatementHandle *select_state_one;
218
219   /**
220    * Precompiled SQL for state_get_prefix()
221    */
222   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
223
224 };
225
226 #if DEBUG_PSYCSTORE
227
228 static void
229 mysql_trace (void *cls, const char *sql)
230 {
231   LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
232 }
233
234 #endif
235
236
237 /**
238  * @brief Prepare a SQL statement
239  *
240  * @param dbh handle to the database
241  * @param sql SQL statement, UTF-8 encoded
242  * @param stmt set to the prepared statement
243  * @return 0 on success
244  */
245 static int
246 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
247               const char *sql,
248               struct GNUNET_MYSQL_StatementHandle **stmt)
249 {
250   *stmt = GNUNET_MYSQL_statement_prepare (mc,
251                                           sql);
252
253   LOG(GNUNET_ERROR_TYPE_DEBUG,
254        "Prepared `%s' / %p\n", sql, stmt);
255   if(NULL == *stmt)
256     LOG(GNUNET_ERROR_TYPE_ERROR,
257    _("Error preparing SQL query: %s\n  %s\n"),
258    mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
259
260   return 0;
261 }
262
263
264 /**
265  * Initialize the database connections and associated
266  * data structures (create tables and indices
267  * as needed as well).
268  *
269  * @param plugin the plugin context (state for this module)
270  * @return GNUNET_OK on success
271  */
272 static int
273 database_setup (struct Plugin *plugin)
274 {
275   char *filename;
276
277   if (GNUNET_OK !=
278       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
279                                                "FILENAME", &filename))
280   {
281     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
282                                "psycstore-mysql", "FILENAME");
283     return GNUNET_SYSERR;
284   }
285   
286   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
287   {
288     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
289     {
290       GNUNET_break (0);
291       GNUNET_free (filename);
292       return GNUNET_SYSERR;
293     }
294   }
295   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
296   plugin->fn = filename;
297
298   /* Open database and precompile statements */
299   plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
300
301   if (NULL == plugin->mc)
302   {
303     LOG(GNUNET_ERROR_TYPE_ERROR,
304    _("Unable to initialize Mysql.\n"));
305     return GNUNET_SYSERR;
306   }
307
308   /* Create tables */
309
310   GNUNET_MYSQL_statement_run (plugin->mc,
311                               "CREATE TABLE IF NOT EXISTS channels (\n"
312                               " id INT AUTO_INCREMENT,\n"
313                               " pub_key BLOB,\n"
314                               " max_state_message_id INT,\n"
315                               " state_hash_message_id INT,\n"
316                               " PRIMARY KEY(id),\n"
317                               " UNIQUE KEY(pub_key(5))\n"
318                               ");");
319
320   GNUNET_MYSQL_statement_run (plugin->mc,
321                               "CREATE TABLE IF NOT EXISTS slaves (\n"
322                               " id INT AUTO_INCREMENT,\n"
323                               " pub_key BLOB,\n"
324                               " PRIMARY KEY(id),\n"
325                               " UNIQUE KEY(pub_key(5))\n"
326                               ");");
327
328   GNUNET_MYSQL_statement_run (plugin->mc,
329                               "CREATE TABLE IF NOT EXISTS membership (\n"
330                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
331                               "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
332                               "  did_join INT NOT NULL,\n"
333                               "  announced_at BIGINT UNSIGNED NOT NULL,\n"
334                               "  effective_since BIGINT UNSIGNED NOT NULL,\n"
335                               "  group_generation BIGINT UNSIGNED NOT NULL\n"
336                               ");");
337
338 /*** FIX because IF NOT EXISTS doesn't work ***/
339   GNUNET_MYSQL_statement_run (plugin->mc,
340                               "CREATE INDEX idx_membership_channel_id_slave_id "
341                               "ON membership (channel_id, slave_id);");
342
343   /** @todo messages table: add method_name column */
344   GNUNET_MYSQL_statement_run (plugin->mc,
345                               "CREATE TABLE IF NOT EXISTS messages (\n"
346                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
347                               "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
348                               "  signature BLOB,\n"
349                               "  purpose BLOB,\n"
350                               "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
351                               "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
352                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
353                               "  group_generation BIGINT UNSIGNED NOT NULL,\n"
354                               "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
355                               "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
356                               "  data BLOB,\n"
357                               "  PRIMARY KEY (channel_id, fragment_id),\n"
358                               "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
359                               ");");
360
361   GNUNET_MYSQL_statement_run (plugin->mc,
362                               "CREATE TABLE IF NOT EXISTS state (\n"
363                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
364                               "  name TEXT NOT NULL,\n"
365                               "  value_current BLOB,\n"
366                               "  value_signed BLOB,\n"
367                               "  PRIMARY KEY (channel_id, name(5))\n"
368                               ");");
369
370   GNUNET_MYSQL_statement_run (plugin->mc,
371                               "CREATE TABLE IF NOT EXISTS state_sync (\n"
372                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
373                               "  name TEXT NOT NULL,\n"
374                               "  value BLOB,\n"
375                               "  PRIMARY KEY (channel_id, name(5))\n"
376                               ");");
377
378   /* Prepare statements */
379   mysql_prepare (plugin->mc,
380                 "BEGIN",
381                 &plugin->transaction_begin);
382
383   mysql_prepare (plugin->mc,
384                 "COMMIT",
385                 &plugin->transaction_commit);
386
387   mysql_prepare (plugin->mc,
388                 "ROLLBACK;",
389                 &plugin->transaction_rollback);
390
391   mysql_prepare (plugin->mc,
392                 "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
393                 &plugin->insert_channel_key);
394
395   mysql_prepare (plugin->mc,
396                 "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
397                 &plugin->insert_slave_key);
398
399   mysql_prepare (plugin->mc,
400                 "INSERT INTO membership\n"
401                 " (channel_id, slave_id, did_join, announced_at,\n"
402                 "  effective_since, group_generation)\n"
403                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
404                 "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
405                 "        ?, ?, ?, ?);",
406                 &plugin->insert_membership);
407
408   mysql_prepare (plugin->mc,
409                 "SELECT did_join FROM membership\n"
410                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
411                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
412                "      AND effective_since <= ? AND did_join = 1\n"
413                "ORDER BY announced_at DESC LIMIT 1;",
414                &plugin->select_membership);
415
416   mysql_prepare (plugin->mc,
417                 "INSERT IGNORE INTO messages\n"
418                " (channel_id, hop_counter, signature, purpose,\n"
419                "  fragment_id, fragment_offset, message_id,\n"
420                "  group_generation, multicast_flags, psycstore_flags, data)\n"
421                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
422                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
423                 &plugin->insert_fragment);
424
425   mysql_prepare (plugin->mc,
426                 "UPDATE messages\n"
427                 "SET psycstore_flags = psycstore_flags | ?\n"
428                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
429                 "      AND message_id = ? AND fragment_offset = 0;",
430                 &plugin->update_message_flags);
431
432   mysql_prepare (plugin->mc,
433                   "SELECT hop_counter, signature, purpose, fragment_id,\n"
434                   "       fragment_offset, message_id, group_generation,\n"
435                   "       multicast_flags, psycstore_flags, data\n"
436                   "FROM messages\n"
437                   "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
438                   "      AND ? <= fragment_id AND fragment_id <= ?;",
439                 &plugin->select_fragments);
440
441   /** @todo select_messages: add method_prefix filter */
442   mysql_prepare (plugin->mc,
443                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
444                 "       fragment_offset, message_id, group_generation,\n"
445                 "       multicast_flags, psycstore_flags, data\n"
446                 "FROM messages\n"
447                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
448                 "      AND ? <= message_id AND message_id <= ?"
449                 "LIMIT ?;",
450                 &plugin->select_messages);
451
452   mysql_prepare (plugin->mc,
453                 "SELECT * FROM\n"
454                 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
455                 "        fragment_offset, message_id, group_generation,\n"
456                 "        multicast_flags, psycstore_flags, data\n"
457                 " FROM messages\n"
458                 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
459                 " ORDER BY fragment_id DESC\n"
460                 " LIMIT ?)\n"
461                 "ORDER BY fragment_id;",
462                 &plugin->select_latest_fragments);
463
464   /** @todo select_latest_messages: add method_prefix filter */
465   mysql_prepare (plugin->mc,
466                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
467                 "       fragment_offset, message_id, group_generation,\n"
468                 "        multicast_flags, psycstore_flags, data\n"
469                 "FROM messages\n"
470                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
471                 "      AND message_id IN\n"
472                 "      (SELECT message_id\n"
473                 "       FROM messages\n"
474                 "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
475                 "       GROUP BY message_id\n"
476                 "       ORDER BY message_id\n"
477                 "       DESC LIMIT ?)\n"
478                 "ORDER BY fragment_id;",
479                 &plugin->select_latest_messages);
480
481   mysql_prepare (plugin->mc,
482                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
483                 "       fragment_offset, message_id, group_generation,\n"
484                 "       multicast_flags, psycstore_flags, data\n"
485                 "FROM messages\n"
486                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
487                 "      AND message_id = ? AND fragment_offset = ?;",
488                 &plugin->select_message_fragment);
489
490   mysql_prepare (plugin->mc,
491                 "SELECT fragment_id, message_id, group_generation\n"
492                 "FROM messages\n"
493                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
494                 "ORDER BY fragment_id DESC LIMIT 1;",
495                 &plugin->select_counters_message);
496
497   mysql_prepare (plugin->mc,
498                 "SELECT max_state_message_id\n"
499                 "FROM channels\n"
500                 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
501                 &plugin->select_counters_state);
502
503   mysql_prepare (plugin->mc,
504                 "UPDATE channels\n"
505                 "SET max_state_message_id = ?\n"
506                 "WHERE pub_key = ?;",
507                 &plugin->update_max_state_message_id);
508
509   mysql_prepare (plugin->mc,
510                 "UPDATE channels\n"
511                 "SET state_hash_message_id = ?\n"
512                 "WHERE pub_key = ?;",
513                 &plugin->update_state_hash_message_id);
514
515   mysql_prepare (plugin->mc,
516                 "REPLACE INTO state\n"
517                 "  (channel_id, name, value_current, value_signed)\n"
518                 "SELECT new.channel_id, new.name,\n"
519                 "       new.value_current, old.value_signed\n"
520                 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
521                 "             AS channel_id,\n"
522                 "             ? AS name, ? AS value_current) AS new\n"
523                 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
524                 "           FROM state) AS old\n"
525                 "ON new.channel_id = old.channel_id AND new.name = old.name;",
526                 &plugin->insert_state_current);
527
528   mysql_prepare (plugin->mc,
529                 "DELETE FROM state\n"
530                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
531                 "      AND (value_current IS NULL OR length(value_current) = 0)\n"
532                 "      AND (value_signed IS NULL OR length(value_signed) = 0);",
533                 &plugin->delete_state_empty);
534
535   mysql_prepare (plugin->mc,
536                 "UPDATE state\n"
537                 "SET value_signed = value_current\n"
538                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
539                 &plugin->update_state_signed);
540
541   mysql_prepare (plugin->mc,
542                 "DELETE FROM state\n"
543                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
544                 &plugin->delete_state);
545
546   mysql_prepare (plugin->mc,
547                 "INSERT INTO state_sync (channel_id, name, value)\n"
548                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
549                 &plugin->insert_state_sync);
550
551   mysql_prepare (plugin->mc,
552                 "INSERT INTO state\n"
553                 " (channel_id, name, value_current, value_signed)\n"
554                 "SELECT channel_id, name, value, value\n"
555                 "FROM state_sync\n"
556                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
557                 &plugin->insert_state_from_sync);
558
559   mysql_prepare (plugin->mc,
560                 "DELETE FROM state_sync\n"
561                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
562                 &plugin->delete_state_sync);
563
564   mysql_prepare (plugin->mc,
565                 "SELECT value_current\n"
566                 "FROM state\n"
567                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
568                 "      AND name = ?;",
569                 &plugin->select_state_one);
570
571   mysql_prepare (plugin->mc,
572                 "SELECT name, value_current\n"
573                 "FROM state\n"
574                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
575                 "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
576                 &plugin->select_state_prefix);
577
578   mysql_prepare (plugin->mc,
579                 "SELECT name, value_signed\n"
580                 "FROM state\n"
581                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
582                 "      AND value_signed IS NOT NULL;",
583                 &plugin->select_state_signed);
584
585   return GNUNET_OK;
586 }
587
588
589 /**
590  * Shutdown database connection and associate data
591  * structures.
592  * @param plugin the plugin context (state for this module)
593  */
594 static void
595 database_shutdown (struct Plugin *plugin)
596 {
597   GNUNET_MYSQL_context_destroy (plugin->mc);
598
599   GNUNET_free_non_null (plugin->fn);
600
601 }
602
603
604 /**
605  * Execute a prepared statement with a @a channel_key argument.
606  *
607  * @param plugin Plugin handle.
608  * @param stmt Statement to execute.
609  * @param channel_key Public key of the channel.
610  *
611  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
612  */
613 static int
614 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
615               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
616 {
617   struct GNUNET_MY_QueryParam params[] = {
618     GNUNET_MY_query_param_auto_from_type (channel_key),
619     GNUNET_MY_query_param_end
620   };
621
622   if(GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
623                                           stmt,
624                                           params))
625   {
626     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
627                 "mysql exec_channel", stmt);
628   }
629
630   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
631   {
632     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
633               "mysql_stmt_reset", stmt);
634     return GNUNET_SYSERR;
635   }
636
637   return GNUNET_OK;
638 }
639
640
641 /**
642  * Begin a transaction.
643  */
644 static int
645 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
646 {
647   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
648  
649   struct GNUNET_MY_QueryParam params[] = {
650     GNUNET_MY_query_param_end
651   };
652
653   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
654                                             stmt,
655                                             params))
656   {
657     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
658                 "mysql extract_result", stmt);
659     return GNUNET_SYSERR;
660   }
661
662   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
663   {
664     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
665               "mysql_stmt_reset", stmt);
666     return GNUNET_SYSERR;
667   }
668
669   plugin->transaction = transaction;
670   return GNUNET_OK;
671 }
672
673
674 /**
675  * Commit current transaction.
676  */
677 static int
678 transaction_commit (struct Plugin *plugin)
679 {
680   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
681
682   struct GNUNET_MY_QueryParam params[] = {
683     GNUNET_MY_query_param_end
684   };
685
686   if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
687                                             stmt,
688                                             params))
689   {
690     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
691                 "mysql extract_result", stmt);
692     return GNUNET_SYSERR;
693   }
694
695   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
696   {
697     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
698               "mysql_stmt_reset", stmt);
699     return GNUNET_SYSERR;
700   }
701
702   plugin->transaction = TRANSACTION_NONE;
703   return GNUNET_OK;
704 }
705
706
707 /**
708  * Roll back current transaction.
709  */
710 static int
711 transaction_rollback (struct Plugin *plugin)
712 {
713   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
714
715   struct GNUNET_MY_QueryParam params[] = {
716     GNUNET_MY_query_param_end
717   };
718
719   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
720                                             stmt,
721                                             params))
722   {
723     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
724                 "mysql extract_result", stmt);
725     return GNUNET_SYSERR;
726   }
727
728   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
729   {
730     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
731               "mysql_stmt_reset", stmt);
732     return GNUNET_SYSERR;
733   }
734
735   plugin->transaction = TRANSACTION_NONE;
736   return GNUNET_OK;
737 }
738
739
740 static int
741 channel_key_store (struct Plugin *plugin,
742                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
743 {
744   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
745
746   struct GNUNET_MY_QueryParam params[] = {
747     GNUNET_MY_query_param_auto_from_type (channel_key),
748     GNUNET_MY_query_param_end
749   };
750
751   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
752                                             stmt,
753                                             params))
754   {
755     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
756                 "mysql exec_prepared", stmt);
757     return GNUNET_SYSERR;
758   }
759
760   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
761   {
762     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
763               "mysql_stmt_reset", stmt);
764     return GNUNET_SYSERR;
765   }
766
767   return GNUNET_OK;
768 }
769
770
771 static int
772 slave_key_store (struct Plugin *plugin,
773                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
774 {
775   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
776
777   struct GNUNET_MY_QueryParam params[] = {
778     GNUNET_MY_query_param_auto_from_type (slave_key),
779     GNUNET_MY_query_param_end
780   };
781
782   if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
783                                             stmt,
784                                             params))
785   {
786     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
787                 "mysql exec_prepared", stmt);
788     return GNUNET_SYSERR;
789   }
790
791   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
792   {
793     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
794               "mysql_stmt_reset", stmt);
795     return GNUNET_SYSERR;
796   }
797
798   return GNUNET_OK;
799 }
800
801
802 /**
803  * Store join/leave events for a PSYC channel in order to be able to answer
804  * membership test queries later.
805  *
806  * @see GNUNET_PSYCSTORE_membership_store()
807  *
808  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
809  */
810 static int
811 mysql_membership_store (void *cls,
812                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
813                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
814                          int did_join,
815                          uint64_t announced_at,
816                          uint64_t effective_since,
817                          uint64_t group_generation)
818 {
819   struct Plugin *plugin = cls;
820
821   uint32_t idid_join = (uint32_t)did_join;
822
823   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
824
825   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
826
827   if (announced_at > INT64_MAX ||
828       effective_since > INT64_MAX ||
829       group_generation > INT64_MAX)
830   {
831     GNUNET_break (0);
832     return GNUNET_SYSERR;
833   }
834
835   if (GNUNET_OK != channel_key_store (plugin, channel_key)
836       || GNUNET_OK != slave_key_store (plugin, slave_key))
837     return GNUNET_SYSERR;
838
839   struct GNUNET_MY_QueryParam params[] = {
840     GNUNET_MY_query_param_auto_from_type (channel_key),
841     GNUNET_MY_query_param_auto_from_type (slave_key),
842     GNUNET_MY_query_param_uint32 (&idid_join),
843     GNUNET_MY_query_param_uint64 (&announced_at),
844     GNUNET_MY_query_param_uint64 (&effective_since),
845     GNUNET_MY_query_param_uint64 (&group_generation),
846     GNUNET_MY_query_param_end
847   };
848
849   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
850                                             stmt,
851                                             params))
852   {
853     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
854                 "mysql exec_prepared", stmt);
855     return GNUNET_SYSERR;
856   }
857
858   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
859   {
860     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
861               "mysql_stmt_reset", stmt);
862     return GNUNET_SYSERR;
863   }
864   return GNUNET_OK;
865 }
866
867 /**
868  * Test if a member was admitted to the channel at the given message ID.
869  *
870  * @see GNUNET_PSYCSTORE_membership_test()
871  *
872  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
873  *         #GNUNET_SYSERR if there was en error.
874  */
875 static int
876 membership_test (void *cls,
877                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
878                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
879                  uint64_t message_id)
880 {
881   struct Plugin *plugin = cls;
882
883   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
884
885   uint32_t did_join = 0;
886
887   int ret = GNUNET_SYSERR;
888
889   struct GNUNET_MY_QueryParam params_select[] = {
890     GNUNET_MY_query_param_auto_from_type (channel_key),
891     GNUNET_MY_query_param_auto_from_type (slave_key),
892     GNUNET_MY_query_param_uint64 (&message_id),
893     GNUNET_MY_query_param_end
894   };
895
896   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
897                               stmt,
898                               params_select))
899   {
900     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
901                 "mysql execute prepared", stmt);
902     return GNUNET_SYSERR;
903   }
904
905   struct GNUNET_MY_ResultSpec results_select[] = {
906     GNUNET_MY_result_spec_uint32 (&did_join),
907     GNUNET_MY_result_spec_end
908   };
909
910   switch(GNUNET_MY_extract_result (stmt,
911                                 results_select))
912   {
913     case GNUNET_NO:
914       ret = GNUNET_NO;
915       break;
916     case GNUNET_OK:
917       ret = GNUNET_YES;
918       break;
919     default:
920       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
921                 "mysql extract_result", stmt);
922       return GNUNET_SYSERR;
923   }
924
925   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
926   {
927     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
928               "mysql_stmt_reset", stmt);
929     return GNUNET_SYSERR;
930   }
931
932   return ret;
933 }
934
935 /**
936  * Store a message fragment sent to a channel.
937  *
938  * @see GNUNET_PSYCSTORE_fragment_store()
939  *
940  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
941  */
942 static int
943 fragment_store (void *cls,
944                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
945                 const struct GNUNET_MULTICAST_MessageHeader *msg,
946                 uint32_t psycstore_flags)
947 {
948   struct Plugin *plugin = cls;
949
950   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
951
952   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
953
954   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
955
956   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
957   uint64_t message_id = GNUNET_ntohll (msg->message_id);
958   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
959
960   uint64_t hop_counter = ntohl(msg->hop_counter);
961   uint64_t flags = ntohl(msg->flags);
962
963   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
964       message_id > INT64_MAX || group_generation > INT64_MAX)
965   {
966     LOG(GNUNET_ERROR_TYPE_ERROR,
967          "Tried to store fragment with a field > INT64_MAX: "
968          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
969          message_id, group_generation);
970     GNUNET_break (0);
971     return GNUNET_SYSERR;
972   }
973
974   if (GNUNET_OK != channel_key_store (plugin, channel_key))
975     return GNUNET_SYSERR;
976
977   struct GNUNET_MY_QueryParam params_insert[] = {
978     GNUNET_MY_query_param_auto_from_type (channel_key),
979     GNUNET_MY_query_param_uint64 (&hop_counter),
980     GNUNET_MY_query_param_auto_from_type (&msg->signature),
981     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
982     GNUNET_MY_query_param_uint64 (&fragment_id),
983     GNUNET_MY_query_param_uint64 (&fragment_offset),
984     GNUNET_MY_query_param_uint64 (&message_id),
985     GNUNET_MY_query_param_uint64 (&group_generation),
986     GNUNET_MY_query_param_uint64 (&flags),
987     GNUNET_MY_query_param_uint32 (&psycstore_flags),
988     GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
989                                                   - sizeof (*msg)),
990     GNUNET_MY_query_param_end
991   };
992
993   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
994                               stmt,
995                               params_insert))
996   {
997     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
998               "mysql execute prepared", stmt);
999     return GNUNET_SYSERR;
1000   }
1001
1002   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1003   {
1004     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1005               "mysql_stmt_reset", stmt);
1006     return GNUNET_SYSERR;
1007   }
1008
1009   return GNUNET_OK;
1010 }
1011
1012 /**
1013  * Set additional flags for a given message.
1014  *
1015  * They are OR'd with any existing flags set.
1016  *
1017  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1018  */
1019 static int
1020 message_add_flags (void *cls,
1021                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1022                    uint64_t message_id,
1023                    uint64_t psycstore_flags)
1024 {
1025   struct Plugin *plugin = cls;
1026   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
1027
1028   int sql_ret;
1029   int ret = GNUNET_SYSERR;
1030
1031   struct GNUNET_MY_QueryParam params_update[] = {
1032     GNUNET_MY_query_param_uint64 (&psycstore_flags),
1033     GNUNET_MY_query_param_auto_from_type (channel_key),
1034     GNUNET_MY_query_param_uint64 (&message_id),
1035     GNUNET_MY_query_param_end
1036   };
1037
1038   sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1039                                             stmt,
1040                                             params_update);
1041   switch(sql_ret)
1042   {
1043     case GNUNET_OK:
1044       ret = GNUNET_OK;
1045       break;
1046     default:
1047        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1048               "mysql execute prepared", stmt);
1049   }
1050
1051   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1052   {
1053     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1054               "mysql_stmt_reset", stmt);
1055     return GNUNET_SYSERR;
1056   }
1057
1058   return ret;
1059 }
1060
1061
1062 static int
1063 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
1064               GNUNET_PSYCSTORE_FragmentCallback cb,
1065               void *cb_cls)
1066 {
1067
1068   uint32_t hop_counter;
1069   void *signature = NULL;
1070   void *purpose = NULL;
1071   size_t signature_size;
1072   size_t purpose_size;
1073
1074   uint64_t fragment_id;
1075   uint64_t fragment_offset;
1076   uint64_t message_id;
1077   uint64_t group_generation;
1078   uint64_t flags;
1079   void *buf;
1080   size_t buf_size;
1081   int ret = GNUNET_SYSERR;
1082   int sql_ret;
1083   struct GNUNET_MULTICAST_MessageHeader *mp;
1084
1085   uint64_t msg_flags;
1086
1087
1088   struct GNUNET_MY_ResultSpec results[] = {
1089     GNUNET_MY_result_spec_uint32 (&hop_counter),
1090     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
1091     GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
1092     GNUNET_MY_result_spec_uint64 (&fragment_id),
1093     GNUNET_MY_result_spec_uint64 (&fragment_offset),
1094     GNUNET_MY_result_spec_uint64 (&message_id),
1095     GNUNET_MY_result_spec_uint64 (&group_generation),
1096     GNUNET_MY_result_spec_uint64 (&msg_flags),
1097     GNUNET_MY_result_spec_uint64 (&flags),
1098     GNUNET_MY_result_spec_variable_size (&buf,
1099                                          &buf_size),
1100     GNUNET_MY_result_spec_end
1101   };
1102
1103   sql_ret = GNUNET_MY_extract_result (stmt,
1104                                 results);
1105   switch (sql_ret)
1106   {
1107     case GNUNET_NO:
1108       if (ret != GNUNET_OK)
1109           ret = GNUNET_NO;
1110       break;
1111     case GNUNET_OK:
1112
1113       mp = GNUNET_malloc (sizeof (*mp) + buf_size); 
1114       
1115       mp->header.size = htons (sizeof (*mp) + buf_size);
1116       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1117       mp->hop_counter = htonl (hop_counter);
1118       GNUNET_memcpy (&mp->signature,
1119                     signature,
1120                     signature_size);
1121       GNUNET_memcpy (&mp->purpose,
1122                     purpose,
1123                     purpose_size);
1124       mp->fragment_id = GNUNET_htonll (fragment_id);
1125       mp->fragment_offset = GNUNET_htonll (fragment_offset);
1126       mp->message_id = GNUNET_htonll (message_id);
1127       mp->group_generation = GNUNET_htonll (group_generation);
1128       mp->flags = htonl(msg_flags);
1129
1130       GNUNET_memcpy (&mp[1],
1131                     buf,
1132                     buf_size);
1133       ret = cb (cb_cls,
1134                 mp,
1135                 (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1136       
1137       GNUNET_MY_cleanup_result (results);
1138  
1139       break;
1140     default:
1141       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1142                     "mysql extract_result", stmt);
1143   }
1144
1145   return ret;
1146 }
1147
1148
1149 static int
1150 fragment_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1151                  struct GNUNET_MY_QueryParam *params,
1152                  uint64_t *returned_fragments,
1153                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1154 {
1155   int ret = GNUNET_SYSERR;
1156   int sql_ret;
1157
1158   if(NULL == plugin->mc)
1159   {
1160     fprintf(stderr, "bla\n");
1161   }
1162
1163   if(NULL == stmt)
1164   {
1165     fprintf(stderr, "blo\n" );
1166   }
1167
1168   if(NULL == params)
1169   {
1170     fprintf(stderr, "toot\n" );
1171   }
1172
1173   sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1174                                     stmt,
1175                                     params);
1176   switch(sql_ret)
1177   {
1178     case GNUNET_NO:
1179       if (ret != GNUNET_OK)
1180           ret = GNUNET_NO;
1181       break;
1182     case GNUNET_YES:
1183        ret = fragment_row (stmt, cb, cb_cls);
1184        (*returned_fragments)++;
1185       break;
1186     default:
1187       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1188                 "mysql exec_prepared", stmt);   
1189   }
1190
1191   return ret;
1192 }
1193
1194 /**
1195  * Retrieve a message fragment range by fragment ID.
1196  *
1197  * @see GNUNET_PSYCSTORE_fragment_get()
1198  *
1199  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1200  */
1201 static int
1202 fragment_get (void *cls,
1203               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1204               uint64_t first_fragment_id,
1205               uint64_t last_fragment_id,
1206               uint64_t *returned_fragments,
1207               GNUNET_PSYCSTORE_FragmentCallback cb,
1208               void *cb_cls)
1209 {
1210   struct Plugin *plugin = cls;
1211   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1212   int ret = GNUNET_SYSERR;
1213   *returned_fragments = 0;
1214
1215   struct GNUNET_MY_QueryParam params_select[] = {
1216     GNUNET_MY_query_param_auto_from_type (channel_key),
1217     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1218     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1219     GNUNET_MY_query_param_end
1220   };
1221
1222   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1223
1224   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1225   {
1226     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1227               "mysql_stmt_reset", stmt);
1228     return GNUNET_SYSERR;
1229   }
1230
1231   return ret;
1232 }
1233
1234
1235 /**
1236  * Retrieve a message fragment range by fragment ID.
1237  *
1238  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1239  *
1240  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1241  */
1242 static int
1243 fragment_get_latest (void *cls,
1244                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1245                      uint64_t fragment_limit,
1246                      uint64_t *returned_fragments,
1247                      GNUNET_PSYCSTORE_FragmentCallback cb,
1248                      void *cb_cls)
1249 {
1250   struct Plugin *plugin = cls;
1251
1252   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1253
1254   int ret = GNUNET_SYSERR;
1255   *returned_fragments = 0;
1256
1257   struct GNUNET_MY_QueryParam params_select[] = {
1258     GNUNET_MY_query_param_auto_from_type (channel_key),
1259     GNUNET_MY_query_param_uint64 (&fragment_limit),
1260     GNUNET_MY_query_param_end
1261   };
1262
1263   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1264
1265   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1266   {
1267     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1268               "mysql_stmt_reset", stmt);
1269     return GNUNET_SYSERR;
1270   }
1271
1272   return ret;
1273 }
1274
1275
1276 /**
1277  * Retrieve all fragments of a message ID range.
1278  *
1279  * @see GNUNET_PSYCSTORE_message_get()
1280  *
1281  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1282  */
1283 static int
1284 message_get (void *cls,
1285              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1286              uint64_t first_message_id,
1287              uint64_t last_message_id,
1288              uint64_t fragment_limit,
1289              uint64_t *returned_fragments,
1290              GNUNET_PSYCSTORE_FragmentCallback cb,
1291              void *cb_cls)
1292 {
1293   struct Plugin *plugin = cls;
1294
1295   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1296
1297   int ret = GNUNET_SYSERR;
1298   *returned_fragments = 0;
1299
1300   struct GNUNET_MY_QueryParam params_select[] = {
1301     GNUNET_MY_query_param_auto_from_type (channel_key),
1302     GNUNET_MY_query_param_uint64 (&first_message_id),
1303     GNUNET_MY_query_param_uint64 (&last_message_id),
1304     GNUNET_MY_query_param_uint64 (&fragment_limit),
1305     GNUNET_MY_query_param_end
1306   };
1307
1308   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1309
1310   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1311   {
1312     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1313               "mysql_stmt_reset", stmt);
1314     return GNUNET_SYSERR;
1315   }
1316
1317   return ret;
1318 }
1319
1320
1321 /**
1322  * Retrieve all fragments of the latest messages.
1323  *
1324  * @see GNUNET_PSYCSTORE_message_get_latest()
1325  *
1326  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1327  */
1328 static int
1329 message_get_latest (void *cls,
1330                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1331                     uint64_t message_limit,
1332                     uint64_t *returned_fragments,
1333                     GNUNET_PSYCSTORE_FragmentCallback cb,
1334                     void *cb_cls)
1335 {
1336   struct Plugin *plugin = cls;
1337
1338   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1339
1340   int ret = GNUNET_SYSERR;
1341   *returned_fragments = 0;
1342
1343   struct GNUNET_MY_QueryParam params_select[] = {
1344     GNUNET_MY_query_param_auto_from_type (channel_key),
1345     GNUNET_MY_query_param_auto_from_type (channel_key),
1346     GNUNET_MY_query_param_uint64 (&message_limit),
1347     GNUNET_MY_query_param_end
1348   };
1349
1350   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1351
1352   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1353   {
1354     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1355               "mysql_stmt_reset", stmt);
1356     return GNUNET_SYSERR;
1357   }
1358
1359   return ret;
1360 }
1361
1362
1363 /**
1364  * Retrieve a fragment of message specified by its message ID and fragment
1365  * offset.
1366  *
1367  * @see GNUNET_PSYCSTORE_message_get_fragment()
1368  *
1369  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1370  */
1371 static int
1372 message_get_fragment (void *cls,
1373                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1374                       uint64_t message_id,
1375                       uint64_t fragment_offset,
1376                       GNUNET_PSYCSTORE_FragmentCallback cb,
1377                       void *cb_cls)
1378 {
1379   struct Plugin *plugin = cls;
1380   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1381   int sql_ret;
1382   int ret = GNUNET_SYSERR;
1383
1384   struct GNUNET_MY_QueryParam params_select[] = {
1385     GNUNET_MY_query_param_auto_from_type (channel_key),
1386     GNUNET_MY_query_param_uint64 (&message_id),
1387     GNUNET_MY_query_param_uint64 (&fragment_offset),
1388     GNUNET_MY_query_param_end
1389   };
1390
1391   sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1392                                             stmt,
1393                                             params_select);
1394   switch(sql_ret)
1395   {
1396     case GNUNET_NO:
1397       ret = GNUNET_NO;
1398       break;
1399     case GNUNET_OK:
1400       ret = fragment_row (stmt, cb, cb_cls);
1401       break;
1402     default:
1403       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1404               "mysql execute prepared", stmt);
1405   }
1406
1407   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1408   {
1409     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1410               "mysql_stmt_reset", stmt);
1411     return GNUNET_SYSERR;
1412   }
1413
1414   return ret;
1415 }
1416
1417 /**
1418  * Retrieve the max. values of message counters for a channel.
1419  *
1420  * @see GNUNET_PSYCSTORE_counters_get()
1421  *
1422  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1423  */
1424 static int
1425 counters_message_get (void *cls,
1426                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1427                       uint64_t *max_fragment_id,
1428                       uint64_t *max_message_id,
1429                       uint64_t *max_group_generation)
1430 {
1431   struct Plugin *plugin = cls;
1432
1433   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1434
1435   int ret = GNUNET_SYSERR;
1436
1437   struct GNUNET_MY_QueryParam params_select[] = {
1438     GNUNET_MY_query_param_auto_from_type (channel_key),
1439     GNUNET_MY_query_param_end
1440   };
1441
1442   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1443                                             stmt,
1444                                             params_select))
1445   {
1446     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1447               "mysql execute prepared", stmt);
1448     return GNUNET_SYSERR;
1449   }
1450
1451   struct GNUNET_MY_ResultSpec results_select[] = {
1452     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1453     GNUNET_MY_result_spec_uint64 (max_message_id),
1454     GNUNET_MY_result_spec_uint64 (max_group_generation),
1455     GNUNET_MY_result_spec_end
1456   };
1457
1458   ret = GNUNET_MY_extract_result (stmt,
1459                                   results_select);
1460
1461   if (GNUNET_OK != ret)
1462   {
1463     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1464               "mysql extract_result", stmt);
1465     return GNUNET_SYSERR;
1466   }
1467
1468   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1469   {
1470     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1471               "mysql_stmt_reset", stmt);
1472     return GNUNET_SYSERR;
1473   }
1474
1475   return ret;
1476 }
1477
1478 /**
1479  * Retrieve the max. values of state counters for a channel.
1480  *
1481  * @see GNUNET_PSYCSTORE_counters_get()
1482  *
1483  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1484  */
1485 static int
1486 counters_state_get (void *cls,
1487                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1488                     uint64_t *max_state_message_id)
1489 {
1490   struct Plugin *plugin = cls;
1491
1492   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1493
1494   int ret = GNUNET_SYSERR;
1495
1496   struct GNUNET_MY_QueryParam params_select[] = {
1497     GNUNET_MY_query_param_auto_from_type (channel_key),
1498     GNUNET_MY_query_param_end
1499   };
1500
1501   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1502                                             stmt,
1503                                             params_select))
1504   {
1505     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1506               "mysql execute prepared", stmt);
1507     return GNUNET_SYSERR;
1508   }
1509
1510   struct GNUNET_MY_ResultSpec results_select[] = {
1511     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1512     GNUNET_MY_result_spec_end
1513   };
1514
1515   ret = GNUNET_MY_extract_result (stmt,
1516                                   results_select);
1517
1518   if (GNUNET_OK != ret)
1519   {
1520     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1521               "mysql extract_result", stmt);
1522     return GNUNET_SYSERR;
1523   }
1524
1525   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1526   {
1527     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1528               "mysql_stmt_reset", stmt);
1529     return GNUNET_SYSERR;
1530   }
1531
1532   return ret;
1533 }
1534
1535
1536 /**
1537  * Assign a value to a state variable.
1538  *
1539  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1540  */
1541 static int
1542 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1543               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1544               const char *name, const void *value, size_t value_size)
1545 {
1546   int ret = GNUNET_SYSERR;
1547
1548   struct GNUNET_MY_QueryParam params[] = {
1549     GNUNET_MY_query_param_auto_from_type (channel_key),
1550     GNUNET_MY_query_param_string (name),
1551     GNUNET_MY_query_param_auto_from_type (value),
1552     GNUNET_MY_query_param_end
1553   };
1554
1555   ret = GNUNET_MY_exec_prepared (plugin->mc,
1556                                             stmt,
1557                                             params);
1558
1559   if (GNUNET_OK != ret)
1560   {
1561     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1562               "mysql execute prepared", stmt);
1563     return GNUNET_SYSERR;
1564   }
1565
1566   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1567   {
1568     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1569               "mysql_stmt_reset", stmt);
1570     return GNUNET_SYSERR;
1571   }
1572
1573   return ret;
1574 }
1575
1576
1577 static int
1578 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1579                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1580                    uint64_t message_id)
1581 {
1582   struct GNUNET_MY_QueryParam params[] = {
1583     GNUNET_MY_query_param_uint64 (&message_id),
1584     GNUNET_MY_query_param_auto_from_type (channel_key),
1585     GNUNET_MY_query_param_end
1586   };
1587
1588   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1589                                             stmt,
1590                                             params))
1591   {
1592     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1593               "mysql execute prepared", stmt);
1594     return GNUNET_SYSERR;
1595   }
1596
1597   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1598   {
1599     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1600               "mysql_stmt_reset", stmt);
1601     return GNUNET_SYSERR;
1602   }
1603
1604   return GNUNET_OK;
1605 }
1606
1607
1608 /**
1609  * Begin modifying current state.
1610  */
1611 static int
1612 state_modify_begin (void *cls,
1613                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1614                     uint64_t message_id, uint64_t state_delta)
1615 {
1616   struct Plugin *plugin = cls;
1617
1618   if (state_delta > 0)
1619   {
1620     /**
1621      * We can only apply state modifiers in the current message if modifiers in
1622      * the previous stateful message (message_id - state_delta) were already
1623      * applied.
1624      */
1625
1626     uint64_t max_state_message_id = 0;
1627     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1628     switch (ret)
1629     {
1630     case GNUNET_OK:
1631     case GNUNET_NO: // no state yet
1632       ret = GNUNET_OK;
1633       break;
1634     default:
1635       return ret;
1636     }
1637
1638     if (max_state_message_id < message_id - state_delta)
1639       return GNUNET_NO; /* some stateful messages not yet applied */
1640     else if (message_id - state_delta < max_state_message_id)
1641       return GNUNET_NO; /* changes already applied */
1642   }
1643
1644   if (TRANSACTION_NONE != plugin->transaction)
1645   {
1646     /** @todo FIXME: wait for other transaction to finish  */
1647     return GNUNET_SYSERR;
1648   }
1649   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1650 }
1651
1652
1653 /**
1654  * Set the current value of state variable.
1655  *
1656  * @see GNUNET_PSYCSTORE_state_modify()
1657  *
1658  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1659  */
1660 static int
1661 state_modify_op (void *cls,
1662                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1663                  enum GNUNET_PSYC_Operator op,
1664                  const char *name, const void *value, size_t value_size)
1665 {
1666   struct Plugin *plugin = cls;
1667   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1668
1669   switch (op)
1670   {
1671   case GNUNET_PSYC_OP_ASSIGN:
1672     return state_assign (plugin, plugin->insert_state_current, channel_key,
1673                          name, value, value_size);
1674
1675   default: /** @todo implement more state operations */
1676     GNUNET_break (0);
1677     return GNUNET_SYSERR;
1678   }
1679 }
1680
1681
1682 /**
1683  * End modifying current state.
1684  */
1685 static int
1686 state_modify_end (void *cls,
1687                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1688                   uint64_t message_id)
1689 {
1690   struct Plugin *plugin = cls;
1691   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1692
1693   return
1694     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1695     && GNUNET_OK == update_message_id (plugin,
1696                                        plugin->update_max_state_message_id,
1697                                        channel_key, message_id)
1698     && GNUNET_OK == transaction_commit (plugin)
1699     ? GNUNET_OK : GNUNET_SYSERR;
1700 }
1701
1702
1703 /**
1704  * Begin state synchronization.
1705  */
1706 static int
1707 state_sync_begin (void *cls,
1708                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1709 {
1710   struct Plugin *plugin = cls;
1711   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1712 }
1713
1714
1715 /**
1716  * Assign current value of a state variable.
1717  *
1718  * @see GNUNET_PSYCSTORE_state_modify()
1719  *
1720  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1721  */
1722 static int
1723 state_sync_assign (void *cls,
1724                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1725                 const char *name, const void *value, size_t value_size)
1726 {
1727   struct Plugin *plugin = cls;
1728   return state_assign (cls, plugin->insert_state_sync, channel_key,
1729                        name, value, value_size);
1730 }
1731
1732
1733 /**
1734  * End modifying current state.
1735  */
1736 static int
1737 state_sync_end (void *cls,
1738                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1739                 uint64_t max_state_message_id,
1740                 uint64_t state_hash_message_id)
1741 {
1742   struct Plugin *plugin = cls;
1743   int ret = GNUNET_SYSERR;
1744
1745   if (TRANSACTION_NONE != plugin->transaction)
1746   {
1747     /** @todo FIXME: wait for other transaction to finish  */
1748     return GNUNET_SYSERR;
1749   }
1750
1751   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1752     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1753     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1754                                   channel_key)
1755     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1756                                   channel_key)
1757     && GNUNET_OK == update_message_id (plugin,
1758                                        plugin->update_state_hash_message_id,
1759                                        channel_key, state_hash_message_id)
1760     && GNUNET_OK == update_message_id (plugin,
1761                                        plugin->update_max_state_message_id,
1762                                        channel_key, max_state_message_id)
1763     && GNUNET_OK == transaction_commit (plugin)
1764     ? ret = GNUNET_OK
1765     : transaction_rollback (plugin);
1766   return ret;
1767 }
1768
1769
1770 /**
1771  * Delete the whole state.
1772  *
1773  * @see GNUNET_PSYCSTORE_state_reset()
1774  *
1775  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1776  */
1777 static int
1778 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1779 {
1780   struct Plugin *plugin = cls;
1781   return exec_channel (plugin, plugin->delete_state, channel_key);
1782 }
1783
1784
1785 /**
1786  * Update signed values of state variables in the state store.
1787  *
1788  * @see GNUNET_PSYCSTORE_state_hash_update()
1789  *
1790  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1791  */
1792 static int
1793 state_update_signed (void *cls,
1794                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1795 {
1796   struct Plugin *plugin = cls;
1797   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1798 }
1799
1800
1801 /**
1802  * Retrieve a state variable by name.
1803  *
1804  * @see GNUNET_PSYCSTORE_state_get()
1805  *
1806  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1807  */
1808 static int
1809 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1810            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1811 {
1812   struct Plugin *plugin = cls;
1813   int ret = GNUNET_SYSERR;
1814   int sql_ret ;
1815
1816   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1817
1818   struct GNUNET_MY_QueryParam params_select[] = {
1819     GNUNET_MY_query_param_auto_from_type (channel_key),
1820     GNUNET_MY_query_param_string (name),
1821     GNUNET_MY_query_param_end
1822   };
1823
1824   void *value_current = NULL;
1825   size_t value_size = 0;
1826
1827   struct GNUNET_MY_ResultSpec results[] = {
1828     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1829     GNUNET_MY_result_spec_end
1830   };
1831
1832   GNUNET_MY_exec_prepared (plugin->mc,
1833                           stmt,
1834                           params_select);
1835
1836
1837   sql_ret = GNUNET_MY_extract_result (stmt,
1838                                       results);
1839
1840   switch (sql_ret)
1841   {
1842     case GNUNET_NO:
1843       ret = GNUNET_NO;
1844       break;
1845     case GNUNET_YES:
1846       ret = cb (cb_cls, name, value_current,
1847                 value_size);
1848       break;
1849     default:
1850       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1851               "mysql extract_result", stmt);
1852   }
1853
1854   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1855   {
1856     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1857               "mysql_stmt_reset", stmt);
1858     return GNUNET_SYSERR;
1859   }
1860
1861   return ret;
1862 }
1863
1864
1865 /**
1866  * Retrieve all state variables for a channel with the given prefix.
1867  *
1868  * @see GNUNET_PSYCSTORE_state_get_prefix()
1869  *
1870  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1871  */
1872 static int
1873 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1874                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1875                   void *cb_cls)
1876 {
1877   struct Plugin *plugin = cls;
1878   int ret = GNUNET_SYSERR;
1879
1880   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1881
1882   uint32_t name_len = (uint32_t) strlen (name);
1883
1884   struct GNUNET_MY_QueryParam params_select[] = {
1885     GNUNET_MY_query_param_auto_from_type (channel_key),
1886     GNUNET_MY_query_param_string (name),
1887     GNUNET_MY_query_param_uint32 (&name_len),
1888     GNUNET_MY_query_param_string (name),
1889     GNUNET_MY_query_param_end
1890   };
1891
1892   char *name2 = "";
1893   void *value_current = NULL;
1894   size_t value_size = 0;
1895
1896   struct GNUNET_MY_ResultSpec results[] = {
1897     GNUNET_MY_result_spec_string (&name2),
1898     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1899     GNUNET_MY_result_spec_end
1900   };
1901
1902   int sql_ret;
1903
1904   do
1905   {
1906     GNUNET_MY_exec_prepared (plugin->mc,
1907                             stmt,
1908                             params_select);
1909     sql_ret = GNUNET_MY_extract_result (stmt,
1910                                         results);
1911     switch (sql_ret)
1912     {
1913       case GNUNET_NO:
1914         if (ret != GNUNET_OK)
1915           ret = GNUNET_NO;
1916         break;
1917       case GNUNET_YES:
1918         ret = cb (cb_cls, (const char *) name2,
1919                   value_current,
1920                   value_size);
1921
1922         if (ret != GNUNET_YES)
1923           sql_ret = GNUNET_NO;
1924         break;
1925       default:
1926         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1927               "mysql extract_result", stmt);
1928     }
1929   }
1930   while (sql_ret == GNUNET_YES);
1931
1932   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1933   {
1934     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1935               "mysql_stmt_reset", stmt);
1936     return GNUNET_SYSERR;
1937   }
1938
1939   return ret;
1940 }
1941
1942
1943 /**
1944  * Retrieve all signed state variables for a channel.
1945  *
1946  * @see GNUNET_PSYCSTORE_state_get_signed()
1947  *
1948  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1949  */
1950 static int
1951 state_get_signed (void *cls,
1952                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1953                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1954 {
1955   struct Plugin *plugin = cls;
1956   int ret = GNUNET_SYSERR;
1957
1958   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1959
1960   struct GNUNET_MY_QueryParam params_select[] = {
1961     GNUNET_MY_query_param_auto_from_type (channel_key),
1962     GNUNET_MY_query_param_end
1963   };
1964
1965   int sql_ret;
1966
1967   char *name = "";
1968   void *value_signed = NULL;
1969   size_t value_size = 0;
1970
1971   struct GNUNET_MY_ResultSpec results[] = {
1972     GNUNET_MY_result_spec_string (&name),
1973     GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1974     GNUNET_MY_result_spec_end
1975   };
1976
1977   do
1978   {
1979     GNUNET_MY_exec_prepared (plugin->mc,
1980                              stmt,
1981                              params_select);
1982     sql_ret = GNUNET_MY_extract_result (stmt,
1983                                         results);
1984
1985     switch (sql_ret)
1986     {
1987       case GNUNET_NO:
1988         if (ret != GNUNET_OK)
1989           ret = GNUNET_NO;
1990         break;
1991       case GNUNET_YES:
1992         ret = cb (cb_cls, (const char *) name,
1993                   value_signed,
1994                   value_size);
1995
1996         if (ret != GNUNET_YES)
1997             sql_ret = GNUNET_NO;
1998         break;
1999       default:
2000          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2001               "mysql extract_result", stmt);
2002     }
2003   }
2004   while (sql_ret == GNUNET_YES);
2005
2006   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
2007   {
2008     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2009               "mysql_stmt_reset", stmt);
2010     return GNUNET_SYSERR;
2011   }
2012
2013   return ret;
2014 }
2015
2016
2017 /**
2018  * Entry point for the plugin.
2019  *
2020  * @param cls The struct GNUNET_CONFIGURATION_Handle.
2021  * @return NULL on error, otherwise the plugin context
2022  */
2023 void *
2024 libgnunet_plugin_psycstore_mysql_init (void *cls)
2025 {
2026   static struct Plugin plugin;
2027   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
2028   struct GNUNET_PSYCSTORE_PluginFunctions *api;
2029
2030   if (NULL != plugin.cfg)
2031     return NULL;                /* can only initialize once! */
2032   memset (&plugin, 0, sizeof (struct Plugin));
2033   plugin.cfg = cfg;
2034   if (GNUNET_OK != database_setup (&plugin))
2035   {
2036     database_shutdown (&plugin);
2037     return NULL;
2038   }
2039   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
2040   api->cls = &plugin;
2041   api->membership_store = &mysql_membership_store;
2042   api->membership_test = &membership_test;
2043   api->fragment_store = &fragment_store;
2044   api->message_add_flags = &message_add_flags;
2045   api->fragment_get = &fragment_get;
2046   api->fragment_get_latest = &fragment_get_latest;
2047   api->message_get = &message_get;
2048   api->message_get_latest = &message_get_latest;
2049   api->message_get_fragment = &message_get_fragment;
2050   api->counters_message_get = &counters_message_get;
2051   api->counters_state_get = &counters_state_get;
2052   api->state_modify_begin = &state_modify_begin;
2053   api->state_modify_op = &state_modify_op;
2054   api->state_modify_end = &state_modify_end;
2055   api->state_sync_begin = &state_sync_begin;
2056   api->state_sync_assign = &state_sync_assign;
2057   api->state_sync_end = &state_sync_end;
2058   api->state_reset = &state_reset;
2059   api->state_update_signed = &state_update_signed;
2060   api->state_get = &state_get;
2061   api->state_get_prefix = &state_get_prefix;
2062   api->state_get_signed = &state_get_signed;
2063
2064   LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
2065   return api;
2066 }
2067
2068
2069 /**
2070  * Exit point from the plugin.
2071  *
2072  * @param cls The plugin context (as returned by "init")
2073  * @return Always NULL
2074  */
2075 void *
2076 libgnunet_plugin_psycstore_mysql_done (void *cls)
2077 {
2078   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2079   struct Plugin *plugin = api->cls;
2080
2081   database_shutdown (plugin);
2082   plugin->cfg = NULL;
2083   GNUNET_free (api);
2084   LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
2085   return NULL;
2086 }
2087
2088 /* end of plugin_psycstore_mysql.c */