psycstore: add option to perform membership test when retrieving fragment or message
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
39 #include <sqlite3.h>
40
41 /**
42  * After how many ms "busy" should a DB operation fail for good?  A
43  * low value makes sure that we are more responsive to requests
44  * (especially PUTs).  A high value guarantees a higher success rate
45  * (SELECTs in iterate can take several seconds despite LIMIT=1).
46  *
47  * The default value of 1s should ensure that users do not experience
48  * huge latencies while at the same time allowing operations to
49  * succeed with reasonable probability.
50  */
51 #define BUSY_TIMEOUT_MS 1000
52
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54
55 /**
56  * Log an error message at log-level 'level' that indicates
57  * a failure of the command 'cmd' on file 'filename'
58  * with the message given by strerror(errno).
59  */
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
61
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
63
64 enum Transactions {
65   TRANSACTION_NONE = 0,
66   TRANSACTION_STATE_MODIFY
67 };
68
69 /**
70  * Context for all functions in this plugin.
71  */
72 struct Plugin
73 {
74
75   const struct GNUNET_CONFIGURATION_Handle *cfg;
76
77   /**
78    * Database filename.
79    */
80   char *fn;
81
82   /**
83    * Native SQLite database handle.
84    */
85   sqlite3 *dbh;
86
87   /**
88    * Current transaction.
89    */
90   enum Transactions transaction;
91
92   sqlite3_stmt *transaction_begin;
93
94   sqlite3_stmt *transaction_commit;
95
96   sqlite3_stmt *transaction_rollback;
97
98   /**
99    * Precompiled SQL for channel_key_store()
100    */
101   sqlite3_stmt *insert_channel_key;
102
103   /**
104    * Precompiled SQL for slave_key_store()
105    */
106   sqlite3_stmt *insert_slave_key;
107
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   sqlite3_stmt *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   sqlite3_stmt *select_membership;
118
119
120   /**
121    * Precompiled SQL for fragment_store()
122    */
123   sqlite3_stmt *insert_fragment;
124
125   /**
126    * Precompiled SQL for message_add_flags()
127    */
128   sqlite3_stmt *update_message_flags;
129
130   /**
131    * Precompiled SQL for fragment_get()
132    */
133   sqlite3_stmt *select_fragment;
134
135   /**
136    * Precompiled SQL for message_get()
137    */
138   sqlite3_stmt *select_message;
139
140   /**
141    * Precompiled SQL for message_get_fragment()
142    */
143   sqlite3_stmt *select_message_fragment;
144
145   /**
146    * Precompiled SQL for counters_get_message()
147    */
148   sqlite3_stmt *select_counters_message;
149
150   /**
151    * Precompiled SQL for counters_get_state()
152    */
153   sqlite3_stmt *select_counters_state;
154
155   /**
156    * Precompiled SQL for state_modify_end()
157    */
158   sqlite3_stmt *update_state_hash_message_id;
159
160   /**
161    * Precompiled SQL for state_sync_end()
162    */
163   sqlite3_stmt *update_max_state_message_id;
164
165
166   /**
167    * Precompiled SQL for message_modify_begin()
168    */
169   sqlite3_stmt *select_message_state_delta;
170
171   /**
172    * Precompiled SQL for state_modify_set()
173    */
174   sqlite3_stmt *insert_state_current;
175
176   /**
177    * Precompiled SQL for state_modify_end()
178    */
179   sqlite3_stmt *delete_state_empty;
180
181   /**
182    * Precompiled SQL for state_set_signed()
183    */
184   sqlite3_stmt *update_state_signed;
185
186   /**
187    * Precompiled SQL for state_sync()
188    */
189   sqlite3_stmt *insert_state_sync;
190
191   /**
192    * Precompiled SQL for state_sync()
193    */
194   sqlite3_stmt *delete_state;
195
196   /**
197    * Precompiled SQL for state_sync()
198    */
199   sqlite3_stmt *insert_state_from_sync;
200
201   /**
202    * Precompiled SQL for state_sync()
203    */
204   sqlite3_stmt *delete_state_sync;
205
206   /**
207    * Precompiled SQL for state_get_signed()
208    */
209   sqlite3_stmt *select_state_signed;
210
211   /**
212    * Precompiled SQL for state_get()
213    */
214   sqlite3_stmt *select_state_one;
215
216   /**
217    * Precompiled SQL for state_get_prefix()
218    */
219   sqlite3_stmt *select_state_prefix;
220
221 };
222
223 #if DEBUG_PSYCSTORE
224
225 static void
226 sql_trace (void *cls, const char *sql)
227 {
228   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
229 }
230
231 #endif
232
233 /**
234  * @brief Prepare a SQL statement
235  *
236  * @param dbh handle to the database
237  * @param sql SQL statement, UTF-8 encoded
238  * @param stmt set to the prepared statement
239  * @return 0 on success
240  */
241 static int
242 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
243 {
244   char *tail;
245   int result;
246
247   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
248                                (const char **) &tail);
249   LOG (GNUNET_ERROR_TYPE_DEBUG,
250        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
251   if (result != SQLITE_OK)
252     LOG (GNUNET_ERROR_TYPE_ERROR,
253          _("Error preparing SQL query: %s\n  %s\n"),
254          sqlite3_errmsg (dbh), sql);
255   return result;
256 }
257
258
259 /**
260  * @brief Prepare a SQL statement
261  *
262  * @param dbh handle to the database
263  * @param sql SQL statement, UTF-8 encoded
264  * @return 0 on success
265  */
266 static int
267 sql_exec (sqlite3 *dbh, const char *sql)
268 {
269   int result;
270
271   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
272   LOG (GNUNET_ERROR_TYPE_DEBUG,
273        "Executed `%s' / %d\n", sql, result);
274   if (result != SQLITE_OK)
275     LOG (GNUNET_ERROR_TYPE_ERROR,
276          _("Error executing SQL query: %s\n  %s\n"),
277          sqlite3_errmsg (dbh), sql);
278   return result;
279 }
280
281
282 /**
283  * Initialize the database connections and associated
284  * data structures (create tables and indices
285  * as needed as well).
286  *
287  * @param plugin the plugin context (state for this module)
288  * @return GNUNET_OK on success
289  */
290 static int
291 database_setup (struct Plugin *plugin)
292 {
293   char *filename;
294
295   if (GNUNET_OK !=
296       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
297                                                "FILENAME", &filename))
298   {
299     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
300                                "psycstore-sqlite", "FILENAME");
301     return GNUNET_SYSERR;
302   }
303   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
304   {
305     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
306     {
307       GNUNET_break (0);
308       GNUNET_free (filename);
309       return GNUNET_SYSERR;
310     }
311   }
312   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
313   plugin->fn = filename;
314
315   /* Open database and precompile statements */
316   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
317   {
318     LOG (GNUNET_ERROR_TYPE_ERROR,
319          _("Unable to initialize SQLite: %s.\n"),
320          sqlite3_errmsg (plugin->dbh));
321     return GNUNET_SYSERR;
322   }
323
324 #if DEBUG_PSYCSTORE
325   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
326 #endif
327
328   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
329   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
330   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
331   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
332   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
333 #if ! DEBUG_PSYCSTORE
334   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
335 #endif
336   sql_exec (plugin->dbh, "PRAGMA count_changes=OFF");
337   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
338
339   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
340
341   /* Create tables */
342
343   sql_exec (plugin->dbh,
344             "CREATE TABLE IF NOT EXISTS channels (\n"
345             "  id INTEGER PRIMARY KEY,\n"
346             "  pub_key BLOB UNIQUE,\n"
347             "  max_state_message_id INTEGER,\n"
348             "  state_hash_message_id INTEGER\n"
349             ");");
350
351   sql_exec (plugin->dbh,
352             "CREATE TABLE IF NOT EXISTS slaves (\n"
353             "  id INTEGER PRIMARY KEY,\n"
354             "  pub_key BLOB UNIQUE\n"
355             ");");
356
357   sql_exec (plugin->dbh,
358             "CREATE TABLE IF NOT EXISTS membership (\n"
359             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
360             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
361             "  did_join INTEGER NOT NULL,\n"
362             "  announced_at INTEGER NOT NULL,\n"
363             "  effective_since INTEGER NOT NULL,\n"
364             "  group_generation INTEGER NOT NULL\n"
365             ");");
366   sql_exec (plugin->dbh,
367             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
368             "ON membership (channel_id, slave_id);");
369
370   sql_exec (plugin->dbh,
371             "CREATE TABLE IF NOT EXISTS messages (\n"
372             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
373             "  hop_counter INTEGER NOT NULL,\n"
374             "  signature BLOB,\n"
375             "  purpose BLOB,\n"
376             "  fragment_id INTEGER NOT NULL,\n"
377             "  fragment_offset INTEGER NOT NULL,\n"
378             "  message_id INTEGER NOT NULL,\n"
379             "  group_generation INTEGER NOT NULL,\n"
380             "  multicast_flags INTEGER NOT NULL,\n"
381             "  psycstore_flags INTEGER NOT NULL,\n"
382             "  data BLOB,\n"
383             "  PRIMARY KEY (channel_id, fragment_id),\n"
384             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
385             ");");
386
387   sql_exec (plugin->dbh,
388             "CREATE TABLE IF NOT EXISTS state (\n"
389             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
390             "  name TEXT NOT NULL,\n"
391             "  value_current BLOB,\n"
392             "  value_signed BLOB,\n"
393             "  PRIMARY KEY (channel_id, name)\n"
394             ");");
395
396   sql_exec (plugin->dbh,
397             "CREATE TABLE IF NOT EXISTS state_sync (\n"
398             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
399             "  name TEXT NOT NULL,\n"
400             "  value BLOB,\n"
401             "  PRIMARY KEY (channel_id, name)\n"
402             ");");
403
404   /* Prepare statements */
405
406   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
407
408   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
409
410   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
411
412   sql_prepare (plugin->dbh,
413                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
414                &plugin->insert_channel_key);
415
416   sql_prepare (plugin->dbh,
417                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
418                &plugin->insert_slave_key);
419
420   sql_prepare (plugin->dbh,
421                "INSERT INTO membership\n"
422                " (channel_id, slave_id, did_join, announced_at,\n"
423                "  effective_since, group_generation)\n"
424                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
425                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
426                "        ?, ?, ?, ?);",
427                &plugin->insert_membership);
428
429   sql_prepare (plugin->dbh,
430                "SELECT did_join FROM membership\n"
431                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
432                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
433                "      AND effective_since <= ? AND did_join = 1\n"
434                "ORDER BY announced_at DESC LIMIT 1;",
435                &plugin->select_membership);
436
437   sql_prepare (plugin->dbh,
438                "INSERT OR IGNORE INTO messages\n"
439                " (channel_id, hop_counter, signature, purpose,\n"
440                "  fragment_id, fragment_offset, message_id,\n"
441                "  group_generation, multicast_flags, psycstore_flags, data)\n"
442                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
443                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
444                &plugin->insert_fragment);
445
446   sql_prepare (plugin->dbh,
447                "UPDATE messages\n"
448                "SET psycstore_flags = psycstore_flags | ?\n"
449                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
450                "      AND message_id = ? AND fragment_offset = 0;",
451                &plugin->update_message_flags);
452
453   sql_prepare (plugin->dbh,
454                "SELECT hop_counter, signature, purpose, fragment_id,\n"
455                "       fragment_offset, message_id, group_generation,\n"
456                "       multicast_flags, psycstore_flags, data\n"
457                "FROM messages\n"
458                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
459                "      AND fragment_id = ?;",
460                &plugin->select_fragment);
461
462   sql_prepare (plugin->dbh,
463                "SELECT hop_counter, signature, purpose, fragment_id,\n"
464                "       fragment_offset, message_id, group_generation,\n"
465                "       multicast_flags, psycstore_flags, data\n"
466                "FROM messages\n"
467                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
468                "      AND message_id = ? AND fragment_offset = ?;",
469                &plugin->select_message_fragment);
470
471   sql_prepare (plugin->dbh,
472                "SELECT hop_counter, signature, purpose, fragment_id,\n"
473                "       fragment_offset, message_id, group_generation,\n"
474                "       multicast_flags, psycstore_flags, data\n"
475                "FROM messages\n"
476                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477                "      AND message_id = ?;",
478                &plugin->select_message);
479
480   sql_prepare (plugin->dbh,
481                "SELECT fragment_id, message_id, group_generation\n"
482                "FROM messages\n"
483                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
484                "ORDER BY fragment_id DESC LIMIT 1;",
485                &plugin->select_counters_message);
486
487   sql_prepare (plugin->dbh,
488                "SELECT max_state_message_id\n"
489                "FROM channels\n"
490                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
491                &plugin->select_counters_state);
492
493   sql_prepare (plugin->dbh,
494                "UPDATE channels\n"
495                "SET max_state_message_id = ?\n"
496                "WHERE pub_key = ?;",
497                &plugin->update_max_state_message_id);
498
499   sql_prepare (plugin->dbh,
500                "UPDATE channels\n"
501                "SET state_hash_message_id = ?\n"
502                "WHERE pub_key = ?;",
503                &plugin->update_state_hash_message_id);
504
505   sql_prepare (plugin->dbh,
506                "SELECT 1\n"
507                "FROM channels AS c\n"
508                "LEFT JOIN messages AS m\n"
509                "ON c.id = m.channel_id\n"
510                "WHERE c.pub_key = ?\n"
511                "      AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
512                "           OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
513                "LIMIT 1;",
514                &plugin->select_message_state_delta);
515
516   sql_prepare (plugin->dbh,
517                "INSERT OR REPLACE INTO state\n"
518                "  (channel_id, name, value_current, value_signed)\n"
519                "SELECT new.channel_id, new.name,\n"
520                "       new.value_current, old.value_signed\n"
521                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
522                "             AS channel_id,\n"
523                "             ? AS name, ? AS value_current) AS new\n"
524                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
525                "           FROM state) AS old\n"
526                "ON new.channel_id = old.channel_id AND new.name = old.name;",
527                &plugin->insert_state_current);
528
529   sql_prepare (plugin->dbh,
530                "DELETE FROM state\n"
531                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
532                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
533                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
534                &plugin->delete_state_empty);
535
536   sql_prepare (plugin->dbh,
537                "UPDATE state\n"
538                "SET value_signed = value_current\n"
539                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
540                &plugin->update_state_signed);
541
542   sql_prepare (plugin->dbh,
543                "DELETE FROM state\n"
544                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
545                &plugin->delete_state);
546
547   sql_prepare (plugin->dbh,
548                "INSERT INTO state_sync (channel_id, name, value)\n"
549                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
550                &plugin->insert_state_sync);
551
552   sql_prepare (plugin->dbh,
553                "INSERT INTO state\n"
554                " (channel_id, name, value_current, value_signed)\n"
555                "SELECT channel_id, name, value, value\n"
556                "FROM state_sync\n"
557                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
558                &plugin->insert_state_from_sync);
559
560   sql_prepare (plugin->dbh,
561                "DELETE FROM state_sync\n"
562                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
563                &plugin->delete_state_sync);
564
565   sql_prepare (plugin->dbh,
566                "SELECT value_current\n"
567                "FROM state\n"
568                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
569                "      AND name = ?;",
570                &plugin->select_state_one);
571
572   sql_prepare (plugin->dbh,
573                "SELECT name, value_current\n"
574                "FROM state\n"
575                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
576                "      AND (name = ? OR name LIKE ?);",
577                &plugin->select_state_prefix);
578
579   sql_prepare (plugin->dbh,
580                "SELECT name, value_signed\n"
581                "FROM state\n"
582                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
583                "      AND value_signed IS NOT NULL;",
584                &plugin->select_state_signed);
585
586   return GNUNET_OK;
587 }
588
589
590 /**
591  * Shutdown database connection and associate data
592  * structures.
593  * @param plugin the plugin context (state for this module)
594  */
595 static void
596 database_shutdown (struct Plugin *plugin)
597 {
598   int result;
599   sqlite3_stmt *stmt;
600   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
601   {
602     result = sqlite3_finalize (stmt);
603     if (SQLITE_OK != result)
604       LOG (GNUNET_ERROR_TYPE_WARNING,
605            "Failed to close statement %p: %d\n", stmt, result);
606   }
607   if (SQLITE_OK != sqlite3_close (plugin->dbh))
608     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
609
610   GNUNET_free_non_null (plugin->fn);
611 }
612
613 /**
614  * Execute a prepared statement with a @a channel_key argument.
615  *
616  * @param plugin Plugin handle.
617  * @param stmt Statement to execute.
618  * @param channel_key Public key of the channel.
619  *
620  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
621  */
622 static int
623 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
624               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
625 {
626   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
627                                       sizeof (*channel_key), SQLITE_STATIC))
628   {
629     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
630                 "sqlite3_bind");
631   }
632   else if (SQLITE_DONE != sqlite3_step (stmt))
633   {
634     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
635                 "sqlite3_step");
636   }
637
638   if (SQLITE_OK != sqlite3_reset (stmt))
639   {
640     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
641                 "sqlite3_reset");
642     return GNUNET_SYSERR;
643   }
644
645   return GNUNET_OK;
646 }
647
648 /**
649  * Begin a transaction.
650  */
651 static int
652 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
653 {
654   sqlite3_stmt *stmt = plugin->transaction_begin;
655
656   if (SQLITE_DONE != sqlite3_step (stmt))
657   {
658     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
659                 "sqlite3_step");
660   }
661   if (SQLITE_OK != sqlite3_reset (stmt))
662   {
663     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
664                 "sqlite3_reset");
665     return GNUNET_SYSERR;
666   }
667
668   plugin->transaction = transaction;
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Commit current transaction.
675  */
676 static int
677 transaction_commit (struct Plugin *plugin)
678 {
679   sqlite3_stmt *stmt = plugin->transaction_commit;
680
681   if (SQLITE_DONE != sqlite3_step (stmt))
682   {
683     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
684                 "sqlite3_step");
685   }
686   if (SQLITE_OK != sqlite3_reset (stmt))
687   {
688     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
689                 "sqlite3_reset");
690     return GNUNET_SYSERR;
691   }
692
693   plugin->transaction = TRANSACTION_NONE;
694   return GNUNET_OK;
695 }
696
697
698 /**
699  * Roll back current transaction.
700  */
701 static int
702 transaction_rollback (struct Plugin *plugin)
703 {
704   sqlite3_stmt *stmt = plugin->transaction_rollback;
705
706   if (SQLITE_DONE != sqlite3_step (stmt))
707   {
708     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
709                 "sqlite3_step");
710   }
711   if (SQLITE_OK != sqlite3_reset (stmt))
712   {
713     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
714                 "sqlite3_reset");
715     return GNUNET_SYSERR;
716   }
717   plugin->transaction = TRANSACTION_NONE;
718   return GNUNET_OK;
719 }
720
721
722 static int
723 channel_key_store (struct Plugin *plugin,
724                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
725 {
726   sqlite3_stmt *stmt = plugin->insert_channel_key;
727
728   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
729                                       sizeof (*channel_key), SQLITE_STATIC))
730   {
731     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
732                 "sqlite3_bind");
733   }
734   else if (SQLITE_DONE != sqlite3_step (stmt))
735   {
736     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
737                 "sqlite3_step");
738   }
739
740   if (SQLITE_OK != sqlite3_reset (stmt))
741   {
742     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
743                 "sqlite3_reset");
744     return GNUNET_SYSERR;
745   }
746
747   return GNUNET_OK;
748 }
749
750
751 static int
752 slave_key_store (struct Plugin *plugin,
753                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
754 {
755   sqlite3_stmt *stmt = plugin->insert_slave_key;
756
757   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
758                                       sizeof (*slave_key), SQLITE_STATIC))
759   {
760     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
761                 "sqlite3_bind");
762   }
763   else if (SQLITE_DONE != sqlite3_step (stmt))
764   {
765     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
766                 "sqlite3_step");
767   }
768
769
770   if (SQLITE_OK != sqlite3_reset (stmt))
771   {
772     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
773                 "sqlite3_reset");
774     return GNUNET_SYSERR;
775   }
776
777   return GNUNET_OK;
778 }
779
780
781 /**
782  * Store join/leave events for a PSYC channel in order to be able to answer
783  * membership test queries later.
784  *
785  * @see GNUNET_PSYCSTORE_membership_store()
786  *
787  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
788  */
789 static int
790 membership_store (void *cls,
791                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
792                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
793                   int did_join,
794                   uint64_t announced_at,
795                   uint64_t effective_since,
796                   uint64_t group_generation)
797 {
798   struct Plugin *plugin = cls;
799   sqlite3_stmt *stmt = plugin->insert_membership;
800
801   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
802
803   if (announced_at > INT64_MAX ||
804       effective_since > INT64_MAX ||
805       group_generation > INT64_MAX)
806   {
807     GNUNET_break (0);
808     return GNUNET_SYSERR;
809   }
810
811   if (GNUNET_OK != channel_key_store (plugin, channel_key)
812       || GNUNET_OK != slave_key_store (plugin, slave_key))
813     return GNUNET_SYSERR;
814
815   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
816                                       sizeof (*channel_key), SQLITE_STATIC)
817       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
818                                          sizeof (*slave_key), SQLITE_STATIC)
819       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
820       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
821       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
822       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
823   {
824     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
825                 "sqlite3_bind");
826   }
827   else if (SQLITE_DONE != sqlite3_step (stmt))
828   {
829     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
830                 "sqlite3_step");
831   }
832
833   if (SQLITE_OK != sqlite3_reset (stmt))
834   {
835     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
836                 "sqlite3_reset");
837     return GNUNET_SYSERR;
838   }
839
840   return GNUNET_OK;
841 }
842
843 /**
844  * Test if a member was admitted to the channel at the given message ID.
845  *
846  * @see GNUNET_PSYCSTORE_membership_test()
847  *
848  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
849  *         #GNUNET_SYSERR if there was en error.
850  */
851 static int
852 membership_test (void *cls,
853                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
854                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
855                  uint64_t message_id)
856 {
857   struct Plugin *plugin = cls;
858   sqlite3_stmt *stmt = plugin->select_membership;
859   int ret = GNUNET_SYSERR;
860
861   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
862                                       sizeof (*channel_key), SQLITE_STATIC)
863       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
864                                          sizeof (*slave_key), SQLITE_STATIC)
865       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
866   {
867     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
868                 "sqlite3_bind");
869   }
870   else
871   {
872     switch (sqlite3_step (stmt))
873     {
874     case SQLITE_DONE:
875       ret = GNUNET_NO;
876       break;
877     case SQLITE_ROW:
878       ret = GNUNET_YES;
879     }
880   }
881
882   if (SQLITE_OK != sqlite3_reset (stmt))
883   {
884     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
885                 "sqlite3_reset");
886   }
887
888   return ret;
889 }
890
891 /**
892  * Store a message fragment sent to a channel.
893  *
894  * @see GNUNET_PSYCSTORE_fragment_store()
895  *
896  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
897  */
898 static int
899 fragment_store (void *cls,
900                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
901                 const struct GNUNET_MULTICAST_MessageHeader *msg,
902                 uint32_t psycstore_flags)
903 {
904   struct Plugin *plugin = cls;
905   sqlite3_stmt *stmt = plugin->insert_fragment;
906
907   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
908
909   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
910   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
911   uint64_t message_id = GNUNET_ntohll (msg->message_id);
912   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
913
914   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
915       message_id > INT64_MAX || group_generation > INT64_MAX)
916   {
917     LOG (GNUNET_ERROR_TYPE_ERROR,
918          "Tried to store fragment with a field > INT64_MAX: "
919          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
920          message_id, group_generation);
921     GNUNET_break (0);
922     return GNUNET_SYSERR;
923   }
924
925   if (GNUNET_OK != channel_key_store (plugin, channel_key))
926     return GNUNET_SYSERR;
927
928   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
929                                       sizeof (*channel_key), SQLITE_STATIC)
930       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
931       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
932                                          sizeof (msg->signature), SQLITE_STATIC)
933       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
934                                          sizeof (msg->purpose), SQLITE_STATIC)
935       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
936       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
937       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
938       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
939       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
940       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
941       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
942                                          ntohs (msg->header.size)
943                                          - sizeof (*msg), SQLITE_STATIC))
944   {
945     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
946                 "sqlite3_bind");
947   }
948   else if (SQLITE_DONE != sqlite3_step (stmt))
949   {
950     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
951                 "sqlite3_step");
952   }
953
954   if (SQLITE_OK != sqlite3_reset (stmt))
955   {
956     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
957                 "sqlite3_reset");
958     return GNUNET_SYSERR;
959   }
960
961   return GNUNET_OK;
962 }
963
964 /**
965  * Set additional flags for a given message.
966  *
967  * They are OR'd with any existing flags set.
968  *
969  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
970  */
971 static int
972 message_add_flags (void *cls,
973                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
974                    uint64_t message_id,
975                    uint64_t psycstore_flags)
976 {
977   struct Plugin *plugin = cls;
978   sqlite3_stmt *stmt = plugin->update_message_flags;
979   int ret = GNUNET_SYSERR;
980
981   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
982       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
983                                          sizeof (*channel_key), SQLITE_STATIC)
984       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
985   {
986     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
987                 "sqlite3_bind");
988   }
989   else
990   {
991     switch (sqlite3_step (stmt))
992     {
993     case SQLITE_DONE:
994       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
995       break;
996     default:
997       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
998                   "sqlite3_step");
999     }
1000   }
1001
1002   if (SQLITE_OK != sqlite3_reset (stmt))
1003   {
1004     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1005                 "sqlite3_reset");
1006     return GNUNET_SYSERR;
1007   }
1008
1009   return ret;
1010 }
1011
1012 static int
1013 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1014               void *cb_cls)
1015 {
1016   int data_size = sqlite3_column_bytes (stmt, 9);
1017   struct GNUNET_MULTICAST_MessageHeader *msg
1018     = GNUNET_malloc (sizeof (*msg) + data_size);
1019
1020   msg->header.size = htons (sizeof (*msg) + data_size);
1021   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1022   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1023   memcpy (&msg->signature,
1024           sqlite3_column_blob (stmt, 1),
1025           sqlite3_column_bytes (stmt, 1));
1026   memcpy (&msg->purpose,
1027           sqlite3_column_blob (stmt, 2),
1028           sqlite3_column_bytes (stmt, 2));
1029   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1030   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1031   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1032   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1033   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1034   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1035
1036   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1037 }
1038
1039 /**
1040  * Retrieve a message fragment by fragment ID.
1041  *
1042  * @see GNUNET_PSYCSTORE_fragment_get()
1043  *
1044  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1045  */
1046 static int
1047 fragment_get (void *cls,
1048               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1049               uint64_t fragment_id,
1050               GNUNET_PSYCSTORE_FragmentCallback cb,
1051               void *cb_cls)
1052 {
1053   struct Plugin *plugin = cls;
1054   sqlite3_stmt *stmt = plugin->select_fragment;
1055   int ret = GNUNET_SYSERR;
1056
1057   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1058                                       sizeof (*channel_key),
1059                                       SQLITE_STATIC)
1060       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id))
1061   {
1062     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1063                 "sqlite3_bind");
1064   }
1065   else
1066   {
1067     switch (sqlite3_step (stmt))
1068     {
1069     case SQLITE_DONE:
1070       ret = GNUNET_NO;
1071       break;
1072     case SQLITE_ROW:
1073       ret = fragment_row (stmt, cb, cb_cls);
1074       break;
1075     default:
1076       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1077                   "sqlite3_step");
1078     }
1079   }
1080
1081   if (SQLITE_OK != sqlite3_reset (stmt))
1082   {
1083     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1084                 "sqlite3_reset");
1085   }
1086
1087   return ret;
1088 }
1089
1090 /**
1091  * Retrieve all fragments of a message.
1092  *
1093  * @see GNUNET_PSYCSTORE_message_get()
1094  *
1095  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1096  */
1097 static int
1098 message_get (void *cls,
1099              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1100              uint64_t message_id,
1101              uint64_t *returned_fragments,
1102              GNUNET_PSYCSTORE_FragmentCallback cb,
1103              void *cb_cls)
1104 {
1105   struct Plugin *plugin = cls;
1106   sqlite3_stmt *stmt = plugin->select_message;
1107   int ret = GNUNET_SYSERR;
1108   *returned_fragments = 0;
1109
1110   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1111                                       sizeof (*channel_key),
1112                                       SQLITE_STATIC)
1113       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id))
1114   {
1115     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1116                 "sqlite3_bind");
1117   }
1118   else
1119   {
1120     int sql_ret;
1121     do
1122     {
1123       sql_ret = sqlite3_step (stmt);
1124       switch (sql_ret)
1125       {
1126       case SQLITE_DONE:
1127         if (ret != GNUNET_OK)
1128           ret = GNUNET_NO;
1129         break;
1130       case SQLITE_ROW:
1131         ret = fragment_row (stmt, cb, cb_cls);
1132         (*returned_fragments)++;
1133         if (ret != GNUNET_YES)
1134           sql_ret = SQLITE_DONE;
1135         break;
1136       default:
1137         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1138                     "sqlite3_step");
1139       }
1140     }
1141     while (sql_ret == SQLITE_ROW);
1142   }
1143
1144   if (SQLITE_OK != sqlite3_reset (stmt))
1145   {
1146     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1147                 "sqlite3_reset");
1148   }
1149
1150   return ret;
1151 }
1152
1153 /**
1154  * Retrieve a fragment of message specified by its message ID and fragment
1155  * offset.
1156  *
1157  * @see GNUNET_PSYCSTORE_message_get_fragment()
1158  *
1159  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1160  */
1161 static int
1162 message_get_fragment (void *cls,
1163                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1164                       uint64_t message_id,
1165                       uint64_t fragment_offset,
1166                       GNUNET_PSYCSTORE_FragmentCallback cb,
1167                       void *cb_cls)
1168 {
1169   struct Plugin *plugin = cls;
1170   sqlite3_stmt *stmt = plugin->select_message_fragment;
1171   int ret = GNUNET_SYSERR;
1172
1173   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1174                                       sizeof (*channel_key),
1175                                       SQLITE_STATIC)
1176       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1177       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1178   {
1179     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1180                 "sqlite3_bind");
1181   }
1182   else
1183   {
1184     switch (sqlite3_step (stmt))
1185     {
1186     case SQLITE_DONE:
1187       ret = GNUNET_NO;
1188       break;
1189     case SQLITE_ROW:
1190       ret = fragment_row (stmt, cb, cb_cls);
1191       break;
1192     default:
1193       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1194                   "sqlite3_step");
1195     }
1196   }
1197
1198   if (SQLITE_OK != sqlite3_reset (stmt))
1199   {
1200     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1201                 "sqlite3_reset");
1202   }
1203
1204   return ret;
1205 }
1206
1207 /**
1208  * Retrieve the max. values of message counters for a channel.
1209  *
1210  * @see GNUNET_PSYCSTORE_counters_get()
1211  *
1212  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1213  */
1214 static int
1215 counters_message_get (void *cls,
1216                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1217                       uint64_t *max_fragment_id,
1218                       uint64_t *max_message_id,
1219                       uint64_t *max_group_generation)
1220 {
1221   struct Plugin *plugin = cls;
1222   sqlite3_stmt *stmt = plugin->select_counters_message;
1223   int ret = GNUNET_SYSERR;
1224
1225   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1226                                       sizeof (*channel_key),
1227                                       SQLITE_STATIC))
1228   {
1229     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1230                 "sqlite3_bind");
1231   }
1232   else
1233   {
1234     switch (sqlite3_step (stmt))
1235     {
1236     case SQLITE_DONE:
1237       ret = GNUNET_NO;
1238       break;
1239     case SQLITE_ROW:
1240       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1241       *max_message_id = sqlite3_column_int64 (stmt, 1);
1242       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1243       ret = GNUNET_OK;
1244       break;
1245     default:
1246       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1247                   "sqlite3_step");
1248     }
1249   }
1250
1251   if (SQLITE_OK != sqlite3_reset (stmt))
1252   {
1253     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1254                 "sqlite3_reset");
1255   }
1256
1257   return ret;
1258 }
1259
1260 /**
1261  * Retrieve the max. values of state counters for a channel.
1262  *
1263  * @see GNUNET_PSYCSTORE_counters_get()
1264  *
1265  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1266  */
1267 static int
1268 counters_state_get (void *cls,
1269                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1270                     uint64_t *max_state_message_id)
1271 {
1272   struct Plugin *plugin = cls;
1273   sqlite3_stmt *stmt = plugin->select_counters_state;
1274   int ret = GNUNET_SYSERR;
1275
1276   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1277                                       sizeof (*channel_key),
1278                                       SQLITE_STATIC))
1279   {
1280     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1281                 "sqlite3_bind");
1282   }
1283   else
1284   {
1285     switch (sqlite3_step (stmt))
1286     {
1287     case SQLITE_DONE:
1288       ret = GNUNET_NO;
1289       break;
1290     case SQLITE_ROW:
1291       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1292       ret = GNUNET_OK;
1293       break;
1294     default:
1295       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1296                   "sqlite3_step");
1297     }
1298   }
1299
1300   if (SQLITE_OK != sqlite3_reset (stmt))
1301   {
1302     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1303                 "sqlite3_reset");
1304   }
1305
1306   return ret;
1307 }
1308
1309
1310 /**
1311  * Set a state variable to the given value.
1312  *
1313  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1314  */
1315 static int
1316 state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1317            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1318            const char *name, const void *value, size_t value_size)
1319 {
1320   int ret = GNUNET_SYSERR;
1321
1322   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1323                                       sizeof (*channel_key), SQLITE_STATIC)
1324       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1325       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1326                                          SQLITE_STATIC))
1327   {
1328     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1329                 "sqlite3_bind");
1330   }
1331   else
1332   {
1333     switch (sqlite3_step (stmt))
1334     {
1335     case SQLITE_DONE:
1336       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1337       break;
1338     default:
1339       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1340                   "sqlite3_step");
1341     }
1342   }
1343
1344   if (SQLITE_OK != sqlite3_reset (stmt))
1345   {
1346     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1347                 "sqlite3_reset");
1348     return GNUNET_SYSERR;
1349   }
1350
1351   return ret;
1352 }
1353
1354
1355 static int
1356 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1357                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1358                    uint64_t message_id)
1359 {
1360   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1361       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1362                                          sizeof (*channel_key), SQLITE_STATIC))
1363   {
1364     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1365                 "sqlite3_bind");
1366   }
1367   else if (SQLITE_DONE != sqlite3_step (stmt))
1368   {
1369     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1370                 "sqlite3_step");
1371   }
1372   if (SQLITE_OK != sqlite3_reset (stmt))
1373   {
1374     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1375                 "sqlite3_reset");
1376     return GNUNET_SYSERR;
1377   }
1378   return GNUNET_OK;
1379 }
1380
1381
1382 /**
1383  * Begin modifying current state.
1384  */
1385 static int
1386 state_modify_begin (void *cls,
1387                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1388                     uint64_t message_id, uint64_t state_delta)
1389 {
1390   struct Plugin *plugin = cls;
1391   sqlite3_stmt *stmt = plugin->select_message_state_delta;
1392
1393   if (state_delta > 0)
1394   {
1395     int ret = GNUNET_SYSERR;
1396     if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1397                                         sizeof (*channel_key), SQLITE_STATIC)
1398         || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1399                                             message_id - state_delta)
1400         || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1401                                             message_id)
1402         || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1403                                             message_id - state_delta)
1404         || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1405                                             GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1406     {
1407       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1408                   "sqlite3_bind");
1409     }
1410     else
1411     {
1412       switch (sqlite3_step (stmt))
1413       {
1414       case SQLITE_DONE:
1415         ret = GNUNET_NO;
1416         break;
1417       case SQLITE_ROW:
1418         ret = GNUNET_OK;
1419         break;
1420       default:
1421         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422                     "sqlite3_step");
1423       }
1424     }
1425     if (SQLITE_OK != sqlite3_reset (stmt))
1426     {
1427       ret = GNUNET_SYSERR;
1428       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1429                   "sqlite3_reset");
1430      }
1431     if (GNUNET_OK != ret)
1432       return ret;
1433   }
1434
1435   if (TRANSACTION_NONE != plugin->transaction)
1436       if (GNUNET_OK != transaction_rollback (plugin))
1437           return GNUNET_SYSERR;
1438
1439   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1440 }
1441
1442
1443 /**
1444  * Set the current value of state variable.
1445  *
1446  * @see GNUNET_PSYCSTORE_state_modify()
1447  *
1448  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1449  */
1450 static int
1451 state_modify_set (void *cls,
1452                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1453                   const char *name, const void *value, size_t value_size)
1454 {
1455   struct Plugin *plugin = cls;
1456   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1457
1458   return state_set (plugin, plugin->insert_state_current, channel_key,
1459                     name, value, value_size);
1460
1461 }
1462
1463
1464 /**
1465  * End modifying current state.
1466  */
1467 static int
1468 state_modify_end (void *cls,
1469                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1470                   uint64_t message_id)
1471 {
1472   struct Plugin *plugin = cls;
1473   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1474
1475   return
1476     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1477     && GNUNET_OK == update_message_id (plugin,
1478                                        plugin->update_max_state_message_id,
1479                                        channel_key, message_id)
1480     && GNUNET_OK == transaction_commit (plugin)
1481     ? GNUNET_OK : GNUNET_SYSERR;
1482 }
1483
1484
1485 /**
1486  * Begin state synchronization.
1487  */
1488 static int
1489 state_sync_begin (void *cls,
1490                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1491 {
1492   struct Plugin *plugin = cls;
1493   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1494 }
1495
1496
1497 /**
1498  * Set the current value of state variable.
1499  *
1500  * @see GNUNET_PSYCSTORE_state_modify()
1501  *
1502  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1503  */
1504 static int
1505 state_sync_set (void *cls,
1506                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1507                 const char *name, const void *value, size_t value_size)
1508 {
1509   struct Plugin *plugin = cls;
1510   return state_set (cls, plugin->insert_state_sync, channel_key,
1511                     name, value, value_size);
1512 }
1513
1514
1515 /**
1516  * End modifying current state.
1517  */
1518 static int
1519 state_sync_end (void *cls,
1520                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1521                 uint64_t message_id)
1522 {
1523   struct Plugin *plugin = cls;
1524   int ret = GNUNET_SYSERR;
1525
1526   GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1527     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1528     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1529                                   channel_key)
1530     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1531                                   channel_key)
1532     && GNUNET_OK == update_message_id (plugin,
1533                                        plugin->update_state_hash_message_id,
1534                                        channel_key, message_id)
1535     && GNUNET_OK == transaction_commit (plugin)
1536     ? ret = GNUNET_OK
1537     : transaction_rollback (plugin);
1538   return ret;
1539 }
1540
1541
1542 /**
1543  * Reset the state of a channel.
1544  *
1545  * @see GNUNET_PSYCSTORE_state_reset()
1546  *
1547  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1548  */
1549 static int
1550 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1551 {
1552   struct Plugin *plugin = cls;
1553   return exec_channel (plugin, plugin->delete_state, channel_key);
1554 }
1555
1556
1557 /**
1558  * Update signed values of state variables in the state store.
1559  *
1560  * @see GNUNET_PSYCSTORE_state_hash_update()
1561  *
1562  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1563  */
1564 static int
1565 state_update_signed (void *cls,
1566                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1567 {
1568   struct Plugin *plugin = cls;
1569   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1570 }
1571
1572
1573 /**
1574  * Retrieve a state variable by name.
1575  *
1576  * @see GNUNET_PSYCSTORE_state_get()
1577  *
1578  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1579  */
1580 static int
1581 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1582            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1583 {
1584   struct Plugin *plugin = cls;
1585   int ret = GNUNET_SYSERR;
1586
1587   sqlite3_stmt *stmt = plugin->select_state_one;
1588
1589   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1590                                       sizeof (*channel_key),
1591                                       SQLITE_STATIC)
1592       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1593   {
1594     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1595                 "sqlite3_bind");
1596   }
1597   else
1598   {
1599     switch (sqlite3_step (stmt))
1600     {
1601     case SQLITE_DONE:
1602       ret = GNUNET_NO;
1603       break;
1604     case SQLITE_ROW:
1605       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1606                 sqlite3_column_bytes (stmt, 0));
1607       break;
1608     default:
1609       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1610                   "sqlite3_step");
1611     }
1612   }
1613
1614   if (SQLITE_OK != sqlite3_reset (stmt))
1615   {
1616     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1617                 "sqlite3_reset");
1618   }
1619
1620   return ret;
1621 }
1622
1623
1624 /**
1625  * Retrieve all state variables for a channel with the given prefix.
1626  *
1627  * @see GNUNET_PSYCSTORE_state_get_prefix()
1628  *
1629  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1630  */
1631 static int
1632 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1633                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1634                   void *cb_cls)
1635 {
1636   struct Plugin *plugin = cls;
1637   int ret = GNUNET_SYSERR;
1638
1639   sqlite3_stmt *stmt = plugin->select_state_prefix;
1640   size_t name_len = strlen (name);
1641   char *name_prefix = GNUNET_malloc (name_len + 2);
1642   memcpy (name_prefix, name, name_len);
1643   memcpy (name_prefix + name_len, "_%", 2);
1644
1645   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1646                                       sizeof (*channel_key), SQLITE_STATIC)
1647       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1648       || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1649                                          SQLITE_STATIC))
1650   {
1651     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1652                 "sqlite3_bind");
1653   }
1654   else
1655   {
1656     int sql_ret;
1657     do
1658     {
1659       sql_ret = sqlite3_step (stmt);
1660       switch (sql_ret)
1661       {
1662       case SQLITE_DONE:
1663         if (ret != GNUNET_OK)
1664           ret = GNUNET_NO;
1665         break;
1666       case SQLITE_ROW:
1667         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1668                   sqlite3_column_blob (stmt, 1),
1669                   sqlite3_column_bytes (stmt, 1));
1670         if (ret != GNUNET_YES)
1671           sql_ret = SQLITE_DONE;
1672         break;
1673       default:
1674         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1675                     "sqlite3_step");
1676       }
1677     }
1678     while (sql_ret == SQLITE_ROW);
1679   }
1680
1681   if (SQLITE_OK != sqlite3_reset (stmt))
1682   {
1683     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1684                 "sqlite3_reset");
1685   }
1686
1687   return ret;
1688 }
1689
1690
1691 /**
1692  * Retrieve all signed state variables for a channel.
1693  *
1694  * @see GNUNET_PSYCSTORE_state_get_signed()
1695  *
1696  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1697  */
1698 static int
1699 state_get_signed (void *cls,
1700                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1701                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1702 {
1703   struct Plugin *plugin = cls;
1704   int ret = GNUNET_SYSERR;
1705
1706   sqlite3_stmt *stmt = plugin->select_state_signed;
1707
1708   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1709                                       sizeof (*channel_key), SQLITE_STATIC))
1710   {
1711     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1712                 "sqlite3_bind");
1713   }
1714   else
1715   {
1716     int sql_ret;
1717     do
1718     {
1719       sql_ret = sqlite3_step (stmt);
1720       switch (sql_ret)
1721       {
1722       case SQLITE_DONE:
1723         if (ret != GNUNET_OK)
1724           ret = GNUNET_NO;
1725         break;
1726       case SQLITE_ROW:
1727         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1728                   sqlite3_column_blob (stmt, 1),
1729                   sqlite3_column_bytes (stmt, 1));
1730         if (ret != GNUNET_YES)
1731           sql_ret = SQLITE_DONE;
1732         break;
1733       default:
1734         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1735                     "sqlite3_step");
1736       }
1737     }
1738     while (sql_ret == SQLITE_ROW);
1739   }
1740
1741   if (SQLITE_OK != sqlite3_reset (stmt))
1742   {
1743     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1744                 "sqlite3_reset");
1745   }
1746
1747   return ret;
1748 }
1749
1750
1751 /**
1752  * Entry point for the plugin.
1753  *
1754  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1755  * @return NULL on error, otherwise the plugin context
1756  */
1757 void *
1758 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1759 {
1760   static struct Plugin plugin;
1761   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1762   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1763
1764   if (NULL != plugin.cfg)
1765     return NULL;                /* can only initialize once! */
1766   memset (&plugin, 0, sizeof (struct Plugin));
1767   plugin.cfg = cfg;
1768   if (GNUNET_OK != database_setup (&plugin))
1769   {
1770     database_shutdown (&plugin);
1771     return NULL;
1772   }
1773   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1774   api->cls = &plugin;
1775   api->membership_store = &membership_store;
1776   api->membership_test = &membership_test;
1777   api->fragment_store = &fragment_store;
1778   api->message_add_flags = &message_add_flags;
1779   api->fragment_get = &fragment_get;
1780   api->message_get = &message_get;
1781   api->message_get_fragment = &message_get_fragment;
1782   api->counters_message_get = &counters_message_get;
1783   api->counters_state_get = &counters_state_get;
1784   api->state_modify_begin = &state_modify_begin;
1785   api->state_modify_set = &state_modify_set;
1786   api->state_modify_end = &state_modify_end;
1787   api->state_sync_begin = &state_sync_begin;
1788   api->state_sync_set = &state_sync_set;
1789   api->state_sync_end = &state_sync_end;
1790   api->state_reset = &state_reset;
1791   api->state_update_signed = &state_update_signed;
1792   api->state_get = &state_get;
1793   api->state_get_prefix = &state_get_prefix;
1794   api->state_get_signed = &state_get_signed;
1795
1796   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1797   return api;
1798 }
1799
1800
1801 /**
1802  * Exit point from the plugin.
1803  *
1804  * @param cls The plugin context (as returned by "init")
1805  * @return Always NULL
1806  */
1807 void *
1808 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1809 {
1810   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1811   struct Plugin *plugin = api->cls;
1812
1813   database_shutdown (plugin);
1814   plugin->cfg = NULL;
1815   GNUNET_free (api);
1816   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1817   return NULL;
1818 }
1819
1820 /* end of plugin_psycstore_sqlite.c */