2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/plugin_psycstore_mysql.c
23 * @brief mysql-based psycstore backend
24 * @author Gabor X Toth
25 * @author Christian Grothoff
26 * @author Christophe Genevey
30 #include "gnunet_psycstore_plugin.h"
31 #include "gnunet_psycstore_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_crypto_lib.h"
34 #include "gnunet_psyc_util_lib.h"
35 #include "psycstore.h"
36 #include "gnunet_my_lib.h"
37 #include "gnunet_mysql_lib.h"
38 #include <mysql/mysql.h>
41 * After how many ms "busy" should a DB operation fail for good? A
42 * low value makes sure that we are more responsive to requests
43 * (especially PUTs). A high value guarantees a higher success rate
44 * (SELECTs in iterate can take several seconds despite LIMIT=1).
46 * The default value of 1s should ensure that users do not experience
47 * huge latencies while at the same time allowing operations to
48 * succeed with reasonable probability.
50 #define BUSY_TIMEOUT_MS 1000
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
55 * Log an error message at log-level 'level' that indicates
56 * a failure of the command 'cmd' on file 'filename'
57 * with the message given by strerror(errno).
59 #define LOG_MYSQL(db, level, cmd, stmt) \
61 GNUNET_log_from (level, "psycstore-mysql", \
62 _("`%s' failed at %s:%d with error: %s\n"), \
63 cmd, __FILE__, __LINE__, \
64 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
67 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
71 TRANSACTION_STATE_MODIFY,
72 TRANSACTION_STATE_SYNC,
76 * Context for all functions in this plugin.
81 const struct GNUNET_CONFIGURATION_Handle *cfg;
89 *Handle to talk to Mysql
91 struct GNUNET_MYSQL_Context *mc;
94 * Current transaction.
96 enum Transactions transaction;
98 struct GNUNET_MYSQL_StatementHandle *transaction_begin;
100 struct GNUNET_MYSQL_StatementHandle *transaction_commit;
102 struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
105 * Precompiled SQL for channel_key_store()
107 struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
111 * Precompiled SQL for slave_key_store()
113 struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
116 * Precompiled SQL for membership_store()
118 struct GNUNET_MYSQL_StatementHandle *insert_membership;
121 * Precompiled SQL for membership_test()
123 struct GNUNET_MYSQL_StatementHandle *select_membership;
126 * Precompiled SQL for fragment_store()
128 struct GNUNET_MYSQL_StatementHandle *insert_fragment;
131 * Precompiled SQL for message_add_flags()
133 struct GNUNET_MYSQL_StatementHandle *update_message_flags;
136 * Precompiled SQL for fragment_get()
138 struct GNUNET_MYSQL_StatementHandle *select_fragments;
141 * Precompiled SQL for fragment_get()
143 struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
146 * Precompiled SQL for message_get()
148 struct GNUNET_MYSQL_StatementHandle *select_messages;
151 * Precompiled SQL for message_get()
153 struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
156 * Precompiled SQL for message_get_fragment()
158 struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
161 * Precompiled SQL for counters_get_message()
163 struct GNUNET_MYSQL_StatementHandle *select_counters_message;
166 * Precompiled SQL for counters_get_state()
168 struct GNUNET_MYSQL_StatementHandle *select_counters_state;
171 * Precompiled SQL for state_modify_end()
173 struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
176 * Precompiled SQL for state_sync_end()
178 struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
181 * Precompiled SQL for state_modify_op()
183 struct GNUNET_MYSQL_StatementHandle *insert_state_current;
186 * Precompiled SQL for state_modify_end()
188 struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
191 * Precompiled SQL for state_set_signed()
193 struct GNUNET_MYSQL_StatementHandle *update_state_signed;
196 * Precompiled SQL for state_sync()
198 struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
201 * Precompiled SQL for state_sync()
203 struct GNUNET_MYSQL_StatementHandle *delete_state;
206 * Precompiled SQL for state_sync()
208 struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
211 * Precompiled SQL for state_sync()
213 struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
216 * Precompiled SQL for state_get_signed()
218 struct GNUNET_MYSQL_StatementHandle *select_state_signed;
221 * Precompiled SQL for state_get()
223 struct GNUNET_MYSQL_StatementHandle *select_state_one;
226 * Precompiled SQL for state_get_prefix()
228 struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
235 mysql_trace (void *cls, const char *sql)
237 LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
244 * @brief Prepare a SQL statement
246 * @param dbh handle to the database
247 * @param sql SQL statement, UTF-8 encoded
248 * @param stmt set to the prepared statement
249 * @return 0 on success
252 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
254 struct GNUNET_MYSQL_StatementHandle **stmt)
256 *stmt = GNUNET_MYSQL_statement_prepare (mc,
259 LOG(GNUNET_ERROR_TYPE_DEBUG,
260 "Prepared `%s' / %p\n", sql, stmt);
262 LOG(GNUNET_ERROR_TYPE_ERROR,
263 _("Error preparing SQL query: %s\n %s\n"),
264 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
271 * Initialize the database connections and associated
272 * data structures (create tables and indices
273 * as needed as well).
275 * @param plugin the plugin context (state for this module)
276 * @return GNUNET_OK on success
279 database_setup (struct Plugin *plugin)
284 GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
285 "FILENAME", &filename))
287 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
288 "psycstore-mysql", "FILENAME");
289 return GNUNET_SYSERR;
292 if (GNUNET_OK != GNUNET_DISK_file_test (filename))
294 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
297 GNUNET_free (filename);
298 return GNUNET_SYSERR;
301 /* filename should be UTF-8-encoded. If it isn't, it's a bug */
302 plugin->fn = filename;
304 /* Open database and precompile statements */
305 plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
307 if (NULL == plugin->mc)
309 LOG(GNUNET_ERROR_TYPE_ERROR,
310 _("Unable to initialize Mysql.\n"));
311 return GNUNET_SYSERR;
316 GNUNET_MYSQL_statement_run (plugin->mc,
317 "CREATE TABLE IF NOT EXISTS channels (\n"
318 " id INT AUTO_INCREMENT,\n"
320 " max_state_message_id INT,\n"
321 " state_hash_message_id INT,\n"
322 " PRIMARY KEY(id),\n"
323 " UNIQUE KEY(pub_key(5))\n"
326 GNUNET_MYSQL_statement_run (plugin->mc,
327 "CREATE TABLE IF NOT EXISTS slaves (\n"
328 " id INT AUTO_INCREMENT,\n"
330 " PRIMARY KEY(id),\n"
331 " UNIQUE KEY(pub_key(5))\n"
334 GNUNET_MYSQL_statement_run (plugin->mc,
335 "CREATE TABLE IF NOT EXISTS membership (\n"
336 " channel_id INT NOT NULL REFERENCES channels(id),\n"
337 " slave_id INT NOT NULL REFERENCES slaves(id),\n"
338 " did_join INT NOT NULL,\n"
339 " announced_at BIGINT UNSIGNED NOT NULL,\n"
340 " effective_since BIGINT UNSIGNED NOT NULL,\n"
341 " group_generation BIGINT UNSIGNED NOT NULL\n"
344 /*** FIX because IF NOT EXISTS doesn't work ***/
345 GNUNET_MYSQL_statement_run (plugin->mc,
346 "CREATE INDEX idx_membership_channel_id_slave_id "
347 "ON membership (channel_id, slave_id);");
349 /** @todo messages table: add method_name column */
350 GNUNET_MYSQL_statement_run (plugin->mc,
351 "CREATE TABLE IF NOT EXISTS messages (\n"
352 " channel_id INT NOT NULL REFERENCES channels(id),\n"
353 " hop_counter BIGINT UNSIGNED NOT NULL,\n"
356 " fragment_id BIGINT UNSIGNED NOT NULL,\n"
357 " fragment_offset BIGINT UNSIGNED NOT NULL,\n"
358 " message_id BIGINT UNSIGNED NOT NULL,\n"
359 " group_generation BIGINT UNSIGNED NOT NULL,\n"
360 " multicast_flags BIGINT UNSIGNED NOT NULL,\n"
361 " psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
363 " PRIMARY KEY (channel_id, fragment_id),\n"
364 " UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
367 GNUNET_MYSQL_statement_run (plugin->mc,
368 "CREATE TABLE IF NOT EXISTS state (\n"
369 " channel_id INT NOT NULL REFERENCES channels(id),\n"
370 " name TEXT NOT NULL,\n"
371 " value_current BLOB,\n"
372 " value_signed BLOB,\n"
373 " PRIMARY KEY (channel_id, name(5))\n"
376 GNUNET_MYSQL_statement_run (plugin->mc,
377 "CREATE TABLE IF NOT EXISTS state_sync (\n"
378 " channel_id INT NOT NULL REFERENCES channels(id),\n"
379 " name TEXT NOT NULL,\n"
381 " PRIMARY KEY (channel_id, name(5))\n"
384 /* Prepare statements */
385 mysql_prepare (plugin->mc,
387 &plugin->transaction_begin);
389 mysql_prepare (plugin->mc,
391 &plugin->transaction_commit);
393 mysql_prepare (plugin->mc,
395 &plugin->transaction_rollback);
397 mysql_prepare (plugin->mc,
398 "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
399 &plugin->insert_channel_key);
401 mysql_prepare (plugin->mc,
402 "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
403 &plugin->insert_slave_key);
405 mysql_prepare (plugin->mc,
406 "INSERT INTO membership\n"
407 " (channel_id, slave_id, did_join, announced_at,\n"
408 " effective_since, group_generation)\n"
409 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
410 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
412 &plugin->insert_membership);
414 mysql_prepare (plugin->mc,
415 "SELECT did_join FROM membership\n"
416 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
417 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
418 " AND effective_since <= ? AND did_join = 1\n"
419 "ORDER BY announced_at DESC LIMIT 1;",
420 &plugin->select_membership);
422 mysql_prepare (plugin->mc,
423 "INSERT IGNORE INTO messages\n"
424 " (channel_id, hop_counter, signature, purpose,\n"
425 " fragment_id, fragment_offset, message_id,\n"
426 " group_generation, multicast_flags, psycstore_flags, data)\n"
427 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
428 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
429 &plugin->insert_fragment);
431 mysql_prepare (plugin->mc,
433 "SET psycstore_flags = psycstore_flags | ?\n"
434 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
435 " AND message_id = ? AND fragment_offset = 0;",
436 &plugin->update_message_flags);
438 mysql_prepare (plugin->mc,
439 "SELECT hop_counter, signature, purpose, fragment_id,\n"
440 " fragment_offset, message_id, group_generation,\n"
441 " multicast_flags, psycstore_flags, data\n"
443 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
444 " AND ? <= fragment_id AND fragment_id <= ?;",
445 &plugin->select_fragments);
447 /** @todo select_messages: add method_prefix filter */
448 mysql_prepare (plugin->mc,
449 "SELECT hop_counter, signature, purpose, fragment_id,\n"
450 " fragment_offset, message_id, group_generation,\n"
451 " multicast_flags, psycstore_flags, data\n"
453 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
454 " AND ? <= message_id AND message_id <= ?"
456 &plugin->select_messages);
458 mysql_prepare (plugin->mc,
460 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
461 " fragment_offset, message_id, group_generation,\n"
462 " multicast_flags, psycstore_flags, data\n"
464 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
465 " ORDER BY fragment_id DESC\n"
467 "ORDER BY fragment_id;",
468 &plugin->select_latest_fragments);
470 /** @todo select_latest_messages: add method_prefix filter */
471 mysql_prepare (plugin->mc,
472 "SELECT hop_counter, signature, purpose, fragment_id,\n"
473 " fragment_offset, message_id, group_generation,\n"
474 " multicast_flags, psycstore_flags, data\n"
476 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477 " AND message_id IN\n"
478 " (SELECT message_id\n"
480 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
481 " GROUP BY message_id\n"
482 " ORDER BY message_id\n"
484 "ORDER BY fragment_id;",
485 &plugin->select_latest_messages);
487 mysql_prepare (plugin->mc,
488 "SELECT hop_counter, signature, purpose, fragment_id,\n"
489 " fragment_offset, message_id, group_generation,\n"
490 " multicast_flags, psycstore_flags, data\n"
492 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
493 " AND message_id = ? AND fragment_offset = ?;",
494 &plugin->select_message_fragment);
496 mysql_prepare (plugin->mc,
497 "SELECT fragment_id, message_id, group_generation\n"
499 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
500 "ORDER BY fragment_id DESC LIMIT 1;",
501 &plugin->select_counters_message);
503 mysql_prepare (plugin->mc,
504 "SELECT max_state_message_id\n"
506 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
507 &plugin->select_counters_state);
509 mysql_prepare (plugin->mc,
511 "SET max_state_message_id = ?\n"
512 "WHERE pub_key = ?;",
513 &plugin->update_max_state_message_id);
515 mysql_prepare (plugin->mc,
517 "SET state_hash_message_id = ?\n"
518 "WHERE pub_key = ?;",
519 &plugin->update_state_hash_message_id);
521 mysql_prepare (plugin->mc,
522 "REPLACE INTO state\n"
523 " (channel_id, name, value_current, value_signed)\n"
524 "SELECT new.channel_id, new.name,\n"
525 " new.value_current, old.value_signed\n"
526 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
528 " ? AS name, ? AS value_current) AS new\n"
529 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
530 " FROM state) AS old\n"
531 "ON new.channel_id = old.channel_id AND new.name = old.name;",
532 &plugin->insert_state_current);
534 mysql_prepare (plugin->mc,
535 "DELETE FROM state\n"
536 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
537 " AND (value_current IS NULL OR length(value_current) = 0)\n"
538 " AND (value_signed IS NULL OR length(value_signed) = 0);",
539 &plugin->delete_state_empty);
541 mysql_prepare (plugin->mc,
543 "SET value_signed = value_current\n"
544 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
545 &plugin->update_state_signed);
547 mysql_prepare (plugin->mc,
548 "DELETE FROM state\n"
549 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
550 &plugin->delete_state);
552 mysql_prepare (plugin->mc,
553 "INSERT INTO state_sync (channel_id, name, value)\n"
554 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
555 &plugin->insert_state_sync);
557 mysql_prepare (plugin->mc,
558 "INSERT INTO state\n"
559 " (channel_id, name, value_current, value_signed)\n"
560 "SELECT channel_id, name, value, value\n"
562 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
563 &plugin->insert_state_from_sync);
565 mysql_prepare (plugin->mc,
566 "DELETE FROM state_sync\n"
567 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
568 &plugin->delete_state_sync);
570 mysql_prepare (plugin->mc,
571 "SELECT value_current\n"
573 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
575 &plugin->select_state_one);
577 mysql_prepare (plugin->mc,
578 "SELECT name, value_current\n"
580 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
581 " AND (name = ? OR substr(name, 1, ?) = ? || '_');",
582 &plugin->select_state_prefix);
584 mysql_prepare (plugin->mc,
585 "SELECT name, value_signed\n"
587 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
588 " AND value_signed IS NOT NULL;",
589 &plugin->select_state_signed);
596 * Shutdown database connection and associate data
598 * @param plugin the plugin context (state for this module)
601 database_shutdown (struct Plugin *plugin)
603 GNUNET_MYSQL_context_destroy (plugin->mc);
605 GNUNET_free_non_null (plugin->fn);
611 * Execute a prepared statement with a @a channel_key argument.
613 * @param plugin Plugin handle.
614 * @param stmt Statement to execute.
615 * @param channel_key Public key of the channel.
617 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
620 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
621 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
623 struct GNUNET_MY_QueryParam params[] = {
624 GNUNET_MY_query_param_auto_from_type (channel_key),
625 GNUNET_MY_query_param_end
628 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
630 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
631 "mysql exec_channel", stmt);
634 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
636 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
637 "mysql_stmt_reset", stmt);
638 return GNUNET_SYSERR;
646 * Begin a transaction.
649 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
651 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
653 struct GNUNET_MY_QueryParam params[] = {
654 GNUNET_MY_query_param_end
657 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
659 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
660 "mysql exexc_prepared", stmt);
661 return GNUNET_SYSERR;
664 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
666 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
667 "mysql_stmt_reset", stmt);
668 return GNUNET_SYSERR;
671 plugin->transaction = transaction;
677 * Commit current transaction.
680 transaction_commit (struct Plugin *plugin)
682 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
684 struct GNUNET_MY_QueryParam params[] = {
685 GNUNET_MY_query_param_end
688 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
690 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
691 "mysql exec_prepared", stmt);
692 return GNUNET_SYSERR;
695 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
697 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
698 "mysql_stmt_reset", stmt);
699 return GNUNET_SYSERR;
702 plugin->transaction = TRANSACTION_NONE;
708 * Roll back current transaction.
711 transaction_rollback (struct Plugin *plugin)
713 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
715 struct GNUNET_MY_QueryParam params[] = {
716 GNUNET_MY_query_param_end
719 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
721 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
722 "mysql exec_prepared", stmt);
723 return GNUNET_SYSERR;
726 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
728 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
729 "mysql_stmt_reset", stmt);
730 return GNUNET_SYSERR;
733 plugin->transaction = TRANSACTION_NONE;
739 channel_key_store (struct Plugin *plugin,
740 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
742 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
744 struct GNUNET_MY_QueryParam params[] = {
745 GNUNET_MY_query_param_auto_from_type (channel_key),
746 GNUNET_MY_query_param_end
749 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
751 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
752 "mysql exec_prepared", stmt);
753 return GNUNET_SYSERR;
756 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
758 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
759 "mysql_stmt_reset", stmt);
760 return GNUNET_SYSERR;
768 slave_key_store (struct Plugin *plugin,
769 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
771 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
773 struct GNUNET_MY_QueryParam params[] = {
774 GNUNET_MY_query_param_auto_from_type (slave_key),
775 GNUNET_MY_query_param_end
778 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
780 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
781 "mysql exec_prepared", stmt);
782 return GNUNET_SYSERR;
785 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
787 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
788 "mysql_stmt_reset", stmt);
789 return GNUNET_SYSERR;
797 * Store join/leave events for a PSYC channel in order to be able to answer
798 * membership test queries later.
800 * @see GNUNET_PSYCSTORE_membership_store()
802 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
805 mysql_membership_store (void *cls,
806 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
807 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
809 uint64_t announced_at,
810 uint64_t effective_since,
811 uint64_t group_generation)
813 struct Plugin *plugin = cls;
815 uint32_t idid_join = (uint32_t)did_join;
817 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
819 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
821 if (announced_at > INT64_MAX ||
822 effective_since > INT64_MAX ||
823 group_generation > INT64_MAX)
826 return GNUNET_SYSERR;
829 if (GNUNET_OK != channel_key_store (plugin, channel_key)
830 || GNUNET_OK != slave_key_store (plugin, slave_key))
831 return GNUNET_SYSERR;
833 struct GNUNET_MY_QueryParam params[] = {
834 GNUNET_MY_query_param_auto_from_type (channel_key),
835 GNUNET_MY_query_param_auto_from_type (slave_key),
836 GNUNET_MY_query_param_uint32 (&idid_join),
837 GNUNET_MY_query_param_uint64 (&announced_at),
838 GNUNET_MY_query_param_uint64 (&effective_since),
839 GNUNET_MY_query_param_uint64 (&group_generation),
840 GNUNET_MY_query_param_end
843 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
845 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
846 "mysql exec_prepared", stmt);
847 return GNUNET_SYSERR;
850 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
852 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
853 "mysql_stmt_reset", stmt);
854 return GNUNET_SYSERR;
860 * Test if a member was admitted to the channel at the given message ID.
862 * @see GNUNET_PSYCSTORE_membership_test()
864 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
865 * #GNUNET_SYSERR if there was en error.
868 membership_test (void *cls,
869 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
870 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
873 struct Plugin *plugin = cls;
875 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
877 uint32_t did_join = 0;
879 int ret = GNUNET_SYSERR;
881 struct GNUNET_MY_QueryParam params_select[] = {
882 GNUNET_MY_query_param_auto_from_type (channel_key),
883 GNUNET_MY_query_param_auto_from_type (slave_key),
884 GNUNET_MY_query_param_uint64 (&message_id),
885 GNUNET_MY_query_param_end
888 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
890 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
891 "mysql execute prepared", stmt);
892 return GNUNET_SYSERR;
895 struct GNUNET_MY_ResultSpec results_select[] = {
896 GNUNET_MY_result_spec_uint32 (&did_join),
897 GNUNET_MY_result_spec_end
900 switch(GNUNET_MY_extract_result (stmt,
910 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
911 "mysql extract_result", stmt);
912 return GNUNET_SYSERR;
915 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
917 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
918 "mysql_stmt_reset", stmt);
919 return GNUNET_SYSERR;
926 * Store a message fragment sent to a channel.
928 * @see GNUNET_PSYCSTORE_fragment_store()
930 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
933 fragment_store (void *cls,
934 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
935 const struct GNUNET_MULTICAST_MessageHeader *msg,
936 uint32_t psycstore_flags)
938 struct Plugin *plugin = cls;
940 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
942 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
944 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
946 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
947 uint64_t message_id = GNUNET_ntohll (msg->message_id);
948 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
950 uint64_t hop_counter = ntohl(msg->hop_counter);
951 uint64_t flags = ntohl(msg->flags);
953 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
954 message_id > INT64_MAX || group_generation > INT64_MAX)
956 LOG(GNUNET_ERROR_TYPE_ERROR,
957 "Tried to store fragment with a field > INT64_MAX: "
958 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
959 message_id, group_generation);
961 return GNUNET_SYSERR;
964 if (GNUNET_OK != channel_key_store (plugin, channel_key))
965 return GNUNET_SYSERR;
967 struct GNUNET_MY_QueryParam params_insert[] = {
968 GNUNET_MY_query_param_auto_from_type (channel_key),
969 GNUNET_MY_query_param_uint64 (&hop_counter),
970 GNUNET_MY_query_param_auto_from_type (&msg->signature),
971 GNUNET_MY_query_param_auto_from_type (&msg->purpose),
972 GNUNET_MY_query_param_uint64 (&fragment_id),
973 GNUNET_MY_query_param_uint64 (&fragment_offset),
974 GNUNET_MY_query_param_uint64 (&message_id),
975 GNUNET_MY_query_param_uint64 (&group_generation),
976 GNUNET_MY_query_param_uint64 (&flags),
977 GNUNET_MY_query_param_uint32 (&psycstore_flags),
978 GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
980 GNUNET_MY_query_param_end
983 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
985 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
986 "mysql execute prepared", stmt);
987 return GNUNET_SYSERR;
990 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
992 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
993 "mysql_stmt_reset", stmt);
994 return GNUNET_SYSERR;
1001 * Set additional flags for a given message.
1003 * They are OR'd with any existing flags set.
1005 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1008 message_add_flags (void *cls,
1009 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1010 uint64_t message_id,
1011 uint64_t psycstore_flags)
1013 struct Plugin *plugin = cls;
1014 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
1017 int ret = GNUNET_SYSERR;
1019 struct GNUNET_MY_QueryParam params_update[] = {
1020 GNUNET_MY_query_param_uint64 (&psycstore_flags),
1021 GNUNET_MY_query_param_auto_from_type (channel_key),
1022 GNUNET_MY_query_param_uint64 (&message_id),
1023 GNUNET_MY_query_param_end
1026 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
1034 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1035 "mysql execute prepared", stmt);
1038 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1040 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1041 "mysql_stmt_reset", stmt);
1042 return GNUNET_SYSERR;
1050 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
1051 GNUNET_PSYCSTORE_FragmentCallback cb,
1055 uint32_t hop_counter;
1056 void *signature = NULL;
1057 void *purpose = NULL;
1058 size_t signature_size;
1059 size_t purpose_size;
1061 uint64_t fragment_id;
1062 uint64_t fragment_offset;
1063 uint64_t message_id;
1064 uint64_t group_generation;
1068 int ret = GNUNET_SYSERR;
1070 struct GNUNET_MULTICAST_MessageHeader *mp;
1075 struct GNUNET_MY_ResultSpec results[] = {
1076 GNUNET_MY_result_spec_uint32 (&hop_counter),
1077 GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
1078 GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
1079 GNUNET_MY_result_spec_uint64 (&fragment_id),
1080 GNUNET_MY_result_spec_uint64 (&fragment_offset),
1081 GNUNET_MY_result_spec_uint64 (&message_id),
1082 GNUNET_MY_result_spec_uint64 (&group_generation),
1083 GNUNET_MY_result_spec_uint64 (&msg_flags),
1084 GNUNET_MY_result_spec_uint64 (&flags),
1085 GNUNET_MY_result_spec_variable_size (&buf,
1087 GNUNET_MY_result_spec_end
1090 sql_ret = GNUNET_MY_extract_result (stmt,
1095 if (ret != GNUNET_OK)
1100 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
1102 mp->header.size = htons (sizeof (*mp) + buf_size);
1103 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1104 mp->hop_counter = htonl (hop_counter);
1105 GNUNET_memcpy (&mp->signature,
1108 GNUNET_memcpy (&mp->purpose,
1111 mp->fragment_id = GNUNET_htonll (fragment_id);
1112 mp->fragment_offset = GNUNET_htonll (fragment_offset);
1113 mp->message_id = GNUNET_htonll (message_id);
1114 mp->group_generation = GNUNET_htonll (group_generation);
1115 mp->flags = htonl(msg_flags);
1117 GNUNET_memcpy (&mp[1],
1120 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1122 GNUNET_MY_cleanup_result (results);
1126 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1127 "mysql extract_result", stmt);
1135 fragment_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1136 struct GNUNET_MY_QueryParam *params,
1137 uint64_t *returned_fragments,
1138 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1140 int ret = GNUNET_SYSERR;
1144 if (NULL == plugin->mc || NULL == stmt || NULL == params)
1146 fprintf(stderr, "%p %p %p\n", plugin->mc, stmt, params);
1147 return GNUNET_SYSERR;
1150 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1154 if (ret != GNUNET_OK)
1159 ret = fragment_row (stmt, cb, cb_cls);
1160 (*returned_fragments)++;
1164 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1165 "mysql exec_prepared", stmt);
1172 * Retrieve a message fragment range by fragment ID.
1174 * @see GNUNET_PSYCSTORE_fragment_get()
1176 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1179 fragment_get (void *cls,
1180 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1181 uint64_t first_fragment_id,
1182 uint64_t last_fragment_id,
1183 uint64_t *returned_fragments,
1184 GNUNET_PSYCSTORE_FragmentCallback cb,
1187 struct Plugin *plugin = cls;
1188 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1189 int ret = GNUNET_SYSERR;
1190 *returned_fragments = 0;
1192 struct GNUNET_MY_QueryParam params_select[] = {
1193 GNUNET_MY_query_param_auto_from_type (channel_key),
1194 GNUNET_MY_query_param_uint64 (&first_fragment_id),
1195 GNUNET_MY_query_param_uint64 (&last_fragment_id),
1196 GNUNET_MY_query_param_end
1199 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1201 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1203 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1204 "mysql_stmt_reset", stmt);
1205 return GNUNET_SYSERR;
1213 * Retrieve a message fragment range by fragment ID.
1215 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1217 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1220 fragment_get_latest (void *cls,
1221 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222 uint64_t fragment_limit,
1223 uint64_t *returned_fragments,
1224 GNUNET_PSYCSTORE_FragmentCallback cb,
1227 struct Plugin *plugin = cls;
1229 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1231 int ret = GNUNET_SYSERR;
1232 *returned_fragments = 0;
1234 struct GNUNET_MY_QueryParam params_select[] = {
1235 GNUNET_MY_query_param_auto_from_type (channel_key),
1236 GNUNET_MY_query_param_uint64 (&fragment_limit),
1237 GNUNET_MY_query_param_end
1240 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1242 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1244 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1245 "mysql_stmt_reset", stmt);
1246 return GNUNET_SYSERR;
1254 * Retrieve all fragments of a message ID range.
1256 * @see GNUNET_PSYCSTORE_message_get()
1258 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1261 message_get (void *cls,
1262 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1263 uint64_t first_message_id,
1264 uint64_t last_message_id,
1265 uint64_t fragment_limit,
1266 uint64_t *returned_fragments,
1267 GNUNET_PSYCSTORE_FragmentCallback cb,
1270 struct Plugin *plugin = cls;
1272 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1274 int ret = GNUNET_SYSERR;
1275 *returned_fragments = 0;
1277 struct GNUNET_MY_QueryParam params_select[] = {
1278 GNUNET_MY_query_param_auto_from_type (channel_key),
1279 GNUNET_MY_query_param_uint64 (&first_message_id),
1280 GNUNET_MY_query_param_uint64 (&last_message_id),
1281 GNUNET_MY_query_param_uint64 (&fragment_limit),
1282 GNUNET_MY_query_param_end
1285 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1287 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1289 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1290 "mysql_stmt_reset", stmt);
1291 return GNUNET_SYSERR;
1299 * Retrieve all fragments of the latest messages.
1301 * @see GNUNET_PSYCSTORE_message_get_latest()
1303 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1306 message_get_latest (void *cls,
1307 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1308 uint64_t message_limit,
1309 uint64_t *returned_fragments,
1310 GNUNET_PSYCSTORE_FragmentCallback cb,
1313 struct Plugin *plugin = cls;
1315 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1317 int ret = GNUNET_SYSERR;
1318 *returned_fragments = 0;
1320 struct GNUNET_MY_QueryParam params_select[] = {
1321 GNUNET_MY_query_param_auto_from_type (channel_key),
1322 GNUNET_MY_query_param_auto_from_type (channel_key),
1323 GNUNET_MY_query_param_uint64 (&message_limit),
1324 GNUNET_MY_query_param_end
1327 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1329 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1331 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1332 "mysql_stmt_reset", stmt);
1333 return GNUNET_SYSERR;
1341 * Retrieve a fragment of message specified by its message ID and fragment
1344 * @see GNUNET_PSYCSTORE_message_get_fragment()
1346 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1349 message_get_fragment (void *cls,
1350 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1351 uint64_t message_id,
1352 uint64_t fragment_offset,
1353 GNUNET_PSYCSTORE_FragmentCallback cb,
1356 struct Plugin *plugin = cls;
1357 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1359 int ret = GNUNET_SYSERR;
1361 struct GNUNET_MY_QueryParam params_select[] = {
1362 GNUNET_MY_query_param_auto_from_type (channel_key),
1363 GNUNET_MY_query_param_uint64 (&message_id),
1364 GNUNET_MY_query_param_uint64 (&fragment_offset),
1365 GNUNET_MY_query_param_end
1368 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1376 ret = fragment_row (stmt, cb, cb_cls);
1380 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1381 "mysql execute prepared", stmt);
1384 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1386 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1387 "mysql_stmt_reset", stmt);
1388 return GNUNET_SYSERR;
1395 * Retrieve the max. values of message counters for a channel.
1397 * @see GNUNET_PSYCSTORE_counters_get()
1399 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1402 counters_message_get (void *cls,
1403 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1404 uint64_t *max_fragment_id,
1405 uint64_t *max_message_id,
1406 uint64_t *max_group_generation)
1408 struct Plugin *plugin = cls;
1410 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1412 int ret = GNUNET_SYSERR;
1414 struct GNUNET_MY_QueryParam params_select[] = {
1415 GNUNET_MY_query_param_auto_from_type (channel_key),
1416 GNUNET_MY_query_param_end
1419 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1421 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422 "mysql execute prepared", stmt);
1423 return GNUNET_SYSERR;
1426 struct GNUNET_MY_ResultSpec results_select[] = {
1427 GNUNET_MY_result_spec_uint64 (max_fragment_id),
1428 GNUNET_MY_result_spec_uint64 (max_message_id),
1429 GNUNET_MY_result_spec_uint64 (max_group_generation),
1430 GNUNET_MY_result_spec_end
1433 ret = GNUNET_MY_extract_result (stmt,
1436 if (GNUNET_OK != ret)
1438 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1439 "mysql extract_result", stmt);
1440 return GNUNET_SYSERR;
1443 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1445 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1446 "mysql_stmt_reset", stmt);
1447 return GNUNET_SYSERR;
1454 * Retrieve the max. values of state counters for a channel.
1456 * @see GNUNET_PSYCSTORE_counters_get()
1458 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1461 counters_state_get (void *cls,
1462 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1463 uint64_t *max_state_message_id)
1465 struct Plugin *plugin = cls;
1467 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1469 int ret = GNUNET_SYSERR;
1471 struct GNUNET_MY_QueryParam params_select[] = {
1472 GNUNET_MY_query_param_auto_from_type (channel_key),
1473 GNUNET_MY_query_param_end
1476 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1478 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1479 "mysql execute prepared", stmt);
1480 return GNUNET_SYSERR;
1483 struct GNUNET_MY_ResultSpec results_select[] = {
1484 GNUNET_MY_result_spec_uint64 (max_state_message_id),
1485 GNUNET_MY_result_spec_end
1488 ret = GNUNET_MY_extract_result (stmt,
1491 if (GNUNET_OK != ret)
1493 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1494 "mysql extract_result", stmt);
1495 return GNUNET_SYSERR;
1498 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1500 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1501 "mysql_stmt_reset", stmt);
1502 return GNUNET_SYSERR;
1510 * Assign a value to a state variable.
1512 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1515 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1516 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1517 const char *name, const void *value, size_t value_size)
1519 int ret = GNUNET_SYSERR;
1521 struct GNUNET_MY_QueryParam params[] = {
1522 GNUNET_MY_query_param_auto_from_type (channel_key),
1523 GNUNET_MY_query_param_string (name),
1524 GNUNET_MY_query_param_auto_from_type (value),
1525 GNUNET_MY_query_param_end
1528 ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1529 if (GNUNET_OK != ret)
1531 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1532 "mysql exec_prepared", stmt);
1533 return GNUNET_SYSERR;
1536 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1538 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1539 "mysql_stmt_reset", stmt);
1540 return GNUNET_SYSERR;
1548 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1549 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1550 uint64_t message_id)
1552 struct GNUNET_MY_QueryParam params[] = {
1553 GNUNET_MY_query_param_uint64 (&message_id),
1554 GNUNET_MY_query_param_auto_from_type (channel_key),
1555 GNUNET_MY_query_param_end
1558 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1562 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1563 "mysql execute prepared", stmt);
1564 return GNUNET_SYSERR;
1567 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1569 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1570 "mysql_stmt_reset", stmt);
1571 return GNUNET_SYSERR;
1579 * Begin modifying current state.
1582 state_modify_begin (void *cls,
1583 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1584 uint64_t message_id, uint64_t state_delta)
1586 struct Plugin *plugin = cls;
1588 if (state_delta > 0)
1591 * We can only apply state modifiers in the current message if modifiers in
1592 * the previous stateful message (message_id - state_delta) were already
1596 uint64_t max_state_message_id = 0;
1597 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1601 case GNUNET_NO: // no state yet
1608 if (max_state_message_id < message_id - state_delta)
1609 return GNUNET_NO; /* some stateful messages not yet applied */
1610 else if (message_id - state_delta < max_state_message_id)
1611 return GNUNET_NO; /* changes already applied */
1614 if (TRANSACTION_NONE != plugin->transaction)
1616 /** @todo FIXME: wait for other transaction to finish */
1617 return GNUNET_SYSERR;
1619 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1624 * Set the current value of state variable.
1626 * @see GNUNET_PSYCSTORE_state_modify()
1628 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1631 state_modify_op (void *cls,
1632 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1633 enum GNUNET_PSYC_Operator op,
1634 const char *name, const void *value, size_t value_size)
1636 struct Plugin *plugin = cls;
1637 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1641 case GNUNET_PSYC_OP_ASSIGN:
1642 return state_assign (plugin, plugin->insert_state_current, channel_key,
1643 name, value, value_size);
1645 default: /** @todo implement more state operations */
1647 return GNUNET_SYSERR;
1653 * End modifying current state.
1656 state_modify_end (void *cls,
1657 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1658 uint64_t message_id)
1660 struct Plugin *plugin = cls;
1661 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1664 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1665 && GNUNET_OK == update_message_id (plugin,
1666 plugin->update_max_state_message_id,
1667 channel_key, message_id)
1668 && GNUNET_OK == transaction_commit (plugin)
1669 ? GNUNET_OK : GNUNET_SYSERR;
1674 * Begin state synchronization.
1677 state_sync_begin (void *cls,
1678 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1680 struct Plugin *plugin = cls;
1681 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1686 * Assign current value of a state variable.
1688 * @see GNUNET_PSYCSTORE_state_modify()
1690 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1693 state_sync_assign (void *cls,
1694 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1695 const char *name, const void *value, size_t value_size)
1697 struct Plugin *plugin = cls;
1698 return state_assign (cls, plugin->insert_state_sync, channel_key,
1699 name, value, value_size);
1704 * End modifying current state.
1707 state_sync_end (void *cls,
1708 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1709 uint64_t max_state_message_id,
1710 uint64_t state_hash_message_id)
1712 struct Plugin *plugin = cls;
1713 int ret = GNUNET_SYSERR;
1715 if (TRANSACTION_NONE != plugin->transaction)
1717 /** @todo FIXME: wait for other transaction to finish */
1718 return GNUNET_SYSERR;
1721 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1722 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1723 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1725 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1727 && GNUNET_OK == update_message_id (plugin,
1728 plugin->update_state_hash_message_id,
1729 channel_key, state_hash_message_id)
1730 && GNUNET_OK == update_message_id (plugin,
1731 plugin->update_max_state_message_id,
1732 channel_key, max_state_message_id)
1733 && GNUNET_OK == transaction_commit (plugin)
1735 : transaction_rollback (plugin);
1741 * Delete the whole state.
1743 * @see GNUNET_PSYCSTORE_state_reset()
1745 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1748 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1750 struct Plugin *plugin = cls;
1751 return exec_channel (plugin, plugin->delete_state, channel_key);
1756 * Update signed values of state variables in the state store.
1758 * @see GNUNET_PSYCSTORE_state_hash_update()
1760 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1763 state_update_signed (void *cls,
1764 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1766 struct Plugin *plugin = cls;
1767 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1772 * Retrieve a state variable by name.
1774 * @see GNUNET_PSYCSTORE_state_get()
1776 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1779 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1780 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1782 struct Plugin *plugin = cls;
1783 int ret = GNUNET_SYSERR;
1786 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1788 struct GNUNET_MY_QueryParam params_select[] = {
1789 GNUNET_MY_query_param_auto_from_type (channel_key),
1790 GNUNET_MY_query_param_string (name),
1791 GNUNET_MY_query_param_end
1794 void *value_current = NULL;
1795 size_t value_size = 0;
1797 struct GNUNET_MY_ResultSpec results[] = {
1798 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1799 GNUNET_MY_result_spec_end
1802 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1804 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1805 "mysql exec_prepared", stmt);
1809 sql_ret = GNUNET_MY_extract_result (stmt, results);
1816 ret = cb (cb_cls, name, value_current,
1820 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1821 "mysql extract_result", stmt);
1825 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1827 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1828 "mysql_stmt_reset", stmt);
1829 return GNUNET_SYSERR;
1837 * Retrieve all state variables for a channel with the given prefix.
1839 * @see GNUNET_PSYCSTORE_state_get_prefix()
1841 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1844 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1845 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1848 struct Plugin *plugin = cls;
1849 int ret = GNUNET_SYSERR;
1851 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1853 uint32_t name_len = (uint32_t) strlen (name);
1855 struct GNUNET_MY_QueryParam params_select[] = {
1856 GNUNET_MY_query_param_auto_from_type (channel_key),
1857 GNUNET_MY_query_param_string (name),
1858 GNUNET_MY_query_param_uint32 (&name_len),
1859 GNUNET_MY_query_param_string (name),
1860 GNUNET_MY_query_param_end
1864 void *value_current = NULL;
1865 size_t value_size = 0;
1867 struct GNUNET_MY_ResultSpec results[] = {
1868 GNUNET_MY_result_spec_string (&name2),
1869 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1870 GNUNET_MY_result_spec_end
1877 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1879 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1880 "mysql exec_prepared", stmt);
1883 sql_ret = GNUNET_MY_extract_result (stmt, results);
1887 if (ret != GNUNET_OK)
1892 ret = cb (cb_cls, (const char *) name2,
1896 if (ret != GNUNET_YES)
1897 sql_ret = GNUNET_NO;
1901 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1902 "mysql extract_result", stmt);
1905 while (sql_ret == GNUNET_YES);
1907 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1909 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1910 "mysql_stmt_reset", stmt);
1911 return GNUNET_SYSERR;
1919 * Retrieve all signed state variables for a channel.
1921 * @see GNUNET_PSYCSTORE_state_get_signed()
1923 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1926 state_get_signed (void *cls,
1927 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1928 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1930 struct Plugin *plugin = cls;
1931 int ret = GNUNET_SYSERR;
1933 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1935 struct GNUNET_MY_QueryParam params_select[] = {
1936 GNUNET_MY_query_param_auto_from_type (channel_key),
1937 GNUNET_MY_query_param_end
1943 void *value_signed = NULL;
1944 size_t value_size = 0;
1946 struct GNUNET_MY_ResultSpec results[] = {
1947 GNUNET_MY_result_spec_string (&name),
1948 GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1949 GNUNET_MY_result_spec_end
1954 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1956 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1957 "mysql exec_prepared", stmt);
1960 sql_ret = GNUNET_MY_extract_result (stmt, results);
1964 if (ret != GNUNET_OK)
1968 ret = cb (cb_cls, (const char *) name,
1972 if (ret != GNUNET_YES)
1973 sql_ret = GNUNET_NO;
1976 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1977 "mysql extract_result", stmt);
1980 while (sql_ret == GNUNET_YES);
1982 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1984 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1985 "mysql_stmt_reset", stmt);
1986 return GNUNET_SYSERR;
1994 * Entry point for the plugin.
1996 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1997 * @return NULL on error, otherwise the plugin context
2000 libgnunet_plugin_psycstore_mysql_init (void *cls)
2002 static struct Plugin plugin;
2003 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
2004 struct GNUNET_PSYCSTORE_PluginFunctions *api;
2006 if (NULL != plugin.cfg)
2007 return NULL; /* can only initialize once! */
2008 memset (&plugin, 0, sizeof (struct Plugin));
2010 if (GNUNET_OK != database_setup (&plugin))
2012 database_shutdown (&plugin);
2015 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
2017 api->membership_store = &mysql_membership_store;
2018 api->membership_test = &membership_test;
2019 api->fragment_store = &fragment_store;
2020 api->message_add_flags = &message_add_flags;
2021 api->fragment_get = &fragment_get;
2022 api->fragment_get_latest = &fragment_get_latest;
2023 api->message_get = &message_get;
2024 api->message_get_latest = &message_get_latest;
2025 api->message_get_fragment = &message_get_fragment;
2026 api->counters_message_get = &counters_message_get;
2027 api->counters_state_get = &counters_state_get;
2028 api->state_modify_begin = &state_modify_begin;
2029 api->state_modify_op = &state_modify_op;
2030 api->state_modify_end = &state_modify_end;
2031 api->state_sync_begin = &state_sync_begin;
2032 api->state_sync_assign = &state_sync_assign;
2033 api->state_sync_end = &state_sync_end;
2034 api->state_reset = &state_reset;
2035 api->state_update_signed = &state_update_signed;
2036 api->state_get = &state_get;
2037 api->state_get_prefix = &state_get_prefix;
2038 api->state_get_signed = &state_get_signed;
2040 LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
2046 * Exit point from the plugin.
2048 * @param cls The plugin context (as returned by "init")
2049 * @return Always NULL
2052 libgnunet_plugin_psycstore_mysql_done (void *cls)
2054 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2055 struct Plugin *plugin = api->cls;
2057 database_shutdown (plugin);
2060 LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
2064 /* end of plugin_psycstore_mysql.c */