2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
17 * @file psycstore/plugin_psycstore_sqlite.c
18 * @brief sqlite-based psycstore backend
19 * @author Gabor X Toth
20 * @author Christian Grothoff
24 * FIXME: SQLite3 only supports signed 64-bit integers natively,
25 * thus it can only store 63 bits of the uint64_t's.
29 #include "gnunet_psycstore_plugin.h"
30 #include "gnunet_psycstore_service.h"
31 #include "gnunet_multicast_service.h"
32 #include "gnunet_crypto_lib.h"
33 #include "gnunet_psyc_util_lib.h"
34 #include "psycstore.h"
38 * After how many ms "busy" should a DB operation fail for good? A
39 * low value makes sure that we are more responsive to requests
40 * (especially PUTs). A high value guarantees a higher success rate
41 * (SELECTs in iterate can take several seconds despite LIMIT=1).
43 * The default value of 1s should ensure that users do not experience
44 * huge latencies while at the same time allowing operations to
45 * succeed with reasonable probability.
47 #define BUSY_TIMEOUT_MS 1000
49 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
52 * Log an error message at log-level 'level' that indicates
53 * a failure of the command 'cmd' on file 'filename'
54 * with the message given by strerror(errno).
56 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
58 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
62 TRANSACTION_STATE_MODIFY,
63 TRANSACTION_STATE_SYNC,
67 * Context for all functions in this plugin.
72 const struct GNUNET_CONFIGURATION_Handle *cfg;
80 * Native SQLite database handle.
85 * Current transaction.
87 enum Transactions transaction;
89 sqlite3_stmt *transaction_begin;
91 sqlite3_stmt *transaction_commit;
93 sqlite3_stmt *transaction_rollback;
96 * Precompiled SQL for channel_key_store()
98 sqlite3_stmt *insert_channel_key;
101 * Precompiled SQL for slave_key_store()
103 sqlite3_stmt *insert_slave_key;
107 * Precompiled SQL for membership_store()
109 sqlite3_stmt *insert_membership;
112 * Precompiled SQL for membership_test()
114 sqlite3_stmt *select_membership;
118 * Precompiled SQL for fragment_store()
120 sqlite3_stmt *insert_fragment;
123 * Precompiled SQL for message_add_flags()
125 sqlite3_stmt *update_message_flags;
128 * Precompiled SQL for fragment_get()
130 sqlite3_stmt *select_fragments;
133 * Precompiled SQL for fragment_get()
135 sqlite3_stmt *select_latest_fragments;
138 * Precompiled SQL for message_get()
140 sqlite3_stmt *select_messages;
143 * Precompiled SQL for message_get()
145 sqlite3_stmt *select_latest_messages;
148 * Precompiled SQL for message_get_fragment()
150 sqlite3_stmt *select_message_fragment;
153 * Precompiled SQL for counters_get_message()
155 sqlite3_stmt *select_counters_message;
158 * Precompiled SQL for counters_get_state()
160 sqlite3_stmt *select_counters_state;
163 * Precompiled SQL for state_modify_end()
165 sqlite3_stmt *update_state_hash_message_id;
168 * Precompiled SQL for state_sync_end()
170 sqlite3_stmt *update_max_state_message_id;
173 * Precompiled SQL for state_modify_op()
175 sqlite3_stmt *insert_state_current;
178 * Precompiled SQL for state_modify_end()
180 sqlite3_stmt *delete_state_empty;
183 * Precompiled SQL for state_set_signed()
185 sqlite3_stmt *update_state_signed;
188 * Precompiled SQL for state_sync()
190 sqlite3_stmt *insert_state_sync;
193 * Precompiled SQL for state_sync()
195 sqlite3_stmt *delete_state;
198 * Precompiled SQL for state_sync()
200 sqlite3_stmt *insert_state_from_sync;
203 * Precompiled SQL for state_sync()
205 sqlite3_stmt *delete_state_sync;
208 * Precompiled SQL for state_get_signed()
210 sqlite3_stmt *select_state_signed;
213 * Precompiled SQL for state_get()
215 sqlite3_stmt *select_state_one;
218 * Precompiled SQL for state_get_prefix()
220 sqlite3_stmt *select_state_prefix;
227 sql_trace (void *cls, const char *sql)
229 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
235 * @brief Prepare a SQL statement
237 * @param dbh handle to the database
238 * @param sql SQL statement, UTF-8 encoded
239 * @param stmt set to the prepared statement
240 * @return 0 on success
243 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
248 result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
249 (const char **) &tail);
250 LOG (GNUNET_ERROR_TYPE_DEBUG,
251 "Prepared `%s' / %p: %d\n", sql, *stmt, result);
252 if (result != SQLITE_OK)
253 LOG (GNUNET_ERROR_TYPE_ERROR,
254 _("Error preparing SQL query: %s\n %s\n"),
255 sqlite3_errmsg (dbh), sql);
261 * @brief Prepare a SQL statement
263 * @param dbh handle to the database
264 * @param sql SQL statement, UTF-8 encoded
265 * @return 0 on success
268 sql_exec (sqlite3 *dbh, const char *sql)
272 result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
273 LOG (GNUNET_ERROR_TYPE_DEBUG,
274 "Executed `%s' / %d\n", sql, result);
275 if (result != SQLITE_OK)
276 LOG (GNUNET_ERROR_TYPE_ERROR,
277 _("Error executing SQL query: %s\n %s\n"),
278 sqlite3_errmsg (dbh), sql);
284 * Initialize the database connections and associated
285 * data structures (create tables and indices
286 * as needed as well).
288 * @param plugin the plugin context (state for this module)
289 * @return GNUNET_OK on success
292 database_setup (struct Plugin *plugin)
297 GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
298 "FILENAME", &filename))
300 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
301 "psycstore-sqlite", "FILENAME");
302 return GNUNET_SYSERR;
304 if (GNUNET_OK != GNUNET_DISK_file_test (filename))
306 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
309 GNUNET_free (filename);
310 return GNUNET_SYSERR;
313 /* filename should be UTF-8-encoded. If it isn't, it's a bug */
314 plugin->fn = filename;
316 /* Open database and precompile statements */
317 if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
319 LOG (GNUNET_ERROR_TYPE_ERROR,
320 _("Unable to initialize SQLite: %s.\n"),
321 sqlite3_errmsg (plugin->dbh));
322 return GNUNET_SYSERR;
326 sqlite3_trace (plugin->dbh, &sql_trace, NULL);
329 sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
330 sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
331 sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
332 sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
333 sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
334 #if ! DEBUG_PSYCSTORE
335 sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
337 sql_exec (plugin->dbh, "PRAGMA page_size=4096");
339 sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
343 sql_exec (plugin->dbh,
344 "CREATE TABLE IF NOT EXISTS channels (\n"
345 " id INTEGER PRIMARY KEY,\n"
346 " pub_key BLOB(32) UNIQUE,\n"
347 " max_state_message_id INTEGER,\n" // last applied state message ID
348 " state_hash_message_id INTEGER\n" // last message ID with a state hash
351 sql_exec (plugin->dbh,
352 "CREATE TABLE IF NOT EXISTS slaves (\n"
353 " id INTEGER PRIMARY KEY,\n"
354 " pub_key BLOB(32) UNIQUE\n"
357 sql_exec (plugin->dbh,
358 "CREATE TABLE IF NOT EXISTS membership (\n"
359 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
360 " slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
361 " did_join INTEGER NOT NULL,\n"
362 " announced_at INTEGER NOT NULL,\n"
363 " effective_since INTEGER NOT NULL,\n"
364 " group_generation INTEGER NOT NULL\n"
366 sql_exec (plugin->dbh,
367 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
368 "ON membership (channel_id, slave_id);");
370 /** @todo messages table: add method_name column */
371 sql_exec (plugin->dbh,
372 "CREATE TABLE IF NOT EXISTS messages (\n"
373 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
374 " hop_counter INTEGER NOT NULL,\n"
377 " fragment_id INTEGER NOT NULL,\n"
378 " fragment_offset INTEGER NOT NULL,\n"
379 " message_id INTEGER NOT NULL,\n"
380 " group_generation INTEGER NOT NULL,\n"
381 " multicast_flags INTEGER NOT NULL,\n"
382 " psycstore_flags INTEGER NOT NULL,\n"
384 " PRIMARY KEY (channel_id, fragment_id),\n"
385 " UNIQUE (channel_id, message_id, fragment_offset)\n"
388 sql_exec (plugin->dbh,
389 "CREATE TABLE IF NOT EXISTS state (\n"
390 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
391 " name TEXT NOT NULL,\n"
392 " value_current BLOB,\n"
393 " value_signed BLOB,\n"
394 " PRIMARY KEY (channel_id, name)\n"
397 sql_exec (plugin->dbh,
398 "CREATE TABLE IF NOT EXISTS state_sync (\n"
399 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
400 " name TEXT NOT NULL,\n"
402 " PRIMARY KEY (channel_id, name)\n"
405 /* Prepare statements */
407 sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
409 sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
411 sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
413 sql_prepare (plugin->dbh,
414 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
415 &plugin->insert_channel_key);
417 sql_prepare (plugin->dbh,
418 "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
419 &plugin->insert_slave_key);
421 sql_prepare (plugin->dbh,
422 "INSERT INTO membership\n"
423 " (channel_id, slave_id, did_join, announced_at,\n"
424 " effective_since, group_generation)\n"
425 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
426 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
428 &plugin->insert_membership);
430 sql_prepare (plugin->dbh,
431 "SELECT did_join FROM membership\n"
432 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
433 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
434 " AND effective_since <= ? AND did_join = 1\n"
435 "ORDER BY announced_at DESC LIMIT 1;",
436 &plugin->select_membership);
438 sql_prepare (plugin->dbh,
439 "INSERT OR IGNORE INTO messages\n"
440 " (channel_id, hop_counter, signature, purpose,\n"
441 " fragment_id, fragment_offset, message_id,\n"
442 " group_generation, multicast_flags, psycstore_flags, data)\n"
443 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
444 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
445 &plugin->insert_fragment);
447 sql_prepare (plugin->dbh,
449 "SET psycstore_flags = psycstore_flags | ?\n"
450 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
451 " AND message_id = ? AND fragment_offset = 0;",
452 &plugin->update_message_flags);
454 sql_prepare (plugin->dbh,
455 "SELECT hop_counter, signature, purpose, fragment_id,\n"
456 " fragment_offset, message_id, group_generation,\n"
457 " multicast_flags, psycstore_flags, data\n"
459 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
460 " AND ? <= fragment_id AND fragment_id <= ?;",
461 &plugin->select_fragments);
463 /** @todo select_messages: add method_prefix filter */
464 sql_prepare (plugin->dbh,
465 "SELECT hop_counter, signature, purpose, fragment_id,\n"
466 " fragment_offset, message_id, group_generation,\n"
467 " multicast_flags, psycstore_flags, data\n"
469 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
470 " AND ? <= message_id AND message_id <= ?"
472 &plugin->select_messages);
474 sql_prepare (plugin->dbh,
476 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
477 " fragment_offset, message_id, group_generation,\n"
478 " multicast_flags, psycstore_flags, data\n"
480 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
481 " ORDER BY fragment_id DESC\n"
483 "ORDER BY fragment_id;",
484 &plugin->select_latest_fragments);
486 /** @todo select_latest_messages: add method_prefix filter */
487 sql_prepare (plugin->dbh,
488 "SELECT hop_counter, signature, purpose, fragment_id,\n"
489 " fragment_offset, message_id, group_generation,\n"
490 " multicast_flags, psycstore_flags, data\n"
492 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
493 " AND message_id IN\n"
494 " (SELECT message_id\n"
496 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
497 " GROUP BY message_id\n"
498 " ORDER BY message_id\n"
500 "ORDER BY fragment_id;",
501 &plugin->select_latest_messages);
503 sql_prepare (plugin->dbh,
504 "SELECT hop_counter, signature, purpose, fragment_id,\n"
505 " fragment_offset, message_id, group_generation,\n"
506 " multicast_flags, psycstore_flags, data\n"
508 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
509 " AND message_id = ? AND fragment_offset = ?;",
510 &plugin->select_message_fragment);
512 sql_prepare (plugin->dbh,
513 "SELECT fragment_id, message_id, group_generation\n"
515 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
516 "ORDER BY fragment_id DESC LIMIT 1;",
517 &plugin->select_counters_message);
519 sql_prepare (plugin->dbh,
520 "SELECT max_state_message_id\n"
522 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
523 &plugin->select_counters_state);
525 sql_prepare (plugin->dbh,
527 "SET max_state_message_id = ?\n"
528 "WHERE pub_key = ?;",
529 &plugin->update_max_state_message_id);
531 sql_prepare (plugin->dbh,
533 "SET state_hash_message_id = ?\n"
534 "WHERE pub_key = ?;",
535 &plugin->update_state_hash_message_id);
537 sql_prepare (plugin->dbh,
538 "INSERT OR REPLACE INTO state\n"
539 " (channel_id, name, value_current, value_signed)\n"
540 "SELECT new.channel_id, new.name,\n"
541 " new.value_current, old.value_signed\n"
542 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
544 " ? AS name, ? AS value_current) AS new\n"
545 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
546 " FROM state) AS old\n"
547 "ON new.channel_id = old.channel_id AND new.name = old.name;",
548 &plugin->insert_state_current);
550 sql_prepare (plugin->dbh,
551 "DELETE FROM state\n"
552 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
553 " AND (value_current IS NULL OR length(value_current) = 0)\n"
554 " AND (value_signed IS NULL OR length(value_signed) = 0);",
555 &plugin->delete_state_empty);
557 sql_prepare (plugin->dbh,
559 "SET value_signed = value_current\n"
560 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
561 &plugin->update_state_signed);
563 sql_prepare (plugin->dbh,
564 "DELETE FROM state\n"
565 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
566 &plugin->delete_state);
568 sql_prepare (plugin->dbh,
569 "INSERT INTO state_sync (channel_id, name, value)\n"
570 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
571 &plugin->insert_state_sync);
573 sql_prepare (plugin->dbh,
574 "INSERT INTO state\n"
575 " (channel_id, name, value_current, value_signed)\n"
576 "SELECT channel_id, name, value, value\n"
578 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
579 &plugin->insert_state_from_sync);
581 sql_prepare (plugin->dbh,
582 "DELETE FROM state_sync\n"
583 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
584 &plugin->delete_state_sync);
586 sql_prepare (plugin->dbh,
587 "SELECT value_current\n"
589 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
591 &plugin->select_state_one);
593 sql_prepare (plugin->dbh,
594 "SELECT name, value_current\n"
596 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
597 " AND (name = ? OR substr(name, 1, ?) = ?);",
598 &plugin->select_state_prefix);
600 sql_prepare (plugin->dbh,
601 "SELECT name, value_signed\n"
603 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
604 " AND value_signed IS NOT NULL;",
605 &plugin->select_state_signed);
612 * Shutdown database connection and associate data
614 * @param plugin the plugin context (state for this module)
617 database_shutdown (struct Plugin *plugin)
621 while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
623 result = sqlite3_finalize (stmt);
624 if (SQLITE_OK != result)
625 LOG (GNUNET_ERROR_TYPE_WARNING,
626 "Failed to close statement %p: %d\n", stmt, result);
628 if (SQLITE_OK != sqlite3_close (plugin->dbh))
629 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
631 GNUNET_free_non_null (plugin->fn);
635 * Execute a prepared statement with a @a channel_key argument.
637 * @param plugin Plugin handle.
638 * @param stmt Statement to execute.
639 * @param channel_key Public key of the channel.
641 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
644 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
645 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
647 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
648 sizeof (*channel_key), SQLITE_STATIC))
650 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
653 else if (SQLITE_DONE != sqlite3_step (stmt))
655 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
659 if (SQLITE_OK != sqlite3_reset (stmt))
661 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
663 return GNUNET_SYSERR;
670 * Begin a transaction.
673 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
675 sqlite3_stmt *stmt = plugin->transaction_begin;
677 if (SQLITE_DONE != sqlite3_step (stmt))
679 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
682 if (SQLITE_OK != sqlite3_reset (stmt))
684 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
686 return GNUNET_SYSERR;
689 plugin->transaction = transaction;
695 * Commit current transaction.
698 transaction_commit (struct Plugin *plugin)
700 sqlite3_stmt *stmt = plugin->transaction_commit;
702 if (SQLITE_DONE != sqlite3_step (stmt))
704 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
707 if (SQLITE_OK != sqlite3_reset (stmt))
709 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
711 return GNUNET_SYSERR;
714 plugin->transaction = TRANSACTION_NONE;
720 * Roll back current transaction.
723 transaction_rollback (struct Plugin *plugin)
725 sqlite3_stmt *stmt = plugin->transaction_rollback;
727 if (SQLITE_DONE != sqlite3_step (stmt))
729 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
732 if (SQLITE_OK != sqlite3_reset (stmt))
734 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
736 return GNUNET_SYSERR;
738 plugin->transaction = TRANSACTION_NONE;
744 channel_key_store (struct Plugin *plugin,
745 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
747 sqlite3_stmt *stmt = plugin->insert_channel_key;
749 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
750 sizeof (*channel_key), SQLITE_STATIC))
752 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
755 else if (SQLITE_DONE != sqlite3_step (stmt))
757 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
761 if (SQLITE_OK != sqlite3_reset (stmt))
763 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
765 return GNUNET_SYSERR;
773 slave_key_store (struct Plugin *plugin,
774 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
776 sqlite3_stmt *stmt = plugin->insert_slave_key;
778 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
779 sizeof (*slave_key), SQLITE_STATIC))
781 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
784 else if (SQLITE_DONE != sqlite3_step (stmt))
786 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
791 if (SQLITE_OK != sqlite3_reset (stmt))
793 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
795 return GNUNET_SYSERR;
803 * Store join/leave events for a PSYC channel in order to be able to answer
804 * membership test queries later.
806 * @see GNUNET_PSYCSTORE_membership_store()
808 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
811 sqlite_membership_store (void *cls,
812 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
813 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
815 uint64_t announced_at,
816 uint64_t effective_since,
817 uint64_t group_generation)
819 struct Plugin *plugin = cls;
820 sqlite3_stmt *stmt = plugin->insert_membership;
822 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
824 if (announced_at > INT64_MAX ||
825 effective_since > INT64_MAX ||
826 group_generation > INT64_MAX)
829 return GNUNET_SYSERR;
832 if (GNUNET_OK != channel_key_store (plugin, channel_key)
833 || GNUNET_OK != slave_key_store (plugin, slave_key))
834 return GNUNET_SYSERR;
836 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
837 sizeof (*channel_key), SQLITE_STATIC)
838 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
839 sizeof (*slave_key), SQLITE_STATIC)
840 || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
841 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
842 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
843 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
845 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
848 else if (SQLITE_DONE != sqlite3_step (stmt))
850 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
854 if (SQLITE_OK != sqlite3_reset (stmt))
856 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
858 return GNUNET_SYSERR;
865 * Test if a member was admitted to the channel at the given message ID.
867 * @see GNUNET_PSYCSTORE_membership_test()
869 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
870 * #GNUNET_SYSERR if there was en error.
873 membership_test (void *cls,
874 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
875 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
878 struct Plugin *plugin = cls;
879 sqlite3_stmt *stmt = plugin->select_membership;
880 int ret = GNUNET_SYSERR;
882 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
883 sizeof (*channel_key), SQLITE_STATIC)
884 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
885 sizeof (*slave_key), SQLITE_STATIC)
886 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
888 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
893 switch (sqlite3_step (stmt))
903 if (SQLITE_OK != sqlite3_reset (stmt))
905 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
913 * Store a message fragment sent to a channel.
915 * @see GNUNET_PSYCSTORE_fragment_store()
917 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
920 fragment_store (void *cls,
921 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
922 const struct GNUNET_MULTICAST_MessageHeader *msg,
923 uint32_t psycstore_flags)
925 struct Plugin *plugin = cls;
926 sqlite3_stmt *stmt = plugin->insert_fragment;
928 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
930 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
931 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
932 uint64_t message_id = GNUNET_ntohll (msg->message_id);
933 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
935 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
936 message_id > INT64_MAX || group_generation > INT64_MAX)
938 LOG (GNUNET_ERROR_TYPE_ERROR,
939 "Tried to store fragment with a field > INT64_MAX: "
940 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
941 message_id, group_generation);
943 return GNUNET_SYSERR;
946 if (GNUNET_OK != channel_key_store (plugin, channel_key))
947 return GNUNET_SYSERR;
949 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
950 sizeof (*channel_key), SQLITE_STATIC)
951 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
952 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
953 sizeof (msg->signature), SQLITE_STATIC)
954 || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
955 sizeof (msg->purpose), SQLITE_STATIC)
956 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
957 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
958 || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
959 || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
960 || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
961 || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
962 || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
963 ntohs (msg->header.size)
964 - sizeof (*msg), SQLITE_STATIC))
966 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
969 else if (SQLITE_DONE != sqlite3_step (stmt))
971 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
975 if (SQLITE_OK != sqlite3_reset (stmt))
977 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
979 return GNUNET_SYSERR;
986 * Set additional flags for a given message.
988 * They are OR'd with any existing flags set.
990 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
993 message_add_flags (void *cls,
994 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
996 uint32_t psycstore_flags)
998 struct Plugin *plugin = cls;
999 sqlite3_stmt *stmt = plugin->update_message_flags;
1000 int ret = GNUNET_SYSERR;
1002 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1003 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1004 sizeof (*channel_key), SQLITE_STATIC)
1005 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1007 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1012 switch (sqlite3_step (stmt))
1015 ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1018 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1023 if (SQLITE_OK != sqlite3_reset (stmt))
1025 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1027 return GNUNET_SYSERR;
1034 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1037 int data_size = sqlite3_column_bytes (stmt, 9);
1038 struct GNUNET_MULTICAST_MessageHeader *msg
1039 = GNUNET_malloc (sizeof (*msg) + data_size);
1041 msg->header.size = htons (sizeof (*msg) + data_size);
1042 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1043 msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1044 GNUNET_memcpy (&msg->signature,
1045 sqlite3_column_blob (stmt, 1),
1046 sqlite3_column_bytes (stmt, 1));
1047 GNUNET_memcpy (&msg->purpose,
1048 sqlite3_column_blob (stmt, 2),
1049 sqlite3_column_bytes (stmt, 2));
1050 msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1051 msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1052 msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1053 msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1054 msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1055 GNUNET_memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1057 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1062 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1063 uint64_t *returned_fragments,
1064 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1066 int ret = GNUNET_SYSERR;
1071 sql_ret = sqlite3_step (stmt);
1075 if (ret != GNUNET_OK)
1079 ret = fragment_row (stmt, cb, cb_cls);
1080 (*returned_fragments)++;
1081 if (ret != GNUNET_YES)
1082 sql_ret = SQLITE_DONE;
1085 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1089 while (sql_ret == SQLITE_ROW);
1095 * Retrieve a message fragment range by fragment ID.
1097 * @see GNUNET_PSYCSTORE_fragment_get()
1099 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1102 fragment_get (void *cls,
1103 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1104 uint64_t first_fragment_id,
1105 uint64_t last_fragment_id,
1106 uint64_t *returned_fragments,
1107 GNUNET_PSYCSTORE_FragmentCallback cb,
1110 struct Plugin *plugin = cls;
1111 sqlite3_stmt *stmt = plugin->select_fragments;
1112 int ret = GNUNET_SYSERR;
1113 *returned_fragments = 0;
1115 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1116 sizeof (*channel_key),
1118 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1119 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1121 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1126 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1129 if (SQLITE_OK != sqlite3_reset (stmt))
1131 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1140 * Retrieve a message fragment range by fragment ID.
1142 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1144 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1147 fragment_get_latest (void *cls,
1148 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1149 uint64_t fragment_limit,
1150 uint64_t *returned_fragments,
1151 GNUNET_PSYCSTORE_FragmentCallback cb,
1154 struct Plugin *plugin = cls;
1155 sqlite3_stmt *stmt = plugin->select_latest_fragments;
1156 int ret = GNUNET_SYSERR;
1157 *returned_fragments = 0;
1159 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1160 sizeof (*channel_key),
1162 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1164 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1169 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1172 if (SQLITE_OK != sqlite3_reset (stmt))
1174 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1183 * Retrieve all fragments of a message ID range.
1185 * @see GNUNET_PSYCSTORE_message_get()
1187 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1190 message_get (void *cls,
1191 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1192 uint64_t first_message_id,
1193 uint64_t last_message_id,
1194 uint64_t fragment_limit,
1195 uint64_t *returned_fragments,
1196 GNUNET_PSYCSTORE_FragmentCallback cb,
1199 struct Plugin *plugin = cls;
1200 sqlite3_stmt *stmt = plugin->select_messages;
1201 int ret = GNUNET_SYSERR;
1202 *returned_fragments = 0;
1204 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1205 sizeof (*channel_key),
1207 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1208 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id)
1209 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1210 (0 != fragment_limit)
1214 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1219 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1222 if (SQLITE_OK != sqlite3_reset (stmt))
1224 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1233 * Retrieve all fragments of the latest messages.
1235 * @see GNUNET_PSYCSTORE_message_get_latest()
1237 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1240 message_get_latest (void *cls,
1241 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1242 uint64_t message_limit,
1243 uint64_t *returned_fragments,
1244 GNUNET_PSYCSTORE_FragmentCallback cb,
1247 struct Plugin *plugin = cls;
1248 sqlite3_stmt *stmt = plugin->select_latest_messages;
1249 int ret = GNUNET_SYSERR;
1250 *returned_fragments = 0;
1252 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1253 sizeof (*channel_key),
1255 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1256 sizeof (*channel_key),
1258 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1260 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1265 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1268 if (SQLITE_OK != sqlite3_reset (stmt))
1270 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1279 * Retrieve a fragment of message specified by its message ID and fragment
1282 * @see GNUNET_PSYCSTORE_message_get_fragment()
1284 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1287 message_get_fragment (void *cls,
1288 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1289 uint64_t message_id,
1290 uint64_t fragment_offset,
1291 GNUNET_PSYCSTORE_FragmentCallback cb,
1294 struct Plugin *plugin = cls;
1295 sqlite3_stmt *stmt = plugin->select_message_fragment;
1296 int ret = GNUNET_SYSERR;
1298 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1299 sizeof (*channel_key),
1301 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1302 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1304 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1309 switch (sqlite3_step (stmt))
1315 ret = fragment_row (stmt, cb, cb_cls);
1318 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1323 if (SQLITE_OK != sqlite3_reset (stmt))
1325 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1333 * Retrieve the max. values of message counters for a channel.
1335 * @see GNUNET_PSYCSTORE_counters_get()
1337 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1340 counters_message_get (void *cls,
1341 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1342 uint64_t *max_fragment_id,
1343 uint64_t *max_message_id,
1344 uint64_t *max_group_generation)
1346 struct Plugin *plugin = cls;
1347 sqlite3_stmt *stmt = plugin->select_counters_message;
1348 int ret = GNUNET_SYSERR;
1350 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1351 sizeof (*channel_key),
1354 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1359 switch (sqlite3_step (stmt))
1365 *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1366 *max_message_id = sqlite3_column_int64 (stmt, 1);
1367 *max_group_generation = sqlite3_column_int64 (stmt, 2);
1371 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1376 if (SQLITE_OK != sqlite3_reset (stmt))
1378 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1386 * Retrieve the max. values of state counters for a channel.
1388 * @see GNUNET_PSYCSTORE_counters_get()
1390 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1393 counters_state_get (void *cls,
1394 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1395 uint64_t *max_state_message_id)
1397 struct Plugin *plugin = cls;
1398 sqlite3_stmt *stmt = plugin->select_counters_state;
1399 int ret = GNUNET_SYSERR;
1401 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1402 sizeof (*channel_key),
1405 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1410 switch (sqlite3_step (stmt))
1416 *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1420 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1425 if (SQLITE_OK != sqlite3_reset (stmt))
1427 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1436 * Assign a value to a state variable.
1438 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1441 state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
1442 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1443 const char *name, const void *value, size_t value_size)
1445 int ret = GNUNET_SYSERR;
1447 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1448 sizeof (*channel_key), SQLITE_STATIC)
1449 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1450 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1453 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1458 switch (sqlite3_step (stmt))
1461 ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1464 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1469 if (SQLITE_OK != sqlite3_reset (stmt))
1471 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1473 return GNUNET_SYSERR;
1481 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1482 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1483 uint64_t message_id)
1485 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1486 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1487 sizeof (*channel_key), SQLITE_STATIC))
1489 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1492 else if (SQLITE_DONE != sqlite3_step (stmt))
1494 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1497 if (SQLITE_OK != sqlite3_reset (stmt))
1499 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1501 return GNUNET_SYSERR;
1508 * Begin modifying current state.
1511 state_modify_begin (void *cls,
1512 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1513 uint64_t message_id, uint64_t state_delta)
1515 struct Plugin *plugin = cls;
1517 if (state_delta > 0)
1520 * We can only apply state modifiers in the current message if modifiers in
1521 * the previous stateful message (message_id - state_delta) were already
1525 uint64_t max_state_message_id = 0;
1526 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1530 case GNUNET_NO: // no state yet
1537 if (max_state_message_id < message_id - state_delta)
1538 return GNUNET_NO; /* some stateful messages not yet applied */
1539 else if (message_id - state_delta < max_state_message_id)
1540 return GNUNET_NO; /* changes already applied */
1543 if (TRANSACTION_NONE != plugin->transaction)
1545 /** @todo FIXME: wait for other transaction to finish */
1546 return GNUNET_SYSERR;
1548 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1553 * Set the current value of state variable.
1555 * @see GNUNET_PSYCSTORE_state_modify()
1557 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1560 state_modify_op (void *cls,
1561 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1562 enum GNUNET_PSYC_Operator op,
1563 const char *name, const void *value, size_t value_size)
1565 struct Plugin *plugin = cls;
1566 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1570 case GNUNET_PSYC_OP_ASSIGN:
1571 return state_assign (plugin, plugin->insert_state_current, channel_key,
1572 name, value, value_size);
1574 default: /** @todo implement more state operations */
1576 return GNUNET_SYSERR;
1582 * End modifying current state.
1585 state_modify_end (void *cls,
1586 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1587 uint64_t message_id)
1589 struct Plugin *plugin = cls;
1590 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1593 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1594 && GNUNET_OK == update_message_id (plugin,
1595 plugin->update_max_state_message_id,
1596 channel_key, message_id)
1597 && GNUNET_OK == transaction_commit (plugin)
1598 ? GNUNET_OK : GNUNET_SYSERR;
1603 * Begin state synchronization.
1606 state_sync_begin (void *cls,
1607 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1609 struct Plugin *plugin = cls;
1610 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1615 * Assign current value of a state variable.
1617 * @see GNUNET_PSYCSTORE_state_modify()
1619 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1622 state_sync_assign (void *cls,
1623 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1624 const char *name, const void *value, size_t value_size)
1626 struct Plugin *plugin = cls;
1627 return state_assign (cls, plugin->insert_state_sync, channel_key,
1628 name, value, value_size);
1633 * End modifying current state.
1636 state_sync_end (void *cls,
1637 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1638 uint64_t max_state_message_id,
1639 uint64_t state_hash_message_id)
1641 struct Plugin *plugin = cls;
1642 int ret = GNUNET_SYSERR;
1644 if (TRANSACTION_NONE != plugin->transaction)
1646 /** @todo FIXME: wait for other transaction to finish */
1647 return GNUNET_SYSERR;
1650 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1651 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1652 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1654 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1656 && GNUNET_OK == update_message_id (plugin,
1657 plugin->update_state_hash_message_id,
1658 channel_key, state_hash_message_id)
1659 && GNUNET_OK == update_message_id (plugin,
1660 plugin->update_max_state_message_id,
1661 channel_key, max_state_message_id)
1662 && GNUNET_OK == transaction_commit (plugin)
1664 : transaction_rollback (plugin);
1670 * Delete the whole state.
1672 * @see GNUNET_PSYCSTORE_state_reset()
1674 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1677 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1679 struct Plugin *plugin = cls;
1680 return exec_channel (plugin, plugin->delete_state, channel_key);
1685 * Update signed values of state variables in the state store.
1687 * @see GNUNET_PSYCSTORE_state_hash_update()
1689 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1692 state_update_signed (void *cls,
1693 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1695 struct Plugin *plugin = cls;
1696 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1701 * Retrieve a state variable by name.
1703 * @see GNUNET_PSYCSTORE_state_get()
1705 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1708 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1709 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1711 struct Plugin *plugin = cls;
1712 int ret = GNUNET_SYSERR;
1714 sqlite3_stmt *stmt = plugin->select_state_one;
1716 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1717 sizeof (*channel_key),
1719 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1721 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1726 switch (sqlite3_step (stmt))
1732 ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1733 sqlite3_column_bytes (stmt, 0));
1736 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1741 if (SQLITE_OK != sqlite3_reset (stmt))
1743 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1752 * Retrieve all state variables for a channel with the given prefix.
1754 * @see GNUNET_PSYCSTORE_state_get_prefix()
1756 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1759 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1760 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1763 struct Plugin *plugin = cls;
1764 int ret = GNUNET_SYSERR;
1765 sqlite3_stmt *stmt = plugin->select_state_prefix;
1766 size_t name_len = strlen (name);
1768 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1769 sizeof (*channel_key), SQLITE_STATIC)
1770 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1771 || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len)
1772 || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC))
1774 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1782 sql_ret = sqlite3_step (stmt);
1786 if (ret != GNUNET_OK)
1790 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1791 sqlite3_column_blob (stmt, 1),
1792 sqlite3_column_bytes (stmt, 1));
1793 if (ret != GNUNET_YES)
1794 sql_ret = SQLITE_DONE;
1797 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1801 while (sql_ret == SQLITE_ROW);
1803 if (SQLITE_OK != sqlite3_reset (stmt))
1805 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1813 * Retrieve all signed state variables for a channel.
1815 * @see GNUNET_PSYCSTORE_state_get_signed()
1817 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1820 state_get_signed (void *cls,
1821 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1822 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1824 struct Plugin *plugin = cls;
1825 int ret = GNUNET_SYSERR;
1827 sqlite3_stmt *stmt = plugin->select_state_signed;
1829 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1830 sizeof (*channel_key), SQLITE_STATIC))
1832 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1840 sql_ret = sqlite3_step (stmt);
1844 if (ret != GNUNET_OK)
1848 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1849 sqlite3_column_blob (stmt, 1),
1850 sqlite3_column_bytes (stmt, 1));
1851 if (ret != GNUNET_YES)
1852 sql_ret = SQLITE_DONE;
1855 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1859 while (sql_ret == SQLITE_ROW);
1862 if (SQLITE_OK != sqlite3_reset (stmt))
1864 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1873 * Entry point for the plugin.
1875 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1876 * @return NULL on error, otherwise the plugin context
1879 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1881 static struct Plugin plugin;
1882 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1883 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1885 if (NULL != plugin.cfg)
1886 return NULL; /* can only initialize once! */
1887 memset (&plugin, 0, sizeof (struct Plugin));
1889 if (GNUNET_OK != database_setup (&plugin))
1891 database_shutdown (&plugin);
1894 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1896 api->membership_store = &sqlite_membership_store;
1897 api->membership_test = &membership_test;
1898 api->fragment_store = &fragment_store;
1899 api->message_add_flags = &message_add_flags;
1900 api->fragment_get = &fragment_get;
1901 api->fragment_get_latest = &fragment_get_latest;
1902 api->message_get = &message_get;
1903 api->message_get_latest = &message_get_latest;
1904 api->message_get_fragment = &message_get_fragment;
1905 api->counters_message_get = &counters_message_get;
1906 api->counters_state_get = &counters_state_get;
1907 api->state_modify_begin = &state_modify_begin;
1908 api->state_modify_op = &state_modify_op;
1909 api->state_modify_end = &state_modify_end;
1910 api->state_sync_begin = &state_sync_begin;
1911 api->state_sync_assign = &state_sync_assign;
1912 api->state_sync_end = &state_sync_end;
1913 api->state_reset = &state_reset;
1914 api->state_update_signed = &state_update_signed;
1915 api->state_get = &state_get;
1916 api->state_get_prefix = &state_get_prefix;
1917 api->state_get_signed = &state_get_signed;
1919 LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1925 * Exit point from the plugin.
1927 * @param cls The plugin context (as returned by "init")
1928 * @return Always NULL
1931 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1933 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1934 struct Plugin *plugin = api->cls;
1936 database_shutdown (plugin);
1939 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1943 /* end of plugin_psycstore_sqlite.c */