2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psycstore/plugin_psycstore_sqlite.c
23 * @brief sqlite-based psycstore backend
24 * @author Gabor X Toth
25 * @author Christian Grothoff
29 * FIXME: SQLite3 only supports signed 64-bit integers natively,
30 * thus it can only store 63 bits of the uint64_t's.
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
42 * After how many ms "busy" should a DB operation fail for good? A
43 * low value makes sure that we are more responsive to requests
44 * (especially PUTs). A high value guarantees a higher success rate
45 * (SELECTs in iterate can take several seconds despite LIMIT=1).
47 * The default value of 1s should ensure that users do not experience
48 * huge latencies while at the same time allowing operations to
49 * succeed with reasonable probability.
51 #define BUSY_TIMEOUT_MS 1000
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
56 * Log an error message at log-level 'level' that indicates
57 * a failure of the command 'cmd' on file 'filename'
58 * with the message given by strerror(errno).
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
66 TRANSACTION_STATE_MODIFY
70 * Context for all functions in this plugin.
75 const struct GNUNET_CONFIGURATION_Handle *cfg;
83 * Native SQLite database handle.
88 * Current transaction.
90 enum Transactions transaction;
92 sqlite3_stmt *transaction_begin;
94 sqlite3_stmt *transaction_commit;
96 sqlite3_stmt *transaction_rollback;
99 * Precompiled SQL for channel_key_store()
101 sqlite3_stmt *insert_channel_key;
104 * Precompiled SQL for slave_key_store()
106 sqlite3_stmt *insert_slave_key;
110 * Precompiled SQL for membership_store()
112 sqlite3_stmt *insert_membership;
115 * Precompiled SQL for membership_test()
117 sqlite3_stmt *select_membership;
121 * Precompiled SQL for fragment_store()
123 sqlite3_stmt *insert_fragment;
126 * Precompiled SQL for message_add_flags()
128 sqlite3_stmt *update_message_flags;
131 * Precompiled SQL for fragment_get()
133 sqlite3_stmt *select_fragments;
136 * Precompiled SQL for fragment_get()
138 sqlite3_stmt *select_latest_fragments;
141 * Precompiled SQL for message_get()
143 sqlite3_stmt *select_messages;
146 * Precompiled SQL for message_get()
148 sqlite3_stmt *select_latest_messages;
151 * Precompiled SQL for message_get_fragment()
153 sqlite3_stmt *select_message_fragment;
156 * Precompiled SQL for counters_get_message()
158 sqlite3_stmt *select_counters_message;
161 * Precompiled SQL for counters_get_state()
163 sqlite3_stmt *select_counters_state;
166 * Precompiled SQL for state_modify_end()
168 sqlite3_stmt *update_state_hash_message_id;
171 * Precompiled SQL for state_sync_end()
173 sqlite3_stmt *update_max_state_message_id;
177 * Precompiled SQL for message_modify_begin()
179 sqlite3_stmt *select_message_state_delta;
182 * Precompiled SQL for state_modify_set()
184 sqlite3_stmt *insert_state_current;
187 * Precompiled SQL for state_modify_end()
189 sqlite3_stmt *delete_state_empty;
192 * Precompiled SQL for state_set_signed()
194 sqlite3_stmt *update_state_signed;
197 * Precompiled SQL for state_sync()
199 sqlite3_stmt *insert_state_sync;
202 * Precompiled SQL for state_sync()
204 sqlite3_stmt *delete_state;
207 * Precompiled SQL for state_sync()
209 sqlite3_stmt *insert_state_from_sync;
212 * Precompiled SQL for state_sync()
214 sqlite3_stmt *delete_state_sync;
217 * Precompiled SQL for state_get_signed()
219 sqlite3_stmt *select_state_signed;
222 * Precompiled SQL for state_get()
224 sqlite3_stmt *select_state_one;
227 * Precompiled SQL for state_get_prefix()
229 sqlite3_stmt *select_state_prefix;
236 sql_trace (void *cls, const char *sql)
238 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
244 * @brief Prepare a SQL statement
246 * @param dbh handle to the database
247 * @param sql SQL statement, UTF-8 encoded
248 * @param stmt set to the prepared statement
249 * @return 0 on success
252 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
257 result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
258 (const char **) &tail);
259 LOG (GNUNET_ERROR_TYPE_DEBUG,
260 "Prepared `%s' / %p: %d\n", sql, *stmt, result);
261 if (result != SQLITE_OK)
262 LOG (GNUNET_ERROR_TYPE_ERROR,
263 _("Error preparing SQL query: %s\n %s\n"),
264 sqlite3_errmsg (dbh), sql);
270 * @brief Prepare a SQL statement
272 * @param dbh handle to the database
273 * @param sql SQL statement, UTF-8 encoded
274 * @return 0 on success
277 sql_exec (sqlite3 *dbh, const char *sql)
281 result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
282 LOG (GNUNET_ERROR_TYPE_DEBUG,
283 "Executed `%s' / %d\n", sql, result);
284 if (result != SQLITE_OK)
285 LOG (GNUNET_ERROR_TYPE_ERROR,
286 _("Error executing SQL query: %s\n %s\n"),
287 sqlite3_errmsg (dbh), sql);
293 * Initialize the database connections and associated
294 * data structures (create tables and indices
295 * as needed as well).
297 * @param plugin the plugin context (state for this module)
298 * @return GNUNET_OK on success
301 database_setup (struct Plugin *plugin)
306 GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
307 "FILENAME", &filename))
309 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
310 "psycstore-sqlite", "FILENAME");
311 return GNUNET_SYSERR;
313 if (GNUNET_OK != GNUNET_DISK_file_test (filename))
315 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
318 GNUNET_free (filename);
319 return GNUNET_SYSERR;
322 /* filename should be UTF-8-encoded. If it isn't, it's a bug */
323 plugin->fn = filename;
325 /* Open database and precompile statements */
326 if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
328 LOG (GNUNET_ERROR_TYPE_ERROR,
329 _("Unable to initialize SQLite: %s.\n"),
330 sqlite3_errmsg (plugin->dbh));
331 return GNUNET_SYSERR;
335 sqlite3_trace (plugin->dbh, &sql_trace, NULL);
338 sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
339 sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
340 sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
341 sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
342 sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
343 #if ! DEBUG_PSYCSTORE
344 sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
346 sql_exec (plugin->dbh, "PRAGMA count_changes=OFF");
347 sql_exec (plugin->dbh, "PRAGMA page_size=4096");
349 sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
353 sql_exec (plugin->dbh,
354 "CREATE TABLE IF NOT EXISTS channels (\n"
355 " id INTEGER PRIMARY KEY,\n"
356 " pub_key BLOB UNIQUE,\n"
357 " max_state_message_id INTEGER,\n"
358 " state_hash_message_id INTEGER\n"
361 sql_exec (plugin->dbh,
362 "CREATE TABLE IF NOT EXISTS slaves (\n"
363 " id INTEGER PRIMARY KEY,\n"
364 " pub_key BLOB UNIQUE\n"
367 sql_exec (plugin->dbh,
368 "CREATE TABLE IF NOT EXISTS membership (\n"
369 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
370 " slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
371 " did_join INTEGER NOT NULL,\n"
372 " announced_at INTEGER NOT NULL,\n"
373 " effective_since INTEGER NOT NULL,\n"
374 " group_generation INTEGER NOT NULL\n"
376 sql_exec (plugin->dbh,
377 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
378 "ON membership (channel_id, slave_id);");
380 sql_exec (plugin->dbh,
381 "CREATE TABLE IF NOT EXISTS messages (\n"
382 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
383 " hop_counter INTEGER NOT NULL,\n"
386 " fragment_id INTEGER NOT NULL,\n"
387 " fragment_offset INTEGER NOT NULL,\n"
388 " message_id INTEGER NOT NULL,\n"
389 " group_generation INTEGER NOT NULL,\n"
390 " multicast_flags INTEGER NOT NULL,\n"
391 " psycstore_flags INTEGER NOT NULL,\n"
393 " PRIMARY KEY (channel_id, fragment_id),\n"
394 " UNIQUE (channel_id, message_id, fragment_offset)\n"
397 sql_exec (plugin->dbh,
398 "CREATE TABLE IF NOT EXISTS state (\n"
399 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
400 " name TEXT NOT NULL,\n"
401 " value_current BLOB,\n"
402 " value_signed BLOB,\n"
403 " PRIMARY KEY (channel_id, name)\n"
406 sql_exec (plugin->dbh,
407 "CREATE TABLE IF NOT EXISTS state_sync (\n"
408 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
409 " name TEXT NOT NULL,\n"
411 " PRIMARY KEY (channel_id, name)\n"
414 /* Prepare statements */
416 sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
418 sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
420 sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
422 sql_prepare (plugin->dbh,
423 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
424 &plugin->insert_channel_key);
426 sql_prepare (plugin->dbh,
427 "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
428 &plugin->insert_slave_key);
430 sql_prepare (plugin->dbh,
431 "INSERT INTO membership\n"
432 " (channel_id, slave_id, did_join, announced_at,\n"
433 " effective_since, group_generation)\n"
434 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
435 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
437 &plugin->insert_membership);
439 sql_prepare (plugin->dbh,
440 "SELECT did_join FROM membership\n"
441 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
442 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
443 " AND effective_since <= ? AND did_join = 1\n"
444 "ORDER BY announced_at DESC LIMIT 1;",
445 &plugin->select_membership);
447 sql_prepare (plugin->dbh,
448 "INSERT OR IGNORE INTO messages\n"
449 " (channel_id, hop_counter, signature, purpose,\n"
450 " fragment_id, fragment_offset, message_id,\n"
451 " group_generation, multicast_flags, psycstore_flags, data)\n"
452 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
453 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
454 &plugin->insert_fragment);
456 sql_prepare (plugin->dbh,
458 "SET psycstore_flags = psycstore_flags | ?\n"
459 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
460 " AND message_id = ? AND fragment_offset = 0;",
461 &plugin->update_message_flags);
463 sql_prepare (plugin->dbh,
464 "SELECT hop_counter, signature, purpose, fragment_id,\n"
465 " fragment_offset, message_id, group_generation,\n"
466 " multicast_flags, psycstore_flags, data\n"
468 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
469 " AND ? <= fragment_id AND fragment_id <= ?;",
470 &plugin->select_fragments);
472 sql_prepare (plugin->dbh,
473 "SELECT hop_counter, signature, purpose, fragment_id,\n"
474 " fragment_offset, message_id, group_generation,\n"
475 " multicast_flags, psycstore_flags, data\n"
477 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
478 " AND ? <= message_id AND message_id <= ?;",
479 &plugin->select_messages);
481 sql_prepare (plugin->dbh,
483 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
484 " fragment_offset, message_id, group_generation,\n"
485 " multicast_flags, psycstore_flags, data\n"
487 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
488 " ORDER BY fragment_id DESC\n"
490 "ORDER BY fragment_id;",
491 &plugin->select_latest_fragments);
493 sql_prepare (plugin->dbh,
494 "SELECT hop_counter, signature, purpose, fragment_id,\n"
495 " fragment_offset, message_id, group_generation,\n"
496 " multicast_flags, psycstore_flags, data\n"
498 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
499 " AND message_id IN\n"
500 " (SELECT message_id\n"
502 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
503 " ORDER BY message_id\n"
505 "ORDER BY fragment_id;",
506 &plugin->select_latest_messages);
508 sql_prepare (plugin->dbh,
509 "SELECT hop_counter, signature, purpose, fragment_id,\n"
510 " fragment_offset, message_id, group_generation,\n"
511 " multicast_flags, psycstore_flags, data\n"
513 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
514 " AND message_id = ? AND fragment_offset = ?;",
515 &plugin->select_message_fragment);
517 sql_prepare (plugin->dbh,
518 "SELECT fragment_id, message_id, group_generation\n"
520 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
521 "ORDER BY fragment_id DESC LIMIT 1;",
522 &plugin->select_counters_message);
524 sql_prepare (plugin->dbh,
525 "SELECT max_state_message_id\n"
527 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
528 &plugin->select_counters_state);
530 sql_prepare (plugin->dbh,
532 "SET max_state_message_id = ?\n"
533 "WHERE pub_key = ?;",
534 &plugin->update_max_state_message_id);
536 sql_prepare (plugin->dbh,
538 "SET state_hash_message_id = ?\n"
539 "WHERE pub_key = ?;",
540 &plugin->update_state_hash_message_id);
542 sql_prepare (plugin->dbh,
544 "FROM channels AS c\n"
545 "LEFT JOIN messages AS m\n"
546 "ON c.id = m.channel_id\n"
547 "WHERE c.pub_key = ?\n"
548 " AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
549 " OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
551 &plugin->select_message_state_delta);
553 sql_prepare (plugin->dbh,
554 "INSERT OR REPLACE INTO state\n"
555 " (channel_id, name, value_current, value_signed)\n"
556 "SELECT new.channel_id, new.name,\n"
557 " new.value_current, old.value_signed\n"
558 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
560 " ? AS name, ? AS value_current) AS new\n"
561 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
562 " FROM state) AS old\n"
563 "ON new.channel_id = old.channel_id AND new.name = old.name;",
564 &plugin->insert_state_current);
566 sql_prepare (plugin->dbh,
567 "DELETE FROM state\n"
568 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
569 " AND (value_current IS NULL OR length(value_current) = 0)\n"
570 " AND (value_signed IS NULL OR length(value_signed) = 0);",
571 &plugin->delete_state_empty);
573 sql_prepare (plugin->dbh,
575 "SET value_signed = value_current\n"
576 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
577 &plugin->update_state_signed);
579 sql_prepare (plugin->dbh,
580 "DELETE FROM state\n"
581 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
582 &plugin->delete_state);
584 sql_prepare (plugin->dbh,
585 "INSERT INTO state_sync (channel_id, name, value)\n"
586 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
587 &plugin->insert_state_sync);
589 sql_prepare (plugin->dbh,
590 "INSERT INTO state\n"
591 " (channel_id, name, value_current, value_signed)\n"
592 "SELECT channel_id, name, value, value\n"
594 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
595 &plugin->insert_state_from_sync);
597 sql_prepare (plugin->dbh,
598 "DELETE FROM state_sync\n"
599 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
600 &plugin->delete_state_sync);
602 sql_prepare (plugin->dbh,
603 "SELECT value_current\n"
605 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
607 &plugin->select_state_one);
609 sql_prepare (plugin->dbh,
610 "SELECT name, value_current\n"
612 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
613 " AND (name = ? OR name LIKE ?);",
614 &plugin->select_state_prefix);
616 sql_prepare (plugin->dbh,
617 "SELECT name, value_signed\n"
619 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
620 " AND value_signed IS NOT NULL;",
621 &plugin->select_state_signed);
628 * Shutdown database connection and associate data
630 * @param plugin the plugin context (state for this module)
633 database_shutdown (struct Plugin *plugin)
637 while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
639 result = sqlite3_finalize (stmt);
640 if (SQLITE_OK != result)
641 LOG (GNUNET_ERROR_TYPE_WARNING,
642 "Failed to close statement %p: %d\n", stmt, result);
644 if (SQLITE_OK != sqlite3_close (plugin->dbh))
645 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
647 GNUNET_free_non_null (plugin->fn);
651 * Execute a prepared statement with a @a channel_key argument.
653 * @param plugin Plugin handle.
654 * @param stmt Statement to execute.
655 * @param channel_key Public key of the channel.
657 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
660 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
661 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
663 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
664 sizeof (*channel_key), SQLITE_STATIC))
666 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
669 else if (SQLITE_DONE != sqlite3_step (stmt))
671 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
675 if (SQLITE_OK != sqlite3_reset (stmt))
677 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
679 return GNUNET_SYSERR;
686 * Begin a transaction.
689 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
691 sqlite3_stmt *stmt = plugin->transaction_begin;
693 if (SQLITE_DONE != sqlite3_step (stmt))
695 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
698 if (SQLITE_OK != sqlite3_reset (stmt))
700 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
702 return GNUNET_SYSERR;
705 plugin->transaction = transaction;
711 * Commit current transaction.
714 transaction_commit (struct Plugin *plugin)
716 sqlite3_stmt *stmt = plugin->transaction_commit;
718 if (SQLITE_DONE != sqlite3_step (stmt))
720 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
723 if (SQLITE_OK != sqlite3_reset (stmt))
725 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
727 return GNUNET_SYSERR;
730 plugin->transaction = TRANSACTION_NONE;
736 * Roll back current transaction.
739 transaction_rollback (struct Plugin *plugin)
741 sqlite3_stmt *stmt = plugin->transaction_rollback;
743 if (SQLITE_DONE != sqlite3_step (stmt))
745 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
748 if (SQLITE_OK != sqlite3_reset (stmt))
750 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
752 return GNUNET_SYSERR;
754 plugin->transaction = TRANSACTION_NONE;
760 channel_key_store (struct Plugin *plugin,
761 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
763 sqlite3_stmt *stmt = plugin->insert_channel_key;
765 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
766 sizeof (*channel_key), SQLITE_STATIC))
768 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
771 else if (SQLITE_DONE != sqlite3_step (stmt))
773 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
777 if (SQLITE_OK != sqlite3_reset (stmt))
779 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
781 return GNUNET_SYSERR;
789 slave_key_store (struct Plugin *plugin,
790 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
792 sqlite3_stmt *stmt = plugin->insert_slave_key;
794 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
795 sizeof (*slave_key), SQLITE_STATIC))
797 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
800 else if (SQLITE_DONE != sqlite3_step (stmt))
802 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
807 if (SQLITE_OK != sqlite3_reset (stmt))
809 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
811 return GNUNET_SYSERR;
819 * Store join/leave events for a PSYC channel in order to be able to answer
820 * membership test queries later.
822 * @see GNUNET_PSYCSTORE_membership_store()
824 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
827 membership_store (void *cls,
828 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
829 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
831 uint64_t announced_at,
832 uint64_t effective_since,
833 uint64_t group_generation)
835 struct Plugin *plugin = cls;
836 sqlite3_stmt *stmt = plugin->insert_membership;
838 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
840 if (announced_at > INT64_MAX ||
841 effective_since > INT64_MAX ||
842 group_generation > INT64_MAX)
845 return GNUNET_SYSERR;
848 if (GNUNET_OK != channel_key_store (plugin, channel_key)
849 || GNUNET_OK != slave_key_store (plugin, slave_key))
850 return GNUNET_SYSERR;
852 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
853 sizeof (*channel_key), SQLITE_STATIC)
854 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
855 sizeof (*slave_key), SQLITE_STATIC)
856 || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
857 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
858 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
859 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
861 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
864 else if (SQLITE_DONE != sqlite3_step (stmt))
866 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
870 if (SQLITE_OK != sqlite3_reset (stmt))
872 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
874 return GNUNET_SYSERR;
881 * Test if a member was admitted to the channel at the given message ID.
883 * @see GNUNET_PSYCSTORE_membership_test()
885 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
886 * #GNUNET_SYSERR if there was en error.
889 membership_test (void *cls,
890 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
891 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
894 struct Plugin *plugin = cls;
895 sqlite3_stmt *stmt = plugin->select_membership;
896 int ret = GNUNET_SYSERR;
898 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
899 sizeof (*channel_key), SQLITE_STATIC)
900 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
901 sizeof (*slave_key), SQLITE_STATIC)
902 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
904 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
909 switch (sqlite3_step (stmt))
919 if (SQLITE_OK != sqlite3_reset (stmt))
921 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
929 * Store a message fragment sent to a channel.
931 * @see GNUNET_PSYCSTORE_fragment_store()
933 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
936 fragment_store (void *cls,
937 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
938 const struct GNUNET_MULTICAST_MessageHeader *msg,
939 uint32_t psycstore_flags)
941 struct Plugin *plugin = cls;
942 sqlite3_stmt *stmt = plugin->insert_fragment;
944 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
946 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
947 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
948 uint64_t message_id = GNUNET_ntohll (msg->message_id);
949 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
951 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
952 message_id > INT64_MAX || group_generation > INT64_MAX)
954 LOG (GNUNET_ERROR_TYPE_ERROR,
955 "Tried to store fragment with a field > INT64_MAX: "
956 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
957 message_id, group_generation);
959 return GNUNET_SYSERR;
962 if (GNUNET_OK != channel_key_store (plugin, channel_key))
963 return GNUNET_SYSERR;
965 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
966 sizeof (*channel_key), SQLITE_STATIC)
967 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
968 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
969 sizeof (msg->signature), SQLITE_STATIC)
970 || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
971 sizeof (msg->purpose), SQLITE_STATIC)
972 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
973 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
974 || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
975 || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
976 || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
977 || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
978 || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
979 ntohs (msg->header.size)
980 - sizeof (*msg), SQLITE_STATIC))
982 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
985 else if (SQLITE_DONE != sqlite3_step (stmt))
987 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
991 if (SQLITE_OK != sqlite3_reset (stmt))
993 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
995 return GNUNET_SYSERR;
1002 * Set additional flags for a given message.
1004 * They are OR'd with any existing flags set.
1006 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1009 message_add_flags (void *cls,
1010 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1011 uint64_t message_id,
1012 uint64_t psycstore_flags)
1014 struct Plugin *plugin = cls;
1015 sqlite3_stmt *stmt = plugin->update_message_flags;
1016 int ret = GNUNET_SYSERR;
1018 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1019 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1020 sizeof (*channel_key), SQLITE_STATIC)
1021 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1023 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1028 switch (sqlite3_step (stmt))
1031 ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1034 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1039 if (SQLITE_OK != sqlite3_reset (stmt))
1041 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1043 return GNUNET_SYSERR;
1050 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1053 int data_size = sqlite3_column_bytes (stmt, 9);
1054 struct GNUNET_MULTICAST_MessageHeader *msg
1055 = GNUNET_malloc (sizeof (*msg) + data_size);
1057 msg->header.size = htons (sizeof (*msg) + data_size);
1058 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1059 msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1060 memcpy (&msg->signature,
1061 sqlite3_column_blob (stmt, 1),
1062 sqlite3_column_bytes (stmt, 1));
1063 memcpy (&msg->purpose,
1064 sqlite3_column_blob (stmt, 2),
1065 sqlite3_column_bytes (stmt, 2));
1066 msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1067 msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1068 msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1069 msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1070 msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1071 memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1073 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1078 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1079 uint64_t *returned_fragments,
1080 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1082 int ret = GNUNET_SYSERR;
1087 sql_ret = sqlite3_step (stmt);
1091 if (ret != GNUNET_OK)
1095 ret = fragment_row (stmt, cb, cb_cls);
1096 (*returned_fragments)++;
1097 if (ret != GNUNET_YES)
1098 sql_ret = SQLITE_DONE;
1101 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1105 while (sql_ret == SQLITE_ROW);
1111 * Retrieve a message fragment range by fragment ID.
1113 * @see GNUNET_PSYCSTORE_fragment_get()
1115 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1118 fragment_get (void *cls,
1119 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1120 uint64_t first_fragment_id,
1121 uint64_t last_fragment_id,
1122 uint64_t *returned_fragments,
1123 GNUNET_PSYCSTORE_FragmentCallback cb,
1126 struct Plugin *plugin = cls;
1127 sqlite3_stmt *stmt = plugin->select_fragments;
1128 int ret = GNUNET_SYSERR;
1129 *returned_fragments = 0;
1131 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1132 sizeof (*channel_key),
1134 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1135 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1137 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1142 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1145 if (SQLITE_OK != sqlite3_reset (stmt))
1147 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1156 * Retrieve a message fragment range by fragment ID.
1158 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1160 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1163 fragment_get_latest (void *cls,
1164 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1165 uint64_t fragment_limit,
1166 uint64_t *returned_fragments,
1167 GNUNET_PSYCSTORE_FragmentCallback cb,
1170 struct Plugin *plugin = cls;
1171 sqlite3_stmt *stmt = plugin->select_latest_fragments;
1172 int ret = GNUNET_SYSERR;
1173 *returned_fragments = 0;
1175 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1176 sizeof (*channel_key),
1178 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1180 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1185 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1188 if (SQLITE_OK != sqlite3_reset (stmt))
1190 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1199 * Retrieve all fragments of a message ID range.
1201 * @see GNUNET_PSYCSTORE_message_get()
1203 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1206 message_get (void *cls,
1207 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1208 uint64_t first_message_id,
1209 uint64_t last_message_id,
1210 uint64_t *returned_fragments,
1211 GNUNET_PSYCSTORE_FragmentCallback cb,
1214 struct Plugin *plugin = cls;
1215 sqlite3_stmt *stmt = plugin->select_messages;
1216 int ret = GNUNET_SYSERR;
1217 *returned_fragments = 0;
1219 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1220 sizeof (*channel_key),
1222 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1223 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1225 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1230 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1233 if (SQLITE_OK != sqlite3_reset (stmt))
1235 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1244 * Retrieve all fragments of the latest messages.
1246 * @see GNUNET_PSYCSTORE_message_get_latest()
1248 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1251 message_get_latest (void *cls,
1252 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1253 uint64_t message_limit,
1254 uint64_t *returned_fragments,
1255 GNUNET_PSYCSTORE_FragmentCallback cb,
1258 struct Plugin *plugin = cls;
1259 sqlite3_stmt *stmt = plugin->select_latest_messages;
1260 int ret = GNUNET_SYSERR;
1261 *returned_fragments = 0;
1263 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1264 sizeof (*channel_key),
1266 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1267 sizeof (*channel_key),
1269 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1271 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1276 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1279 if (SQLITE_OK != sqlite3_reset (stmt))
1281 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1290 * Retrieve a fragment of message specified by its message ID and fragment
1293 * @see GNUNET_PSYCSTORE_message_get_fragment()
1295 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1298 message_get_fragment (void *cls,
1299 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1300 uint64_t message_id,
1301 uint64_t fragment_offset,
1302 GNUNET_PSYCSTORE_FragmentCallback cb,
1305 struct Plugin *plugin = cls;
1306 sqlite3_stmt *stmt = plugin->select_message_fragment;
1307 int ret = GNUNET_SYSERR;
1309 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1310 sizeof (*channel_key),
1312 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1313 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1315 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1320 switch (sqlite3_step (stmt))
1326 ret = fragment_row (stmt, cb, cb_cls);
1329 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1334 if (SQLITE_OK != sqlite3_reset (stmt))
1336 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1344 * Retrieve the max. values of message counters for a channel.
1346 * @see GNUNET_PSYCSTORE_counters_get()
1348 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1351 counters_message_get (void *cls,
1352 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1353 uint64_t *max_fragment_id,
1354 uint64_t *max_message_id,
1355 uint64_t *max_group_generation)
1357 struct Plugin *plugin = cls;
1358 sqlite3_stmt *stmt = plugin->select_counters_message;
1359 int ret = GNUNET_SYSERR;
1361 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1362 sizeof (*channel_key),
1365 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1370 switch (sqlite3_step (stmt))
1376 *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1377 *max_message_id = sqlite3_column_int64 (stmt, 1);
1378 *max_group_generation = sqlite3_column_int64 (stmt, 2);
1382 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1387 if (SQLITE_OK != sqlite3_reset (stmt))
1389 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1397 * Retrieve the max. values of state counters for a channel.
1399 * @see GNUNET_PSYCSTORE_counters_get()
1401 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1404 counters_state_get (void *cls,
1405 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1406 uint64_t *max_state_message_id)
1408 struct Plugin *plugin = cls;
1409 sqlite3_stmt *stmt = plugin->select_counters_state;
1410 int ret = GNUNET_SYSERR;
1412 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1413 sizeof (*channel_key),
1416 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1421 switch (sqlite3_step (stmt))
1427 *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1431 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1436 if (SQLITE_OK != sqlite3_reset (stmt))
1438 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1447 * Set a state variable to the given value.
1449 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1452 state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1453 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1454 const char *name, const void *value, size_t value_size)
1456 int ret = GNUNET_SYSERR;
1458 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1459 sizeof (*channel_key), SQLITE_STATIC)
1460 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1461 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1464 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1469 switch (sqlite3_step (stmt))
1472 ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1475 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1480 if (SQLITE_OK != sqlite3_reset (stmt))
1482 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1484 return GNUNET_SYSERR;
1492 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1493 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1494 uint64_t message_id)
1496 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1497 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1498 sizeof (*channel_key), SQLITE_STATIC))
1500 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1503 else if (SQLITE_DONE != sqlite3_step (stmt))
1505 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1508 if (SQLITE_OK != sqlite3_reset (stmt))
1510 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1512 return GNUNET_SYSERR;
1519 * Begin modifying current state.
1522 state_modify_begin (void *cls,
1523 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1524 uint64_t message_id, uint64_t state_delta)
1526 struct Plugin *plugin = cls;
1527 sqlite3_stmt *stmt = plugin->select_message_state_delta;
1529 if (state_delta > 0)
1531 int ret = GNUNET_SYSERR;
1532 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1533 sizeof (*channel_key), SQLITE_STATIC)
1534 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1535 message_id - state_delta)
1536 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1538 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1539 message_id - state_delta)
1540 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1541 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1543 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1548 switch (sqlite3_step (stmt))
1557 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1561 if (SQLITE_OK != sqlite3_reset (stmt))
1563 ret = GNUNET_SYSERR;
1564 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1567 if (GNUNET_OK != ret)
1571 if (TRANSACTION_NONE != plugin->transaction)
1572 if (GNUNET_OK != transaction_rollback (plugin))
1573 return GNUNET_SYSERR;
1575 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1580 * Set the current value of state variable.
1582 * @see GNUNET_PSYCSTORE_state_modify()
1584 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1587 state_modify_set (void *cls,
1588 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1589 const char *name, const void *value, size_t value_size)
1591 struct Plugin *plugin = cls;
1592 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1594 return state_set (plugin, plugin->insert_state_current, channel_key,
1595 name, value, value_size);
1601 * End modifying current state.
1604 state_modify_end (void *cls,
1605 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1606 uint64_t message_id)
1608 struct Plugin *plugin = cls;
1609 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1612 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1613 && GNUNET_OK == update_message_id (plugin,
1614 plugin->update_max_state_message_id,
1615 channel_key, message_id)
1616 && GNUNET_OK == transaction_commit (plugin)
1617 ? GNUNET_OK : GNUNET_SYSERR;
1622 * Begin state synchronization.
1625 state_sync_begin (void *cls,
1626 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1628 struct Plugin *plugin = cls;
1629 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1634 * Set the current value of state variable.
1636 * @see GNUNET_PSYCSTORE_state_modify()
1638 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1641 state_sync_set (void *cls,
1642 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1643 const char *name, const void *value, size_t value_size)
1645 struct Plugin *plugin = cls;
1646 return state_set (cls, plugin->insert_state_sync, channel_key,
1647 name, value, value_size);
1652 * End modifying current state.
1655 state_sync_end (void *cls,
1656 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1657 uint64_t message_id)
1659 struct Plugin *plugin = cls;
1660 int ret = GNUNET_SYSERR;
1662 GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1663 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1664 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1666 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1668 && GNUNET_OK == update_message_id (plugin,
1669 plugin->update_state_hash_message_id,
1670 channel_key, message_id)
1671 && GNUNET_OK == transaction_commit (plugin)
1673 : transaction_rollback (plugin);
1679 * Reset the state of a channel.
1681 * @see GNUNET_PSYCSTORE_state_reset()
1683 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1686 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1688 struct Plugin *plugin = cls;
1689 return exec_channel (plugin, plugin->delete_state, channel_key);
1694 * Update signed values of state variables in the state store.
1696 * @see GNUNET_PSYCSTORE_state_hash_update()
1698 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1701 state_update_signed (void *cls,
1702 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1704 struct Plugin *plugin = cls;
1705 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1710 * Retrieve a state variable by name.
1712 * @see GNUNET_PSYCSTORE_state_get()
1714 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1717 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1718 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1720 struct Plugin *plugin = cls;
1721 int ret = GNUNET_SYSERR;
1723 sqlite3_stmt *stmt = plugin->select_state_one;
1725 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1726 sizeof (*channel_key),
1728 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1730 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1735 switch (sqlite3_step (stmt))
1741 ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1742 sqlite3_column_bytes (stmt, 0));
1745 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1750 if (SQLITE_OK != sqlite3_reset (stmt))
1752 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1761 * Retrieve all state variables for a channel with the given prefix.
1763 * @see GNUNET_PSYCSTORE_state_get_prefix()
1765 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1768 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1769 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1772 struct Plugin *plugin = cls;
1773 int ret = GNUNET_SYSERR;
1775 sqlite3_stmt *stmt = plugin->select_state_prefix;
1776 size_t name_len = strlen (name);
1777 char *name_prefix = GNUNET_malloc (name_len + 2);
1778 memcpy (name_prefix, name, name_len);
1779 memcpy (name_prefix + name_len, "_%", 2);
1781 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1782 sizeof (*channel_key), SQLITE_STATIC)
1783 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1784 || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1787 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1795 sql_ret = sqlite3_step (stmt);
1799 if (ret != GNUNET_OK)
1803 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1804 sqlite3_column_blob (stmt, 1),
1805 sqlite3_column_bytes (stmt, 1));
1806 if (ret != GNUNET_YES)
1807 sql_ret = SQLITE_DONE;
1810 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1814 while (sql_ret == SQLITE_ROW);
1817 if (SQLITE_OK != sqlite3_reset (stmt))
1819 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1828 * Retrieve all signed state variables for a channel.
1830 * @see GNUNET_PSYCSTORE_state_get_signed()
1832 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1835 state_get_signed (void *cls,
1836 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1837 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1839 struct Plugin *plugin = cls;
1840 int ret = GNUNET_SYSERR;
1842 sqlite3_stmt *stmt = plugin->select_state_signed;
1844 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1845 sizeof (*channel_key), SQLITE_STATIC))
1847 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1855 sql_ret = sqlite3_step (stmt);
1859 if (ret != GNUNET_OK)
1863 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1864 sqlite3_column_blob (stmt, 1),
1865 sqlite3_column_bytes (stmt, 1));
1866 if (ret != GNUNET_YES)
1867 sql_ret = SQLITE_DONE;
1870 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1874 while (sql_ret == SQLITE_ROW);
1877 if (SQLITE_OK != sqlite3_reset (stmt))
1879 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1888 * Entry point for the plugin.
1890 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1891 * @return NULL on error, otherwise the plugin context
1894 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1896 static struct Plugin plugin;
1897 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1898 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1900 if (NULL != plugin.cfg)
1901 return NULL; /* can only initialize once! */
1902 memset (&plugin, 0, sizeof (struct Plugin));
1904 if (GNUNET_OK != database_setup (&plugin))
1906 database_shutdown (&plugin);
1909 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1911 api->membership_store = &membership_store;
1912 api->membership_test = &membership_test;
1913 api->fragment_store = &fragment_store;
1914 api->message_add_flags = &message_add_flags;
1915 api->fragment_get = &fragment_get;
1916 api->fragment_get_latest = &fragment_get_latest;
1917 api->message_get = &message_get;
1918 api->message_get_latest = &message_get_latest;
1919 api->message_get_fragment = &message_get_fragment;
1920 api->counters_message_get = &counters_message_get;
1921 api->counters_state_get = &counters_state_get;
1922 api->state_modify_begin = &state_modify_begin;
1923 api->state_modify_set = &state_modify_set;
1924 api->state_modify_end = &state_modify_end;
1925 api->state_sync_begin = &state_sync_begin;
1926 api->state_sync_set = &state_sync_set;
1927 api->state_sync_end = &state_sync_end;
1928 api->state_reset = &state_reset;
1929 api->state_update_signed = &state_update_signed;
1930 api->state_get = &state_get;
1931 api->state_get_prefix = &state_get_prefix;
1932 api->state_get_signed = &state_get_signed;
1934 LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1940 * Exit point from the plugin.
1942 * @param cls The plugin context (as returned by "init")
1943 * @return Always NULL
1946 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1948 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1949 struct Plugin *plugin = api->cls;
1951 database_shutdown (plugin);
1954 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1958 /* end of plugin_psycstore_sqlite.c */