2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/plugin_psycstore_sqlite.c
23 * @brief sqlite-based psycstore backend
24 * @author Gabor X Toth
25 * @author Christian Grothoff
29 * FIXME: SQLite3 only supports signed 64-bit integers natively,
30 * thus it can only store 63 bits of the uint64_t's.
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
42 * After how many ms "busy" should a DB operation fail for good? A
43 * low value makes sure that we are more responsive to requests
44 * (especially PUTs). A high value guarantees a higher success rate
45 * (SELECTs in iterate can take several seconds despite LIMIT=1).
47 * The default value of 1s should ensure that users do not experience
48 * huge latencies while at the same time allowing operations to
49 * succeed with reasonable probability.
51 #define BUSY_TIMEOUT_MS 1000
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
56 * Log an error message at log-level 'level' that indicates
57 * a failure of the command 'cmd' on file 'filename'
58 * with the message given by strerror(errno).
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
66 TRANSACTION_STATE_MODIFY
70 * Context for all functions in this plugin.
75 const struct GNUNET_CONFIGURATION_Handle *cfg;
83 * Native SQLite database handle.
88 * Current transaction.
90 enum Transactions transaction;
92 sqlite3_stmt *transaction_begin;
94 sqlite3_stmt *transaction_commit;
96 sqlite3_stmt *transaction_rollback;
99 * Precompiled SQL for channel_key_store()
101 sqlite3_stmt *insert_channel_key;
104 * Precompiled SQL for slave_key_store()
106 sqlite3_stmt *insert_slave_key;
110 * Precompiled SQL for membership_store()
112 sqlite3_stmt *insert_membership;
115 * Precompiled SQL for membership_test()
117 sqlite3_stmt *select_membership;
121 * Precompiled SQL for fragment_store()
123 sqlite3_stmt *insert_fragment;
126 * Precompiled SQL for message_add_flags()
128 sqlite3_stmt *update_message_flags;
131 * Precompiled SQL for fragment_get()
133 sqlite3_stmt *select_fragments;
136 * Precompiled SQL for fragment_get()
138 sqlite3_stmt *select_latest_fragments;
141 * Precompiled SQL for message_get()
143 sqlite3_stmt *select_messages;
146 * Precompiled SQL for message_get()
148 sqlite3_stmt *select_latest_messages;
151 * Precompiled SQL for message_get_fragment()
153 sqlite3_stmt *select_message_fragment;
156 * Precompiled SQL for counters_get_message()
158 sqlite3_stmt *select_counters_message;
161 * Precompiled SQL for counters_get_state()
163 sqlite3_stmt *select_counters_state;
166 * Precompiled SQL for state_modify_end()
168 sqlite3_stmt *update_state_hash_message_id;
171 * Precompiled SQL for state_sync_end()
173 sqlite3_stmt *update_max_state_message_id;
177 * Precompiled SQL for message_modify_begin()
179 sqlite3_stmt *select_message_state_delta;
182 * Precompiled SQL for state_modify_set()
184 sqlite3_stmt *insert_state_current;
187 * Precompiled SQL for state_modify_end()
189 sqlite3_stmt *delete_state_empty;
192 * Precompiled SQL for state_set_signed()
194 sqlite3_stmt *update_state_signed;
197 * Precompiled SQL for state_sync()
199 sqlite3_stmt *insert_state_sync;
202 * Precompiled SQL for state_sync()
204 sqlite3_stmt *delete_state;
207 * Precompiled SQL for state_sync()
209 sqlite3_stmt *insert_state_from_sync;
212 * Precompiled SQL for state_sync()
214 sqlite3_stmt *delete_state_sync;
217 * Precompiled SQL for state_get_signed()
219 sqlite3_stmt *select_state_signed;
222 * Precompiled SQL for state_get()
224 sqlite3_stmt *select_state_one;
227 * Precompiled SQL for state_get_prefix()
229 sqlite3_stmt *select_state_prefix;
236 sql_trace (void *cls, const char *sql)
238 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
244 * @brief Prepare a SQL statement
246 * @param dbh handle to the database
247 * @param sql SQL statement, UTF-8 encoded
248 * @param stmt set to the prepared statement
249 * @return 0 on success
252 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
257 result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
258 (const char **) &tail);
259 LOG (GNUNET_ERROR_TYPE_DEBUG,
260 "Prepared `%s' / %p: %d\n", sql, *stmt, result);
261 if (result != SQLITE_OK)
262 LOG (GNUNET_ERROR_TYPE_ERROR,
263 _("Error preparing SQL query: %s\n %s\n"),
264 sqlite3_errmsg (dbh), sql);
270 * @brief Prepare a SQL statement
272 * @param dbh handle to the database
273 * @param sql SQL statement, UTF-8 encoded
274 * @return 0 on success
277 sql_exec (sqlite3 *dbh, const char *sql)
281 result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
282 LOG (GNUNET_ERROR_TYPE_DEBUG,
283 "Executed `%s' / %d\n", sql, result);
284 if (result != SQLITE_OK)
285 LOG (GNUNET_ERROR_TYPE_ERROR,
286 _("Error executing SQL query: %s\n %s\n"),
287 sqlite3_errmsg (dbh), sql);
293 * Initialize the database connections and associated
294 * data structures (create tables and indices
295 * as needed as well).
297 * @param plugin the plugin context (state for this module)
298 * @return GNUNET_OK on success
301 database_setup (struct Plugin *plugin)
306 GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
307 "FILENAME", &filename))
309 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
310 "psycstore-sqlite", "FILENAME");
311 return GNUNET_SYSERR;
313 if (GNUNET_OK != GNUNET_DISK_file_test (filename))
315 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
318 GNUNET_free (filename);
319 return GNUNET_SYSERR;
322 /* filename should be UTF-8-encoded. If it isn't, it's a bug */
323 plugin->fn = filename;
325 /* Open database and precompile statements */
326 if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
328 LOG (GNUNET_ERROR_TYPE_ERROR,
329 _("Unable to initialize SQLite: %s.\n"),
330 sqlite3_errmsg (plugin->dbh));
331 return GNUNET_SYSERR;
335 sqlite3_trace (plugin->dbh, &sql_trace, NULL);
338 sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
339 sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
340 sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
341 sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
342 sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
343 #if ! DEBUG_PSYCSTORE
344 sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
346 sql_exec (plugin->dbh, "PRAGMA page_size=4096");
348 sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
352 sql_exec (plugin->dbh,
353 "CREATE TABLE IF NOT EXISTS channels (\n"
354 " id INTEGER PRIMARY KEY,\n"
355 " pub_key BLOB UNIQUE,\n"
356 " max_state_message_id INTEGER,\n"
357 " state_hash_message_id INTEGER\n"
360 sql_exec (plugin->dbh,
361 "CREATE TABLE IF NOT EXISTS slaves (\n"
362 " id INTEGER PRIMARY KEY,\n"
363 " pub_key BLOB UNIQUE\n"
366 sql_exec (plugin->dbh,
367 "CREATE TABLE IF NOT EXISTS membership (\n"
368 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
369 " slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
370 " did_join INTEGER NOT NULL,\n"
371 " announced_at INTEGER NOT NULL,\n"
372 " effective_since INTEGER NOT NULL,\n"
373 " group_generation INTEGER NOT NULL\n"
375 sql_exec (plugin->dbh,
376 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
377 "ON membership (channel_id, slave_id);");
379 /** @todo messages table: add method_name column */
380 sql_exec (plugin->dbh,
381 "CREATE TABLE IF NOT EXISTS messages (\n"
382 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
383 " hop_counter INTEGER NOT NULL,\n"
386 " fragment_id INTEGER NOT NULL,\n"
387 " fragment_offset INTEGER NOT NULL,\n"
388 " message_id INTEGER NOT NULL,\n"
389 " group_generation INTEGER NOT NULL,\n"
390 " multicast_flags INTEGER NOT NULL,\n"
391 " psycstore_flags INTEGER NOT NULL,\n"
393 " PRIMARY KEY (channel_id, fragment_id),\n"
394 " UNIQUE (channel_id, message_id, fragment_offset)\n"
397 sql_exec (plugin->dbh,
398 "CREATE TABLE IF NOT EXISTS state (\n"
399 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
400 " name TEXT NOT NULL,\n"
401 " value_current BLOB,\n"
402 " value_signed BLOB,\n"
403 " PRIMARY KEY (channel_id, name)\n"
406 sql_exec (plugin->dbh,
407 "CREATE TABLE IF NOT EXISTS state_sync (\n"
408 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
409 " name TEXT NOT NULL,\n"
411 " PRIMARY KEY (channel_id, name)\n"
414 /* Prepare statements */
416 sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
418 sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
420 sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
422 sql_prepare (plugin->dbh,
423 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
424 &plugin->insert_channel_key);
426 sql_prepare (plugin->dbh,
427 "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
428 &plugin->insert_slave_key);
430 sql_prepare (plugin->dbh,
431 "INSERT INTO membership\n"
432 " (channel_id, slave_id, did_join, announced_at,\n"
433 " effective_since, group_generation)\n"
434 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
435 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
437 &plugin->insert_membership);
439 sql_prepare (plugin->dbh,
440 "SELECT did_join FROM membership\n"
441 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
442 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
443 " AND effective_since <= ? AND did_join = 1\n"
444 "ORDER BY announced_at DESC LIMIT 1;",
445 &plugin->select_membership);
447 sql_prepare (plugin->dbh,
448 "INSERT OR IGNORE INTO messages\n"
449 " (channel_id, hop_counter, signature, purpose,\n"
450 " fragment_id, fragment_offset, message_id,\n"
451 " group_generation, multicast_flags, psycstore_flags, data)\n"
452 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
453 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
454 &plugin->insert_fragment);
456 sql_prepare (plugin->dbh,
458 "SET psycstore_flags = psycstore_flags | ?\n"
459 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
460 " AND message_id = ? AND fragment_offset = 0;",
461 &plugin->update_message_flags);
463 sql_prepare (plugin->dbh,
464 "SELECT hop_counter, signature, purpose, fragment_id,\n"
465 " fragment_offset, message_id, group_generation,\n"
466 " multicast_flags, psycstore_flags, data\n"
468 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
469 " AND ? <= fragment_id AND fragment_id <= ?;",
470 &plugin->select_fragments);
472 /** @todo select_messages: add method_prefix filter */
473 sql_prepare (plugin->dbh,
474 "SELECT hop_counter, signature, purpose, fragment_id,\n"
475 " fragment_offset, message_id, group_generation,\n"
476 " multicast_flags, psycstore_flags, data\n"
478 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
479 " AND ? <= message_id AND message_id <= ?;",
480 &plugin->select_messages);
482 sql_prepare (plugin->dbh,
484 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
485 " fragment_offset, message_id, group_generation,\n"
486 " multicast_flags, psycstore_flags, data\n"
488 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
489 " ORDER BY fragment_id DESC\n"
491 "ORDER BY fragment_id;",
492 &plugin->select_latest_fragments);
494 /** @todo select_latest_messages: add method_prefix filter */
495 sql_prepare (plugin->dbh,
496 "SELECT hop_counter, signature, purpose, fragment_id,\n"
497 " fragment_offset, message_id, group_generation,\n"
498 " multicast_flags, psycstore_flags, data\n"
500 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
501 " AND message_id IN\n"
502 " (SELECT message_id\n"
504 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
505 " GROUP BY message_id\n"
506 " ORDER BY message_id\n"
508 "ORDER BY fragment_id;",
509 &plugin->select_latest_messages);
511 sql_prepare (plugin->dbh,
512 "SELECT hop_counter, signature, purpose, fragment_id,\n"
513 " fragment_offset, message_id, group_generation,\n"
514 " multicast_flags, psycstore_flags, data\n"
516 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
517 " AND message_id = ? AND fragment_offset = ?;",
518 &plugin->select_message_fragment);
520 sql_prepare (plugin->dbh,
521 "SELECT fragment_id, message_id, group_generation\n"
523 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
524 "ORDER BY fragment_id DESC LIMIT 1;",
525 &plugin->select_counters_message);
527 sql_prepare (plugin->dbh,
528 "SELECT max_state_message_id\n"
530 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
531 &plugin->select_counters_state);
533 sql_prepare (plugin->dbh,
535 "SET max_state_message_id = ?\n"
536 "WHERE pub_key = ?;",
537 &plugin->update_max_state_message_id);
539 sql_prepare (plugin->dbh,
541 "SET state_hash_message_id = ?\n"
542 "WHERE pub_key = ?;",
543 &plugin->update_state_hash_message_id);
545 sql_prepare (plugin->dbh,
547 "FROM channels AS c\n"
548 "LEFT JOIN messages AS m\n"
549 "ON c.id = m.channel_id\n"
550 "WHERE c.pub_key = ?\n"
551 " AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
552 " OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
554 &plugin->select_message_state_delta);
556 sql_prepare (plugin->dbh,
557 "INSERT OR REPLACE INTO state\n"
558 " (channel_id, name, value_current, value_signed)\n"
559 "SELECT new.channel_id, new.name,\n"
560 " new.value_current, old.value_signed\n"
561 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
563 " ? AS name, ? AS value_current) AS new\n"
564 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
565 " FROM state) AS old\n"
566 "ON new.channel_id = old.channel_id AND new.name = old.name;",
567 &plugin->insert_state_current);
569 sql_prepare (plugin->dbh,
570 "DELETE FROM state\n"
571 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
572 " AND (value_current IS NULL OR length(value_current) = 0)\n"
573 " AND (value_signed IS NULL OR length(value_signed) = 0);",
574 &plugin->delete_state_empty);
576 sql_prepare (plugin->dbh,
578 "SET value_signed = value_current\n"
579 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
580 &plugin->update_state_signed);
582 sql_prepare (plugin->dbh,
583 "DELETE FROM state\n"
584 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
585 &plugin->delete_state);
587 sql_prepare (plugin->dbh,
588 "INSERT INTO state_sync (channel_id, name, value)\n"
589 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
590 &plugin->insert_state_sync);
592 sql_prepare (plugin->dbh,
593 "INSERT INTO state\n"
594 " (channel_id, name, value_current, value_signed)\n"
595 "SELECT channel_id, name, value, value\n"
597 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
598 &plugin->insert_state_from_sync);
600 sql_prepare (plugin->dbh,
601 "DELETE FROM state_sync\n"
602 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
603 &plugin->delete_state_sync);
605 sql_prepare (plugin->dbh,
606 "SELECT value_current\n"
608 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
610 &plugin->select_state_one);
612 sql_prepare (plugin->dbh,
613 "SELECT name, value_current\n"
615 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
616 " AND (name = ? OR name LIKE ?);",
617 &plugin->select_state_prefix);
619 sql_prepare (plugin->dbh,
620 "SELECT name, value_signed\n"
622 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
623 " AND value_signed IS NOT NULL;",
624 &plugin->select_state_signed);
631 * Shutdown database connection and associate data
633 * @param plugin the plugin context (state for this module)
636 database_shutdown (struct Plugin *plugin)
640 while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
642 result = sqlite3_finalize (stmt);
643 if (SQLITE_OK != result)
644 LOG (GNUNET_ERROR_TYPE_WARNING,
645 "Failed to close statement %p: %d\n", stmt, result);
647 if (SQLITE_OK != sqlite3_close (plugin->dbh))
648 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
650 GNUNET_free_non_null (plugin->fn);
654 * Execute a prepared statement with a @a channel_key argument.
656 * @param plugin Plugin handle.
657 * @param stmt Statement to execute.
658 * @param channel_key Public key of the channel.
660 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
663 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
664 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
666 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
667 sizeof (*channel_key), SQLITE_STATIC))
669 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
672 else if (SQLITE_DONE != sqlite3_step (stmt))
674 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
678 if (SQLITE_OK != sqlite3_reset (stmt))
680 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
682 return GNUNET_SYSERR;
689 * Begin a transaction.
692 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
694 sqlite3_stmt *stmt = plugin->transaction_begin;
696 if (SQLITE_DONE != sqlite3_step (stmt))
698 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
701 if (SQLITE_OK != sqlite3_reset (stmt))
703 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
705 return GNUNET_SYSERR;
708 plugin->transaction = transaction;
714 * Commit current transaction.
717 transaction_commit (struct Plugin *plugin)
719 sqlite3_stmt *stmt = plugin->transaction_commit;
721 if (SQLITE_DONE != sqlite3_step (stmt))
723 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
726 if (SQLITE_OK != sqlite3_reset (stmt))
728 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
730 return GNUNET_SYSERR;
733 plugin->transaction = TRANSACTION_NONE;
739 * Roll back current transaction.
742 transaction_rollback (struct Plugin *plugin)
744 sqlite3_stmt *stmt = plugin->transaction_rollback;
746 if (SQLITE_DONE != sqlite3_step (stmt))
748 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
751 if (SQLITE_OK != sqlite3_reset (stmt))
753 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
755 return GNUNET_SYSERR;
757 plugin->transaction = TRANSACTION_NONE;
763 channel_key_store (struct Plugin *plugin,
764 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
766 sqlite3_stmt *stmt = plugin->insert_channel_key;
768 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
769 sizeof (*channel_key), SQLITE_STATIC))
771 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
774 else if (SQLITE_DONE != sqlite3_step (stmt))
776 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
780 if (SQLITE_OK != sqlite3_reset (stmt))
782 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
784 return GNUNET_SYSERR;
792 slave_key_store (struct Plugin *plugin,
793 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
795 sqlite3_stmt *stmt = plugin->insert_slave_key;
797 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
798 sizeof (*slave_key), SQLITE_STATIC))
800 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
803 else if (SQLITE_DONE != sqlite3_step (stmt))
805 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
810 if (SQLITE_OK != sqlite3_reset (stmt))
812 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
814 return GNUNET_SYSERR;
822 * Store join/leave events for a PSYC channel in order to be able to answer
823 * membership test queries later.
825 * @see GNUNET_PSYCSTORE_membership_store()
827 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
830 membership_store (void *cls,
831 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
832 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
834 uint64_t announced_at,
835 uint64_t effective_since,
836 uint64_t group_generation)
838 struct Plugin *plugin = cls;
839 sqlite3_stmt *stmt = plugin->insert_membership;
841 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
843 if (announced_at > INT64_MAX ||
844 effective_since > INT64_MAX ||
845 group_generation > INT64_MAX)
848 return GNUNET_SYSERR;
851 if (GNUNET_OK != channel_key_store (plugin, channel_key)
852 || GNUNET_OK != slave_key_store (plugin, slave_key))
853 return GNUNET_SYSERR;
855 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
856 sizeof (*channel_key), SQLITE_STATIC)
857 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
858 sizeof (*slave_key), SQLITE_STATIC)
859 || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
860 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
861 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
862 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
864 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
867 else if (SQLITE_DONE != sqlite3_step (stmt))
869 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
873 if (SQLITE_OK != sqlite3_reset (stmt))
875 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
877 return GNUNET_SYSERR;
884 * Test if a member was admitted to the channel at the given message ID.
886 * @see GNUNET_PSYCSTORE_membership_test()
888 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
889 * #GNUNET_SYSERR if there was en error.
892 membership_test (void *cls,
893 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
894 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
897 struct Plugin *plugin = cls;
898 sqlite3_stmt *stmt = plugin->select_membership;
899 int ret = GNUNET_SYSERR;
901 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
902 sizeof (*channel_key), SQLITE_STATIC)
903 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
904 sizeof (*slave_key), SQLITE_STATIC)
905 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
907 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
912 switch (sqlite3_step (stmt))
922 if (SQLITE_OK != sqlite3_reset (stmt))
924 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
932 * Store a message fragment sent to a channel.
934 * @see GNUNET_PSYCSTORE_fragment_store()
936 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
939 fragment_store (void *cls,
940 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
941 const struct GNUNET_MULTICAST_MessageHeader *msg,
942 uint32_t psycstore_flags)
944 struct Plugin *plugin = cls;
945 sqlite3_stmt *stmt = plugin->insert_fragment;
947 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
949 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
950 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
951 uint64_t message_id = GNUNET_ntohll (msg->message_id);
952 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
954 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
955 message_id > INT64_MAX || group_generation > INT64_MAX)
957 LOG (GNUNET_ERROR_TYPE_ERROR,
958 "Tried to store fragment with a field > INT64_MAX: "
959 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
960 message_id, group_generation);
962 return GNUNET_SYSERR;
965 if (GNUNET_OK != channel_key_store (plugin, channel_key))
966 return GNUNET_SYSERR;
968 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
969 sizeof (*channel_key), SQLITE_STATIC)
970 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
971 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
972 sizeof (msg->signature), SQLITE_STATIC)
973 || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
974 sizeof (msg->purpose), SQLITE_STATIC)
975 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
976 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
977 || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
978 || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
979 || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
980 || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
981 || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
982 ntohs (msg->header.size)
983 - sizeof (*msg), SQLITE_STATIC))
985 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
988 else if (SQLITE_DONE != sqlite3_step (stmt))
990 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
994 if (SQLITE_OK != sqlite3_reset (stmt))
996 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
998 return GNUNET_SYSERR;
1005 * Set additional flags for a given message.
1007 * They are OR'd with any existing flags set.
1009 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1012 message_add_flags (void *cls,
1013 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1014 uint64_t message_id,
1015 uint64_t psycstore_flags)
1017 struct Plugin *plugin = cls;
1018 sqlite3_stmt *stmt = plugin->update_message_flags;
1019 int ret = GNUNET_SYSERR;
1021 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1022 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1023 sizeof (*channel_key), SQLITE_STATIC)
1024 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1026 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1031 switch (sqlite3_step (stmt))
1034 ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1037 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1042 if (SQLITE_OK != sqlite3_reset (stmt))
1044 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1046 return GNUNET_SYSERR;
1053 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1056 int data_size = sqlite3_column_bytes (stmt, 9);
1057 struct GNUNET_MULTICAST_MessageHeader *msg
1058 = GNUNET_malloc (sizeof (*msg) + data_size);
1060 msg->header.size = htons (sizeof (*msg) + data_size);
1061 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1062 msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1063 memcpy (&msg->signature,
1064 sqlite3_column_blob (stmt, 1),
1065 sqlite3_column_bytes (stmt, 1));
1066 memcpy (&msg->purpose,
1067 sqlite3_column_blob (stmt, 2),
1068 sqlite3_column_bytes (stmt, 2));
1069 msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1070 msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1071 msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1072 msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1073 msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1074 memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1076 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1081 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1082 uint64_t *returned_fragments,
1083 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1085 int ret = GNUNET_SYSERR;
1090 sql_ret = sqlite3_step (stmt);
1094 if (ret != GNUNET_OK)
1098 ret = fragment_row (stmt, cb, cb_cls);
1099 (*returned_fragments)++;
1100 if (ret != GNUNET_YES)
1101 sql_ret = SQLITE_DONE;
1104 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1108 while (sql_ret == SQLITE_ROW);
1114 * Retrieve a message fragment range by fragment ID.
1116 * @see GNUNET_PSYCSTORE_fragment_get()
1118 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1121 fragment_get (void *cls,
1122 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1123 uint64_t first_fragment_id,
1124 uint64_t last_fragment_id,
1125 uint64_t *returned_fragments,
1126 GNUNET_PSYCSTORE_FragmentCallback cb,
1129 struct Plugin *plugin = cls;
1130 sqlite3_stmt *stmt = plugin->select_fragments;
1131 int ret = GNUNET_SYSERR;
1132 *returned_fragments = 0;
1134 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1135 sizeof (*channel_key),
1137 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1138 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1140 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1145 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1148 if (SQLITE_OK != sqlite3_reset (stmt))
1150 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1159 * Retrieve a message fragment range by fragment ID.
1161 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1163 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1166 fragment_get_latest (void *cls,
1167 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1168 uint64_t fragment_limit,
1169 uint64_t *returned_fragments,
1170 GNUNET_PSYCSTORE_FragmentCallback cb,
1173 struct Plugin *plugin = cls;
1174 sqlite3_stmt *stmt = plugin->select_latest_fragments;
1175 int ret = GNUNET_SYSERR;
1176 *returned_fragments = 0;
1178 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1179 sizeof (*channel_key),
1181 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1183 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1188 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1191 if (SQLITE_OK != sqlite3_reset (stmt))
1193 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1202 * Retrieve all fragments of a message ID range.
1204 * @see GNUNET_PSYCSTORE_message_get()
1206 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1209 message_get (void *cls,
1210 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1211 uint64_t first_message_id,
1212 uint64_t last_message_id,
1213 uint64_t *returned_fragments,
1214 GNUNET_PSYCSTORE_FragmentCallback cb,
1217 struct Plugin *plugin = cls;
1218 sqlite3_stmt *stmt = plugin->select_messages;
1219 int ret = GNUNET_SYSERR;
1220 *returned_fragments = 0;
1222 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1223 sizeof (*channel_key),
1225 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1226 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1228 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1233 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1236 if (SQLITE_OK != sqlite3_reset (stmt))
1238 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1247 * Retrieve all fragments of the latest messages.
1249 * @see GNUNET_PSYCSTORE_message_get_latest()
1251 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1254 message_get_latest (void *cls,
1255 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1256 uint64_t message_limit,
1257 uint64_t *returned_fragments,
1258 GNUNET_PSYCSTORE_FragmentCallback cb,
1261 struct Plugin *plugin = cls;
1262 sqlite3_stmt *stmt = plugin->select_latest_messages;
1263 int ret = GNUNET_SYSERR;
1264 *returned_fragments = 0;
1266 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1267 sizeof (*channel_key),
1269 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1270 sizeof (*channel_key),
1272 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1274 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1279 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1282 if (SQLITE_OK != sqlite3_reset (stmt))
1284 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1293 * Retrieve a fragment of message specified by its message ID and fragment
1296 * @see GNUNET_PSYCSTORE_message_get_fragment()
1298 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1301 message_get_fragment (void *cls,
1302 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1303 uint64_t message_id,
1304 uint64_t fragment_offset,
1305 GNUNET_PSYCSTORE_FragmentCallback cb,
1308 struct Plugin *plugin = cls;
1309 sqlite3_stmt *stmt = plugin->select_message_fragment;
1310 int ret = GNUNET_SYSERR;
1312 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1313 sizeof (*channel_key),
1315 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1316 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1318 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1323 switch (sqlite3_step (stmt))
1329 ret = fragment_row (stmt, cb, cb_cls);
1332 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1337 if (SQLITE_OK != sqlite3_reset (stmt))
1339 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1347 * Retrieve the max. values of message counters for a channel.
1349 * @see GNUNET_PSYCSTORE_counters_get()
1351 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1354 counters_message_get (void *cls,
1355 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1356 uint64_t *max_fragment_id,
1357 uint64_t *max_message_id,
1358 uint64_t *max_group_generation)
1360 struct Plugin *plugin = cls;
1361 sqlite3_stmt *stmt = plugin->select_counters_message;
1362 int ret = GNUNET_SYSERR;
1364 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1365 sizeof (*channel_key),
1368 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1373 switch (sqlite3_step (stmt))
1379 *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1380 *max_message_id = sqlite3_column_int64 (stmt, 1);
1381 *max_group_generation = sqlite3_column_int64 (stmt, 2);
1385 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1390 if (SQLITE_OK != sqlite3_reset (stmt))
1392 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1400 * Retrieve the max. values of state counters for a channel.
1402 * @see GNUNET_PSYCSTORE_counters_get()
1404 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1407 counters_state_get (void *cls,
1408 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1409 uint64_t *max_state_message_id)
1411 struct Plugin *plugin = cls;
1412 sqlite3_stmt *stmt = plugin->select_counters_state;
1413 int ret = GNUNET_SYSERR;
1415 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1416 sizeof (*channel_key),
1419 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1424 switch (sqlite3_step (stmt))
1430 *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1434 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1439 if (SQLITE_OK != sqlite3_reset (stmt))
1441 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1450 * Set a state variable to the given value.
1452 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1455 state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1456 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1457 const char *name, const void *value, size_t value_size)
1459 int ret = GNUNET_SYSERR;
1461 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1462 sizeof (*channel_key), SQLITE_STATIC)
1463 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1464 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1467 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1472 switch (sqlite3_step (stmt))
1475 ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1478 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1483 if (SQLITE_OK != sqlite3_reset (stmt))
1485 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1487 return GNUNET_SYSERR;
1495 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1496 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1497 uint64_t message_id)
1499 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1500 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1501 sizeof (*channel_key), SQLITE_STATIC))
1503 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1506 else if (SQLITE_DONE != sqlite3_step (stmt))
1508 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1511 if (SQLITE_OK != sqlite3_reset (stmt))
1513 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1515 return GNUNET_SYSERR;
1522 * Begin modifying current state.
1525 state_modify_begin (void *cls,
1526 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1527 uint64_t message_id, uint64_t state_delta)
1529 struct Plugin *plugin = cls;
1530 sqlite3_stmt *stmt = plugin->select_message_state_delta;
1532 if (state_delta > 0)
1534 int ret = GNUNET_SYSERR;
1535 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1536 sizeof (*channel_key), SQLITE_STATIC)
1537 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1538 message_id - state_delta)
1539 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1541 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1542 message_id - state_delta)
1543 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1544 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1546 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1551 switch (sqlite3_step (stmt))
1560 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1564 if (SQLITE_OK != sqlite3_reset (stmt))
1566 ret = GNUNET_SYSERR;
1567 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1570 if (GNUNET_OK != ret)
1574 if (TRANSACTION_NONE != plugin->transaction)
1575 if (GNUNET_OK != transaction_rollback (plugin))
1576 return GNUNET_SYSERR;
1578 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1583 * Set the current value of state variable.
1585 * @see GNUNET_PSYCSTORE_state_modify()
1587 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1590 state_modify_set (void *cls,
1591 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1592 const char *name, const void *value, size_t value_size)
1594 struct Plugin *plugin = cls;
1595 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1597 return state_set (plugin, plugin->insert_state_current, channel_key,
1598 name, value, value_size);
1604 * End modifying current state.
1607 state_modify_end (void *cls,
1608 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1609 uint64_t message_id)
1611 struct Plugin *plugin = cls;
1612 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1615 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1616 && GNUNET_OK == update_message_id (plugin,
1617 plugin->update_max_state_message_id,
1618 channel_key, message_id)
1619 && GNUNET_OK == transaction_commit (plugin)
1620 ? GNUNET_OK : GNUNET_SYSERR;
1625 * Begin state synchronization.
1628 state_sync_begin (void *cls,
1629 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1631 struct Plugin *plugin = cls;
1632 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1637 * Set the current value of state variable.
1639 * @see GNUNET_PSYCSTORE_state_modify()
1641 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1644 state_sync_set (void *cls,
1645 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1646 const char *name, const void *value, size_t value_size)
1648 struct Plugin *plugin = cls;
1649 return state_set (cls, plugin->insert_state_sync, channel_key,
1650 name, value, value_size);
1655 * End modifying current state.
1658 state_sync_end (void *cls,
1659 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1660 uint64_t message_id)
1662 struct Plugin *plugin = cls;
1663 int ret = GNUNET_SYSERR;
1665 GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1666 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1667 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1669 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1671 && GNUNET_OK == update_message_id (plugin,
1672 plugin->update_state_hash_message_id,
1673 channel_key, message_id)
1674 && GNUNET_OK == transaction_commit (plugin)
1676 : transaction_rollback (plugin);
1682 * Reset the state of a channel.
1684 * @see GNUNET_PSYCSTORE_state_reset()
1686 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1689 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1691 struct Plugin *plugin = cls;
1692 return exec_channel (plugin, plugin->delete_state, channel_key);
1697 * Update signed values of state variables in the state store.
1699 * @see GNUNET_PSYCSTORE_state_hash_update()
1701 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1704 state_update_signed (void *cls,
1705 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1707 struct Plugin *plugin = cls;
1708 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1713 * Retrieve a state variable by name.
1715 * @see GNUNET_PSYCSTORE_state_get()
1717 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1720 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1721 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1723 struct Plugin *plugin = cls;
1724 int ret = GNUNET_SYSERR;
1726 sqlite3_stmt *stmt = plugin->select_state_one;
1728 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1729 sizeof (*channel_key),
1731 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1733 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1738 switch (sqlite3_step (stmt))
1744 ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1745 sqlite3_column_bytes (stmt, 0));
1748 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1753 if (SQLITE_OK != sqlite3_reset (stmt))
1755 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1764 * Retrieve all state variables for a channel with the given prefix.
1766 * @see GNUNET_PSYCSTORE_state_get_prefix()
1768 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1771 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1772 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1775 struct Plugin *plugin = cls;
1776 int ret = GNUNET_SYSERR;
1777 sqlite3_stmt *stmt = plugin->select_state_prefix;
1778 size_t name_len = strlen (name);
1781 GNUNET_asprintf (&name_prefix,
1784 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1785 sizeof (*channel_key), SQLITE_STATIC)
1786 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1787 || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1790 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1798 sql_ret = sqlite3_step (stmt);
1802 if (ret != GNUNET_OK)
1806 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1807 sqlite3_column_blob (stmt, 1),
1808 sqlite3_column_bytes (stmt, 1));
1809 if (ret != GNUNET_YES)
1810 sql_ret = SQLITE_DONE;
1813 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1817 while (sql_ret == SQLITE_ROW);
1819 if (SQLITE_OK != sqlite3_reset (stmt))
1821 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1824 GNUNET_free (name_prefix);
1830 * Retrieve all signed state variables for a channel.
1832 * @see GNUNET_PSYCSTORE_state_get_signed()
1834 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1837 state_get_signed (void *cls,
1838 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1839 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1841 struct Plugin *plugin = cls;
1842 int ret = GNUNET_SYSERR;
1844 sqlite3_stmt *stmt = plugin->select_state_signed;
1846 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1847 sizeof (*channel_key), SQLITE_STATIC))
1849 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1857 sql_ret = sqlite3_step (stmt);
1861 if (ret != GNUNET_OK)
1865 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1866 sqlite3_column_blob (stmt, 1),
1867 sqlite3_column_bytes (stmt, 1));
1868 if (ret != GNUNET_YES)
1869 sql_ret = SQLITE_DONE;
1872 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1876 while (sql_ret == SQLITE_ROW);
1879 if (SQLITE_OK != sqlite3_reset (stmt))
1881 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1890 * Entry point for the plugin.
1892 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1893 * @return NULL on error, otherwise the plugin context
1896 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1898 static struct Plugin plugin;
1899 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1900 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1902 if (NULL != plugin.cfg)
1903 return NULL; /* can only initialize once! */
1904 memset (&plugin, 0, sizeof (struct Plugin));
1906 if (GNUNET_OK != database_setup (&plugin))
1908 database_shutdown (&plugin);
1911 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1913 api->membership_store = &membership_store;
1914 api->membership_test = &membership_test;
1915 api->fragment_store = &fragment_store;
1916 api->message_add_flags = &message_add_flags;
1917 api->fragment_get = &fragment_get;
1918 api->fragment_get_latest = &fragment_get_latest;
1919 api->message_get = &message_get;
1920 api->message_get_latest = &message_get_latest;
1921 api->message_get_fragment = &message_get_fragment;
1922 api->counters_message_get = &counters_message_get;
1923 api->counters_state_get = &counters_state_get;
1924 api->state_modify_begin = &state_modify_begin;
1925 api->state_modify_set = &state_modify_set;
1926 api->state_modify_end = &state_modify_end;
1927 api->state_sync_begin = &state_sync_begin;
1928 api->state_sync_set = &state_sync_set;
1929 api->state_sync_end = &state_sync_end;
1930 api->state_reset = &state_reset;
1931 api->state_update_signed = &state_update_signed;
1932 api->state_get = &state_get;
1933 api->state_get_prefix = &state_get_prefix;
1934 api->state_get_signed = &state_get_signed;
1936 LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1942 * Exit point from the plugin.
1944 * @param cls The plugin context (as returned by "init")
1945 * @return Always NULL
1948 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1950 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1951 struct Plugin *plugin = api->cls;
1953 database_shutdown (plugin);
1956 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1960 /* end of plugin_psycstore_sqlite.c */