2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
17 * @file psycstore/plugin_psycstore_mysql.c
18 * @brief mysql-based psycstore backend
19 * @author Gabor X Toth
20 * @author Christian Grothoff
21 * @author Christophe Genevey
25 #include "gnunet_psycstore_plugin.h"
26 #include "gnunet_psycstore_service.h"
27 #include "gnunet_multicast_service.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_psyc_util_lib.h"
30 #include "psycstore.h"
31 #include "gnunet_my_lib.h"
32 #include "gnunet_mysql_lib.h"
33 #include <mysql/mysql.h>
36 * After how many ms "busy" should a DB operation fail for good? A
37 * low value makes sure that we are more responsive to requests
38 * (especially PUTs). A high value guarantees a higher success rate
39 * (SELECTs in iterate can take several seconds despite LIMIT=1).
41 * The default value of 1s should ensure that users do not experience
42 * huge latencies while at the same time allowing operations to
43 * succeed with reasonable probability.
45 #define BUSY_TIMEOUT_MS 1000
47 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
50 * Log an error message at log-level 'level' that indicates
51 * a failure of the command 'cmd' on file 'filename'
52 * with the message given by strerror(errno).
54 #define LOG_MYSQL(db, level, cmd, stmt) \
56 GNUNET_log_from (level, "psycstore-mysql", \
57 _("`%s' failed at %s:%d with error: %s\n"), \
58 cmd, __FILE__, __LINE__, \
59 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
66 TRANSACTION_STATE_MODIFY,
67 TRANSACTION_STATE_SYNC,
71 * Context for all functions in this plugin.
76 const struct GNUNET_CONFIGURATION_Handle *cfg;
81 struct GNUNET_MYSQL_Context *mc;
84 * Current transaction.
86 enum Transactions transaction;
89 * Precompiled SQL for channel_key_store()
91 struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
94 * Precompiled SQL for slave_key_store()
96 struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
99 * Precompiled SQL for membership_store()
101 struct GNUNET_MYSQL_StatementHandle *insert_membership;
104 * Precompiled SQL for membership_test()
106 struct GNUNET_MYSQL_StatementHandle *select_membership;
109 * Precompiled SQL for fragment_store()
111 struct GNUNET_MYSQL_StatementHandle *insert_fragment;
114 * Precompiled SQL for message_add_flags()
116 struct GNUNET_MYSQL_StatementHandle *update_message_flags;
119 * Precompiled SQL for fragment_get()
121 struct GNUNET_MYSQL_StatementHandle *select_fragments;
124 * Precompiled SQL for fragment_get()
126 struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
129 * Precompiled SQL for message_get()
131 struct GNUNET_MYSQL_StatementHandle *select_messages;
134 * Precompiled SQL for message_get()
136 struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
139 * Precompiled SQL for message_get_fragment()
141 struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
144 * Precompiled SQL for counters_get_message()
146 struct GNUNET_MYSQL_StatementHandle *select_counters_message;
149 * Precompiled SQL for counters_get_state()
151 struct GNUNET_MYSQL_StatementHandle *select_counters_state;
154 * Precompiled SQL for state_modify_end()
156 struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
159 * Precompiled SQL for state_sync_end()
161 struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
164 * Precompiled SQL for state_modify_op()
166 struct GNUNET_MYSQL_StatementHandle *insert_state_current;
169 * Precompiled SQL for state_modify_end()
171 struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
174 * Precompiled SQL for state_set_signed()
176 struct GNUNET_MYSQL_StatementHandle *update_state_signed;
179 * Precompiled SQL for state_sync()
181 struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
184 * Precompiled SQL for state_sync()
186 struct GNUNET_MYSQL_StatementHandle *delete_state;
189 * Precompiled SQL for state_sync()
191 struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
194 * Precompiled SQL for state_sync()
196 struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
199 * Precompiled SQL for state_get_signed()
201 struct GNUNET_MYSQL_StatementHandle *select_state_signed;
204 * Precompiled SQL for state_get()
206 struct GNUNET_MYSQL_StatementHandle *select_state_one;
209 * Precompiled SQL for state_get_prefix()
211 struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
218 mysql_trace (void *cls, const char *sql)
220 LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
227 * @brief Prepare a SQL statement
229 * @param dbh handle to the database
230 * @param sql SQL statement, UTF-8 encoded
231 * @param stmt set to the prepared statement
232 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
235 mysql_prepare (struct GNUNET_MYSQL_Context *mc,
237 struct GNUNET_MYSQL_StatementHandle **stmt)
239 *stmt = GNUNET_MYSQL_statement_prepare (mc,
244 LOG (GNUNET_ERROR_TYPE_ERROR,
245 _("Error preparing SQL query: %s\n %s\n"),
246 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
248 return GNUNET_SYSERR;
250 LOG (GNUNET_ERROR_TYPE_DEBUG,
251 "Prepared `%s' / %p\n",
259 * Initialize the database connections and associated
260 * data structures (create tables and indices
261 * as needed as well).
263 * @param plugin the plugin context (state for this module)
264 * @return #GNUNET_OK on success
267 database_setup (struct Plugin *plugin)
269 /* Open database and precompile statements */
270 plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
273 if (NULL == plugin->mc)
275 LOG (GNUNET_ERROR_TYPE_ERROR,
276 _("Unable to initialize Mysql.\n"));
277 return GNUNET_SYSERR;
280 #define STMT_RUN(sql) \
282 GNUNET_MYSQL_statement_run (plugin->mc, \
285 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
286 _("Failed to run SQL statement `%s'\n"), \
288 return GNUNET_SYSERR; \
292 STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
293 " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
294 " pub_key BLOB(32),\n"
295 " max_state_message_id BIGINT UNSIGNED,\n"
296 " state_hash_message_id BIGINT UNSIGNED,\n"
297 " PRIMARY KEY(id),\n"
298 " UNIQUE KEY(pub_key(32))\n"
301 STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
302 " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
303 " pub_key BLOB(32),\n"
304 " PRIMARY KEY(id),\n"
305 " UNIQUE KEY(pub_key(32))\n"
308 STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
309 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
310 " slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
311 " did_join TINYINT NOT NULL,\n"
312 " announced_at BIGINT UNSIGNED NOT NULL,\n"
313 " effective_since BIGINT UNSIGNED NOT NULL,\n"
314 " group_generation BIGINT UNSIGNED NOT NULL\n"
317 /*** FIX because IF NOT EXISTS doesn't work ***/
318 GNUNET_MYSQL_statement_run (plugin->mc,
319 "CREATE INDEX idx_membership_channel_id_slave_id "
320 "ON membership (channel_id, slave_id);");
322 /** @todo messages table: add method_name column */
323 STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
324 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
325 " hop_counter BIGINT UNSIGNED NOT NULL,\n"
328 " fragment_id BIGINT UNSIGNED NOT NULL,\n"
329 " fragment_offset BIGINT UNSIGNED NOT NULL,\n"
330 " message_id BIGINT UNSIGNED NOT NULL,\n"
331 " group_generation BIGINT UNSIGNED NOT NULL,\n"
332 " multicast_flags BIGINT UNSIGNED NOT NULL,\n"
333 " psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
335 " PRIMARY KEY (channel_id, fragment_id),\n"
336 " UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
339 STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
340 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
341 " name TEXT NOT NULL,\n"
342 " value_current BLOB,\n"
343 " value_signed BLOB\n"
344 //" PRIMARY KEY (channel_id, name(255))\n"
347 STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
348 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
349 " name TEXT NOT NULL,\n"
351 //" PRIMARY KEY (channel_id, name(255))\n"
355 /* Prepare statements */
356 #define PREP(stmt,handle) \
357 if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
360 return GNUNET_SYSERR; \
362 PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
363 &plugin->insert_channel_key);
364 PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
365 &plugin->insert_slave_key);
366 PREP ("INSERT INTO membership\n"
367 " (channel_id, slave_id, did_join, announced_at,\n"
368 " effective_since, group_generation)\n"
369 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
370 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
372 &plugin->insert_membership);
373 PREP ("SELECT did_join FROM membership\n"
374 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
375 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
376 " AND effective_since <= ? AND did_join = 1\n"
377 "ORDER BY announced_at DESC LIMIT 1;",
378 &plugin->select_membership);
380 PREP ("INSERT IGNORE INTO messages\n"
381 " (channel_id, hop_counter, signature, purpose,\n"
382 " fragment_id, fragment_offset, message_id,\n"
383 " group_generation, multicast_flags, psycstore_flags, data)\n"
384 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
385 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
386 &plugin->insert_fragment);
388 PREP ("UPDATE messages\n"
389 "SET psycstore_flags = psycstore_flags | ?\n"
390 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
391 " AND message_id = ? AND fragment_offset = 0;",
392 &plugin->update_message_flags);
394 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
395 " fragment_offset, message_id, group_generation,\n"
396 " multicast_flags, psycstore_flags, data\n"
398 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
399 " AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
400 &plugin->select_fragments);
402 /** @todo select_messages: add method_prefix filter */
403 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
404 " fragment_offset, message_id, group_generation,\n"
405 " multicast_flags, psycstore_flags, data\n"
407 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
408 " AND ? <= message_id AND message_id <= ?\n"
410 &plugin->select_messages);
412 PREP ("SELECT * FROM\n"
413 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
414 " fragment_offset, message_id, group_generation,\n"
415 " multicast_flags, psycstore_flags, data\n"
417 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
418 " ORDER BY fragment_id DESC\n"
420 "ORDER BY fragment_id;",
421 &plugin->select_latest_fragments);
423 /** @todo select_latest_messages: add method_prefix filter */
424 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
425 " fragment_offset, message_id, group_generation,\n"
426 " multicast_flags, psycstore_flags, data\n"
428 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
429 " AND message_id IN\n"
430 " (SELECT message_id\n"
432 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
433 " GROUP BY message_id\n"
434 " ORDER BY message_id\n"
436 "ORDER BY fragment_id;",
437 &plugin->select_latest_messages);
439 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
440 " fragment_offset, message_id, group_generation,\n"
441 " multicast_flags, psycstore_flags, data\n"
443 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
444 " AND message_id = ? AND fragment_offset = ?;",
445 &plugin->select_message_fragment);
447 PREP ("SELECT fragment_id, message_id, group_generation\n"
449 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
450 "ORDER BY fragment_id DESC LIMIT 1;",
451 &plugin->select_counters_message);
453 PREP ("SELECT max_state_message_id\n"
455 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
456 &plugin->select_counters_state);
458 PREP ("UPDATE channels\n"
459 "SET max_state_message_id = ?\n"
460 "WHERE pub_key = ?;",
461 &plugin->update_max_state_message_id);
463 PREP ("UPDATE channels\n"
464 "SET state_hash_message_id = ?\n"
465 "WHERE pub_key = ?;",
466 &plugin->update_state_hash_message_id);
468 PREP ("REPLACE INTO state\n"
469 " (channel_id, name, value_current, value_signed)\n"
470 "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
471 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
472 " (SELECT ?) AS name,\n"
473 " (SELECT ?) AS value_current\n"
475 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
476 " FROM state) AS old\n"
477 "ON new.channel_id = old.channel_id AND new.name = old.name;",
478 &plugin->insert_state_current);
480 PREP ("DELETE FROM state\n"
481 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
482 " AND (value_current IS NULL OR length(value_current) = 0)\n"
483 " AND (value_signed IS NULL OR length(value_signed) = 0);",
484 &plugin->delete_state_empty);
486 PREP ("UPDATE state\n"
487 "SET value_signed = value_current\n"
488 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
489 &plugin->update_state_signed);
491 PREP ("DELETE FROM state\n"
492 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
493 &plugin->delete_state);
495 PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
496 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
497 &plugin->insert_state_sync);
499 PREP ("INSERT INTO state\n"
500 " (channel_id, name, value_current, value_signed)\n"
501 "SELECT channel_id, name, value, value\n"
503 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
504 &plugin->insert_state_from_sync);
506 PREP ("DELETE FROM state_sync\n"
507 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
508 &plugin->delete_state_sync);
510 PREP ("SELECT value_current\n"
512 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
514 &plugin->select_state_one);
516 PREP ("SELECT name, value_current\n"
518 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
519 " AND (name = ? OR substr(name, 1, ?) = ?);",
520 &plugin->select_state_prefix);
522 PREP ("SELECT name, value_signed\n"
524 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
525 " AND value_signed IS NOT NULL;",
526 &plugin->select_state_signed);
534 * Shutdown database connection and associate data
536 * @param plugin the plugin context (state for this module)
539 database_shutdown (struct Plugin *plugin)
541 GNUNET_MYSQL_context_destroy (plugin->mc);
546 * Execute a prepared statement with a @a channel_key argument.
548 * @param plugin Plugin handle.
549 * @param stmt Statement to execute.
550 * @param channel_key Public key of the channel.
552 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
555 exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
556 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
558 struct GNUNET_MY_QueryParam params[] = {
559 GNUNET_MY_query_param_auto_from_type (channel_key),
560 GNUNET_MY_query_param_end
563 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
565 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
566 "mysql exec_channel", stmt);
569 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
571 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
572 "mysql_stmt_reset", stmt);
573 return GNUNET_SYSERR;
581 * Begin a transaction.
584 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
586 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
588 LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
589 return GNUNET_SYSERR;
592 plugin->transaction = transaction;
598 * Commit current transaction.
601 transaction_commit (struct Plugin *plugin)
603 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
605 LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
606 return GNUNET_SYSERR;
609 plugin->transaction = TRANSACTION_NONE;
615 * Roll back current transaction.
618 transaction_rollback (struct Plugin *plugin)
620 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
622 LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
623 return GNUNET_SYSERR;
626 plugin->transaction = TRANSACTION_NONE;
632 channel_key_store (struct Plugin *plugin,
633 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
635 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
637 struct GNUNET_MY_QueryParam params[] = {
638 GNUNET_MY_query_param_auto_from_type (channel_key),
639 GNUNET_MY_query_param_end
642 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
644 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
645 "mysql exec_prepared", stmt);
646 return GNUNET_SYSERR;
649 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
651 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
652 "mysql_stmt_reset", stmt);
653 return GNUNET_SYSERR;
661 slave_key_store (struct Plugin *plugin,
662 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
664 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
666 struct GNUNET_MY_QueryParam params[] = {
667 GNUNET_MY_query_param_auto_from_type (slave_key),
668 GNUNET_MY_query_param_end
671 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
673 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
674 "mysql exec_prepared", stmt);
675 return GNUNET_SYSERR;
678 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
680 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
681 "mysql_stmt_reset", stmt);
682 return GNUNET_SYSERR;
690 * Store join/leave events for a PSYC channel in order to be able to answer
691 * membership test queries later.
693 * @see GNUNET_PSYCSTORE_membership_store()
695 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
698 mysql_membership_store (void *cls,
699 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
700 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
702 uint64_t announced_at,
703 uint64_t effective_since,
704 uint64_t group_generation)
706 struct Plugin *plugin = cls;
708 uint32_t idid_join = (uint32_t)did_join;
710 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
712 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
714 if (announced_at > INT64_MAX ||
715 effective_since > INT64_MAX ||
716 group_generation > INT64_MAX)
719 return GNUNET_SYSERR;
722 if (GNUNET_OK != channel_key_store (plugin, channel_key)
723 || GNUNET_OK != slave_key_store (plugin, slave_key))
724 return GNUNET_SYSERR;
726 struct GNUNET_MY_QueryParam params[] = {
727 GNUNET_MY_query_param_auto_from_type (channel_key),
728 GNUNET_MY_query_param_auto_from_type (slave_key),
729 GNUNET_MY_query_param_uint32 (&idid_join),
730 GNUNET_MY_query_param_uint64 (&announced_at),
731 GNUNET_MY_query_param_uint64 (&effective_since),
732 GNUNET_MY_query_param_uint64 (&group_generation),
733 GNUNET_MY_query_param_end
736 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
738 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
739 "mysql exec_prepared", stmt);
740 return GNUNET_SYSERR;
743 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
745 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
746 "mysql_stmt_reset", stmt);
747 return GNUNET_SYSERR;
753 * Test if a member was admitted to the channel at the given message ID.
755 * @see GNUNET_PSYCSTORE_membership_test()
757 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
758 * #GNUNET_SYSERR if there was en error.
761 membership_test (void *cls,
762 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
763 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
766 struct Plugin *plugin = cls;
768 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
770 uint32_t did_join = 0;
772 int ret = GNUNET_SYSERR;
774 struct GNUNET_MY_QueryParam params_select[] = {
775 GNUNET_MY_query_param_auto_from_type (channel_key),
776 GNUNET_MY_query_param_auto_from_type (slave_key),
777 GNUNET_MY_query_param_uint64 (&message_id),
778 GNUNET_MY_query_param_end
781 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
783 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
784 "mysql execute prepared", stmt);
785 return GNUNET_SYSERR;
788 struct GNUNET_MY_ResultSpec results_select[] = {
789 GNUNET_MY_result_spec_uint32 (&did_join),
790 GNUNET_MY_result_spec_end
793 switch (GNUNET_MY_extract_result (stmt, results_select))
802 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
803 "mysql extract_result", stmt);
804 return GNUNET_SYSERR;
807 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
809 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
810 "mysql_stmt_reset", stmt);
811 return GNUNET_SYSERR;
818 * Store a message fragment sent to a channel.
820 * @see GNUNET_PSYCSTORE_fragment_store()
822 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
825 fragment_store (void *cls,
826 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
827 const struct GNUNET_MULTICAST_MessageHeader *msg,
828 uint32_t psycstore_flags)
830 struct Plugin *plugin = cls;
832 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
834 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
836 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
838 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
839 uint64_t message_id = GNUNET_ntohll (msg->message_id);
840 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
842 uint64_t hop_counter = ntohl(msg->hop_counter);
843 uint64_t flags = ntohl(msg->flags);
845 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
846 message_id > INT64_MAX || group_generation > INT64_MAX)
848 LOG(GNUNET_ERROR_TYPE_ERROR,
849 "Tried to store fragment with a field > INT64_MAX: "
850 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
851 message_id, group_generation);
853 return GNUNET_SYSERR;
856 if (GNUNET_OK != channel_key_store (plugin, channel_key))
857 return GNUNET_SYSERR;
859 struct GNUNET_MY_QueryParam params_insert[] = {
860 GNUNET_MY_query_param_auto_from_type (channel_key),
861 GNUNET_MY_query_param_uint64 (&hop_counter),
862 GNUNET_MY_query_param_auto_from_type (&msg->signature),
863 GNUNET_MY_query_param_auto_from_type (&msg->purpose),
864 GNUNET_MY_query_param_uint64 (&fragment_id),
865 GNUNET_MY_query_param_uint64 (&fragment_offset),
866 GNUNET_MY_query_param_uint64 (&message_id),
867 GNUNET_MY_query_param_uint64 (&group_generation),
868 GNUNET_MY_query_param_uint64 (&flags),
869 GNUNET_MY_query_param_uint32 (&psycstore_flags),
870 GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
872 GNUNET_MY_query_param_end
875 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
877 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
878 "mysql execute prepared", stmt);
879 return GNUNET_SYSERR;
882 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
884 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
885 "mysql_stmt_reset", stmt);
886 return GNUNET_SYSERR;
893 * Set additional flags for a given message.
895 * They are OR'd with any existing flags set.
897 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
900 message_add_flags (void *cls,
901 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
903 uint32_t psycstore_flags)
905 struct Plugin *plugin = cls;
906 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
909 int ret = GNUNET_SYSERR;
911 struct GNUNET_MY_QueryParam params_update[] = {
912 GNUNET_MY_query_param_uint32 (&psycstore_flags),
913 GNUNET_MY_query_param_auto_from_type (channel_key),
914 GNUNET_MY_query_param_uint64 (&message_id),
915 GNUNET_MY_query_param_end
918 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
926 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
927 "mysql execute prepared", stmt);
930 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
932 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
933 "mysql_stmt_reset", stmt);
934 return GNUNET_SYSERR;
942 fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
943 GNUNET_PSYCSTORE_FragmentCallback cb,
945 uint64_t *returned_fragments)
948 uint32_t hop_counter;
949 void *signature = NULL;
950 void *purpose = NULL;
951 size_t signature_size;
953 uint64_t fragment_id;
954 uint64_t fragment_offset;
956 uint64_t group_generation;
960 int ret = GNUNET_SYSERR;
962 struct GNUNET_MULTICAST_MessageHeader *mp;
964 struct GNUNET_MY_ResultSpec results[] = {
965 GNUNET_MY_result_spec_uint32 (&hop_counter),
966 GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
967 GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
968 GNUNET_MY_result_spec_uint64 (&fragment_id),
969 GNUNET_MY_result_spec_uint64 (&fragment_offset),
970 GNUNET_MY_result_spec_uint64 (&message_id),
971 GNUNET_MY_result_spec_uint64 (&group_generation),
972 GNUNET_MY_result_spec_uint64 (&msg_flags),
973 GNUNET_MY_result_spec_uint64 (&flags),
974 GNUNET_MY_result_spec_variable_size (&buf,
976 GNUNET_MY_result_spec_end
981 sql_ret = GNUNET_MY_extract_result (stmt, results);
985 if (ret != GNUNET_YES)
990 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
992 mp->header.size = htons (sizeof (*mp) + buf_size);
993 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
994 mp->hop_counter = htonl (hop_counter);
995 GNUNET_memcpy (&mp->signature,
998 GNUNET_memcpy (&mp->purpose,
1001 mp->fragment_id = GNUNET_htonll (fragment_id);
1002 mp->fragment_offset = GNUNET_htonll (fragment_offset);
1003 mp->message_id = GNUNET_htonll (message_id);
1004 mp->group_generation = GNUNET_htonll (group_generation);
1005 mp->flags = htonl(msg_flags);
1007 GNUNET_memcpy (&mp[1],
1010 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1011 if (NULL != returned_fragments)
1012 (*returned_fragments)++;
1013 GNUNET_MY_cleanup_result (results);
1017 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1018 "mysql extract_result", stmt);
1021 while (GNUNET_YES == sql_ret);
1024 if (GNUNET_NO == ret)
1025 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1026 "Empty result set\n");
1033 fragment_select (struct Plugin *plugin,
1034 struct GNUNET_MYSQL_StatementHandle *stmt,
1035 struct GNUNET_MY_QueryParam *params,
1036 uint64_t *returned_fragments,
1037 GNUNET_PSYCSTORE_FragmentCallback cb,
1040 int ret = GNUNET_SYSERR;
1043 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1047 if (ret != GNUNET_YES)
1052 ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
1056 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1057 "mysql exec_prepared", stmt);
1064 * Retrieve a message fragment range by fragment ID.
1066 * @see GNUNET_PSYCSTORE_fragment_get()
1068 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1071 fragment_get (void *cls,
1072 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1073 uint64_t first_fragment_id,
1074 uint64_t last_fragment_id,
1075 uint64_t *returned_fragments,
1076 GNUNET_PSYCSTORE_FragmentCallback cb,
1079 struct Plugin *plugin = cls;
1080 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1081 int ret = GNUNET_SYSERR;
1082 struct GNUNET_MY_QueryParam params_select[] = {
1083 GNUNET_MY_query_param_auto_from_type (channel_key),
1084 GNUNET_MY_query_param_uint64 (&first_fragment_id),
1085 GNUNET_MY_query_param_uint64 (&last_fragment_id),
1086 GNUNET_MY_query_param_end
1089 *returned_fragments = 0;
1090 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1092 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1094 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1095 "mysql_stmt_reset", stmt);
1096 return GNUNET_SYSERR;
1104 * Retrieve a message fragment range by fragment ID.
1106 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1108 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1111 fragment_get_latest (void *cls,
1112 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1113 uint64_t fragment_limit,
1114 uint64_t *returned_fragments,
1115 GNUNET_PSYCSTORE_FragmentCallback cb,
1118 struct Plugin *plugin = cls;
1120 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1122 int ret = GNUNET_SYSERR;
1123 *returned_fragments = 0;
1125 struct GNUNET_MY_QueryParam params_select[] = {
1126 GNUNET_MY_query_param_auto_from_type (channel_key),
1127 GNUNET_MY_query_param_uint64 (&fragment_limit),
1128 GNUNET_MY_query_param_end
1131 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1133 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1135 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1136 "mysql_stmt_reset", stmt);
1137 return GNUNET_SYSERR;
1145 * Retrieve all fragments of a message ID range.
1147 * @see GNUNET_PSYCSTORE_message_get()
1149 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1152 message_get (void *cls,
1153 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1154 uint64_t first_message_id,
1155 uint64_t last_message_id,
1156 uint64_t fragment_limit,
1157 uint64_t *returned_fragments,
1158 GNUNET_PSYCSTORE_FragmentCallback cb,
1161 struct Plugin *plugin = cls;
1162 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1165 if (0 == fragment_limit)
1166 fragment_limit = UINT64_MAX;
1168 struct GNUNET_MY_QueryParam params_select[] = {
1169 GNUNET_MY_query_param_auto_from_type (channel_key),
1170 GNUNET_MY_query_param_uint64 (&first_message_id),
1171 GNUNET_MY_query_param_uint64 (&last_message_id),
1172 GNUNET_MY_query_param_uint64 (&fragment_limit),
1173 GNUNET_MY_query_param_end
1176 *returned_fragments = 0;
1177 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1179 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1181 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1182 "mysql_stmt_reset", stmt);
1183 return GNUNET_SYSERR;
1191 * Retrieve all fragments of the latest messages.
1193 * @see GNUNET_PSYCSTORE_message_get_latest()
1195 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1198 message_get_latest (void *cls,
1199 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1200 uint64_t message_limit,
1201 uint64_t *returned_fragments,
1202 GNUNET_PSYCSTORE_FragmentCallback cb,
1205 struct Plugin *plugin = cls;
1207 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1209 int ret = GNUNET_SYSERR;
1210 *returned_fragments = 0;
1212 struct GNUNET_MY_QueryParam params_select[] = {
1213 GNUNET_MY_query_param_auto_from_type (channel_key),
1214 GNUNET_MY_query_param_auto_from_type (channel_key),
1215 GNUNET_MY_query_param_uint64 (&message_limit),
1216 GNUNET_MY_query_param_end
1219 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1221 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1223 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1224 "mysql_stmt_reset", stmt);
1225 return GNUNET_SYSERR;
1233 * Retrieve a fragment of message specified by its message ID and fragment
1236 * @see GNUNET_PSYCSTORE_message_get_fragment()
1238 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1241 message_get_fragment (void *cls,
1242 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1243 uint64_t message_id,
1244 uint64_t fragment_offset,
1245 GNUNET_PSYCSTORE_FragmentCallback cb,
1248 struct Plugin *plugin = cls;
1249 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1251 int ret = GNUNET_SYSERR;
1253 struct GNUNET_MY_QueryParam params_select[] = {
1254 GNUNET_MY_query_param_auto_from_type (channel_key),
1255 GNUNET_MY_query_param_uint64 (&message_id),
1256 GNUNET_MY_query_param_uint64 (&fragment_offset),
1257 GNUNET_MY_query_param_end
1260 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1268 ret = fragment_row (stmt, cb, cb_cls, NULL);
1272 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1273 "mysql execute prepared", stmt);
1276 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1278 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1279 "mysql_stmt_reset", stmt);
1280 return GNUNET_SYSERR;
1287 * Retrieve the max. values of message counters for a channel.
1289 * @see GNUNET_PSYCSTORE_counters_get()
1291 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1294 counters_message_get (void *cls,
1295 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1296 uint64_t *max_fragment_id,
1297 uint64_t *max_message_id,
1298 uint64_t *max_group_generation)
1300 struct Plugin *plugin = cls;
1302 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1304 int ret = GNUNET_SYSERR;
1306 struct GNUNET_MY_QueryParam params_select[] = {
1307 GNUNET_MY_query_param_auto_from_type (channel_key),
1308 GNUNET_MY_query_param_end
1311 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1313 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1314 "mysql execute prepared", stmt);
1315 return GNUNET_SYSERR;
1318 struct GNUNET_MY_ResultSpec results_select[] = {
1319 GNUNET_MY_result_spec_uint64 (max_fragment_id),
1320 GNUNET_MY_result_spec_uint64 (max_message_id),
1321 GNUNET_MY_result_spec_uint64 (max_group_generation),
1322 GNUNET_MY_result_spec_end
1325 ret = GNUNET_MY_extract_result (stmt, results_select);
1327 if (GNUNET_OK != ret)
1329 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1330 "mysql extract_result", stmt);
1331 return GNUNET_SYSERR;
1334 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1336 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1337 "mysql_stmt_reset", stmt);
1338 return GNUNET_SYSERR;
1345 * Retrieve the max. values of state counters for a channel.
1347 * @see GNUNET_PSYCSTORE_counters_get()
1349 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1352 counters_state_get (void *cls,
1353 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1354 uint64_t *max_state_message_id)
1356 struct Plugin *plugin = cls;
1358 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1360 int ret = GNUNET_SYSERR;
1362 struct GNUNET_MY_QueryParam params_select[] = {
1363 GNUNET_MY_query_param_auto_from_type (channel_key),
1364 GNUNET_MY_query_param_end
1367 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1369 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1370 "mysql execute prepared", stmt);
1371 return GNUNET_SYSERR;
1374 struct GNUNET_MY_ResultSpec results_select[] = {
1375 GNUNET_MY_result_spec_uint64 (max_state_message_id),
1376 GNUNET_MY_result_spec_end
1379 ret = GNUNET_MY_extract_result (stmt, results_select);
1381 if (GNUNET_OK != ret)
1383 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1384 "mysql extract_result", stmt);
1385 return GNUNET_SYSERR;
1388 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1390 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1391 "mysql_stmt_reset", stmt);
1392 return GNUNET_SYSERR;
1400 * Assign a value to a state variable.
1402 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1405 state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1406 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1407 const char *name, const void *value, size_t value_size)
1409 int ret = GNUNET_SYSERR;
1411 struct GNUNET_MY_QueryParam params[] = {
1412 GNUNET_MY_query_param_auto_from_type (channel_key),
1413 GNUNET_MY_query_param_string (name),
1414 GNUNET_MY_query_param_fixed_size(value, value_size),
1415 GNUNET_MY_query_param_end
1418 ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1419 if (GNUNET_OK != ret)
1421 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422 "mysql exec_prepared", stmt);
1423 return GNUNET_SYSERR;
1426 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1428 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1429 "mysql_stmt_reset", stmt);
1430 return GNUNET_SYSERR;
1438 update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1439 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1440 uint64_t message_id)
1442 struct GNUNET_MY_QueryParam params[] = {
1443 GNUNET_MY_query_param_uint64 (&message_id),
1444 GNUNET_MY_query_param_auto_from_type (channel_key),
1445 GNUNET_MY_query_param_end
1448 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1452 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1453 "mysql execute prepared", stmt);
1454 return GNUNET_SYSERR;
1457 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1459 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1460 "mysql_stmt_reset", stmt);
1461 return GNUNET_SYSERR;
1469 * Begin modifying current state.
1472 state_modify_begin (void *cls,
1473 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1474 uint64_t message_id, uint64_t state_delta)
1476 struct Plugin *plugin = cls;
1478 if (state_delta > 0)
1481 * We can only apply state modifiers in the current message if modifiers in
1482 * the previous stateful message (message_id - state_delta) were already
1486 uint64_t max_state_message_id = 0;
1487 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1491 case GNUNET_NO: // no state yet
1498 if (max_state_message_id < message_id - state_delta)
1499 return GNUNET_NO; /* some stateful messages not yet applied */
1500 else if (message_id - state_delta < max_state_message_id)
1501 return GNUNET_NO; /* changes already applied */
1504 if (TRANSACTION_NONE != plugin->transaction)
1506 /** @todo FIXME: wait for other transaction to finish */
1507 return GNUNET_SYSERR;
1509 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1514 * Set the current value of state variable.
1516 * @see GNUNET_PSYCSTORE_state_modify()
1518 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1521 state_modify_op (void *cls,
1522 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1523 enum GNUNET_PSYC_Operator op,
1524 const char *name, const void *value, size_t value_size)
1526 struct Plugin *plugin = cls;
1527 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1531 case GNUNET_PSYC_OP_ASSIGN:
1532 return state_assign (plugin, plugin->insert_state_current,
1533 channel_key, name, value, value_size);
1535 default: /** @todo implement more state operations */
1537 return GNUNET_SYSERR;
1543 * End modifying current state.
1546 state_modify_end (void *cls,
1547 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1548 uint64_t message_id)
1550 struct Plugin *plugin = cls;
1551 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1554 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1555 && GNUNET_OK == update_message_id (plugin,
1556 plugin->update_max_state_message_id,
1557 channel_key, message_id)
1558 && GNUNET_OK == transaction_commit (plugin)
1559 ? GNUNET_OK : GNUNET_SYSERR;
1564 * Begin state synchronization.
1567 state_sync_begin (void *cls,
1568 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1570 struct Plugin *plugin = cls;
1571 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1576 * Assign current value of a state variable.
1578 * @see GNUNET_PSYCSTORE_state_modify()
1580 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1583 state_sync_assign (void *cls,
1584 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1585 const char *name, const void *value, size_t value_size)
1587 struct Plugin *plugin = cls;
1588 return state_assign (cls, plugin->insert_state_sync,
1589 channel_key, name, value, value_size);
1594 * End modifying current state.
1597 state_sync_end (void *cls,
1598 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1599 uint64_t max_state_message_id,
1600 uint64_t state_hash_message_id)
1602 struct Plugin *plugin = cls;
1603 int ret = GNUNET_SYSERR;
1605 if (TRANSACTION_NONE != plugin->transaction)
1607 /** @todo FIXME: wait for other transaction to finish */
1608 return GNUNET_SYSERR;
1611 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1612 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1613 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1615 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1617 && GNUNET_OK == update_message_id (plugin,
1618 plugin->update_state_hash_message_id,
1619 channel_key, state_hash_message_id)
1620 && GNUNET_OK == update_message_id (plugin,
1621 plugin->update_max_state_message_id,
1622 channel_key, max_state_message_id)
1623 && GNUNET_OK == transaction_commit (plugin)
1625 : transaction_rollback (plugin);
1631 * Delete the whole state.
1633 * @see GNUNET_PSYCSTORE_state_reset()
1635 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1638 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1640 struct Plugin *plugin = cls;
1641 return exec_channel (plugin, plugin->delete_state, channel_key);
1646 * Update signed values of state variables in the state store.
1648 * @see GNUNET_PSYCSTORE_state_hash_update()
1650 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1653 state_update_signed (void *cls,
1654 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1656 struct Plugin *plugin = cls;
1657 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1662 * Retrieve a state variable by name.
1664 * @see GNUNET_PSYCSTORE_state_get()
1666 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1669 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1670 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1672 struct Plugin *plugin = cls;
1673 int ret = GNUNET_SYSERR;
1676 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1678 struct GNUNET_MY_QueryParam params_select[] = {
1679 GNUNET_MY_query_param_auto_from_type (channel_key),
1680 GNUNET_MY_query_param_string (name),
1681 GNUNET_MY_query_param_end
1684 void *value_current = NULL;
1685 size_t value_size = 0;
1687 struct GNUNET_MY_ResultSpec results[] = {
1688 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1689 GNUNET_MY_result_spec_end
1692 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1694 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1695 "mysql exec_prepared", stmt);
1699 sql_ret = GNUNET_MY_extract_result (stmt, results);
1707 ret = cb (cb_cls, name, value_current, value_size);
1711 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1712 "mysql extract_result", stmt);
1716 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1718 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1719 "mysql_stmt_reset", stmt);
1720 return GNUNET_SYSERR;
1728 * Retrieve all state variables for a channel with the given prefix.
1730 * @see GNUNET_PSYCSTORE_state_get_prefix()
1732 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1735 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1736 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1739 struct Plugin *plugin = cls;
1740 int ret = GNUNET_SYSERR;
1742 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1744 uint32_t name_len = (uint32_t) strlen (name);
1746 struct GNUNET_MY_QueryParam params_select[] = {
1747 GNUNET_MY_query_param_auto_from_type (channel_key),
1748 GNUNET_MY_query_param_string (name),
1749 GNUNET_MY_query_param_uint32 (&name_len),
1750 GNUNET_MY_query_param_string (name),
1751 GNUNET_MY_query_param_end
1755 void *value_current = NULL;
1756 size_t value_size = 0;
1758 struct GNUNET_MY_ResultSpec results[] = {
1759 GNUNET_MY_result_spec_string (&name2),
1760 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1761 GNUNET_MY_result_spec_end
1766 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1768 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1769 "mysql exec_prepared", stmt);
1770 return GNUNET_SYSERR;
1775 sql_ret = GNUNET_MY_extract_result (stmt, results);
1779 if (ret != GNUNET_YES)
1784 ret = cb (cb_cls, (const char *) name2, value_current, value_size);
1786 if (ret != GNUNET_YES)
1787 sql_ret = GNUNET_NO;
1791 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1792 "mysql extract_result", stmt);
1795 while (sql_ret == GNUNET_YES);
1797 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1799 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1800 "mysql_stmt_reset", stmt);
1801 return GNUNET_SYSERR;
1809 * Retrieve all signed state variables for a channel.
1811 * @see GNUNET_PSYCSTORE_state_get_signed()
1813 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1816 state_get_signed (void *cls,
1817 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1818 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1820 struct Plugin *plugin = cls;
1821 int ret = GNUNET_SYSERR;
1823 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1825 struct GNUNET_MY_QueryParam params_select[] = {
1826 GNUNET_MY_query_param_auto_from_type (channel_key),
1827 GNUNET_MY_query_param_end
1833 void *value_signed = NULL;
1834 size_t value_size = 0;
1836 struct GNUNET_MY_ResultSpec results[] = {
1837 GNUNET_MY_result_spec_string (&name),
1838 GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1839 GNUNET_MY_result_spec_end
1842 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1844 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1845 "mysql exec_prepared", stmt);
1846 return GNUNET_SYSERR;
1851 sql_ret = GNUNET_MY_extract_result (stmt, results);
1855 if (ret != GNUNET_YES)
1860 ret = cb (cb_cls, (const char *) name, value_signed, value_size);
1862 if (ret != GNUNET_YES)
1863 sql_ret = GNUNET_NO;
1867 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1868 "mysql extract_result", stmt);
1871 while (sql_ret == GNUNET_YES);
1873 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1875 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1876 "mysql_stmt_reset", stmt);
1877 return GNUNET_SYSERR;
1885 * Entry point for the plugin.
1887 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1888 * @return NULL on error, otherwise the plugin context
1891 libgnunet_plugin_psycstore_mysql_init (void *cls)
1893 static struct Plugin plugin;
1894 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1895 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1897 if (NULL != plugin.cfg)
1898 return NULL; /* can only initialize once! */
1899 memset (&plugin, 0, sizeof (struct Plugin));
1901 if (GNUNET_OK != database_setup (&plugin))
1903 database_shutdown (&plugin);
1906 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1908 api->membership_store = &mysql_membership_store;
1909 api->membership_test = &membership_test;
1910 api->fragment_store = &fragment_store;
1911 api->message_add_flags = &message_add_flags;
1912 api->fragment_get = &fragment_get;
1913 api->fragment_get_latest = &fragment_get_latest;
1914 api->message_get = &message_get;
1915 api->message_get_latest = &message_get_latest;
1916 api->message_get_fragment = &message_get_fragment;
1917 api->counters_message_get = &counters_message_get;
1918 api->counters_state_get = &counters_state_get;
1919 api->state_modify_begin = &state_modify_begin;
1920 api->state_modify_op = &state_modify_op;
1921 api->state_modify_end = &state_modify_end;
1922 api->state_sync_begin = &state_sync_begin;
1923 api->state_sync_assign = &state_sync_assign;
1924 api->state_sync_end = &state_sync_end;
1925 api->state_reset = &state_reset;
1926 api->state_update_signed = &state_update_signed;
1927 api->state_get = &state_get;
1928 api->state_get_prefix = &state_get_prefix;
1929 api->state_get_signed = &state_get_signed;
1931 LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
1937 * Exit point from the plugin.
1939 * @param cls The plugin context (as returned by "init")
1940 * @return Always NULL
1943 libgnunet_plugin_psycstore_mysql_done (void *cls)
1945 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1946 struct Plugin *plugin = api->cls;
1948 database_shutdown (plugin);
1951 LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
1955 /* end of plugin_psycstore_mysql.c */