- fix coverity
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_mysql.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  */
28
29 /*
30  * FIXME: SQLite3 only supports signed 64-bit integers natively,
31  *        thus it can only store 63 bits of the uint64_t's.
32  */
33
34 #include "platform.h"
35 #include "gnunet_psycstore_plugin.h"
36 #include "gnunet_psycstore_service.h"
37 #include "gnunet_multicast_service.h"
38 #include "gnunet_crypto_lib.h"
39 #include "gnunet_psyc_util_lib.h"
40 #include "psycstore.h"
41 #include "gnunet_my_lib.h"
42 #include "gnunet_mysql_lib.h"
43
44 #include <sqlite3.h>
45 #include <mysql/mysql.h>
46
47 /**
48  * After how many ms "busy" should a DB operation fail for good?  A
49  * low value makes sure that we are more responsive to requests
50  * (especially PUTs).  A high value guarantees a higher success rate
51  * (SELECTs in iterate can take several seconds despite LIMIT=1).
52  *
53  * The default value of 1s should ensure that users do not experience
54  * huge latencies while at the same time allowing operations to
55  * succeed with reasonable probability.
56  */
57 #define BUSY_TIMEOUT_MS 1000
58
59 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
60
61 /**
62  * Log an error message at log-level 'level' that indicates
63  * a failure of the command 'cmd' on file 'filename'
64  * with the message given by strerror(errno).
65  */
66 #define LOG_MYSQL(db, level, cmd, stmt) do { GNUNET_log_from (level, "psycstore-mysql", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_stmt_error (stmt)); } while(0)
67
68 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
69
70 enum Transactions {
71   TRANSACTION_NONE = 0,
72   TRANSACTION_STATE_MODIFY,
73   TRANSACTION_STATE_SYNC,
74 };
75
76 /**
77  * Context for all functions in this plugin.
78  */
79 struct Plugin
80 {
81
82   const struct GNUNET_CONFIGURATION_Handle *cfg;
83
84   /**
85    * Database filename.
86    */
87   char *fn;
88
89   /**
90     *Handle to talk to Mysql
91     */
92   struct GNUNET_MYSQL_Context *mc;
93
94   /**
95    * Current transaction.
96    */
97   enum Transactions transaction;
98
99   struct GNUNET_MYSQL_StatementHandle *transaction_begin;
100
101   struct GNUNET_MYSQL_StatementHandle *transaction_commit;
102
103   struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
104
105   /**
106    * Precompiled SQL for channel_key_store()
107    */
108   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
109
110
111   /**
112    * Precompiled SQL for slave_key_store()
113    */
114   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
115
116   /**
117    * Precompiled SQL for membership_store()
118    */
119   struct GNUNET_MYSQL_StatementHandle *insert_membership;
120
121   /**
122    * Precompiled SQL for membership_test()
123    */
124   struct GNUNET_MYSQL_StatementHandle *select_membership;
125
126   /**
127    * Precompiled SQL for fragment_store()
128    */
129   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
130
131   /**
132    * Precompiled SQL for message_add_flags()
133    */
134   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
135
136   /**
137    * Precompiled SQL for fragment_get()
138    */
139   struct GNUNET_MYSQL_StatementHandle *select_fragments;
140
141   /**
142    * Precompiled SQL for fragment_get()
143    */
144   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
145
146   /**
147    * Precompiled SQL for message_get()
148    */
149   struct GNUNET_MYSQL_StatementHandle *select_messages;
150
151   /**
152    * Precompiled SQL for message_get()
153    */
154   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
155
156   /**
157    * Precompiled SQL for message_get_fragment()
158    */
159   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
160
161   /**
162    * Precompiled SQL for counters_get_message()
163    */
164   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
165
166   /**
167    * Precompiled SQL for counters_get_state()
168    */
169   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
170
171   /**
172    * Precompiled SQL for state_modify_end()
173    */
174   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
175
176   /**
177    * Precompiled SQL for state_sync_end()
178    */
179   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
180
181   /**
182    * Precompiled SQL for state_modify_op()
183    */
184   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
185
186   /**
187    * Precompiled SQL for state_modify_end()
188    */
189   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
190
191   /**
192    * Precompiled SQL for state_set_signed()
193    */
194   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
195
196   /**
197    * Precompiled SQL for state_sync()
198    */
199   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
200
201   /**
202    * Precompiled SQL for state_sync()
203    */
204   struct GNUNET_MYSQL_StatementHandle *delete_state;
205
206   /**
207    * Precompiled SQL for state_sync()
208    */
209   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
210
211   /**
212    * Precompiled SQL for state_sync()
213    */
214   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
215
216   /**
217    * Precompiled SQL for state_get_signed()
218    */
219   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
220
221   /**
222    * Precompiled SQL for state_get()
223    */
224   struct GNUNET_MYSQL_StatementHandle *select_state_one;
225
226   /**
227    * Precompiled SQL for state_get_prefix()
228    */
229   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
230
231 };
232
233 #if DEBUG_PSYCSTORE
234
235 static void
236 sql_trace (void *cls, const char *sql)
237 {
238   LOG (GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
239 }
240
241 #endif
242
243
244 /**
245  * @brief Prepare a SQL statement
246  *
247  * @param dbh handle to the database
248  * @param sql SQL statement, UTF-8 encoded
249  * @param stmt set to the prepared statement
250  * @return 0 on success
251  */
252 static int
253 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
254               const char *sql, 
255               struct GNUNET_MYSQL_StatementHandle *stmt)
256 {
257   stmt = GNUNET_MYSQL_statement_prepare (mc,
258                                           sql);
259
260   LOG (GNUNET_ERROR_TYPE_DEBUG,
261        "Prepared `%s' / %p\n", sql, stmt);
262   if(NULL == stmt)
263     LOG (GNUNET_ERROR_TYPE_ERROR,
264    _("Error preparing SQL query: %s\n  %s\n"),
265    mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (stmt)), sql);
266
267   return 1;
268 }
269
270
271 /**
272  * @brief Prepare a SQL statement
273  *
274  * @param dbh handle to the database
275  * @param sql SQL statement, UTF-8 encoded
276  * @return 0 on success
277  */
278 static int
279 mysql_exec (struct GNUNET_MYSQL_Context *mc,
280             struct GNUNET_MYSQL_StatementHandle *sh,
281             struct GNUNET_MY_QueryParam *qp)
282 {
283   int result;
284
285   result = GNUNET_MY_exec_prepared (mc, sh, qp);
286   LOG (GNUNET_ERROR_TYPE_DEBUG,
287        "Executed `GNUNET_MY_exec_prepared`' / %d\n", result);
288   if (GNUNET_OK != result)
289     LOG (GNUNET_ERROR_TYPE_ERROR,
290    _("Error executing SQL query: %s\n"),
291    mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (sh)));
292   return result;
293 }
294
295
296 /**
297  * Initialize the database connections and associated
298  * data structures (create tables and indices
299  * as needed as well).
300  *
301  * @param plugin the plugin context (state for this module)
302  * @return GNUNET_OK on success
303  */
304 static int
305 database_setup (struct Plugin *plugin)
306 {
307   char *filename;
308
309   if (GNUNET_OK !=
310       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
311                                                "FILENAME", &filename))
312   {
313     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
314                                "psycstore-sqlite", "FILENAME");
315     return GNUNET_SYSERR;
316   }
317   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
318   {
319     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
320     {
321       GNUNET_break (0);
322       GNUNET_free (filename);
323       return GNUNET_SYSERR;
324     }
325   }
326   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
327   plugin->fn = filename;
328
329   /* Open database and precompile statements */
330   plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
331
332   if (NULL == plugin->mc)
333   {
334     LOG (GNUNET_ERROR_TYPE_ERROR,
335    _("Unable to initialize SQLite: %s.\n"),
336     sqlite3_errmsg (plugin->dbh));
337     return GNUNET_SYSERR; 
338   }
339   
340 /*
341 #if DEBUG_PSYCSTORE
342   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
343 #endif
344
345   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
346   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
347   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
348   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
349   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
350 #if ! DEBUG_PSYCSTORE
351   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
352 #endif
353   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
354
355   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
356
357 */
358   /* Create tables */
359
360   GNUNET_MYSQL_statement_run (plugin->mc,
361                               "CREATE TABLE IF NOT EXISTS channels (\n"
362                               "  id INT PRIMARY KEY,\n"
363                               "  pub_key BLOB UNIQUE,\n"
364                               "  max_state_message_id INT,\n" // last applied state message ID
365                               "  state_hash_message_id INT\n" // last message ID with a state hash
366                               ");");
367
368   GNUNET_MYSQL_statement_run (plugin->mc,
369                               "CREATE TABLE IF NOT EXISTS slaves (\n"
370                               "  id INT PRIMARY KEY,\n"
371                               "  pub_key BLOB UNIQUE\n"
372                               ");");
373
374   GNUNET_MYSQL_statement_run (plugin->mc,
375                               "CREATE TABLE IF NOT EXISTS membership (\n"
376                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
377                               "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
378                               "  did_join INT NOT NULL,\n"
379                               "  announced_at INT NOT NULL,\n"
380                               "  effective_since INT NOT NULL,\n"
381                               "  group_generation INT NOT NULL\n"
382                               ");");
383
384   GNUNET_MYSQL_statement_run (plugin->mc,
385                               "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
386                               "ON membership (channel_id, slave_id);");
387
388   /** @todo messages table: add method_name column */
389   GNUNET_MYSQL_statement_run (plugin->mc,
390                               "CREATE TABLE IF NOT EXISTS messages (\n"
391                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
392                               "  hop_counter INT NOT NULL,\n"
393                               "  signature BLOB,\n"
394                               "  purpose BLOB,\n"
395                               "  fragment_id INT NOT NULL,\n"
396                               "  fragment_offset INT NOT NULL,\n"
397                               "  message_id INT NOT NULL,\n"
398                               "  group_generation INT NOT NULL,\n"
399                               "  multicast_flags INT NOT NULL,\n"
400                               "  psycstore_flags INT NOT NULL,\n"
401                               "  data BLOB,\n"
402                               "  PRIMARY KEY (channel_id, fragment_id),\n"
403                               "  UNIQUE (channel_id, message_id, fragment_offset)\n"
404                               ");");
405
406   GNUNET_MYSQL_statement_run (plugin->mc,
407                               "CREATE TABLE IF NOT EXISTS state (\n"
408                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
409                               "  name TEXT NOT NULL,\n"
410                               "  value_current BLOB,\n"
411                               "  value_signed BLOB,\n"
412                               "  PRIMARY KEY (channel_id, name)\n"
413                               ");");
414
415   GNUNET_MYSQL_statement_run (plugin->mc,
416                               "CREATE TABLE IF NOT EXISTS state_sync (\n"
417                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
418                               "  name TEXT NOT NULL,\n"
419                               "  value BLOB,\n"
420                               "  PRIMARY KEY (channel_id, name)\n"
421                               ");");
422
423   /* Prepare statements */
424   mysql_prepare (plugin->mc, 
425                 "BEGIN", 
426                 plugin->transaction_begin);
427
428   mysql_prepare (plugin->mc, 
429                 "COMMIT", 
430                 plugin->transaction_commit);
431
432   mysql_prepare (plugin->mc, 
433                 "ROLLBACK;", 
434                 plugin->transaction_rollback);
435
436   mysql_prepare (plugin->mc, 
437                 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);", 
438                 plugin->insert_channel_key);
439
440   mysql_prepare (plugin->mc, 
441                 "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);", 
442                 plugin->insert_slave_key);
443  
444   mysql_prepare (plugin->mc, 
445                 "INSERT INTO membership\n"
446                 " (channel_id, slave_id, did_join, announced_at,\n"
447                 "  effective_since, group_generation)\n"
448                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
449                 "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
450                 "        ?, ?, ?, ?);",
451                 plugin->insert_membership);
452
453   mysql_prepare (plugin->mc,
454                 "SELECT did_join FROM membership\n"
455                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
456                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
457                "      AND effective_since <= ? AND did_join = 1\n"
458                "ORDER BY announced_at DESC LIMIT 1;",
459                plugin->select_membership);
460
461   mysql_prepare (plugin->mc,
462                 "INSERT OR IGNORE INTO messages\n"
463                " (channel_id, hop_counter, signature, purpose,\n"
464                "  fragment_id, fragment_offset, message_id,\n"
465                "  group_generation, multicast_flags, psycstore_flags, data)\n"
466                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
467                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
468                 plugin->insert_fragment);
469
470   mysql_prepare (plugin->mc,
471                 "UPDATE messages\n"
472                 "SET psycstore_flags = psycstore_flags | ?\n"
473                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
474                 "      AND message_id = ? AND fragment_offset = 0;",
475                 plugin->update_message_flags);
476
477   mysql_prepare (plugin->mc,
478                   "SELECT hop_counter, signature, purpose, fragment_id,\n"
479                   "       fragment_offset, message_id, group_generation,\n"
480                   "       multicast_flags, psycstore_flags, data\n"
481                   "FROM messages\n"
482                   "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
483                   "      AND ? <= fragment_id AND fragment_id <= ?;",
484                plugin->select_fragments);
485
486   /** @todo select_messages: add method_prefix filter */
487   mysql_prepare (plugin->mc,
488                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
489                 "       fragment_offset, message_id, group_generation,\n"
490                 "       multicast_flags, psycstore_flags, data\n"
491                 "FROM messages\n"
492                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
493                 "      AND ? <= message_id AND message_id <= ?"
494                 "LIMIT ?;",
495                 plugin->select_messages);
496
497   mysql_prepare (plugin->mc,
498                 "SELECT * FROM\n"
499                 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
500                 "        fragment_offset, message_id, group_generation,\n"
501                 "        multicast_flags, psycstore_flags, data\n"
502                 " FROM messages\n"
503                 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
504                 " ORDER BY fragment_id DESC\n"
505                 " LIMIT ?)\n"
506                 "ORDER BY fragment_id;",
507                 plugin->select_latest_fragments);
508
509   /** @todo select_latest_messages: add method_prefix filter */
510   mysql_prepare (plugin->mc,
511                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
512                 "       fragment_offset, message_id, group_generation,\n"
513                 "        multicast_flags, psycstore_flags, data\n"
514                 "FROM messages\n"
515                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
516                 "      AND message_id IN\n"
517                 "      (SELECT message_id\n"
518                 "       FROM messages\n"
519                 "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
520                 "       GROUP BY message_id\n"
521                 "       ORDER BY message_id\n"
522                 "       DESC LIMIT ?)\n"
523                 "ORDER BY fragment_id;",
524                plugin->select_latest_messages);
525
526   mysql_prepare (plugin->mc,
527                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
528                 "       fragment_offset, message_id, group_generation,\n"
529                 "       multicast_flags, psycstore_flags, data\n"
530                 "FROM messages\n"
531                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
532                 "      AND message_id = ? AND fragment_offset = ?;",
533                 plugin->select_message_fragment);
534
535   mysql_prepare (plugin->mc,
536                 "SELECT fragment_id, message_id, group_generation\n"
537                "FROM messages\n"
538                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
539                "ORDER BY fragment_id DESC LIMIT 1;",
540                plugin->select_counters_message);
541
542   mysql_prepare (plugin->mc,
543                 "SELECT max_state_message_id\n"
544                 "FROM channels\n"
545                 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
546                plugin->select_counters_state);
547
548   mysql_prepare (plugin->mc,
549                 "UPDATE channels\n"
550                 "SET max_state_message_id = ?\n"
551                 "WHERE pub_key = ?;",
552                plugin->update_max_state_message_id);
553
554   mysql_prepare (plugin->mc,
555                 "UPDATE channels\n"
556                 "SET state_hash_message_id = ?\n"
557                 "WHERE pub_key = ?;",
558                 plugin->update_state_hash_message_id);
559
560   mysql_prepare (plugin->mc,
561                 "INSERT OR REPLACE INTO state\n"
562                 "  (channel_id, name, value_current, value_signed)\n"
563                 "SELECT new.channel_id, new.name,\n"
564                 "       new.value_current, old.value_signed\n"
565                 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
566                 "             AS channel_id,\n"
567                 "             ? AS name, ? AS value_current) AS new\n"
568                 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
569                 "           FROM state) AS old\n"
570                 "ON new.channel_id = old.channel_id AND new.name = old.name;",
571                 plugin->insert_state_current);
572
573   mysql_prepare (plugin->mc,
574                 "DELETE FROM state\n"
575                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
576                 "      AND (value_current IS NULL OR length(value_current) = 0)\n"
577                 "      AND (value_signed IS NULL OR length(value_signed) = 0);",
578                plugin->delete_state_empty);
579
580   mysql_prepare (plugin->mc,
581                 "UPDATE state\n"
582                 "SET value_signed = value_current\n"
583                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
584                 plugin->update_state_signed);
585
586   mysql_prepare (plugin->mc,
587                 "DELETE FROM state\n"
588                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
589                 plugin->delete_state);
590
591   mysql_prepare (plugin->mc,
592                 "INSERT INTO state_sync (channel_id, name, value)\n"
593                 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
594                 plugin->insert_state_sync);
595
596   mysql_prepare (plugin->mc,
597                 "INSERT INTO state\n"
598                 " (channel_id, name, value_current, value_signed)\n"
599                 "SELECT channel_id, name, value, value\n"
600                 "FROM state_sync\n"
601                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
602                 plugin->insert_state_from_sync);
603
604   mysql_prepare (plugin->mc,
605                 "DELETE FROM state_sync\n"
606                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
607                 plugin->delete_state_sync);
608
609   mysql_prepare (plugin->mc,
610                 "SELECT value_current\n"
611                 "FROM state\n"
612                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
613                 "      AND name = ?;",
614                 plugin->select_state_one);
615
616   mysql_prepare (plugin->mc,
617                 "SELECT name, value_current\n"
618                 "FROM state\n"
619                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
620                 "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
621                 plugin->select_state_prefix);
622
623   mysql_prepare (plugin->mc,
624                 "SELECT name, value_signed\n"
625                 "FROM state\n"
626                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
627                 "      AND value_signed IS NOT NULL;",
628                 plugin->select_state_signed);
629
630   return GNUNET_OK;
631 }
632
633
634 /**
635  * Shutdown database connection and associate data
636  * structures.
637  * @param plugin the plugin context (state for this module)
638  */
639 static void
640 database_shutdown (struct Plugin *plugin)
641 {
642   int result;
643   sqlite3_stmt *stmt;
644   
645   //MYSQL_STMT *stmt;
646
647   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
648   {
649     result = sqlite3_finalize (stmt);
650     if (SQLITE_OK != result)
651       LOG (GNUNET_ERROR_TYPE_WARNING,
652            "Failed to close statement %p: %d\n", stmt, result);
653   }
654   if (SQLITE_OK != sqlite3_close (plugin->dbh))
655     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
656
657   GNUNET_free_non_null (plugin->fn);
658 }
659
660
661 /**
662  * Execute a prepared statement with a @a channel_key argument.
663  *
664  * @param plugin Plugin handle.
665  * @param stmt Statement to execute.
666  * @param channel_key Public key of the channel.
667  *
668  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
669  */
670 static int
671 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
672               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
673 {
674   MYSQL_STMT * statement = NULL;
675   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
676
677   if (NULL == statement)
678   {
679      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
680                 "mysql statement invalide", statement);
681     return GNUNET_SYSERR;
682   }
683
684   struct GNUNET_MY_QueryParam params[] = {
685     GNUNET_MY_query_param_auto_from_type (channel_key),
686     GNUNET_MY_query_param_end
687   };
688
689   if(GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
690                                           stmt,
691                                           params))
692   {
693     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
694                 "mysql exec_channel", stmt); 
695   }
696
697   if (0 != mysql_stmt_reset (statement))
698   {
699     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
700               "mysql_stmt_reset", statement);
701     return GNUNET_SYSERR; 
702   }
703
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Begin a transaction.
710  */
711 static int
712 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
713 {
714   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
715   MYSQL_STMT * statement = NULL;
716
717   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
718
719   if (NULL == statement)
720   {
721      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
722                 "mysql statement invalide", statement);
723     return GNUNET_SYSERR;
724   }
725
726   struct GNUNET_MY_QueryParam params[] = {
727     GNUNET_MY_query_param_end
728   };
729
730   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
731                                             stmt,
732                                             params))
733   {
734     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
735                 "mysql extract_result", statement);
736     return GNUNET_SYSERR; 
737   }
738
739   if (0 != mysql_stmt_reset (statement))
740   {
741     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
742               "mysql_stmt_reset", statement);
743     return GNUNET_SYSERR; 
744   }
745
746   plugin->transaction = transaction;
747   return GNUNET_OK;
748 }
749
750
751 /**
752  * Commit current transaction.
753  */
754 static int
755 transaction_commit (struct Plugin *plugin)
756 {
757   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
758   MYSQL_STMT *statement = NULL;
759
760   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
761
762   if (NULL == statement)
763   {
764     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
765                 "mysql statement invalide", statement);
766     return GNUNET_SYSERR; 
767   }
768
769   struct GNUNET_MY_QueryParam params[] = {
770     GNUNET_MY_query_param_end
771   };
772
773   if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
774                                             stmt,
775                                             params))
776   {
777     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
778                 "mysql extract_result", statement);
779     return GNUNET_SYSERR; 
780   }
781
782   if (0 != mysql_stmt_reset (statement))
783   {
784     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
785               "mysql_stmt_reset", statement);
786     return GNUNET_SYSERR; 
787   }
788
789   plugin->transaction = TRANSACTION_NONE;
790   return GNUNET_OK;
791 }
792
793
794 /**
795  * Roll back current transaction.
796  */
797 static int
798 transaction_rollback (struct Plugin *plugin)
799 {
800   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
801   MYSQL_STMT* statement = NULL;
802
803   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
804   if (NULL == statement)
805   {
806     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
807                 "mysql statement invalide", statement);
808     return GNUNET_SYSERR; 
809   }
810
811   struct GNUNET_MY_QueryParam params[] = {
812     GNUNET_MY_query_param_end
813   };
814
815   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
816                                             stmt,
817                                             params))
818   {
819     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
820                 "mysql extract_result", statement);
821     return GNUNET_SYSERR;  
822   }
823
824   if (0 != mysql_stmt_reset (statement))
825   {
826     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
827               "mysql_stmt_reset", statement);
828     return GNUNET_SYSERR; 
829   }
830
831   plugin->transaction = TRANSACTION_NONE;
832   return GNUNET_OK;
833 }
834
835
836 static int
837 channel_key_store (struct Plugin *plugin,
838                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
839 {
840   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
841
842   MYSQL_STMT *statement = NULL;
843   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
844
845   if(NULL == statement)
846   {
847    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
848                 "mysql statement invalide", statement);
849     return GNUNET_SYSERR;  
850   }
851
852   struct GNUNET_MY_QueryParam params[] = {
853     GNUNET_MY_query_param_auto_from_type (channel_key),
854     GNUNET_MY_query_param_end
855   };
856
857   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
858                                             stmt,
859                                             params))
860   {
861     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
862                 "mysql extract_result", statement);
863     return GNUNET_SYSERR;
864   }
865
866   if (0 != mysql_stmt_reset (statement))
867   {
868     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
869               "mysql_stmt_reset", statement);
870     return GNUNET_SYSERR; 
871   }
872
873   return GNUNET_OK;
874 }
875
876
877 static int
878 slave_key_store (struct Plugin *plugin,
879                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
880 {
881   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
882
883   MYSQL_STMT *statement = NULL;
884   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
885
886   if(NULL == statement)
887   {
888    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
889                 "mysql statement invalide", statement);
890     return GNUNET_SYSERR;  
891   }
892
893   struct GNUNET_MY_QueryParam params[] = {
894     GNUNET_MY_query_param_auto_from_type (slave_key),
895     GNUNET_MY_query_param_end
896   };
897
898   if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
899                                             stmt,
900                                             params))
901   {
902     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
903                 "mysql extract_result", statement);
904     return GNUNET_SYSERR;
905   }
906
907   if (0 != mysql_stmt_reset (statement))
908   {
909     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
910               "mysql_stmt_reset", statement);
911     return GNUNET_SYSERR; 
912   }
913
914   return GNUNET_OK;
915 }
916
917
918 /**
919  * Store join/leave events for a PSYC channel in order to be able to answer
920  * membership test queries later.
921  *
922  * @see GNUNET_PSYCSTORE_membership_store()
923  *
924  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
925  */
926 static int
927 sqlite_membership_store (void *cls,
928                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
929                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
930                          int did_join,
931                          uint64_t announced_at,
932                          uint64_t effective_since,
933                          uint64_t group_generation)
934 {
935   struct Plugin *plugin = cls;
936   
937   uint32_t idid_join = (uint32_t)did_join;
938   uint64_t iannounced_at = (uint64_t)announced_at;
939   uint64_t ieffective_since = (uint64_t)effective_since;
940   uint64_t igroup_generation = (uint64_t)group_generation;
941
942   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
943   MYSQL_STMT *statement = NULL;
944
945   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
946
947   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
948
949   if (announced_at > INT64_MAX ||
950       effective_since > INT64_MAX ||
951       group_generation > INT64_MAX)
952   {
953     GNUNET_break (0);
954     return GNUNET_SYSERR;
955   }
956
957   if (GNUNET_OK != channel_key_store (plugin, channel_key)
958       || GNUNET_OK != slave_key_store (plugin, slave_key))
959     return GNUNET_SYSERR;
960
961   struct GNUNET_MY_QueryParam params[] = {
962     GNUNET_MY_query_param_auto_from_type (channel_key),
963     GNUNET_MY_query_param_auto_from_type (slave_key),
964     GNUNET_MY_query_param_uint32 (&idid_join),
965     GNUNET_MY_query_param_uint64 (&iannounced_at),
966     GNUNET_MY_query_param_uint64 (&ieffective_since),
967     GNUNET_MY_query_param_uint64 (&igroup_generation),
968     GNUNET_MY_query_param_end
969   };
970
971   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
972                                             stmt,
973                                             params))
974   {
975     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
976                 "mysql extract_result", statement);
977     return GNUNET_SYSERR; 
978   }
979
980   if (0 != mysql_stmt_reset (statement))
981   {
982     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
983               "mysql_stmt_reset", statement);
984     return GNUNET_SYSERR; 
985   }
986   return GNUNET_OK;
987 }
988
989 /**
990  * Test if a member was admitted to the channel at the given message ID.
991  *
992  * @see GNUNET_PSYCSTORE_membership_test()
993  *
994  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
995  *         #GNUNET_SYSERR if there was en error.
996  */
997 static int
998 membership_test (void *cls,
999                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1000                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1001                  uint64_t message_id)
1002 {
1003   struct Plugin *plugin = cls;
1004
1005   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
1006   MYSQL_STMT *statement = NULL;
1007   
1008   uint32_t did_join = 0;
1009
1010   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1011
1012   if(NULL == statement)
1013   {
1014    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1015                 "mysql statement invalide", statement);
1016     return GNUNET_SYSERR;  
1017   }
1018
1019   int ret = GNUNET_SYSERR;
1020
1021   struct GNUNET_MY_QueryParam params_select[] = {
1022     GNUNET_MY_query_param_auto_from_type (channel_key),
1023     GNUNET_MY_query_param_auto_from_type (slave_key),
1024     GNUNET_MY_query_param_uint64 (&message_id),
1025     GNUNET_MY_query_param_end
1026   };
1027
1028   if (GNUNET_MY_exec_prepared (plugin->mc,
1029                               stmt,
1030                               params_select))
1031   {
1032     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1033                 "mysql execute prepared", statement);
1034     return GNUNET_SYSERR; 
1035   }
1036
1037   struct GNUNET_MY_ResultSpec results_select[] = {
1038     GNUNET_MY_result_spec_uint32 (&did_join),
1039     GNUNET_MY_result_spec_end
1040   };
1041
1042   if (GNUNET_OK != GNUNET_MY_extract_result (stmt,
1043                                 results_select))
1044   {
1045     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1046                 "mysql extract_result", statement);
1047     return GNUNET_SYSERR; 
1048   }
1049
1050   if(0 != did_join)
1051   {
1052     ret = GNUNET_YES;
1053   }
1054   else
1055   {
1056     ret = GNUNET_NO;
1057   }
1058
1059   if (0 != mysql_stmt_reset (statement))
1060   {
1061     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1062               "mysql_stmt_reset", statement);
1063     return GNUNET_SYSERR; 
1064   }
1065
1066   return ret;
1067 }
1068
1069 /**
1070  * Store a message fragment sent to a channel.
1071  *
1072  * @see GNUNET_PSYCSTORE_fragment_store()
1073  *
1074  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1075  */
1076 static int
1077 fragment_store (void *cls,
1078                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1079                 const struct GNUNET_MULTICAST_MessageHeader *msg,
1080                 uint32_t psycstore_flags)
1081 {
1082   struct Plugin *plugin = cls;
1083   
1084   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
1085   MYSQL_STMT *statement = NULL;
1086
1087   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1088
1089   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
1090
1091   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
1092   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
1093   uint64_t message_id = GNUNET_ntohll (msg->message_id);
1094   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
1095
1096   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
1097       message_id > INT64_MAX || group_generation > INT64_MAX)
1098   {
1099     LOG (GNUNET_ERROR_TYPE_ERROR,
1100          "Tried to store fragment with a field > INT64_MAX: "
1101          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
1102          message_id, group_generation);
1103     GNUNET_break (0);
1104     return GNUNET_SYSERR;
1105   }
1106
1107   if (GNUNET_OK != channel_key_store (plugin, channel_key))
1108     return GNUNET_SYSERR;
1109
1110   struct GNUNET_MY_QueryParam params_insert[] = {
1111     GNUNET_MY_query_param_auto_from_type (channel_key),
1112     GNUNET_MY_query_param_uint32 (msg->hop_counter),
1113     GNUNET_MY_query_param_auto_from_type (&msg->signature),
1114     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
1115     GNUNET_MY_query_param_uint64 (&fragment_id),
1116     GNUNET_MY_query_param_uint64 (&fragment_offset),
1117     GNUNET_MY_query_param_uint64 (&message_id),
1118     GNUNET_MY_query_param_uint64 (&group_generation),
1119     GNUNET_MY_query_param_uint32 ( msg->flags),
1120     GNUNET_MY_query_param_uint32 (&psycstore_flags),
1121     GNUNET_MY_query_param_auto_from_type (&msg[1]),
1122     GNUNET_MY_query_param_end
1123   };
1124
1125   if (GNUNET_MY_exec_prepared (plugin->mc,
1126                               stmt,
1127                               params_insert))
1128   {
1129     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1130               "mysql execute prepared", statement);
1131     return GNUNET_SYSERR;    
1132   }
1133
1134   if (0 != mysql_stmt_reset (statement))
1135   {
1136     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1137               "mysql_stmt_reset", statement);
1138     return GNUNET_SYSERR; 
1139   }
1140   
1141   return GNUNET_OK;
1142 }
1143
1144 /**
1145  * Set additional flags for a given message.
1146  *
1147  * They are OR'd with any existing flags set.
1148  *
1149  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1150  */
1151 static int
1152 message_add_flags (void *cls,
1153                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1154                    uint64_t message_id,
1155                    uint64_t psycstore_flags)
1156 {
1157   struct Plugin *plugin = cls;
1158
1159   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
1160   MYSQL_STMT *statement = NULL;
1161
1162   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1163
1164   int ret = GNUNET_SYSERR;
1165
1166   struct GNUNET_MY_QueryParam params_update[] = {
1167     GNUNET_MY_query_param_uint64 (&psycstore_flags),
1168     GNUNET_MY_query_param_auto_from_type (channel_key),
1169     GNUNET_MY_query_param_uint64 (&message_id),
1170     GNUNET_MY_query_param_end
1171   };
1172
1173   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1174                                             stmt,
1175                                             params_update))
1176   {
1177     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1178               "mysql execute prepared", statement);
1179     return GNUNET_SYSERR;   
1180   }
1181
1182   if (0 != mysql_stmt_reset (statement))
1183   {
1184     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1185               "mysql_stmt_reset", statement);
1186     return GNUNET_SYSERR; 
1187   }
1188
1189   return ret;
1190 }
1191
1192 /** Extract result from statement **/
1193 static int
1194 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1195               void *cb_cls)
1196 {
1197   int data_size = sqlite3_column_bytes (stmt, 9);
1198   struct GNUNET_MULTICAST_MessageHeader *msg
1199     = GNUNET_malloc (sizeof (*msg) + data_size);
1200
1201   msg->header.size = htons (sizeof (*msg) + data_size);
1202   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1203   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1204   memcpy (&msg->signature,
1205           sqlite3_column_blob (stmt, 1),
1206           sqlite3_column_bytes (stmt, 1));
1207   memcpy (&msg->purpose,
1208           sqlite3_column_blob (stmt, 2),
1209           sqlite3_column_bytes (stmt, 2));
1210   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1211   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1212   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1213   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1214   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1215   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1216
1217   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1218 }
1219
1220
1221 static int
1222 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1223                  uint64_t *returned_fragments,
1224                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1225 {
1226   int ret = GNUNET_SYSERR;
1227   int sql_ret;
1228
1229   do
1230   {
1231     sql_ret = sqlite3_step (stmt);
1232     switch (sql_ret)
1233     {
1234     case SQLITE_DONE:
1235       if (ret != GNUNET_OK)
1236         ret = GNUNET_NO;
1237       break;
1238     case SQLITE_ROW:
1239       ret = fragment_row (stmt, cb, cb_cls);
1240       (*returned_fragments)++;
1241       if (ret != GNUNET_YES)
1242         sql_ret = SQLITE_DONE;
1243       break;
1244     default:
1245       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1246                   "sqlite3_step");
1247     }
1248   }
1249   while (sql_ret == SQLITE_ROW);
1250
1251   return ret;
1252 }
1253
1254 /**
1255  * Retrieve a message fragment range by fragment ID.
1256  *
1257  * @see GNUNET_PSYCSTORE_fragment_get()
1258  *
1259  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1260  */
1261 static int
1262 fragment_get (void *cls,
1263               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1264               uint64_t first_fragment_id,
1265               uint64_t last_fragment_id,
1266               uint64_t *returned_fragments,
1267               GNUNET_PSYCSTORE_FragmentCallback cb,
1268               void *cb_cls)
1269 {
1270   struct Plugin *plugin = cls;
1271
1272   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1273   MYSQL_STMT *statement = NULL;
1274
1275   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1276   if (NULL == statement)
1277   {
1278    LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1279               "mysql get_stmt", statement);
1280     return GNUNET_SYSERR;  
1281   }
1282
1283   int ret = GNUNET_SYSERR;
1284   *returned_fragments = 0;
1285
1286   struct GNUNET_MY_QueryParam params_select[] = {
1287     GNUNET_MY_query_param_auto_from_type (channel_key),
1288     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1289     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1290     GNUNET_MY_query_param_end
1291   };
1292
1293   ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1294
1295   if (0 != mysql_stmt_reset (statement))
1296   {
1297     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1298               "mysql_stmt_reset", statement);
1299     return GNUNET_SYSERR; 
1300   }
1301
1302   return ret;
1303 }
1304
1305
1306 /**
1307  * Retrieve a message fragment range by fragment ID.
1308  *
1309  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1310  *
1311  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1312  */
1313 static int
1314 fragment_get_latest (void *cls,
1315                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1316                      uint64_t fragment_limit,
1317                      uint64_t *returned_fragments,
1318                      GNUNET_PSYCSTORE_FragmentCallback cb,
1319                      void *cb_cls)
1320 {
1321   struct Plugin *plugin = cls;
1322
1323   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1324   MYSQL_STMT * statement = NULL;
1325
1326   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1327
1328   int ret = GNUNET_SYSERR;
1329   *returned_fragments = 0;
1330
1331   struct GNUNET_MY_QueryParam params_select[] = {
1332     GNUNET_MY_query_param_auto_from_type (channel_key),
1333     GNUNET_MY_query_param_uint64 (&fragment_limit),
1334     GNUNET_MY_query_param_end
1335   };
1336
1337   ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1338
1339   if (0 != mysql_stmt_reset (statement))
1340   {
1341     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1342               "mysql_stmt_reset", statement);
1343     return GNUNET_SYSERR; 
1344   }  
1345
1346   return ret;
1347 }
1348
1349
1350 /**
1351  * Retrieve all fragments of a message ID range.
1352  *
1353  * @see GNUNET_PSYCSTORE_message_get()
1354  *
1355  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1356  */
1357 static int
1358 message_get (void *cls,
1359              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1360              uint64_t first_message_id,
1361              uint64_t last_message_id,
1362              uint64_t fragment_limit,
1363              uint64_t *returned_fragments,
1364              GNUNET_PSYCSTORE_FragmentCallback cb,
1365              void *cb_cls)
1366 {
1367   struct Plugin *plugin = cls;
1368
1369   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1370   MYSQL_STMT *statement = NULL;
1371
1372   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1373
1374   int ret = GNUNET_SYSERR;
1375   *returned_fragments = 0;
1376
1377   struct GNUNET_MY_QueryParam params_select[] = {
1378     GNUNET_MY_query_param_auto_from_type (channel_key),
1379     GNUNET_MY_query_param_uint64 (&first_message_id),
1380     GNUNET_MY_query_param_uint64 (&last_message_id),
1381     GNUNET_MY_query_param_uint64 (&fragment_limit),
1382     GNUNET_MY_query_param_end
1383   };
1384
1385   ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1386
1387   if (0 != mysql_stmt_reset (statement))
1388   {
1389     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1390               "mysql_stmt_reset", statement);
1391     return GNUNET_SYSERR; 
1392   }  
1393
1394   return ret;
1395 }
1396
1397
1398 /**
1399  * Retrieve all fragments of the latest messages.
1400  *
1401  * @see GNUNET_PSYCSTORE_message_get_latest()
1402  *
1403  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1404  */
1405 static int
1406 message_get_latest (void *cls,
1407                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1408                     uint64_t message_limit,
1409                     uint64_t *returned_fragments,
1410                     GNUNET_PSYCSTORE_FragmentCallback cb,
1411                     void *cb_cls)
1412 {
1413   struct Plugin *plugin = cls;
1414
1415   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1416   MYSQL_STMT *statement;
1417
1418   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1419
1420   int ret = GNUNET_SYSERR;
1421   *returned_fragments = 0;
1422
1423   struct GNUNET_MY_QueryParam params_select[] = {
1424     GNUNET_MY_query_param_auto_from_type (channel_key),
1425     GNUNET_MY_query_param_auto_from_type (channel_key),
1426     GNUNET_MY_query_param_uint64 (&message_limit),
1427     GNUNET_MY_query_param_end
1428   };
1429
1430   ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1431
1432   if (0 != mysql_stmt_reset (statement))
1433   {
1434     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1435               "mysql_stmt_reset", statement);
1436     return GNUNET_SYSERR; 
1437   }  
1438
1439   return ret;
1440 }
1441
1442
1443 /**
1444  * Retrieve a fragment of message specified by its message ID and fragment
1445  * offset.
1446  *
1447  * @see GNUNET_PSYCSTORE_message_get_fragment()
1448  *
1449  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1450  */
1451 static int
1452 message_get_fragment (void *cls,
1453                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1454                       uint64_t message_id,
1455                       uint64_t fragment_offset,
1456                       GNUNET_PSYCSTORE_FragmentCallback cb,
1457                       void *cb_cls)
1458 {
1459   struct Plugin *plugin = cls;
1460
1461   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1462   MYSQL_STMT *statement = NULL;
1463
1464   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1465
1466   int ret = GNUNET_SYSERR;
1467
1468   struct GNUNET_MY_QueryParam params_select[] = {
1469     GNUNET_MY_query_param_auto_from_type (channel_key),
1470     GNUNET_MY_query_param_uint64 (&message_id),
1471     GNUNET_MY_query_param_uint64 (&fragment_offset),
1472     GNUNET_MY_query_param_end
1473   };
1474
1475   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1476                                             stmt,
1477                                             params_select))
1478   {
1479     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1480               "mysql execute prepared", statement);
1481     return GNUNET_SYSERR;    
1482   }
1483
1484   ret = fragment_row (stmt, cb, cb_cls);
1485
1486   if (0 != mysql_stmt_reset (statement))
1487   {
1488     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1489               "mysql_stmt_reset", statement);
1490     return GNUNET_SYSERR; 
1491   }  
1492
1493   return ret;
1494 }
1495
1496 /**
1497  * Retrieve the max. values of message counters for a channel.
1498  *
1499  * @see GNUNET_PSYCSTORE_counters_get()
1500  *
1501  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1502  */
1503 static int
1504 counters_message_get (void *cls,
1505                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1506                       uint64_t *max_fragment_id,
1507                       uint64_t *max_message_id,
1508                       uint64_t *max_group_generation)
1509 {
1510   struct Plugin *plugin = cls;
1511   
1512   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1513   MYSQL_STMT *statement = NULL;
1514
1515   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1516   if (NULL == statement)
1517   {
1518     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1519               "mysql get statement", statement);
1520     return GNUNET_SYSERR;
1521   }  
1522
1523   int ret = GNUNET_SYSERR;
1524
1525   struct GNUNET_MY_QueryParam params_select[] = {
1526     GNUNET_MY_query_param_auto_from_type (channel_key),
1527     GNUNET_MY_query_param_end
1528   };
1529
1530   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1531                                             stmt,
1532                                             params_select))
1533   {
1534     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1535               "mysql execute prepared", statement);
1536     return GNUNET_SYSERR;
1537   }
1538
1539   struct GNUNET_MY_ResultSpec results_select[] = {
1540     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1541     GNUNET_MY_result_spec_uint64 (max_message_id),
1542     GNUNET_MY_result_spec_uint64 (max_group_generation),
1543     GNUNET_MY_result_spec_end
1544   };
1545
1546   ret = GNUNET_MY_extract_result (stmt,
1547                                   results_select);
1548
1549   if (GNUNET_OK != ret)
1550   {
1551     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1552               "mysql extract_result", statement);
1553     return GNUNET_SYSERR;
1554   }
1555
1556   if (0 != mysql_stmt_reset (statement))
1557   {
1558     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1559               "mysql_stmt_reset", statement);
1560     return GNUNET_SYSERR; 
1561   }  
1562
1563   return ret;
1564 }
1565
1566 /**
1567  * Retrieve the max. values of state counters for a channel.
1568  *
1569  * @see GNUNET_PSYCSTORE_counters_get()
1570  *
1571  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1572  */
1573 static int
1574 counters_state_get (void *cls,
1575                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1576                     uint64_t *max_state_message_id)
1577 {
1578   struct Plugin *plugin = cls;
1579
1580   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1581   MYSQL_STMT *statement = NULL;
1582
1583   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1584   if (NULL == statement)
1585   {
1586     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1587               "mysql get_stmt", statement);
1588     return GNUNET_SYSERR; 
1589   }
1590
1591   int ret = GNUNET_SYSERR;
1592
1593   struct GNUNET_MY_QueryParam params_select[] = {
1594     GNUNET_MY_query_param_auto_from_type (channel_key),
1595     GNUNET_MY_query_param_end
1596   };
1597
1598   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1599                                             stmt,
1600                                             params_select))
1601   {
1602     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1603               "mysql execute prepared", statement);
1604     return GNUNET_SYSERR;
1605   }
1606
1607   struct GNUNET_MY_ResultSpec results_select[] = {
1608     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1609     GNUNET_MY_result_spec_end
1610   };
1611
1612   ret = GNUNET_MY_extract_result (stmt,
1613                                   results_select);
1614
1615   if (GNUNET_OK != ret)
1616   {
1617     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1618               "mysql extract_result", statement);
1619     return GNUNET_SYSERR;
1620   }
1621
1622   if (0 != mysql_stmt_reset (statement))
1623   {
1624     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1625               "mysql_stmt_reset", statement);
1626     return GNUNET_SYSERR; 
1627   }  
1628
1629   return ret;
1630 }
1631
1632
1633 /**
1634  * Assign a value to a state variable.
1635  *
1636  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1637  */
1638 static int
1639 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1640               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1641               const char *name, const void *value, size_t value_size)
1642 {
1643   int ret = GNUNET_SYSERR;
1644
1645   MYSQL_STMT *statement = NULL;
1646   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1647
1648   if (NULL == statement)
1649   {
1650     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1651               "mysql get_stmt", statement);
1652     return GNUNET_SYSERR;
1653   }
1654
1655   struct GNUNET_MY_QueryParam params[] = {
1656     GNUNET_MY_query_param_auto_from_type (channel_key),
1657     GNUNET_MY_query_param_string (name),
1658     GNUNET_MY_query_param_auto_from_type (value_size),
1659     GNUNET_MY_query_param_end
1660   };
1661
1662   ret = GNUNET_MY_exec_prepared (plugin->mc,
1663                                             stmt,
1664                                             params);
1665
1666   if (GNUNET_OK != ret)
1667   {
1668     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1669               "mysql execute prepared", statement);
1670     return GNUNET_SYSERR;
1671   }
1672
1673   if (0 != mysql_stmt_reset (statement))
1674   {
1675     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1676               "mysql_stmt_reset", statement);
1677     return GNUNET_SYSERR; 
1678   }    
1679
1680   return ret;
1681 }
1682
1683
1684 static int
1685 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1686                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1687                    uint64_t message_id)
1688 {
1689   MYSQL_STMT *statement = NULL;
1690   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1691
1692   if (NULL == statement)
1693   {
1694     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1695               "mysql get_stmt", statement);
1696     return GNUNET_SYSERR;
1697   }
1698
1699   struct GNUNET_MY_QueryParam params[] = {
1700     GNUNET_MY_query_param_uint64 (&message_id),
1701     GNUNET_MY_query_param_auto_from_type (channel_key),
1702     GNUNET_MY_query_param_end
1703   };
1704
1705   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1706                                             stmt,
1707                                             params))
1708   {
1709     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1710               "mysql execute prepared", statement);
1711     return GNUNET_SYSERR;
1712   }
1713
1714   if (0 != mysql_stmt_reset (statement))
1715   {
1716     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1717               "mysql_stmt_reset", statement);
1718     return GNUNET_SYSERR; 
1719   }   
1720   
1721   return GNUNET_OK;
1722 }
1723
1724
1725 /**
1726  * Begin modifying current state.
1727  */
1728 static int
1729 state_modify_begin (void *cls,
1730                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1731                     uint64_t message_id, uint64_t state_delta)
1732 {
1733   struct Plugin *plugin = cls;
1734
1735   if (state_delta > 0)
1736   {
1737     /**
1738      * We can only apply state modifiers in the current message if modifiers in
1739      * the previous stateful message (message_id - state_delta) were already
1740      * applied.
1741      */
1742
1743     uint64_t max_state_message_id = 0;
1744     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1745     switch (ret)
1746     {
1747     case GNUNET_OK:
1748     case GNUNET_NO: // no state yet
1749       ret = GNUNET_OK;
1750       break;
1751     default:
1752       return ret;
1753     }
1754
1755     if (max_state_message_id < message_id - state_delta)
1756       return GNUNET_NO; /* some stateful messages not yet applied */
1757     else if (message_id - state_delta < max_state_message_id)
1758       return GNUNET_NO; /* changes already applied */
1759   }
1760
1761   if (TRANSACTION_NONE != plugin->transaction)
1762   {
1763     /** @todo FIXME: wait for other transaction to finish  */
1764     return GNUNET_SYSERR;
1765   }
1766   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1767 }
1768
1769
1770 /**
1771  * Set the current value of state variable.
1772  *
1773  * @see GNUNET_PSYCSTORE_state_modify()
1774  *
1775  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1776  */
1777 static int
1778 state_modify_op (void *cls,
1779                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1780                  enum GNUNET_PSYC_Operator op,
1781                  const char *name, const void *value, size_t value_size)
1782 {
1783   struct Plugin *plugin = cls;
1784   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1785
1786   switch (op)
1787   {
1788   case GNUNET_PSYC_OP_ASSIGN:
1789     return state_assign (plugin, plugin->insert_state_current, channel_key,
1790                          name, value, value_size);
1791
1792   default: /** @todo implement more state operations */
1793     GNUNET_break (0);
1794     return GNUNET_SYSERR;
1795   }
1796 }
1797
1798
1799 /**
1800  * End modifying current state.
1801  */
1802 static int
1803 state_modify_end (void *cls,
1804                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1805                   uint64_t message_id)
1806 {
1807   struct Plugin *plugin = cls;
1808   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1809
1810   return
1811     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1812     && GNUNET_OK == update_message_id (plugin,
1813                                        plugin->update_max_state_message_id,
1814                                        channel_key, message_id)
1815     && GNUNET_OK == transaction_commit (plugin)
1816     ? GNUNET_OK : GNUNET_SYSERR;
1817 }
1818
1819
1820 /**
1821  * Begin state synchronization.
1822  */
1823 static int
1824 state_sync_begin (void *cls,
1825                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1826 {
1827   struct Plugin *plugin = cls;
1828   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1829 }
1830
1831
1832 /**
1833  * Assign current value of a state variable.
1834  *
1835  * @see GNUNET_PSYCSTORE_state_modify()
1836  *
1837  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1838  */
1839 static int
1840 state_sync_assign (void *cls,
1841                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1842                 const char *name, const void *value, size_t value_size)
1843 {
1844   struct Plugin *plugin = cls;
1845   return state_assign (cls, plugin->insert_state_sync, channel_key,
1846                        name, value, value_size);
1847 }
1848
1849
1850 /**
1851  * End modifying current state.
1852  */
1853 static int
1854 state_sync_end (void *cls,
1855                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1856                 uint64_t max_state_message_id,
1857                 uint64_t state_hash_message_id)
1858 {
1859   struct Plugin *plugin = cls;
1860   int ret = GNUNET_SYSERR;
1861
1862   if (TRANSACTION_NONE != plugin->transaction)
1863   {
1864     /** @todo FIXME: wait for other transaction to finish  */
1865     return GNUNET_SYSERR;
1866   }
1867
1868   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1869     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1870     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1871                                   channel_key)
1872     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1873                                   channel_key)
1874     && GNUNET_OK == update_message_id (plugin,
1875                                        plugin->update_state_hash_message_id,
1876                                        channel_key, state_hash_message_id)
1877     && GNUNET_OK == update_message_id (plugin,
1878                                        plugin->update_max_state_message_id,
1879                                        channel_key, max_state_message_id)
1880     && GNUNET_OK == transaction_commit (plugin)
1881     ? ret = GNUNET_OK
1882     : transaction_rollback (plugin);
1883   return ret;
1884 }
1885
1886
1887 /**
1888  * Delete the whole state.
1889  *
1890  * @see GNUNET_PSYCSTORE_state_reset()
1891  *
1892  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1893  */
1894 static int
1895 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1896 {
1897   struct Plugin *plugin = cls;
1898   return exec_channel (plugin, plugin->delete_state, channel_key);
1899 }
1900
1901
1902 /**
1903  * Update signed values of state variables in the state store.
1904  *
1905  * @see GNUNET_PSYCSTORE_state_hash_update()
1906  *
1907  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1908  */
1909 static int
1910 state_update_signed (void *cls,
1911                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1912 {
1913   struct Plugin *plugin = cls;
1914   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1915 }
1916
1917
1918 /**
1919  * Retrieve a state variable by name.
1920  *
1921  * @see GNUNET_PSYCSTORE_state_get()
1922  *
1923  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1924  */
1925 static int
1926 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1927            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1928 {
1929   struct Plugin *plugin = cls;
1930   int ret = GNUNET_SYSERR;
1931
1932   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1933   MYSQL_STMT *statement = NULL;
1934
1935   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1936
1937   struct GNUNET_MY_QueryParam params_select[] = {
1938     GNUNET_MY_query_param_auto_from_type (channel_key),
1939     GNUNET_MY_query_param_string (name),
1940     GNUNET_MY_query_param_end
1941   };
1942
1943   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1944                                             stmt,
1945                                             params_select))
1946   {
1947
1948   }
1949
1950   ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1951                 sqlite3_column_bytes (stmt, 0));
1952
1953   if (0 != mysql_stmt_reset (statement))
1954   {
1955     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1956               "mysql_stmt_reset", statement);
1957     return GNUNET_SYSERR; 
1958   }   
1959
1960   return ret;
1961 }
1962
1963
1964 /**
1965  * Retrieve all state variables for a channel with the given prefix.
1966  *
1967  * @see GNUNET_PSYCSTORE_state_get_prefix()
1968  *
1969  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1970  */
1971 static int
1972 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1973                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1974                   void *cb_cls)
1975 {
1976   struct Plugin *plugin = cls;
1977   int ret = GNUNET_SYSERR;
1978
1979   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1980   MYSQL_STMT *statement = NULL;
1981   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1982
1983   if (NULL == statement)
1984   {
1985     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1986               "mysql get_stmt", statement);
1987     return GNUNET_SYSERR;
1988   }
1989
1990   uint32_t name_len = (uint32_t) strlen (name);
1991
1992   struct GNUNET_MY_QueryParam params_select[] = {
1993     GNUNET_MY_query_param_auto_from_type (channel_key),
1994     GNUNET_MY_query_param_string (name),
1995     GNUNET_MY_query_param_uint32 (&name_len),
1996     GNUNET_MY_query_param_string (name),
1997     GNUNET_MY_query_param_end
1998   };
1999
2000   int sql_ret;
2001
2002   sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
2003                                     stmt,
2004                                     params_select);
2005
2006   if (GNUNET_OK != sql_ret)
2007   {
2008     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2009               "mysql exec_prepared", statement);
2010     return GNUNET_SYSERR;
2011   }
2012
2013   ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
2014                   sqlite3_column_blob (stmt, 1),
2015                   sqlite3_column_bytes (stmt, 1));
2016
2017   if (0 != mysql_stmt_reset (statement))
2018   {
2019     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2020               "mysql_stmt_reset", statement);
2021     return GNUNET_SYSERR; 
2022   }
2023
2024   return ret;
2025 }
2026
2027
2028 /**
2029  * Retrieve all signed state variables for a channel.
2030  *
2031  * @see GNUNET_PSYCSTORE_state_get_signed()
2032  *
2033  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
2034  */
2035 static int
2036 state_get_signed (void *cls,
2037                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
2038                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
2039 {
2040   struct Plugin *plugin = cls;
2041   int ret = GNUNET_SYSERR;
2042
2043   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
2044   MYSQL_STMT *statement = NULL;
2045
2046   statement = GNUNET_MYSQL_statement_get_stmt (stmt);
2047   if (NULL == statement)
2048   {
2049     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2050               "mysql get_stmt", statement);
2051     return GNUNET_SYSERR; 
2052   }
2053
2054   struct GNUNET_MY_QueryParam params_select[] = {
2055     GNUNET_MY_query_param_auto_from_type (channel_key),
2056     GNUNET_MY_query_param_end
2057   };
2058
2059   int sql_ret;
2060
2061   sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
2062                                       stmt,
2063                                       params_select);
2064
2065   if (GNUNET_OK != sql_ret)
2066   {
2067     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2068               "mysql_exec_prepared", statement);
2069     return GNUNET_SYSERR;
2070   }
2071
2072   ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
2073                   sqlite3_column_blob (stmt, 1),
2074                   sqlite3_column_bytes (stmt, 1));
2075
2076   if (0 != mysql_stmt_reset (statement))
2077   {
2078     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2079               "mysql_stmt_reset", statement);
2080     return GNUNET_SYSERR; 
2081   }
2082
2083   return ret;
2084 }
2085
2086
2087 /**
2088  * Entry point for the plugin.
2089  *
2090  * @param cls The struct GNUNET_CONFIGURATION_Handle.
2091  * @return NULL on error, otherwise the plugin context
2092  */
2093 void *
2094 libgnunet_plugin_psycstore_sqlite_init (void *cls)
2095 {
2096   static struct Plugin plugin;
2097   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
2098   struct GNUNET_PSYCSTORE_PluginFunctions *api;
2099
2100   if (NULL != plugin.cfg)
2101     return NULL;                /* can only initialize once! */
2102   memset (&plugin, 0, sizeof (struct Plugin));
2103   plugin.cfg = cfg;
2104   if (GNUNET_OK != database_setup (&plugin))
2105   {
2106     database_shutdown (&plugin);
2107     return NULL;
2108   }
2109   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
2110   api->cls = &plugin;
2111   api->membership_store = &sqlite_membership_store;
2112   api->membership_test = &membership_test;
2113   api->fragment_store = &fragment_store;
2114   api->message_add_flags = &message_add_flags;
2115   api->fragment_get = &fragment_get;
2116   api->fragment_get_latest = &fragment_get_latest;
2117   api->message_get = &message_get;
2118   api->message_get_latest = &message_get_latest;
2119   api->message_get_fragment = &message_get_fragment;
2120   api->counters_message_get = &counters_message_get;
2121   api->counters_state_get = &counters_state_get;
2122   api->state_modify_begin = &state_modify_begin;
2123   api->state_modify_op = &state_modify_op;
2124   api->state_modify_end = &state_modify_end;
2125   api->state_sync_begin = &state_sync_begin;
2126   api->state_sync_assign = &state_sync_assign;
2127   api->state_sync_end = &state_sync_end;
2128   api->state_reset = &state_reset;
2129   api->state_update_signed = &state_update_signed;
2130   api->state_get = &state_get;
2131   api->state_get_prefix = &state_get_prefix;
2132   api->state_get_signed = &state_get_signed;
2133
2134   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
2135   return api;
2136 }
2137
2138
2139 /**
2140  * Exit point from the plugin.
2141  *
2142  * @param cls The plugin context (as returned by "init")
2143  * @return Always NULL
2144  */
2145 void *
2146 libgnunet_plugin_psycstore_sqlite_done (void *cls)
2147 {
2148   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2149   struct Plugin *plugin = api->cls;
2150
2151   database_shutdown (plugin);
2152   plugin->cfg = NULL;
2153   GNUNET_free (api);
2154   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
2155   return NULL;
2156 }
2157
2158 /* end of plugin_psycstore_mysql.c */