4a75e9997d373ac337f657d3d2060b80800f87a0
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  */
15
16 /**
17  * @file psycstore/plugin_psycstore_mysql.c
18  * @brief mysql-based psycstore backend
19  * @author Gabor X Toth
20  * @author Christian Grothoff
21  * @author Christophe Genevey
22  */
23
24 #include "platform.h"
25 #include "gnunet_psycstore_plugin.h"
26 #include "gnunet_psycstore_service.h"
27 #include "gnunet_multicast_service.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_psyc_util_lib.h"
30 #include "psycstore.h"
31 #include "gnunet_my_lib.h"
32 #include "gnunet_mysql_lib.h"
33 #include <mysql/mysql.h>
34
35 /**
36  * After how many ms "busy" should a DB operation fail for good?  A
37  * low value makes sure that we are more responsive to requests
38  * (especially PUTs).  A high value guarantees a higher success rate
39  * (SELECTs in iterate can take several seconds despite LIMIT=1).
40  *
41  * The default value of 1s should ensure that users do not experience
42  * huge latencies while at the same time allowing operations to
43  * succeed with reasonable probability.
44  */
45 #define BUSY_TIMEOUT_MS 1000
46
47 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
48
49 /**
50  * Log an error message at log-level 'level' that indicates
51  * a failure of the command 'cmd' on file 'filename'
52  * with the message given by strerror(errno).
53  */
54 #define LOG_MYSQL(db, level, cmd, stmt)                                 \
55   do {                                                                  \
56     GNUNET_log_from (level, "psycstore-mysql",                          \
57                      _("`%s' failed at %s:%d with error: %s\n"),        \
58                      cmd, __FILE__, __LINE__,                           \
59                      mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
60   } while (0)
61
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
63
64 enum Transactions {
65   TRANSACTION_NONE = 0,
66   TRANSACTION_STATE_MODIFY,
67   TRANSACTION_STATE_SYNC,
68 };
69
70 /**
71  * Context for all functions in this plugin.
72  */
73 struct Plugin
74 {
75
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * MySQL context.
80    */
81   struct GNUNET_MYSQL_Context *mc;
82
83   /**
84    * Current transaction.
85    */
86   enum Transactions transaction;
87
88   /**
89    * Precompiled SQL for channel_key_store()
90    */
91   struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
92
93   /**
94    * Precompiled SQL for slave_key_store()
95    */
96   struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
97
98   /**
99    * Precompiled SQL for membership_store()
100    */
101   struct GNUNET_MYSQL_StatementHandle *insert_membership;
102
103   /**
104    * Precompiled SQL for membership_test()
105    */
106   struct GNUNET_MYSQL_StatementHandle *select_membership;
107
108   /**
109    * Precompiled SQL for fragment_store()
110    */
111   struct GNUNET_MYSQL_StatementHandle *insert_fragment;
112
113   /**
114    * Precompiled SQL for message_add_flags()
115    */
116   struct GNUNET_MYSQL_StatementHandle *update_message_flags;
117
118   /**
119    * Precompiled SQL for fragment_get()
120    */
121   struct GNUNET_MYSQL_StatementHandle *select_fragments;
122
123   /**
124    * Precompiled SQL for fragment_get()
125    */
126   struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
127
128   /**
129    * Precompiled SQL for message_get()
130    */
131   struct GNUNET_MYSQL_StatementHandle *select_messages;
132
133   /**
134    * Precompiled SQL for message_get()
135    */
136   struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
137
138   /**
139    * Precompiled SQL for message_get_fragment()
140    */
141   struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
142
143   /**
144    * Precompiled SQL for counters_get_message()
145    */
146   struct GNUNET_MYSQL_StatementHandle *select_counters_message;
147
148   /**
149    * Precompiled SQL for counters_get_state()
150    */
151   struct GNUNET_MYSQL_StatementHandle *select_counters_state;
152
153   /**
154    * Precompiled SQL for state_modify_end()
155    */
156   struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
157
158   /**
159    * Precompiled SQL for state_sync_end()
160    */
161   struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
162
163   /**
164    * Precompiled SQL for state_modify_op()
165    */
166   struct GNUNET_MYSQL_StatementHandle *insert_state_current;
167
168   /**
169    * Precompiled SQL for state_modify_end()
170    */
171   struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
172
173   /**
174    * Precompiled SQL for state_set_signed()
175    */
176   struct GNUNET_MYSQL_StatementHandle *update_state_signed;
177
178   /**
179    * Precompiled SQL for state_sync()
180    */
181   struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
182
183   /**
184    * Precompiled SQL for state_sync()
185    */
186   struct GNUNET_MYSQL_StatementHandle *delete_state;
187
188   /**
189    * Precompiled SQL for state_sync()
190    */
191   struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
192
193   /**
194    * Precompiled SQL for state_sync()
195    */
196   struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
197
198   /**
199    * Precompiled SQL for state_get_signed()
200    */
201   struct GNUNET_MYSQL_StatementHandle *select_state_signed;
202
203   /**
204    * Precompiled SQL for state_get()
205    */
206   struct GNUNET_MYSQL_StatementHandle *select_state_one;
207
208   /**
209    * Precompiled SQL for state_get_prefix()
210    */
211   struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
212
213 };
214
215 #if DEBUG_PSYCSTORE
216
217 static void
218 mysql_trace (void *cls, const char *sql)
219 {
220   LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
221 }
222
223 #endif
224
225
226 /**
227  * @brief Prepare a SQL statement
228  *
229  * @param dbh handle to the database
230  * @param sql SQL statement, UTF-8 encoded
231  * @param stmt set to the prepared statement
232  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
233  */
234 static int
235 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
236               const char *sql,
237               struct GNUNET_MYSQL_StatementHandle **stmt)
238 {
239   *stmt = GNUNET_MYSQL_statement_prepare (mc,
240                                           sql);
241
242   if (NULL == *stmt)
243   {
244     LOG (GNUNET_ERROR_TYPE_ERROR,
245          _("Error preparing SQL query: %s\n  %s\n"),
246          mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
247          sql);
248     return GNUNET_SYSERR;
249   }
250   LOG (GNUNET_ERROR_TYPE_DEBUG,
251        "Prepared `%s' / %p\n",
252        sql,
253        stmt);
254   return GNUNET_OK;
255 }
256
257
258 /**
259  * Initialize the database connections and associated
260  * data structures (create tables and indices
261  * as needed as well).
262  *
263  * @param plugin the plugin context (state for this module)
264  * @return #GNUNET_OK on success
265  */
266 static int
267 database_setup (struct Plugin *plugin)
268 {
269   /* Open database and precompile statements */
270   plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
271                                             "psycstore-mysql");
272
273   if (NULL == plugin->mc)
274   {
275     LOG (GNUNET_ERROR_TYPE_ERROR,
276          _("Unable to initialize Mysql.\n"));
277     return GNUNET_SYSERR;
278   }
279
280 #define STMT_RUN(sql) \
281   if (GNUNET_OK != \
282       GNUNET_MYSQL_statement_run (plugin->mc, \
283                                   sql)) \
284   { \
285     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
286                 _("Failed to run SQL statement `%s'\n"), \
287                 sql); \
288     return GNUNET_SYSERR; \
289   }
290
291   /* Create tables */
292   STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
293             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
294             " pub_key BLOB(32),\n"
295             " max_state_message_id BIGINT UNSIGNED,\n"
296             " state_hash_message_id BIGINT UNSIGNED,\n"
297             " PRIMARY KEY(id),\n"
298             " UNIQUE KEY(pub_key(32))\n"
299             ");");
300
301   STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
302             " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
303             " pub_key BLOB(32),\n"
304             " PRIMARY KEY(id),\n"
305             " UNIQUE KEY(pub_key(32))\n"
306             ");");
307
308   STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
309             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
310             "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
311             "  did_join TINYINT NOT NULL,\n"
312             "  announced_at BIGINT UNSIGNED NOT NULL,\n"
313             "  effective_since BIGINT UNSIGNED NOT NULL,\n"
314             "  group_generation BIGINT UNSIGNED NOT NULL\n"
315             ");");
316
317 /*** FIX because IF NOT EXISTS doesn't work ***/
318   GNUNET_MYSQL_statement_run (plugin->mc,
319                               "CREATE INDEX idx_membership_channel_id_slave_id "
320                               "ON membership (channel_id, slave_id);");
321
322   /** @todo messages table: add method_name column */
323   STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
324             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
325             "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
326             "  signature BLOB,\n"
327             "  purpose BLOB,\n"
328             "  fragment_id BIGINT UNSIGNED NOT NULL,\n"
329             "  fragment_offset BIGINT UNSIGNED NOT NULL,\n"
330                               "  message_id BIGINT UNSIGNED NOT NULL,\n"
331             "  group_generation BIGINT UNSIGNED NOT NULL,\n"
332             "  multicast_flags BIGINT UNSIGNED NOT NULL,\n"
333             "  psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
334             "  data BLOB,\n"
335             "  PRIMARY KEY (channel_id, fragment_id),\n"
336             "  UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
337             ");");
338
339   STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
340             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
341             "  name TEXT NOT NULL,\n"
342             "  value_current BLOB,\n"
343             "  value_signed BLOB\n"
344             //"  PRIMARY KEY (channel_id, name(255))\n"
345             ");");
346
347   STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
348             "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
349             "  name TEXT NOT NULL,\n"
350             "  value BLOB\n"
351             //"  PRIMARY KEY (channel_id, name(255))\n"
352             ");");
353 #undef STMT_RUN
354
355   /* Prepare statements */
356 #define PREP(stmt,handle)                                    \
357   if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
358   { \
359     GNUNET_break (0); \
360     return GNUNET_SYSERR; \
361   }
362   PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
363         &plugin->insert_channel_key);
364   PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
365         &plugin->insert_slave_key);
366   PREP ("INSERT INTO membership\n"
367         " (channel_id, slave_id, did_join, announced_at,\n"
368         "  effective_since, group_generation)\n"
369         "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
370         "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
371         "        ?, ?, ?, ?);",
372         &plugin->insert_membership);
373   PREP ("SELECT did_join FROM membership\n"
374         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
375         "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
376         "      AND effective_since <= ? AND did_join = 1\n"
377         "ORDER BY announced_at DESC LIMIT 1;",
378         &plugin->select_membership);
379
380   PREP ("INSERT IGNORE INTO messages\n"
381         " (channel_id, hop_counter, signature, purpose,\n"
382         "  fragment_id, fragment_offset, message_id,\n"
383         "  group_generation, multicast_flags, psycstore_flags, data)\n"
384         "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
385         "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
386         &plugin->insert_fragment);
387
388   PREP ("UPDATE messages\n"
389         "SET psycstore_flags = psycstore_flags | ?\n"
390         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
391         "      AND message_id = ? AND fragment_offset = 0;",
392         &plugin->update_message_flags);
393
394   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
395         "       fragment_offset, message_id, group_generation,\n"
396         "       multicast_flags, psycstore_flags, data\n"
397         "FROM messages\n"
398         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
399         "      AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
400         &plugin->select_fragments);
401
402   /** @todo select_messages: add method_prefix filter */
403   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
404         "       fragment_offset, message_id, group_generation,\n"
405         "       multicast_flags, psycstore_flags, data\n"
406         "FROM messages\n"
407         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
408         "      AND ? <= message_id AND message_id <= ?\n"
409         "LIMIT ?;",
410         &plugin->select_messages);
411
412   PREP ("SELECT * FROM\n"
413         "(SELECT hop_counter, signature, purpose, fragment_id,\n"
414         "        fragment_offset, message_id, group_generation,\n"
415         "        multicast_flags, psycstore_flags, data\n"
416         " FROM messages\n"
417         " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
418         " ORDER BY fragment_id DESC\n"
419         " LIMIT ?)\n"
420         "ORDER BY fragment_id;",
421         &plugin->select_latest_fragments);
422
423   /** @todo select_latest_messages: add method_prefix filter */
424   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
425         "       fragment_offset, message_id, group_generation,\n"
426         "        multicast_flags, psycstore_flags, data\n"
427         "FROM messages\n"
428         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
429         "      AND message_id IN\n"
430         "      (SELECT message_id\n"
431         "       FROM messages\n"
432         "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
433         "       GROUP BY message_id\n"
434         "       ORDER BY message_id\n"
435         "       DESC LIMIT ?)\n"
436         "ORDER BY fragment_id;",
437         &plugin->select_latest_messages);
438
439   PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
440         "       fragment_offset, message_id, group_generation,\n"
441         "       multicast_flags, psycstore_flags, data\n"
442         "FROM messages\n"
443         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
444         "      AND message_id = ? AND fragment_offset = ?;",
445         &plugin->select_message_fragment);
446
447   PREP ("SELECT fragment_id, message_id, group_generation\n"
448         "FROM messages\n"
449         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
450         "ORDER BY fragment_id DESC LIMIT 1;",
451         &plugin->select_counters_message);
452
453   PREP ("SELECT max_state_message_id\n"
454         "FROM channels\n"
455         "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
456         &plugin->select_counters_state);
457
458   PREP ("UPDATE channels\n"
459         "SET max_state_message_id = ?\n"
460         "WHERE pub_key = ?;",
461         &plugin->update_max_state_message_id);
462
463   PREP ("UPDATE channels\n"
464         "SET state_hash_message_id = ?\n"
465         "WHERE pub_key = ?;",
466         &plugin->update_state_hash_message_id);
467
468   PREP ("REPLACE INTO state\n"
469         "  (channel_id, name, value_current, value_signed)\n"
470         "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
471         "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
472         "             (SELECT ?) AS name,\n"
473         "             (SELECT ?) AS value_current\n"
474         "     ) AS new\n"
475         "LEFT JOIN (SELECT channel_id, name, value_signed\n"
476         "           FROM state) AS old\n"
477         "ON new.channel_id = old.channel_id AND new.name = old.name;",
478         &plugin->insert_state_current);
479
480   PREP ("DELETE FROM state\n"
481         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
482         "      AND (value_current IS NULL OR length(value_current) = 0)\n"
483         "      AND (value_signed IS NULL OR length(value_signed) = 0);",
484         &plugin->delete_state_empty);
485
486   PREP ("UPDATE state\n"
487         "SET value_signed = value_current\n"
488         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
489         &plugin->update_state_signed);
490
491   PREP ("DELETE FROM state\n"
492         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
493         &plugin->delete_state);
494
495   PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
496         "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
497         &plugin->insert_state_sync);
498
499   PREP ("INSERT INTO state\n"
500         " (channel_id, name, value_current, value_signed)\n"
501         "SELECT channel_id, name, value, value\n"
502         "FROM state_sync\n"
503         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
504         &plugin->insert_state_from_sync);
505
506   PREP ("DELETE FROM state_sync\n"
507         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
508         &plugin->delete_state_sync);
509
510   PREP ("SELECT value_current\n"
511         "FROM state\n"
512         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
513         "      AND name = ?;",
514         &plugin->select_state_one);
515
516   PREP ("SELECT name, value_current\n"
517         "FROM state\n"
518         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
519         "      AND (name = ? OR substr(name, 1, ?) = ?);",
520         &plugin->select_state_prefix);
521
522   PREP ("SELECT name, value_signed\n"
523         "FROM state\n"
524         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
525         "      AND value_signed IS NOT NULL;",
526         &plugin->select_state_signed);
527 #undef PREP
528
529   return GNUNET_OK;
530 }
531
532
533 /**
534  * Shutdown database connection and associate data
535  * structures.
536  * @param plugin the plugin context (state for this module)
537  */
538 static void
539 database_shutdown (struct Plugin *plugin)
540 {
541   GNUNET_MYSQL_context_destroy (plugin->mc);
542 }
543
544
545 /**
546  * Execute a prepared statement with a @a channel_key argument.
547  *
548  * @param plugin Plugin handle.
549  * @param stmt Statement to execute.
550  * @param channel_key Public key of the channel.
551  *
552  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
553  */
554 static int
555 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
556               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
557 {
558   struct GNUNET_MY_QueryParam params[] = {
559     GNUNET_MY_query_param_auto_from_type (channel_key),
560     GNUNET_MY_query_param_end
561   };
562
563   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
564   {
565     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
566               "mysql exec_channel", stmt);
567   }
568
569   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
570   {
571     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
572               "mysql_stmt_reset", stmt);
573     return GNUNET_SYSERR;
574   }
575
576   return GNUNET_OK;
577 }
578
579
580 /**
581  * Begin a transaction.
582  */
583 static int
584 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
585 {
586   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
587   {
588     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
589     return GNUNET_SYSERR;
590   }
591
592   plugin->transaction = transaction;
593   return GNUNET_OK;
594 }
595
596
597 /**
598  * Commit current transaction.
599  */
600 static int
601 transaction_commit (struct Plugin *plugin)
602 {
603   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
604   {
605     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
606     return GNUNET_SYSERR;
607   }
608
609   plugin->transaction = TRANSACTION_NONE;
610   return GNUNET_OK;
611 }
612
613
614 /**
615  * Roll back current transaction.
616  */
617 static int
618 transaction_rollback (struct Plugin *plugin)
619 {
620   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
621   {
622     LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
623     return GNUNET_SYSERR;
624   }
625
626   plugin->transaction = TRANSACTION_NONE;
627   return GNUNET_OK;
628 }
629
630
631 static int
632 channel_key_store (struct Plugin *plugin,
633                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
634 {
635   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
636
637   struct GNUNET_MY_QueryParam params[] = {
638     GNUNET_MY_query_param_auto_from_type (channel_key),
639     GNUNET_MY_query_param_end
640   };
641
642   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
643   {
644     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
645               "mysql exec_prepared", stmt);
646     return GNUNET_SYSERR;
647   }
648
649   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
650   {
651     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
652               "mysql_stmt_reset", stmt);
653     return GNUNET_SYSERR;
654   }
655
656   return GNUNET_OK;
657 }
658
659
660 static int
661 slave_key_store (struct Plugin *plugin,
662                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
663 {
664   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
665
666   struct GNUNET_MY_QueryParam params[] = {
667     GNUNET_MY_query_param_auto_from_type (slave_key),
668     GNUNET_MY_query_param_end
669   };
670
671   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
672   {
673     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
674               "mysql exec_prepared", stmt);
675     return GNUNET_SYSERR;
676   }
677
678   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
679   {
680     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
681               "mysql_stmt_reset", stmt);
682     return GNUNET_SYSERR;
683   }
684
685   return GNUNET_OK;
686 }
687
688
689 /**
690  * Store join/leave events for a PSYC channel in order to be able to answer
691  * membership test queries later.
692  *
693  * @see GNUNET_PSYCSTORE_membership_store()
694  *
695  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
696  */
697 static int
698 mysql_membership_store (void *cls,
699                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
700                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
701                          int did_join,
702                          uint64_t announced_at,
703                          uint64_t effective_since,
704                          uint64_t group_generation)
705 {
706   struct Plugin *plugin = cls;
707
708   uint32_t idid_join = (uint32_t)did_join;
709
710   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
711
712   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
713
714   if (announced_at > INT64_MAX ||
715       effective_since > INT64_MAX ||
716       group_generation > INT64_MAX)
717   {
718     GNUNET_break (0);
719     return GNUNET_SYSERR;
720   }
721
722   if (GNUNET_OK != channel_key_store (plugin, channel_key)
723       || GNUNET_OK != slave_key_store (plugin, slave_key))
724     return GNUNET_SYSERR;
725
726   struct GNUNET_MY_QueryParam params[] = {
727     GNUNET_MY_query_param_auto_from_type (channel_key),
728     GNUNET_MY_query_param_auto_from_type (slave_key),
729     GNUNET_MY_query_param_uint32 (&idid_join),
730     GNUNET_MY_query_param_uint64 (&announced_at),
731     GNUNET_MY_query_param_uint64 (&effective_since),
732     GNUNET_MY_query_param_uint64 (&group_generation),
733     GNUNET_MY_query_param_end
734   };
735
736   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
737   {
738     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
739               "mysql exec_prepared", stmt);
740     return GNUNET_SYSERR;
741   }
742
743   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
744   {
745     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
746               "mysql_stmt_reset", stmt);
747     return GNUNET_SYSERR;
748   }
749   return GNUNET_OK;
750 }
751
752 /**
753  * Test if a member was admitted to the channel at the given message ID.
754  *
755  * @see GNUNET_PSYCSTORE_membership_test()
756  *
757  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
758  *         #GNUNET_SYSERR if there was en error.
759  */
760 static int
761 membership_test (void *cls,
762                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
763                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
764                  uint64_t message_id)
765 {
766   struct Plugin *plugin = cls;
767
768   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
769
770   uint32_t did_join = 0;
771
772   int ret = GNUNET_SYSERR;
773
774   struct GNUNET_MY_QueryParam params_select[] = {
775     GNUNET_MY_query_param_auto_from_type (channel_key),
776     GNUNET_MY_query_param_auto_from_type (slave_key),
777     GNUNET_MY_query_param_uint64 (&message_id),
778     GNUNET_MY_query_param_end
779   };
780
781   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
782   {
783     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
784                 "mysql execute prepared", stmt);
785     return GNUNET_SYSERR;
786   }
787
788   struct GNUNET_MY_ResultSpec results_select[] = {
789     GNUNET_MY_result_spec_uint32 (&did_join),
790     GNUNET_MY_result_spec_end
791   };
792
793   switch (GNUNET_MY_extract_result (stmt, results_select))
794   {
795     case GNUNET_NO:
796       ret = GNUNET_NO;
797       break;
798     case GNUNET_OK:
799       ret = GNUNET_YES;
800       break;
801     default:
802       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
803                 "mysql extract_result", stmt);
804       return GNUNET_SYSERR;
805   }
806
807   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
808   {
809     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
810               "mysql_stmt_reset", stmt);
811     return GNUNET_SYSERR;
812   }
813
814   return ret;
815 }
816
817 /**
818  * Store a message fragment sent to a channel.
819  *
820  * @see GNUNET_PSYCSTORE_fragment_store()
821  *
822  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
823  */
824 static int
825 fragment_store (void *cls,
826                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
827                 const struct GNUNET_MULTICAST_MessageHeader *msg,
828                 uint32_t psycstore_flags)
829 {
830   struct Plugin *plugin = cls;
831
832   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
833
834   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
835
836   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
837
838   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
839   uint64_t message_id = GNUNET_ntohll (msg->message_id);
840   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
841
842   uint64_t hop_counter = ntohl(msg->hop_counter);
843   uint64_t flags = ntohl(msg->flags);
844
845   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
846       message_id > INT64_MAX || group_generation > INT64_MAX)
847   {
848     LOG(GNUNET_ERROR_TYPE_ERROR,
849          "Tried to store fragment with a field > INT64_MAX: "
850          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
851          message_id, group_generation);
852     GNUNET_break (0);
853     return GNUNET_SYSERR;
854   }
855
856   if (GNUNET_OK != channel_key_store (plugin, channel_key))
857     return GNUNET_SYSERR;
858
859   struct GNUNET_MY_QueryParam params_insert[] = {
860     GNUNET_MY_query_param_auto_from_type (channel_key),
861     GNUNET_MY_query_param_uint64 (&hop_counter),
862     GNUNET_MY_query_param_auto_from_type (&msg->signature),
863     GNUNET_MY_query_param_auto_from_type (&msg->purpose),
864     GNUNET_MY_query_param_uint64 (&fragment_id),
865     GNUNET_MY_query_param_uint64 (&fragment_offset),
866     GNUNET_MY_query_param_uint64 (&message_id),
867     GNUNET_MY_query_param_uint64 (&group_generation),
868     GNUNET_MY_query_param_uint64 (&flags),
869     GNUNET_MY_query_param_uint32 (&psycstore_flags),
870     GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
871                                                   - sizeof (*msg)),
872     GNUNET_MY_query_param_end
873   };
874
875   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
876   {
877     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
878               "mysql execute prepared", stmt);
879     return GNUNET_SYSERR;
880   }
881
882   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
883   {
884     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
885               "mysql_stmt_reset", stmt);
886     return GNUNET_SYSERR;
887   }
888
889   return GNUNET_OK;
890 }
891
892 /**
893  * Set additional flags for a given message.
894  *
895  * They are OR'd with any existing flags set.
896  *
897  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
898  */
899 static int
900 message_add_flags (void *cls,
901                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
902                    uint64_t message_id,
903                    uint32_t psycstore_flags)
904 {
905   struct Plugin *plugin = cls;
906   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
907
908   int sql_ret;
909   int ret = GNUNET_SYSERR;
910
911   struct GNUNET_MY_QueryParam params_update[] = {
912     GNUNET_MY_query_param_uint32 (&psycstore_flags),
913     GNUNET_MY_query_param_auto_from_type (channel_key),
914     GNUNET_MY_query_param_uint64 (&message_id),
915     GNUNET_MY_query_param_end
916   };
917
918   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
919   switch (sql_ret)
920   {
921     case GNUNET_OK:
922       ret = GNUNET_OK;
923       break;
924
925     default:
926        LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
927               "mysql execute prepared", stmt);
928   }
929
930   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
931   {
932     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
933               "mysql_stmt_reset", stmt);
934     return GNUNET_SYSERR;
935   }
936
937   return ret;
938 }
939
940
941 static int
942 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
943               GNUNET_PSYCSTORE_FragmentCallback cb,
944               void *cb_cls,
945               uint64_t *returned_fragments)
946 {
947
948   uint32_t hop_counter;
949   void *signature = NULL;
950   void *purpose = NULL;
951   size_t signature_size;
952   size_t purpose_size;
953   uint64_t fragment_id;
954   uint64_t fragment_offset;
955   uint64_t message_id;
956   uint64_t group_generation;
957   uint64_t flags;
958   void *buf;
959   size_t buf_size;
960   int ret = GNUNET_SYSERR;
961   int sql_ret;
962   struct GNUNET_MULTICAST_MessageHeader *mp;
963   uint64_t msg_flags;
964   struct GNUNET_MY_ResultSpec results[] = {
965     GNUNET_MY_result_spec_uint32 (&hop_counter),
966     GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
967     GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
968     GNUNET_MY_result_spec_uint64 (&fragment_id),
969     GNUNET_MY_result_spec_uint64 (&fragment_offset),
970     GNUNET_MY_result_spec_uint64 (&message_id),
971     GNUNET_MY_result_spec_uint64 (&group_generation),
972     GNUNET_MY_result_spec_uint64 (&msg_flags),
973     GNUNET_MY_result_spec_uint64 (&flags),
974     GNUNET_MY_result_spec_variable_size (&buf,
975                                          &buf_size),
976     GNUNET_MY_result_spec_end
977   };
978
979   do
980   {
981     sql_ret = GNUNET_MY_extract_result (stmt, results);
982     switch (sql_ret)
983     {
984     case GNUNET_NO:
985       if (ret != GNUNET_YES)
986         ret = GNUNET_NO;
987       break;
988
989     case GNUNET_YES:
990       mp = GNUNET_malloc (sizeof (*mp) + buf_size);
991
992       mp->header.size = htons (sizeof (*mp) + buf_size);
993       mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
994       mp->hop_counter = htonl (hop_counter);
995       GNUNET_memcpy (&mp->signature,
996                      signature,
997                      signature_size);
998       GNUNET_memcpy (&mp->purpose,
999                      purpose,
1000                      purpose_size);
1001       mp->fragment_id = GNUNET_htonll (fragment_id);
1002       mp->fragment_offset = GNUNET_htonll (fragment_offset);
1003       mp->message_id = GNUNET_htonll (message_id);
1004       mp->group_generation = GNUNET_htonll (group_generation);
1005       mp->flags = htonl(msg_flags);
1006
1007       GNUNET_memcpy (&mp[1],
1008                      buf,
1009                      buf_size);
1010       ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1011       if (NULL != returned_fragments)
1012         (*returned_fragments)++;
1013       GNUNET_MY_cleanup_result (results);
1014       break;
1015
1016     default:
1017       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1018                  "mysql extract_result", stmt);
1019     }
1020   }
1021   while (GNUNET_YES == sql_ret);
1022
1023   // for debugging
1024   if (GNUNET_NO == ret)
1025     GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1026                "Empty result set\n");
1027
1028   return ret;
1029 }
1030
1031
1032 static int
1033 fragment_select (struct Plugin *plugin,
1034                  struct GNUNET_MYSQL_StatementHandle *stmt,
1035                  struct GNUNET_MY_QueryParam *params,
1036                  uint64_t *returned_fragments,
1037                  GNUNET_PSYCSTORE_FragmentCallback cb,
1038                  void *cb_cls)
1039 {
1040   int ret = GNUNET_SYSERR;
1041   int sql_ret;
1042
1043   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1044   switch (sql_ret)
1045   {
1046     case GNUNET_NO:
1047       if (ret != GNUNET_YES)
1048         ret = GNUNET_NO;
1049       break;
1050
1051     case GNUNET_YES:
1052       ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
1053       break;
1054
1055     default:
1056       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1057                  "mysql exec_prepared", stmt);
1058   }
1059   return ret;
1060 }
1061
1062
1063 /**
1064  * Retrieve a message fragment range by fragment ID.
1065  *
1066  * @see GNUNET_PSYCSTORE_fragment_get()
1067  *
1068  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1069  */
1070 static int
1071 fragment_get (void *cls,
1072               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1073               uint64_t first_fragment_id,
1074               uint64_t last_fragment_id,
1075               uint64_t *returned_fragments,
1076               GNUNET_PSYCSTORE_FragmentCallback cb,
1077               void *cb_cls)
1078 {
1079   struct Plugin *plugin = cls;
1080   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1081   int ret = GNUNET_SYSERR;
1082   struct GNUNET_MY_QueryParam params_select[] = {
1083     GNUNET_MY_query_param_auto_from_type (channel_key),
1084     GNUNET_MY_query_param_uint64 (&first_fragment_id),
1085     GNUNET_MY_query_param_uint64 (&last_fragment_id),
1086     GNUNET_MY_query_param_end
1087   };
1088
1089   *returned_fragments = 0;
1090   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1091
1092   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1093   {
1094     LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1095               "mysql_stmt_reset", stmt);
1096     return GNUNET_SYSERR;
1097   }
1098
1099   return ret;
1100 }
1101
1102
1103 /**
1104  * Retrieve a message fragment range by fragment ID.
1105  *
1106  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1107  *
1108  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1109  */
1110 static int
1111 fragment_get_latest (void *cls,
1112                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1113                      uint64_t fragment_limit,
1114                      uint64_t *returned_fragments,
1115                      GNUNET_PSYCSTORE_FragmentCallback cb,
1116                      void *cb_cls)
1117 {
1118   struct Plugin *plugin = cls;
1119
1120   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1121
1122   int ret = GNUNET_SYSERR;
1123   *returned_fragments = 0;
1124
1125   struct GNUNET_MY_QueryParam params_select[] = {
1126     GNUNET_MY_query_param_auto_from_type (channel_key),
1127     GNUNET_MY_query_param_uint64 (&fragment_limit),
1128     GNUNET_MY_query_param_end
1129   };
1130
1131   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1132
1133   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1134   {
1135     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1136               "mysql_stmt_reset", stmt);
1137     return GNUNET_SYSERR;
1138   }
1139
1140   return ret;
1141 }
1142
1143
1144 /**
1145  * Retrieve all fragments of a message ID range.
1146  *
1147  * @see GNUNET_PSYCSTORE_message_get()
1148  *
1149  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1150  */
1151 static int
1152 message_get (void *cls,
1153              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1154              uint64_t first_message_id,
1155              uint64_t last_message_id,
1156              uint64_t fragment_limit,
1157              uint64_t *returned_fragments,
1158              GNUNET_PSYCSTORE_FragmentCallback cb,
1159              void *cb_cls)
1160 {
1161   struct Plugin *plugin = cls;
1162   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1163   int ret;
1164
1165   if (0 == fragment_limit)
1166     fragment_limit = UINT64_MAX;
1167
1168   struct GNUNET_MY_QueryParam params_select[] = {
1169     GNUNET_MY_query_param_auto_from_type (channel_key),
1170     GNUNET_MY_query_param_uint64 (&first_message_id),
1171     GNUNET_MY_query_param_uint64 (&last_message_id),
1172     GNUNET_MY_query_param_uint64 (&fragment_limit),
1173     GNUNET_MY_query_param_end
1174   };
1175
1176   *returned_fragments = 0;
1177   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1178
1179   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1180   {
1181     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1182               "mysql_stmt_reset", stmt);
1183     return GNUNET_SYSERR;
1184   }
1185
1186   return ret;
1187 }
1188
1189
1190 /**
1191  * Retrieve all fragments of the latest messages.
1192  *
1193  * @see GNUNET_PSYCSTORE_message_get_latest()
1194  *
1195  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1196  */
1197 static int
1198 message_get_latest (void *cls,
1199                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1200                     uint64_t message_limit,
1201                     uint64_t *returned_fragments,
1202                     GNUNET_PSYCSTORE_FragmentCallback cb,
1203                     void *cb_cls)
1204 {
1205   struct Plugin *plugin = cls;
1206
1207   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1208
1209   int ret = GNUNET_SYSERR;
1210   *returned_fragments = 0;
1211
1212   struct GNUNET_MY_QueryParam params_select[] = {
1213     GNUNET_MY_query_param_auto_from_type (channel_key),
1214     GNUNET_MY_query_param_auto_from_type (channel_key),
1215     GNUNET_MY_query_param_uint64 (&message_limit),
1216     GNUNET_MY_query_param_end
1217   };
1218
1219   ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1220
1221   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1222   {
1223     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1224               "mysql_stmt_reset", stmt);
1225     return GNUNET_SYSERR;
1226   }
1227
1228   return ret;
1229 }
1230
1231
1232 /**
1233  * Retrieve a fragment of message specified by its message ID and fragment
1234  * offset.
1235  *
1236  * @see GNUNET_PSYCSTORE_message_get_fragment()
1237  *
1238  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1239  */
1240 static int
1241 message_get_fragment (void *cls,
1242                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1243                       uint64_t message_id,
1244                       uint64_t fragment_offset,
1245                       GNUNET_PSYCSTORE_FragmentCallback cb,
1246                       void *cb_cls)
1247 {
1248   struct Plugin *plugin = cls;
1249   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1250   int sql_ret;
1251   int ret = GNUNET_SYSERR;
1252
1253   struct GNUNET_MY_QueryParam params_select[] = {
1254     GNUNET_MY_query_param_auto_from_type (channel_key),
1255     GNUNET_MY_query_param_uint64 (&message_id),
1256     GNUNET_MY_query_param_uint64 (&fragment_offset),
1257     GNUNET_MY_query_param_end
1258   };
1259
1260   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1261   switch (sql_ret)
1262   {
1263     case GNUNET_NO:
1264       ret = GNUNET_NO;
1265       break;
1266
1267     case GNUNET_OK:
1268       ret = fragment_row (stmt, cb, cb_cls, NULL);
1269       break;
1270
1271     default:
1272       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1273               "mysql execute prepared", stmt);
1274   }
1275
1276   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1277   {
1278     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1279               "mysql_stmt_reset", stmt);
1280     return GNUNET_SYSERR;
1281   }
1282
1283   return ret;
1284 }
1285
1286 /**
1287  * Retrieve the max. values of message counters for a channel.
1288  *
1289  * @see GNUNET_PSYCSTORE_counters_get()
1290  *
1291  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1292  */
1293 static int
1294 counters_message_get (void *cls,
1295                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1296                       uint64_t *max_fragment_id,
1297                       uint64_t *max_message_id,
1298                       uint64_t *max_group_generation)
1299 {
1300   struct Plugin *plugin = cls;
1301
1302   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1303
1304   int ret = GNUNET_SYSERR;
1305
1306   struct GNUNET_MY_QueryParam params_select[] = {
1307     GNUNET_MY_query_param_auto_from_type (channel_key),
1308     GNUNET_MY_query_param_end
1309   };
1310
1311   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1312   {
1313     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1314               "mysql execute prepared", stmt);
1315     return GNUNET_SYSERR;
1316   }
1317
1318   struct GNUNET_MY_ResultSpec results_select[] = {
1319     GNUNET_MY_result_spec_uint64 (max_fragment_id),
1320     GNUNET_MY_result_spec_uint64 (max_message_id),
1321     GNUNET_MY_result_spec_uint64 (max_group_generation),
1322     GNUNET_MY_result_spec_end
1323   };
1324
1325   ret = GNUNET_MY_extract_result (stmt, results_select);
1326
1327   if (GNUNET_OK != ret)
1328   {
1329     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1330               "mysql extract_result", stmt);
1331     return GNUNET_SYSERR;
1332   }
1333
1334   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1335   {
1336     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1337               "mysql_stmt_reset", stmt);
1338     return GNUNET_SYSERR;
1339   }
1340
1341   return ret;
1342 }
1343
1344 /**
1345  * Retrieve the max. values of state counters for a channel.
1346  *
1347  * @see GNUNET_PSYCSTORE_counters_get()
1348  *
1349  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1350  */
1351 static int
1352 counters_state_get (void *cls,
1353                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1354                     uint64_t *max_state_message_id)
1355 {
1356   struct Plugin *plugin = cls;
1357
1358   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1359
1360   int ret = GNUNET_SYSERR;
1361
1362   struct GNUNET_MY_QueryParam params_select[] = {
1363     GNUNET_MY_query_param_auto_from_type (channel_key),
1364     GNUNET_MY_query_param_end
1365   };
1366
1367   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1368   {
1369     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1370               "mysql execute prepared", stmt);
1371     return GNUNET_SYSERR;
1372   }
1373
1374   struct GNUNET_MY_ResultSpec results_select[] = {
1375     GNUNET_MY_result_spec_uint64 (max_state_message_id),
1376     GNUNET_MY_result_spec_end
1377   };
1378
1379   ret = GNUNET_MY_extract_result (stmt, results_select);
1380
1381   if (GNUNET_OK != ret)
1382   {
1383     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1384               "mysql extract_result", stmt);
1385     return GNUNET_SYSERR;
1386   }
1387
1388   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1389   {
1390     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1391               "mysql_stmt_reset", stmt);
1392     return GNUNET_SYSERR;
1393   }
1394
1395   return ret;
1396 }
1397
1398
1399 /**
1400  * Assign a value to a state variable.
1401  *
1402  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1403  */
1404 static int
1405 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1406               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1407               const char *name, const void *value, size_t value_size)
1408 {
1409   int ret = GNUNET_SYSERR;
1410
1411   struct GNUNET_MY_QueryParam params[] = {
1412     GNUNET_MY_query_param_auto_from_type (channel_key),
1413     GNUNET_MY_query_param_string (name),
1414     GNUNET_MY_query_param_fixed_size(value, value_size),
1415     GNUNET_MY_query_param_end
1416   };
1417
1418   ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1419   if (GNUNET_OK != ret)
1420   {
1421     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422               "mysql exec_prepared", stmt);
1423     return GNUNET_SYSERR;
1424   }
1425
1426   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1427   {
1428     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1429               "mysql_stmt_reset", stmt);
1430     return GNUNET_SYSERR;
1431   }
1432
1433   return ret;
1434 }
1435
1436
1437 static int
1438 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1439                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1440                    uint64_t message_id)
1441 {
1442   struct GNUNET_MY_QueryParam params[] = {
1443     GNUNET_MY_query_param_uint64 (&message_id),
1444     GNUNET_MY_query_param_auto_from_type (channel_key),
1445     GNUNET_MY_query_param_end
1446   };
1447
1448   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1449                                             stmt,
1450                                             params))
1451   {
1452     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1453               "mysql execute prepared", stmt);
1454     return GNUNET_SYSERR;
1455   }
1456
1457   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1458   {
1459     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1460               "mysql_stmt_reset", stmt);
1461     return GNUNET_SYSERR;
1462   }
1463
1464   return GNUNET_OK;
1465 }
1466
1467
1468 /**
1469  * Begin modifying current state.
1470  */
1471 static int
1472 state_modify_begin (void *cls,
1473                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1474                     uint64_t message_id, uint64_t state_delta)
1475 {
1476   struct Plugin *plugin = cls;
1477
1478   if (state_delta > 0)
1479   {
1480     /**
1481      * We can only apply state modifiers in the current message if modifiers in
1482      * the previous stateful message (message_id - state_delta) were already
1483      * applied.
1484      */
1485
1486     uint64_t max_state_message_id = 0;
1487     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1488     switch (ret)
1489     {
1490     case GNUNET_OK:
1491     case GNUNET_NO: // no state yet
1492       ret = GNUNET_OK;
1493       break;
1494     default:
1495       return ret;
1496     }
1497
1498     if (max_state_message_id < message_id - state_delta)
1499       return GNUNET_NO; /* some stateful messages not yet applied */
1500     else if (message_id - state_delta < max_state_message_id)
1501       return GNUNET_NO; /* changes already applied */
1502   }
1503
1504   if (TRANSACTION_NONE != plugin->transaction)
1505   {
1506     /** @todo FIXME: wait for other transaction to finish  */
1507     return GNUNET_SYSERR;
1508   }
1509   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1510 }
1511
1512
1513 /**
1514  * Set the current value of state variable.
1515  *
1516  * @see GNUNET_PSYCSTORE_state_modify()
1517  *
1518  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1519  */
1520 static int
1521 state_modify_op (void *cls,
1522                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1523                  enum GNUNET_PSYC_Operator op,
1524                  const char *name, const void *value, size_t value_size)
1525 {
1526   struct Plugin *plugin = cls;
1527   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1528
1529   switch (op)
1530   {
1531   case GNUNET_PSYC_OP_ASSIGN:
1532     return state_assign (plugin, plugin->insert_state_current,
1533                          channel_key, name, value, value_size);
1534
1535   default: /** @todo implement more state operations */
1536     GNUNET_break (0);
1537     return GNUNET_SYSERR;
1538   }
1539 }
1540
1541
1542 /**
1543  * End modifying current state.
1544  */
1545 static int
1546 state_modify_end (void *cls,
1547                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1548                   uint64_t message_id)
1549 {
1550   struct Plugin *plugin = cls;
1551   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1552
1553   return
1554     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1555     && GNUNET_OK == update_message_id (plugin,
1556                                        plugin->update_max_state_message_id,
1557                                        channel_key, message_id)
1558     && GNUNET_OK == transaction_commit (plugin)
1559     ? GNUNET_OK : GNUNET_SYSERR;
1560 }
1561
1562
1563 /**
1564  * Begin state synchronization.
1565  */
1566 static int
1567 state_sync_begin (void *cls,
1568                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1569 {
1570   struct Plugin *plugin = cls;
1571   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1572 }
1573
1574
1575 /**
1576  * Assign current value of a state variable.
1577  *
1578  * @see GNUNET_PSYCSTORE_state_modify()
1579  *
1580  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1581  */
1582 static int
1583 state_sync_assign (void *cls,
1584                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1585                 const char *name, const void *value, size_t value_size)
1586 {
1587   struct Plugin *plugin = cls;
1588   return state_assign (cls, plugin->insert_state_sync,
1589                        channel_key, name, value, value_size);
1590 }
1591
1592
1593 /**
1594  * End modifying current state.
1595  */
1596 static int
1597 state_sync_end (void *cls,
1598                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1599                 uint64_t max_state_message_id,
1600                 uint64_t state_hash_message_id)
1601 {
1602   struct Plugin *plugin = cls;
1603   int ret = GNUNET_SYSERR;
1604
1605   if (TRANSACTION_NONE != plugin->transaction)
1606   {
1607     /** @todo FIXME: wait for other transaction to finish  */
1608     return GNUNET_SYSERR;
1609   }
1610
1611   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1612     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1613     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1614                                   channel_key)
1615     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1616                                   channel_key)
1617     && GNUNET_OK == update_message_id (plugin,
1618                                        plugin->update_state_hash_message_id,
1619                                        channel_key, state_hash_message_id)
1620     && GNUNET_OK == update_message_id (plugin,
1621                                        plugin->update_max_state_message_id,
1622                                        channel_key, max_state_message_id)
1623     && GNUNET_OK == transaction_commit (plugin)
1624     ? ret = GNUNET_OK
1625     : transaction_rollback (plugin);
1626   return ret;
1627 }
1628
1629
1630 /**
1631  * Delete the whole state.
1632  *
1633  * @see GNUNET_PSYCSTORE_state_reset()
1634  *
1635  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1636  */
1637 static int
1638 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1639 {
1640   struct Plugin *plugin = cls;
1641   return exec_channel (plugin, plugin->delete_state, channel_key);
1642 }
1643
1644
1645 /**
1646  * Update signed values of state variables in the state store.
1647  *
1648  * @see GNUNET_PSYCSTORE_state_hash_update()
1649  *
1650  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1651  */
1652 static int
1653 state_update_signed (void *cls,
1654                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1655 {
1656   struct Plugin *plugin = cls;
1657   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1658 }
1659
1660
1661 /**
1662  * Retrieve a state variable by name.
1663  *
1664  * @see GNUNET_PSYCSTORE_state_get()
1665  *
1666  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1667  */
1668 static int
1669 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1670            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1671 {
1672   struct Plugin *plugin = cls;
1673   int ret = GNUNET_SYSERR;
1674   int sql_ret ;
1675
1676   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1677
1678   struct GNUNET_MY_QueryParam params_select[] = {
1679     GNUNET_MY_query_param_auto_from_type (channel_key),
1680     GNUNET_MY_query_param_string (name),
1681     GNUNET_MY_query_param_end
1682   };
1683
1684   void *value_current = NULL;
1685   size_t value_size = 0;
1686
1687   struct GNUNET_MY_ResultSpec results[] = {
1688     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1689     GNUNET_MY_result_spec_end
1690   };
1691
1692   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1693   {
1694     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1695               "mysql exec_prepared", stmt);
1696   }
1697   else
1698   {
1699     sql_ret = GNUNET_MY_extract_result (stmt, results);
1700     switch (sql_ret)
1701     {
1702     case GNUNET_NO:
1703       ret = GNUNET_NO;
1704       break;
1705
1706     case GNUNET_YES:
1707       ret = cb (cb_cls, name, value_current, value_size);
1708       break;
1709
1710     default:
1711       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1712                 "mysql extract_result", stmt);
1713     }
1714   }
1715
1716   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1717   {
1718     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1719               "mysql_stmt_reset", stmt);
1720     return GNUNET_SYSERR;
1721   }
1722
1723   return ret;
1724 }
1725
1726
1727 /**
1728  * Retrieve all state variables for a channel with the given prefix.
1729  *
1730  * @see GNUNET_PSYCSTORE_state_get_prefix()
1731  *
1732  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1733  */
1734 static int
1735 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1736                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1737                   void *cb_cls)
1738 {
1739   struct Plugin *plugin = cls;
1740   int ret = GNUNET_SYSERR;
1741
1742   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1743
1744   uint32_t name_len = (uint32_t) strlen (name);
1745
1746   struct GNUNET_MY_QueryParam params_select[] = {
1747     GNUNET_MY_query_param_auto_from_type (channel_key),
1748     GNUNET_MY_query_param_string (name),
1749     GNUNET_MY_query_param_uint32 (&name_len),
1750     GNUNET_MY_query_param_string (name),
1751     GNUNET_MY_query_param_end
1752   };
1753
1754   char *name2 = "";
1755   void *value_current = NULL;
1756   size_t value_size = 0;
1757
1758   struct GNUNET_MY_ResultSpec results[] = {
1759     GNUNET_MY_result_spec_string (&name2),
1760     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1761     GNUNET_MY_result_spec_end
1762   };;
1763
1764   int sql_ret;
1765
1766   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1767   {
1768     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1769               "mysql exec_prepared", stmt);
1770     return GNUNET_SYSERR;
1771   }
1772
1773   do
1774   {
1775     sql_ret = GNUNET_MY_extract_result (stmt, results);
1776     switch (sql_ret)
1777     {
1778       case GNUNET_NO:
1779         if (ret != GNUNET_YES)
1780           ret = GNUNET_NO;
1781         break;
1782
1783       case GNUNET_YES:
1784         ret = cb (cb_cls, (const char *) name2, value_current, value_size);
1785
1786         if (ret != GNUNET_YES)
1787           sql_ret = GNUNET_NO;
1788         break;
1789
1790       default:
1791         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1792                   "mysql extract_result", stmt);
1793     }
1794   }
1795   while (sql_ret == GNUNET_YES);
1796
1797   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1798   {
1799     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1800               "mysql_stmt_reset", stmt);
1801     return GNUNET_SYSERR;
1802   }
1803
1804   return ret;
1805 }
1806
1807
1808 /**
1809  * Retrieve all signed state variables for a channel.
1810  *
1811  * @see GNUNET_PSYCSTORE_state_get_signed()
1812  *
1813  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1814  */
1815 static int
1816 state_get_signed (void *cls,
1817                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1818                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1819 {
1820   struct Plugin *plugin = cls;
1821   int ret = GNUNET_SYSERR;
1822
1823   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1824
1825   struct GNUNET_MY_QueryParam params_select[] = {
1826     GNUNET_MY_query_param_auto_from_type (channel_key),
1827     GNUNET_MY_query_param_end
1828   };
1829
1830   int sql_ret;
1831
1832   char *name = "";
1833   void *value_signed = NULL;
1834   size_t value_size = 0;
1835
1836   struct GNUNET_MY_ResultSpec results[] = {
1837     GNUNET_MY_result_spec_string (&name),
1838     GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1839     GNUNET_MY_result_spec_end
1840   };
1841
1842   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1843   {
1844     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1845               "mysql exec_prepared", stmt);
1846     return GNUNET_SYSERR;
1847   }
1848
1849   do
1850   {
1851     sql_ret = GNUNET_MY_extract_result (stmt, results);
1852     switch (sql_ret)
1853     {
1854       case GNUNET_NO:
1855         if (ret != GNUNET_YES)
1856           ret = GNUNET_NO;
1857         break;
1858
1859       case GNUNET_YES:
1860         ret = cb (cb_cls, (const char *) name, value_signed, value_size);
1861
1862         if (ret != GNUNET_YES)
1863             sql_ret = GNUNET_NO;
1864         break;
1865
1866       default:
1867          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1868               "mysql extract_result", stmt);
1869     }
1870   }
1871   while (sql_ret == GNUNET_YES);
1872
1873   if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1874   {
1875     LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1876               "mysql_stmt_reset", stmt);
1877     return GNUNET_SYSERR;
1878   }
1879
1880   return ret;
1881 }
1882
1883
1884 /**
1885  * Entry point for the plugin.
1886  *
1887  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1888  * @return NULL on error, otherwise the plugin context
1889  */
1890 void *
1891 libgnunet_plugin_psycstore_mysql_init (void *cls)
1892 {
1893   static struct Plugin plugin;
1894   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1895   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1896
1897   if (NULL != plugin.cfg)
1898     return NULL;                /* can only initialize once! */
1899   memset (&plugin, 0, sizeof (struct Plugin));
1900   plugin.cfg = cfg;
1901   if (GNUNET_OK != database_setup (&plugin))
1902   {
1903     database_shutdown (&plugin);
1904     return NULL;
1905   }
1906   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1907   api->cls = &plugin;
1908   api->membership_store = &mysql_membership_store;
1909   api->membership_test = &membership_test;
1910   api->fragment_store = &fragment_store;
1911   api->message_add_flags = &message_add_flags;
1912   api->fragment_get = &fragment_get;
1913   api->fragment_get_latest = &fragment_get_latest;
1914   api->message_get = &message_get;
1915   api->message_get_latest = &message_get_latest;
1916   api->message_get_fragment = &message_get_fragment;
1917   api->counters_message_get = &counters_message_get;
1918   api->counters_state_get = &counters_state_get;
1919   api->state_modify_begin = &state_modify_begin;
1920   api->state_modify_op = &state_modify_op;
1921   api->state_modify_end = &state_modify_end;
1922   api->state_sync_begin = &state_sync_begin;
1923   api->state_sync_assign = &state_sync_assign;
1924   api->state_sync_end = &state_sync_end;
1925   api->state_reset = &state_reset;
1926   api->state_update_signed = &state_update_signed;
1927   api->state_get = &state_get;
1928   api->state_get_prefix = &state_get_prefix;
1929   api->state_get_signed = &state_get_signed;
1930
1931   LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
1932   return api;
1933 }
1934
1935
1936 /**
1937  * Exit point from the plugin.
1938  *
1939  * @param cls The plugin context (as returned by "init")
1940  * @return Always NULL
1941  */
1942 void *
1943 libgnunet_plugin_psycstore_mysql_done (void *cls)
1944 {
1945   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1946   struct Plugin *plugin = api->cls;
1947
1948   database_shutdown (plugin);
1949   plugin->cfg = NULL;
1950   GNUNET_free (api);
1951   LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
1952   return NULL;
1953 }
1954
1955 /* end of plugin_psycstore_mysql.c */