2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/plugin_psycstore_mysql.c
23 * @brief mysql-based psycstore backend
24 * @author Gabor X Toth
25 * @author Christian Grothoff
26 * @author Christophe Genevey
30 #include "gnunet_psycstore_plugin.h"
31 #include "gnunet_psycstore_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_crypto_lib.h"
34 #include "gnunet_psyc_util_lib.h"
35 #include "psycstore.h"
36 #include "gnunet_my_lib.h"
37 #include "gnunet_mysql_lib.h"
38 #include <mysql/mysql.h>
41 * After how many ms "busy" should a DB operation fail for good? A
42 * low value makes sure that we are more responsive to requests
43 * (especially PUTs). A high value guarantees a higher success rate
44 * (SELECTs in iterate can take several seconds despite LIMIT=1).
46 * The default value of 1s should ensure that users do not experience
47 * huge latencies while at the same time allowing operations to
48 * succeed with reasonable probability.
50 #define BUSY_TIMEOUT_MS 1000
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
55 * Log an error message at log-level 'level' that indicates
56 * a failure of the command 'cmd' on file 'filename'
57 * with the message given by strerror(errno).
59 #define LOG_MYSQL(db, level, cmd, stmt) do { GNUNET_log_from (level, "psycstore-mysql", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); } while(0)
61 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
65 TRANSACTION_STATE_MODIFY,
66 TRANSACTION_STATE_SYNC,
70 * Context for all functions in this plugin.
75 const struct GNUNET_CONFIGURATION_Handle *cfg;
83 *Handle to talk to Mysql
85 struct GNUNET_MYSQL_Context *mc;
88 * Current transaction.
90 enum Transactions transaction;
92 struct GNUNET_MYSQL_StatementHandle *transaction_begin;
94 struct GNUNET_MYSQL_StatementHandle *transaction_commit;
96 struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
99 * Precompiled SQL for channel_key_store()
101 struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
105 * Precompiled SQL for slave_key_store()
107 struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
110 * Precompiled SQL for membership_store()
112 struct GNUNET_MYSQL_StatementHandle *insert_membership;
115 * Precompiled SQL for membership_test()
117 struct GNUNET_MYSQL_StatementHandle *select_membership;
120 * Precompiled SQL for fragment_store()
122 struct GNUNET_MYSQL_StatementHandle *insert_fragment;
125 * Precompiled SQL for message_add_flags()
127 struct GNUNET_MYSQL_StatementHandle *update_message_flags;
130 * Precompiled SQL for fragment_get()
132 struct GNUNET_MYSQL_StatementHandle *select_fragments;
135 * Precompiled SQL for fragment_get()
137 struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
140 * Precompiled SQL for message_get()
142 struct GNUNET_MYSQL_StatementHandle *select_messages;
145 * Precompiled SQL for message_get()
147 struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
150 * Precompiled SQL for message_get_fragment()
152 struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
155 * Precompiled SQL for counters_get_message()
157 struct GNUNET_MYSQL_StatementHandle *select_counters_message;
160 * Precompiled SQL for counters_get_state()
162 struct GNUNET_MYSQL_StatementHandle *select_counters_state;
165 * Precompiled SQL for state_modify_end()
167 struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
170 * Precompiled SQL for state_sync_end()
172 struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
175 * Precompiled SQL for state_modify_op()
177 struct GNUNET_MYSQL_StatementHandle *insert_state_current;
180 * Precompiled SQL for state_modify_end()
182 struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
185 * Precompiled SQL for state_set_signed()
187 struct GNUNET_MYSQL_StatementHandle *update_state_signed;
190 * Precompiled SQL for state_sync()
192 struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
195 * Precompiled SQL for state_sync()
197 struct GNUNET_MYSQL_StatementHandle *delete_state;
200 * Precompiled SQL for state_sync()
202 struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
205 * Precompiled SQL for state_sync()
207 struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
210 * Precompiled SQL for state_get_signed()
212 struct GNUNET_MYSQL_StatementHandle *select_state_signed;
215 * Precompiled SQL for state_get()
217 struct GNUNET_MYSQL_StatementHandle *select_state_one;
220 * Precompiled SQL for state_get_prefix()
222 struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
229 mysql_trace (void *cls, const char *sql)
231 LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
238 * @brief Prepare a SQL statement
240 * @param dbh handle to the database
241 * @param sql SQL statement, UTF-8 encoded
242 * @param stmt set to the prepared statement
243 * @return 0 on success
246 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
248 struct GNUNET_MYSQL_StatementHandle **stmt)
250 *stmt = GNUNET_MYSQL_statement_prepare (mc,
253 LOG(GNUNET_ERROR_TYPE_DEBUG,
254 "Prepared `%s' / %p\n", sql, stmt);
256 LOG(GNUNET_ERROR_TYPE_ERROR,
257 _("Error preparing SQL query: %s\n %s\n"),
258 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), sql);
265 * Initialize the database connections and associated
266 * data structures (create tables and indices
267 * as needed as well).
269 * @param plugin the plugin context (state for this module)
270 * @return GNUNET_OK on success
273 database_setup (struct Plugin *plugin)
278 GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
279 "FILENAME", &filename))
281 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
282 "psycstore-mysql", "FILENAME");
283 return GNUNET_SYSERR;
286 if (GNUNET_OK != GNUNET_DISK_file_test (filename))
288 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
291 GNUNET_free (filename);
292 return GNUNET_SYSERR;
295 /* filename should be UTF-8-encoded. If it isn't, it's a bug */
296 plugin->fn = filename;
298 /* Open database and precompile statements */
299 plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
301 if (NULL == plugin->mc)
303 LOG(GNUNET_ERROR_TYPE_ERROR,
304 _("Unable to initialize Mysql.\n"));
305 return GNUNET_SYSERR;
310 GNUNET_MYSQL_statement_run (plugin->mc,
311 "CREATE TABLE IF NOT EXISTS channels (\n"
312 " id INT AUTO_INCREMENT,\n"
314 " max_state_message_id INT,\n"
315 " state_hash_message_id INT,\n"
316 " PRIMARY KEY(id),\n"
317 " UNIQUE KEY(pub_key(5))\n"
320 GNUNET_MYSQL_statement_run (plugin->mc,
321 "CREATE TABLE IF NOT EXISTS slaves (\n"
322 " id INT AUTO_INCREMENT,\n"
324 " PRIMARY KEY(id),\n"
325 " UNIQUE KEY(pub_key(5))\n"
328 GNUNET_MYSQL_statement_run (plugin->mc,
329 "CREATE TABLE IF NOT EXISTS membership (\n"
330 " channel_id INT NOT NULL REFERENCES channels(id),\n"
331 " slave_id INT NOT NULL REFERENCES slaves(id),\n"
332 " did_join INT NOT NULL,\n"
333 " announced_at BIGINT UNSIGNED NOT NULL,\n"
334 " effective_since BIGINT UNSIGNED NOT NULL,\n"
335 " group_generation BIGINT UNSIGNED NOT NULL\n"
338 /*** FIX because IF NOT EXISTS doesn't work ***/
339 GNUNET_MYSQL_statement_run (plugin->mc,
340 "CREATE INDEX idx_membership_channel_id_slave_id "
341 "ON membership (channel_id, slave_id);");
343 /** @todo messages table: add method_name column */
344 GNUNET_MYSQL_statement_run (plugin->mc,
345 "CREATE TABLE IF NOT EXISTS messages (\n"
346 " channel_id INT NOT NULL REFERENCES channels(id),\n"
347 " hop_counter BIGINT UNSIGNED NOT NULL,\n"
350 " fragment_id BIGINT UNSIGNED NOT NULL,\n"
351 " fragment_offset BIGINT UNSIGNED NOT NULL,\n"
352 " message_id BIGINT UNSIGNED NOT NULL,\n"
353 " group_generation BIGINT UNSIGNED NOT NULL,\n"
354 " multicast_flags BIGINT UNSIGNED NOT NULL,\n"
355 " psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
357 " PRIMARY KEY (channel_id, fragment_id),\n"
358 " UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
361 GNUNET_MYSQL_statement_run (plugin->mc,
362 "CREATE TABLE IF NOT EXISTS state (\n"
363 " channel_id INT NOT NULL REFERENCES channels(id),\n"
364 " name TEXT NOT NULL,\n"
365 " value_current BLOB,\n"
366 " value_signed BLOB,\n"
367 " PRIMARY KEY (channel_id, name(5))\n"
370 GNUNET_MYSQL_statement_run (plugin->mc,
371 "CREATE TABLE IF NOT EXISTS state_sync (\n"
372 " channel_id INT NOT NULL REFERENCES channels(id),\n"
373 " name TEXT NOT NULL,\n"
375 " PRIMARY KEY (channel_id, name(5))\n"
378 /* Prepare statements */
379 mysql_prepare (plugin->mc,
381 &plugin->transaction_begin);
383 mysql_prepare (plugin->mc,
385 &plugin->transaction_commit);
387 mysql_prepare (plugin->mc,
389 &plugin->transaction_rollback);
391 mysql_prepare (plugin->mc,
392 "INSERT IGNORE INTO channels (pub_key) VALUES (?);",
393 &plugin->insert_channel_key);
395 mysql_prepare (plugin->mc,
396 "INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
397 &plugin->insert_slave_key);
399 mysql_prepare (plugin->mc,
400 "INSERT INTO membership\n"
401 " (channel_id, slave_id, did_join, announced_at,\n"
402 " effective_since, group_generation)\n"
403 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
404 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
406 &plugin->insert_membership);
408 mysql_prepare (plugin->mc,
409 "SELECT did_join FROM membership\n"
410 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
411 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
412 " AND effective_since <= ? AND did_join = 1\n"
413 "ORDER BY announced_at DESC LIMIT 1;",
414 &plugin->select_membership);
416 mysql_prepare (plugin->mc,
417 "INSERT IGNORE INTO messages\n"
418 " (channel_id, hop_counter, signature, purpose,\n"
419 " fragment_id, fragment_offset, message_id,\n"
420 " group_generation, multicast_flags, psycstore_flags, data)\n"
421 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
422 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
423 &plugin->insert_fragment);
425 mysql_prepare (plugin->mc,
427 "SET psycstore_flags = psycstore_flags | ?\n"
428 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
429 " AND message_id = ? AND fragment_offset = 0;",
430 &plugin->update_message_flags);
432 mysql_prepare (plugin->mc,
433 "SELECT hop_counter, signature, purpose, fragment_id,\n"
434 " fragment_offset, message_id, group_generation,\n"
435 " multicast_flags, psycstore_flags, data\n"
437 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
438 " AND ? <= fragment_id AND fragment_id <= ?;",
439 &plugin->select_fragments);
441 /** @todo select_messages: add method_prefix filter */
442 mysql_prepare (plugin->mc,
443 "SELECT hop_counter, signature, purpose, fragment_id,\n"
444 " fragment_offset, message_id, group_generation,\n"
445 " multicast_flags, psycstore_flags, data\n"
447 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
448 " AND ? <= message_id AND message_id <= ?"
450 &plugin->select_messages);
452 mysql_prepare (plugin->mc,
454 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
455 " fragment_offset, message_id, group_generation,\n"
456 " multicast_flags, psycstore_flags, data\n"
458 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
459 " ORDER BY fragment_id DESC\n"
461 "ORDER BY fragment_id;",
462 &plugin->select_latest_fragments);
464 /** @todo select_latest_messages: add method_prefix filter */
465 mysql_prepare (plugin->mc,
466 "SELECT hop_counter, signature, purpose, fragment_id,\n"
467 " fragment_offset, message_id, group_generation,\n"
468 " multicast_flags, psycstore_flags, data\n"
470 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
471 " AND message_id IN\n"
472 " (SELECT message_id\n"
474 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
475 " GROUP BY message_id\n"
476 " ORDER BY message_id\n"
478 "ORDER BY fragment_id;",
479 &plugin->select_latest_messages);
481 mysql_prepare (plugin->mc,
482 "SELECT hop_counter, signature, purpose, fragment_id,\n"
483 " fragment_offset, message_id, group_generation,\n"
484 " multicast_flags, psycstore_flags, data\n"
486 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
487 " AND message_id = ? AND fragment_offset = ?;",
488 &plugin->select_message_fragment);
490 mysql_prepare (plugin->mc,
491 "SELECT fragment_id, message_id, group_generation\n"
493 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
494 "ORDER BY fragment_id DESC LIMIT 1;",
495 &plugin->select_counters_message);
497 mysql_prepare (plugin->mc,
498 "SELECT max_state_message_id\n"
500 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
501 &plugin->select_counters_state);
503 mysql_prepare (plugin->mc,
505 "SET max_state_message_id = ?\n"
506 "WHERE pub_key = ?;",
507 &plugin->update_max_state_message_id);
509 mysql_prepare (plugin->mc,
511 "SET state_hash_message_id = ?\n"
512 "WHERE pub_key = ?;",
513 &plugin->update_state_hash_message_id);
515 mysql_prepare (plugin->mc,
516 "REPLACE INTO state\n"
517 " (channel_id, name, value_current, value_signed)\n"
518 "SELECT new.channel_id, new.name,\n"
519 " new.value_current, old.value_signed\n"
520 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
522 " ? AS name, ? AS value_current) AS new\n"
523 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
524 " FROM state) AS old\n"
525 "ON new.channel_id = old.channel_id AND new.name = old.name;",
526 &plugin->insert_state_current);
528 mysql_prepare (plugin->mc,
529 "DELETE FROM state\n"
530 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
531 " AND (value_current IS NULL OR length(value_current) = 0)\n"
532 " AND (value_signed IS NULL OR length(value_signed) = 0);",
533 &plugin->delete_state_empty);
535 mysql_prepare (plugin->mc,
537 "SET value_signed = value_current\n"
538 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
539 &plugin->update_state_signed);
541 mysql_prepare (plugin->mc,
542 "DELETE FROM state\n"
543 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
544 &plugin->delete_state);
546 mysql_prepare (plugin->mc,
547 "INSERT INTO state_sync (channel_id, name, value)\n"
548 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
549 &plugin->insert_state_sync);
551 mysql_prepare (plugin->mc,
552 "INSERT INTO state\n"
553 " (channel_id, name, value_current, value_signed)\n"
554 "SELECT channel_id, name, value, value\n"
556 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
557 &plugin->insert_state_from_sync);
559 mysql_prepare (plugin->mc,
560 "DELETE FROM state_sync\n"
561 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
562 &plugin->delete_state_sync);
564 mysql_prepare (plugin->mc,
565 "SELECT value_current\n"
567 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
569 &plugin->select_state_one);
571 mysql_prepare (plugin->mc,
572 "SELECT name, value_current\n"
574 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
575 " AND (name = ? OR substr(name, 1, ?) = ? || '_');",
576 &plugin->select_state_prefix);
578 mysql_prepare (plugin->mc,
579 "SELECT name, value_signed\n"
581 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
582 " AND value_signed IS NOT NULL;",
583 &plugin->select_state_signed);
590 * Shutdown database connection and associate data
592 * @param plugin the plugin context (state for this module)
595 database_shutdown (struct Plugin *plugin)
597 GNUNET_MYSQL_context_destroy (plugin->mc);
599 GNUNET_free_non_null (plugin->fn);
605 * Execute a prepared statement with a @a channel_key argument.
607 * @param plugin Plugin handle.
608 * @param stmt Statement to execute.
609 * @param channel_key Public key of the channel.
611 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
614 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
615 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
617 struct GNUNET_MY_QueryParam params[] = {
618 GNUNET_MY_query_param_auto_from_type (channel_key),
619 GNUNET_MY_query_param_end
622 if(GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
626 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
627 "mysql exec_channel", stmt);
630 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
632 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
633 "mysql_stmt_reset", stmt);
634 return GNUNET_SYSERR;
642 * Begin a transaction.
645 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
647 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
649 struct GNUNET_MY_QueryParam params[] = {
650 GNUNET_MY_query_param_end
653 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
657 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
658 "mysql extract_result", stmt);
659 return GNUNET_SYSERR;
662 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
664 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
665 "mysql_stmt_reset", stmt);
666 return GNUNET_SYSERR;
669 plugin->transaction = transaction;
675 * Commit current transaction.
678 transaction_commit (struct Plugin *plugin)
680 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
682 struct GNUNET_MY_QueryParam params[] = {
683 GNUNET_MY_query_param_end
686 if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
690 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
691 "mysql extract_result", stmt);
692 return GNUNET_SYSERR;
695 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
697 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
698 "mysql_stmt_reset", stmt);
699 return GNUNET_SYSERR;
702 plugin->transaction = TRANSACTION_NONE;
708 * Roll back current transaction.
711 transaction_rollback (struct Plugin *plugin)
713 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
715 struct GNUNET_MY_QueryParam params[] = {
716 GNUNET_MY_query_param_end
719 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
723 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
724 "mysql extract_result", stmt);
725 return GNUNET_SYSERR;
728 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
730 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
731 "mysql_stmt_reset", stmt);
732 return GNUNET_SYSERR;
735 plugin->transaction = TRANSACTION_NONE;
741 channel_key_store (struct Plugin *plugin,
742 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
744 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
746 struct GNUNET_MY_QueryParam params[] = {
747 GNUNET_MY_query_param_auto_from_type (channel_key),
748 GNUNET_MY_query_param_end
751 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
755 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
756 "mysql exec_prepared", stmt);
757 return GNUNET_SYSERR;
760 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
762 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
763 "mysql_stmt_reset", stmt);
764 return GNUNET_SYSERR;
772 slave_key_store (struct Plugin *plugin,
773 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
775 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
777 struct GNUNET_MY_QueryParam params[] = {
778 GNUNET_MY_query_param_auto_from_type (slave_key),
779 GNUNET_MY_query_param_end
782 if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
786 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
787 "mysql exec_prepared", stmt);
788 return GNUNET_SYSERR;
791 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
793 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
794 "mysql_stmt_reset", stmt);
795 return GNUNET_SYSERR;
803 * Store join/leave events for a PSYC channel in order to be able to answer
804 * membership test queries later.
806 * @see GNUNET_PSYCSTORE_membership_store()
808 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
811 mysql_membership_store (void *cls,
812 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
813 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
815 uint64_t announced_at,
816 uint64_t effective_since,
817 uint64_t group_generation)
819 struct Plugin *plugin = cls;
821 uint32_t idid_join = (uint32_t)did_join;
823 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
825 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
827 if (announced_at > INT64_MAX ||
828 effective_since > INT64_MAX ||
829 group_generation > INT64_MAX)
832 return GNUNET_SYSERR;
835 if (GNUNET_OK != channel_key_store (plugin, channel_key)
836 || GNUNET_OK != slave_key_store (plugin, slave_key))
837 return GNUNET_SYSERR;
839 struct GNUNET_MY_QueryParam params[] = {
840 GNUNET_MY_query_param_auto_from_type (channel_key),
841 GNUNET_MY_query_param_auto_from_type (slave_key),
842 GNUNET_MY_query_param_uint32 (&idid_join),
843 GNUNET_MY_query_param_uint64 (&announced_at),
844 GNUNET_MY_query_param_uint64 (&effective_since),
845 GNUNET_MY_query_param_uint64 (&group_generation),
846 GNUNET_MY_query_param_end
849 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
853 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
854 "mysql exec_prepared", stmt);
855 return GNUNET_SYSERR;
858 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
860 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
861 "mysql_stmt_reset", stmt);
862 return GNUNET_SYSERR;
868 * Test if a member was admitted to the channel at the given message ID.
870 * @see GNUNET_PSYCSTORE_membership_test()
872 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
873 * #GNUNET_SYSERR if there was en error.
876 membership_test (void *cls,
877 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
878 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
881 struct Plugin *plugin = cls;
883 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
885 uint32_t did_join = 0;
887 int ret = GNUNET_SYSERR;
889 struct GNUNET_MY_QueryParam params_select[] = {
890 GNUNET_MY_query_param_auto_from_type (channel_key),
891 GNUNET_MY_query_param_auto_from_type (slave_key),
892 GNUNET_MY_query_param_uint64 (&message_id),
893 GNUNET_MY_query_param_end
896 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
900 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
901 "mysql execute prepared", stmt);
902 return GNUNET_SYSERR;
905 struct GNUNET_MY_ResultSpec results_select[] = {
906 GNUNET_MY_result_spec_uint32 (&did_join),
907 GNUNET_MY_result_spec_end
910 switch(GNUNET_MY_extract_result (stmt,
920 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
921 "mysql extract_result", stmt);
922 return GNUNET_SYSERR;
925 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
927 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
928 "mysql_stmt_reset", stmt);
929 return GNUNET_SYSERR;
936 * Store a message fragment sent to a channel.
938 * @see GNUNET_PSYCSTORE_fragment_store()
940 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
943 fragment_store (void *cls,
944 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
945 const struct GNUNET_MULTICAST_MessageHeader *msg,
946 uint32_t psycstore_flags)
948 struct Plugin *plugin = cls;
950 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
952 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
954 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
956 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
957 uint64_t message_id = GNUNET_ntohll (msg->message_id);
958 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
960 uint64_t hop_counter = ntohl(msg->hop_counter);
961 uint64_t flags = ntohl(msg->flags);
963 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
964 message_id > INT64_MAX || group_generation > INT64_MAX)
966 LOG(GNUNET_ERROR_TYPE_ERROR,
967 "Tried to store fragment with a field > INT64_MAX: "
968 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
969 message_id, group_generation);
971 return GNUNET_SYSERR;
974 if (GNUNET_OK != channel_key_store (plugin, channel_key))
975 return GNUNET_SYSERR;
977 struct GNUNET_MY_QueryParam params_insert[] = {
978 GNUNET_MY_query_param_auto_from_type (channel_key),
979 GNUNET_MY_query_param_uint64 (&hop_counter),
980 GNUNET_MY_query_param_auto_from_type (&msg->signature),
981 GNUNET_MY_query_param_auto_from_type (&msg->purpose),
982 GNUNET_MY_query_param_uint64 (&fragment_id),
983 GNUNET_MY_query_param_uint64 (&fragment_offset),
984 GNUNET_MY_query_param_uint64 (&message_id),
985 GNUNET_MY_query_param_uint64 (&group_generation),
986 GNUNET_MY_query_param_uint64 (&flags),
987 GNUNET_MY_query_param_uint32 (&psycstore_flags),
988 GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
990 GNUNET_MY_query_param_end
993 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
997 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
998 "mysql execute prepared", stmt);
999 return GNUNET_SYSERR;
1002 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1004 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1005 "mysql_stmt_reset", stmt);
1006 return GNUNET_SYSERR;
1013 * Set additional flags for a given message.
1015 * They are OR'd with any existing flags set.
1017 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1020 message_add_flags (void *cls,
1021 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1022 uint64_t message_id,
1023 uint64_t psycstore_flags)
1025 struct Plugin *plugin = cls;
1026 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
1029 int ret = GNUNET_SYSERR;
1031 struct GNUNET_MY_QueryParam params_update[] = {
1032 GNUNET_MY_query_param_uint64 (&psycstore_flags),
1033 GNUNET_MY_query_param_auto_from_type (channel_key),
1034 GNUNET_MY_query_param_uint64 (&message_id),
1035 GNUNET_MY_query_param_end
1038 sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1047 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1048 "mysql execute prepared", stmt);
1051 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1053 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1054 "mysql_stmt_reset", stmt);
1055 return GNUNET_SYSERR;
1063 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
1064 GNUNET_PSYCSTORE_FragmentCallback cb,
1068 uint32_t hop_counter;
1069 void *signature = NULL;
1070 void *purpose = NULL;
1071 size_t signature_size;
1072 size_t purpose_size;
1074 uint64_t fragment_id;
1075 uint64_t fragment_offset;
1076 uint64_t message_id;
1077 uint64_t group_generation;
1081 int ret = GNUNET_SYSERR;
1083 struct GNUNET_MULTICAST_MessageHeader *mp;
1088 struct GNUNET_MY_ResultSpec results[] = {
1089 GNUNET_MY_result_spec_uint32 (&hop_counter),
1090 GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
1091 GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
1092 GNUNET_MY_result_spec_uint64 (&fragment_id),
1093 GNUNET_MY_result_spec_uint64 (&fragment_offset),
1094 GNUNET_MY_result_spec_uint64 (&message_id),
1095 GNUNET_MY_result_spec_uint64 (&group_generation),
1096 GNUNET_MY_result_spec_uint64 (&msg_flags),
1097 GNUNET_MY_result_spec_uint64 (&flags),
1098 GNUNET_MY_result_spec_variable_size (&buf,
1100 GNUNET_MY_result_spec_end
1103 sql_ret = GNUNET_MY_extract_result (stmt,
1108 if (ret != GNUNET_OK)
1113 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
1115 mp->header.size = htons (sizeof (*mp) + buf_size);
1116 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1117 mp->hop_counter = htonl (hop_counter);
1118 GNUNET_memcpy (&mp->signature,
1121 GNUNET_memcpy (&mp->purpose,
1124 mp->fragment_id = GNUNET_htonll (fragment_id);
1125 mp->fragment_offset = GNUNET_htonll (fragment_offset);
1126 mp->message_id = GNUNET_htonll (message_id);
1127 mp->group_generation = GNUNET_htonll (group_generation);
1128 mp->flags = htonl(msg_flags);
1130 GNUNET_memcpy (&mp[1],
1135 (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1137 GNUNET_MY_cleanup_result (results);
1141 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1142 "mysql extract_result", stmt);
1150 fragment_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1151 struct GNUNET_MY_QueryParam *params,
1152 uint64_t *returned_fragments,
1153 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1155 int ret = GNUNET_SYSERR;
1158 if(NULL == plugin->mc)
1160 fprintf(stderr, "bla\n");
1165 fprintf(stderr, "blo\n" );
1170 fprintf(stderr, "toot\n" );
1173 sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1179 if (ret != GNUNET_OK)
1183 ret = fragment_row (stmt, cb, cb_cls);
1184 (*returned_fragments)++;
1187 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1188 "mysql exec_prepared", stmt);
1195 * Retrieve a message fragment range by fragment ID.
1197 * @see GNUNET_PSYCSTORE_fragment_get()
1199 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1202 fragment_get (void *cls,
1203 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1204 uint64_t first_fragment_id,
1205 uint64_t last_fragment_id,
1206 uint64_t *returned_fragments,
1207 GNUNET_PSYCSTORE_FragmentCallback cb,
1210 struct Plugin *plugin = cls;
1211 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1212 int ret = GNUNET_SYSERR;
1213 *returned_fragments = 0;
1215 struct GNUNET_MY_QueryParam params_select[] = {
1216 GNUNET_MY_query_param_auto_from_type (channel_key),
1217 GNUNET_MY_query_param_uint64 (&first_fragment_id),
1218 GNUNET_MY_query_param_uint64 (&last_fragment_id),
1219 GNUNET_MY_query_param_end
1222 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1224 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1226 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1227 "mysql_stmt_reset", stmt);
1228 return GNUNET_SYSERR;
1236 * Retrieve a message fragment range by fragment ID.
1238 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1240 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1243 fragment_get_latest (void *cls,
1244 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1245 uint64_t fragment_limit,
1246 uint64_t *returned_fragments,
1247 GNUNET_PSYCSTORE_FragmentCallback cb,
1250 struct Plugin *plugin = cls;
1252 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1254 int ret = GNUNET_SYSERR;
1255 *returned_fragments = 0;
1257 struct GNUNET_MY_QueryParam params_select[] = {
1258 GNUNET_MY_query_param_auto_from_type (channel_key),
1259 GNUNET_MY_query_param_uint64 (&fragment_limit),
1260 GNUNET_MY_query_param_end
1263 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1265 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1267 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1268 "mysql_stmt_reset", stmt);
1269 return GNUNET_SYSERR;
1277 * Retrieve all fragments of a message ID range.
1279 * @see GNUNET_PSYCSTORE_message_get()
1281 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1284 message_get (void *cls,
1285 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1286 uint64_t first_message_id,
1287 uint64_t last_message_id,
1288 uint64_t fragment_limit,
1289 uint64_t *returned_fragments,
1290 GNUNET_PSYCSTORE_FragmentCallback cb,
1293 struct Plugin *plugin = cls;
1295 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1297 int ret = GNUNET_SYSERR;
1298 *returned_fragments = 0;
1300 struct GNUNET_MY_QueryParam params_select[] = {
1301 GNUNET_MY_query_param_auto_from_type (channel_key),
1302 GNUNET_MY_query_param_uint64 (&first_message_id),
1303 GNUNET_MY_query_param_uint64 (&last_message_id),
1304 GNUNET_MY_query_param_uint64 (&fragment_limit),
1305 GNUNET_MY_query_param_end
1308 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1310 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1312 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1313 "mysql_stmt_reset", stmt);
1314 return GNUNET_SYSERR;
1322 * Retrieve all fragments of the latest messages.
1324 * @see GNUNET_PSYCSTORE_message_get_latest()
1326 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1329 message_get_latest (void *cls,
1330 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1331 uint64_t message_limit,
1332 uint64_t *returned_fragments,
1333 GNUNET_PSYCSTORE_FragmentCallback cb,
1336 struct Plugin *plugin = cls;
1338 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1340 int ret = GNUNET_SYSERR;
1341 *returned_fragments = 0;
1343 struct GNUNET_MY_QueryParam params_select[] = {
1344 GNUNET_MY_query_param_auto_from_type (channel_key),
1345 GNUNET_MY_query_param_auto_from_type (channel_key),
1346 GNUNET_MY_query_param_uint64 (&message_limit),
1347 GNUNET_MY_query_param_end
1350 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1352 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1354 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1355 "mysql_stmt_reset", stmt);
1356 return GNUNET_SYSERR;
1364 * Retrieve a fragment of message specified by its message ID and fragment
1367 * @see GNUNET_PSYCSTORE_message_get_fragment()
1369 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1372 message_get_fragment (void *cls,
1373 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1374 uint64_t message_id,
1375 uint64_t fragment_offset,
1376 GNUNET_PSYCSTORE_FragmentCallback cb,
1379 struct Plugin *plugin = cls;
1380 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1382 int ret = GNUNET_SYSERR;
1384 struct GNUNET_MY_QueryParam params_select[] = {
1385 GNUNET_MY_query_param_auto_from_type (channel_key),
1386 GNUNET_MY_query_param_uint64 (&message_id),
1387 GNUNET_MY_query_param_uint64 (&fragment_offset),
1388 GNUNET_MY_query_param_end
1391 sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1400 ret = fragment_row (stmt, cb, cb_cls);
1403 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1404 "mysql execute prepared", stmt);
1407 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1409 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1410 "mysql_stmt_reset", stmt);
1411 return GNUNET_SYSERR;
1418 * Retrieve the max. values of message counters for a channel.
1420 * @see GNUNET_PSYCSTORE_counters_get()
1422 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1425 counters_message_get (void *cls,
1426 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1427 uint64_t *max_fragment_id,
1428 uint64_t *max_message_id,
1429 uint64_t *max_group_generation)
1431 struct Plugin *plugin = cls;
1433 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1435 int ret = GNUNET_SYSERR;
1437 struct GNUNET_MY_QueryParam params_select[] = {
1438 GNUNET_MY_query_param_auto_from_type (channel_key),
1439 GNUNET_MY_query_param_end
1442 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1446 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1447 "mysql execute prepared", stmt);
1448 return GNUNET_SYSERR;
1451 struct GNUNET_MY_ResultSpec results_select[] = {
1452 GNUNET_MY_result_spec_uint64 (max_fragment_id),
1453 GNUNET_MY_result_spec_uint64 (max_message_id),
1454 GNUNET_MY_result_spec_uint64 (max_group_generation),
1455 GNUNET_MY_result_spec_end
1458 ret = GNUNET_MY_extract_result (stmt,
1461 if (GNUNET_OK != ret)
1463 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1464 "mysql extract_result", stmt);
1465 return GNUNET_SYSERR;
1468 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1470 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1471 "mysql_stmt_reset", stmt);
1472 return GNUNET_SYSERR;
1479 * Retrieve the max. values of state counters for a channel.
1481 * @see GNUNET_PSYCSTORE_counters_get()
1483 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1486 counters_state_get (void *cls,
1487 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1488 uint64_t *max_state_message_id)
1490 struct Plugin *plugin = cls;
1492 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1494 int ret = GNUNET_SYSERR;
1496 struct GNUNET_MY_QueryParam params_select[] = {
1497 GNUNET_MY_query_param_auto_from_type (channel_key),
1498 GNUNET_MY_query_param_end
1501 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1505 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1506 "mysql execute prepared", stmt);
1507 return GNUNET_SYSERR;
1510 struct GNUNET_MY_ResultSpec results_select[] = {
1511 GNUNET_MY_result_spec_uint64 (max_state_message_id),
1512 GNUNET_MY_result_spec_end
1515 ret = GNUNET_MY_extract_result (stmt,
1518 if (GNUNET_OK != ret)
1520 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1521 "mysql extract_result", stmt);
1522 return GNUNET_SYSERR;
1525 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1527 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1528 "mysql_stmt_reset", stmt);
1529 return GNUNET_SYSERR;
1537 * Assign a value to a state variable.
1539 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1542 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1543 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1544 const char *name, const void *value, size_t value_size)
1546 int ret = GNUNET_SYSERR;
1548 struct GNUNET_MY_QueryParam params[] = {
1549 GNUNET_MY_query_param_auto_from_type (channel_key),
1550 GNUNET_MY_query_param_string (name),
1551 GNUNET_MY_query_param_auto_from_type (value),
1552 GNUNET_MY_query_param_end
1555 ret = GNUNET_MY_exec_prepared (plugin->mc,
1559 if (GNUNET_OK != ret)
1561 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1562 "mysql execute prepared", stmt);
1563 return GNUNET_SYSERR;
1566 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1568 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1569 "mysql_stmt_reset", stmt);
1570 return GNUNET_SYSERR;
1578 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1579 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1580 uint64_t message_id)
1582 struct GNUNET_MY_QueryParam params[] = {
1583 GNUNET_MY_query_param_uint64 (&message_id),
1584 GNUNET_MY_query_param_auto_from_type (channel_key),
1585 GNUNET_MY_query_param_end
1588 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1592 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1593 "mysql execute prepared", stmt);
1594 return GNUNET_SYSERR;
1597 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1599 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1600 "mysql_stmt_reset", stmt);
1601 return GNUNET_SYSERR;
1609 * Begin modifying current state.
1612 state_modify_begin (void *cls,
1613 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1614 uint64_t message_id, uint64_t state_delta)
1616 struct Plugin *plugin = cls;
1618 if (state_delta > 0)
1621 * We can only apply state modifiers in the current message if modifiers in
1622 * the previous stateful message (message_id - state_delta) were already
1626 uint64_t max_state_message_id = 0;
1627 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1631 case GNUNET_NO: // no state yet
1638 if (max_state_message_id < message_id - state_delta)
1639 return GNUNET_NO; /* some stateful messages not yet applied */
1640 else if (message_id - state_delta < max_state_message_id)
1641 return GNUNET_NO; /* changes already applied */
1644 if (TRANSACTION_NONE != plugin->transaction)
1646 /** @todo FIXME: wait for other transaction to finish */
1647 return GNUNET_SYSERR;
1649 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1654 * Set the current value of state variable.
1656 * @see GNUNET_PSYCSTORE_state_modify()
1658 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1661 state_modify_op (void *cls,
1662 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1663 enum GNUNET_PSYC_Operator op,
1664 const char *name, const void *value, size_t value_size)
1666 struct Plugin *plugin = cls;
1667 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1671 case GNUNET_PSYC_OP_ASSIGN:
1672 return state_assign (plugin, plugin->insert_state_current, channel_key,
1673 name, value, value_size);
1675 default: /** @todo implement more state operations */
1677 return GNUNET_SYSERR;
1683 * End modifying current state.
1686 state_modify_end (void *cls,
1687 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1688 uint64_t message_id)
1690 struct Plugin *plugin = cls;
1691 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1694 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1695 && GNUNET_OK == update_message_id (plugin,
1696 plugin->update_max_state_message_id,
1697 channel_key, message_id)
1698 && GNUNET_OK == transaction_commit (plugin)
1699 ? GNUNET_OK : GNUNET_SYSERR;
1704 * Begin state synchronization.
1707 state_sync_begin (void *cls,
1708 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1710 struct Plugin *plugin = cls;
1711 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1716 * Assign current value of a state variable.
1718 * @see GNUNET_PSYCSTORE_state_modify()
1720 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1723 state_sync_assign (void *cls,
1724 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1725 const char *name, const void *value, size_t value_size)
1727 struct Plugin *plugin = cls;
1728 return state_assign (cls, plugin->insert_state_sync, channel_key,
1729 name, value, value_size);
1734 * End modifying current state.
1737 state_sync_end (void *cls,
1738 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1739 uint64_t max_state_message_id,
1740 uint64_t state_hash_message_id)
1742 struct Plugin *plugin = cls;
1743 int ret = GNUNET_SYSERR;
1745 if (TRANSACTION_NONE != plugin->transaction)
1747 /** @todo FIXME: wait for other transaction to finish */
1748 return GNUNET_SYSERR;
1751 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1752 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1753 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1755 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1757 && GNUNET_OK == update_message_id (plugin,
1758 plugin->update_state_hash_message_id,
1759 channel_key, state_hash_message_id)
1760 && GNUNET_OK == update_message_id (plugin,
1761 plugin->update_max_state_message_id,
1762 channel_key, max_state_message_id)
1763 && GNUNET_OK == transaction_commit (plugin)
1765 : transaction_rollback (plugin);
1771 * Delete the whole state.
1773 * @see GNUNET_PSYCSTORE_state_reset()
1775 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1778 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1780 struct Plugin *plugin = cls;
1781 return exec_channel (plugin, plugin->delete_state, channel_key);
1786 * Update signed values of state variables in the state store.
1788 * @see GNUNET_PSYCSTORE_state_hash_update()
1790 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1793 state_update_signed (void *cls,
1794 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1796 struct Plugin *plugin = cls;
1797 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1802 * Retrieve a state variable by name.
1804 * @see GNUNET_PSYCSTORE_state_get()
1806 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1809 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1810 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1812 struct Plugin *plugin = cls;
1813 int ret = GNUNET_SYSERR;
1816 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1818 struct GNUNET_MY_QueryParam params_select[] = {
1819 GNUNET_MY_query_param_auto_from_type (channel_key),
1820 GNUNET_MY_query_param_string (name),
1821 GNUNET_MY_query_param_end
1824 void *value_current = NULL;
1825 size_t value_size = 0;
1827 struct GNUNET_MY_ResultSpec results[] = {
1828 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1829 GNUNET_MY_result_spec_end
1832 GNUNET_MY_exec_prepared (plugin->mc,
1837 sql_ret = GNUNET_MY_extract_result (stmt,
1846 ret = cb (cb_cls, name, value_current,
1850 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1851 "mysql extract_result", stmt);
1854 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1856 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1857 "mysql_stmt_reset", stmt);
1858 return GNUNET_SYSERR;
1866 * Retrieve all state variables for a channel with the given prefix.
1868 * @see GNUNET_PSYCSTORE_state_get_prefix()
1870 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1873 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1874 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1877 struct Plugin *plugin = cls;
1878 int ret = GNUNET_SYSERR;
1880 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1882 uint32_t name_len = (uint32_t) strlen (name);
1884 struct GNUNET_MY_QueryParam params_select[] = {
1885 GNUNET_MY_query_param_auto_from_type (channel_key),
1886 GNUNET_MY_query_param_string (name),
1887 GNUNET_MY_query_param_uint32 (&name_len),
1888 GNUNET_MY_query_param_string (name),
1889 GNUNET_MY_query_param_end
1893 void *value_current = NULL;
1894 size_t value_size = 0;
1896 struct GNUNET_MY_ResultSpec results[] = {
1897 GNUNET_MY_result_spec_string (&name2),
1898 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1899 GNUNET_MY_result_spec_end
1906 GNUNET_MY_exec_prepared (plugin->mc,
1909 sql_ret = GNUNET_MY_extract_result (stmt,
1914 if (ret != GNUNET_OK)
1918 ret = cb (cb_cls, (const char *) name2,
1922 if (ret != GNUNET_YES)
1923 sql_ret = GNUNET_NO;
1926 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1927 "mysql extract_result", stmt);
1930 while (sql_ret == GNUNET_YES);
1932 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1934 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1935 "mysql_stmt_reset", stmt);
1936 return GNUNET_SYSERR;
1944 * Retrieve all signed state variables for a channel.
1946 * @see GNUNET_PSYCSTORE_state_get_signed()
1948 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1951 state_get_signed (void *cls,
1952 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1953 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1955 struct Plugin *plugin = cls;
1956 int ret = GNUNET_SYSERR;
1958 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1960 struct GNUNET_MY_QueryParam params_select[] = {
1961 GNUNET_MY_query_param_auto_from_type (channel_key),
1962 GNUNET_MY_query_param_end
1968 void *value_signed = NULL;
1969 size_t value_size = 0;
1971 struct GNUNET_MY_ResultSpec results[] = {
1972 GNUNET_MY_result_spec_string (&name),
1973 GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1974 GNUNET_MY_result_spec_end
1979 GNUNET_MY_exec_prepared (plugin->mc,
1982 sql_ret = GNUNET_MY_extract_result (stmt,
1988 if (ret != GNUNET_OK)
1992 ret = cb (cb_cls, (const char *) name,
1996 if (ret != GNUNET_YES)
1997 sql_ret = GNUNET_NO;
2000 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2001 "mysql extract_result", stmt);
2004 while (sql_ret == GNUNET_YES);
2006 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
2008 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2009 "mysql_stmt_reset", stmt);
2010 return GNUNET_SYSERR;
2018 * Entry point for the plugin.
2020 * @param cls The struct GNUNET_CONFIGURATION_Handle.
2021 * @return NULL on error, otherwise the plugin context
2024 libgnunet_plugin_psycstore_mysql_init (void *cls)
2026 static struct Plugin plugin;
2027 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
2028 struct GNUNET_PSYCSTORE_PluginFunctions *api;
2030 if (NULL != plugin.cfg)
2031 return NULL; /* can only initialize once! */
2032 memset (&plugin, 0, sizeof (struct Plugin));
2034 if (GNUNET_OK != database_setup (&plugin))
2036 database_shutdown (&plugin);
2039 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
2041 api->membership_store = &mysql_membership_store;
2042 api->membership_test = &membership_test;
2043 api->fragment_store = &fragment_store;
2044 api->message_add_flags = &message_add_flags;
2045 api->fragment_get = &fragment_get;
2046 api->fragment_get_latest = &fragment_get_latest;
2047 api->message_get = &message_get;
2048 api->message_get_latest = &message_get_latest;
2049 api->message_get_fragment = &message_get_fragment;
2050 api->counters_message_get = &counters_message_get;
2051 api->counters_state_get = &counters_state_get;
2052 api->state_modify_begin = &state_modify_begin;
2053 api->state_modify_op = &state_modify_op;
2054 api->state_modify_end = &state_modify_end;
2055 api->state_sync_begin = &state_sync_begin;
2056 api->state_sync_assign = &state_sync_assign;
2057 api->state_sync_end = &state_sync_end;
2058 api->state_reset = &state_reset;
2059 api->state_update_signed = &state_update_signed;
2060 api->state_get = &state_get;
2061 api->state_get_prefix = &state_get_prefix;
2062 api->state_get_signed = &state_get_signed;
2064 LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
2070 * Exit point from the plugin.
2072 * @param cls The plugin context (as returned by "init")
2073 * @return Always NULL
2076 libgnunet_plugin_psycstore_mysql_done (void *cls)
2078 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2079 struct Plugin *plugin = api->cls;
2081 database_shutdown (plugin);
2084 LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
2088 /* end of plugin_psycstore_mysql.c */