paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 /**
20  * @file psycstore/plugin_psycstore_sqlite.c
21  * @brief sqlite-based psycstore backend
22  * @author Gabor X Toth
23  * @author Christian Grothoff
24  */
25
26 /*
27  * FIXME: SQLite3 only supports signed 64-bit integers natively,
28  *        thus it can only store 63 bits of the uint64_t's.
29  */
30
31 #include "platform.h"
32 #include "gnunet_psycstore_plugin.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_crypto_lib.h"
36 #include "gnunet_psyc_util_lib.h"
37 #include "psycstore.h"
38 #include <sqlite3.h>
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 /**
55  * Log an error message at log-level 'level' that indicates
56  * a failure of the command 'cmd' on file 'filename'
57  * with the message given by strerror(errno).
58  */
59 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
60
61 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
62
63 enum Transactions {
64   TRANSACTION_NONE = 0,
65   TRANSACTION_STATE_MODIFY,
66   TRANSACTION_STATE_SYNC,
67 };
68
69 /**
70  * Context for all functions in this plugin.
71  */
72 struct Plugin
73 {
74
75   const struct GNUNET_CONFIGURATION_Handle *cfg;
76
77   /**
78    * Database filename.
79    */
80   char *fn;
81
82   /**
83    * Native SQLite database handle.
84    */
85   sqlite3 *dbh;
86
87   /**
88    * Current transaction.
89    */
90   enum Transactions transaction;
91
92   sqlite3_stmt *transaction_begin;
93
94   sqlite3_stmt *transaction_commit;
95
96   sqlite3_stmt *transaction_rollback;
97
98   /**
99    * Precompiled SQL for channel_key_store()
100    */
101   sqlite3_stmt *insert_channel_key;
102
103   /**
104    * Precompiled SQL for slave_key_store()
105    */
106   sqlite3_stmt *insert_slave_key;
107
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   sqlite3_stmt *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   sqlite3_stmt *select_membership;
118
119
120   /**
121    * Precompiled SQL for fragment_store()
122    */
123   sqlite3_stmt *insert_fragment;
124
125   /**
126    * Precompiled SQL for message_add_flags()
127    */
128   sqlite3_stmt *update_message_flags;
129
130   /**
131    * Precompiled SQL for fragment_get()
132    */
133   sqlite3_stmt *select_fragments;
134
135   /**
136    * Precompiled SQL for fragment_get()
137    */
138   sqlite3_stmt *select_latest_fragments;
139
140   /**
141    * Precompiled SQL for message_get()
142    */
143   sqlite3_stmt *select_messages;
144
145   /**
146    * Precompiled SQL for message_get()
147    */
148   sqlite3_stmt *select_latest_messages;
149
150   /**
151    * Precompiled SQL for message_get_fragment()
152    */
153   sqlite3_stmt *select_message_fragment;
154
155   /**
156    * Precompiled SQL for counters_get_message()
157    */
158   sqlite3_stmt *select_counters_message;
159
160   /**
161    * Precompiled SQL for counters_get_state()
162    */
163   sqlite3_stmt *select_counters_state;
164
165   /**
166    * Precompiled SQL for state_modify_end()
167    */
168   sqlite3_stmt *update_state_hash_message_id;
169
170   /**
171    * Precompiled SQL for state_sync_end()
172    */
173   sqlite3_stmt *update_max_state_message_id;
174
175   /**
176    * Precompiled SQL for state_modify_op()
177    */
178   sqlite3_stmt *insert_state_current;
179
180   /**
181    * Precompiled SQL for state_modify_end()
182    */
183   sqlite3_stmt *delete_state_empty;
184
185   /**
186    * Precompiled SQL for state_set_signed()
187    */
188   sqlite3_stmt *update_state_signed;
189
190   /**
191    * Precompiled SQL for state_sync()
192    */
193   sqlite3_stmt *insert_state_sync;
194
195   /**
196    * Precompiled SQL for state_sync()
197    */
198   sqlite3_stmt *delete_state;
199
200   /**
201    * Precompiled SQL for state_sync()
202    */
203   sqlite3_stmt *insert_state_from_sync;
204
205   /**
206    * Precompiled SQL for state_sync()
207    */
208   sqlite3_stmt *delete_state_sync;
209
210   /**
211    * Precompiled SQL for state_get_signed()
212    */
213   sqlite3_stmt *select_state_signed;
214
215   /**
216    * Precompiled SQL for state_get()
217    */
218   sqlite3_stmt *select_state_one;
219
220   /**
221    * Precompiled SQL for state_get_prefix()
222    */
223   sqlite3_stmt *select_state_prefix;
224
225 };
226
227 #if DEBUG_PSYCSTORE
228
229 static void
230 sql_trace (void *cls, const char *sql)
231 {
232   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
233 }
234
235 #endif
236
237 /**
238  * @brief Prepare a SQL statement
239  *
240  * @param dbh handle to the database
241  * @param sql SQL statement, UTF-8 encoded
242  * @param stmt set to the prepared statement
243  * @return 0 on success
244  */
245 static int
246 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
247 {
248   char *tail;
249   int result;
250
251   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
252                                (const char **) &tail);
253   LOG (GNUNET_ERROR_TYPE_DEBUG,
254        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
255   if (result != SQLITE_OK)
256     LOG (GNUNET_ERROR_TYPE_ERROR,
257          _("Error preparing SQL query: %s\n  %s\n"),
258          sqlite3_errmsg (dbh), sql);
259   return result;
260 }
261
262
263 /**
264  * @brief Prepare a SQL statement
265  *
266  * @param dbh handle to the database
267  * @param sql SQL statement, UTF-8 encoded
268  * @return 0 on success
269  */
270 static int
271 sql_exec (sqlite3 *dbh, const char *sql)
272 {
273   int result;
274
275   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
276   LOG (GNUNET_ERROR_TYPE_DEBUG,
277        "Executed `%s' / %d\n", sql, result);
278   if (result != SQLITE_OK)
279     LOG (GNUNET_ERROR_TYPE_ERROR,
280          _("Error executing SQL query: %s\n  %s\n"),
281          sqlite3_errmsg (dbh), sql);
282   return result;
283 }
284
285
286 /**
287  * Initialize the database connections and associated
288  * data structures (create tables and indices
289  * as needed as well).
290  *
291  * @param plugin the plugin context (state for this module)
292  * @return GNUNET_OK on success
293  */
294 static int
295 database_setup (struct Plugin *plugin)
296 {
297   char *filename;
298
299   if (GNUNET_OK !=
300       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
301                                                "FILENAME", &filename))
302   {
303     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
304                                "psycstore-sqlite", "FILENAME");
305     return GNUNET_SYSERR;
306   }
307   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
308   {
309     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
310     {
311       GNUNET_break (0);
312       GNUNET_free (filename);
313       return GNUNET_SYSERR;
314     }
315   }
316   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
317   plugin->fn = filename;
318
319   /* Open database and precompile statements */
320   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
321   {
322     LOG (GNUNET_ERROR_TYPE_ERROR,
323          _("Unable to initialize SQLite: %s.\n"),
324          sqlite3_errmsg (plugin->dbh));
325     return GNUNET_SYSERR;
326   }
327
328 #if DEBUG_PSYCSTORE
329   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
330 #endif
331
332   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
333   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
334   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
335   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
336   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
337 #if ! DEBUG_PSYCSTORE
338   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
339 #endif
340   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
341
342   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
343
344   /* Create tables */
345
346   sql_exec (plugin->dbh,
347             "CREATE TABLE IF NOT EXISTS channels (\n"
348             "  id INTEGER PRIMARY KEY,\n"
349             "  pub_key BLOB(32) UNIQUE,\n"
350             "  max_state_message_id INTEGER,\n" // last applied state message ID
351             "  state_hash_message_id INTEGER\n" // last message ID with a state hash
352             ");");
353
354   sql_exec (plugin->dbh,
355             "CREATE TABLE IF NOT EXISTS slaves (\n"
356             "  id INTEGER PRIMARY KEY,\n"
357             "  pub_key BLOB(32) UNIQUE\n"
358             ");");
359
360   sql_exec (plugin->dbh,
361             "CREATE TABLE IF NOT EXISTS membership (\n"
362             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
363             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
364             "  did_join INTEGER NOT NULL,\n"
365             "  announced_at INTEGER NOT NULL,\n"
366             "  effective_since INTEGER NOT NULL,\n"
367             "  group_generation INTEGER NOT NULL\n"
368             ");");
369   sql_exec (plugin->dbh,
370             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
371             "ON membership (channel_id, slave_id);");
372
373   /** @todo messages table: add method_name column */
374   sql_exec (plugin->dbh,
375             "CREATE TABLE IF NOT EXISTS messages (\n"
376             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
377             "  hop_counter INTEGER NOT NULL,\n"
378             "  signature BLOB,\n"
379             "  purpose BLOB,\n"
380             "  fragment_id INTEGER NOT NULL,\n"
381             "  fragment_offset INTEGER NOT NULL,\n"
382             "  message_id INTEGER NOT NULL,\n"
383             "  group_generation INTEGER NOT NULL,\n"
384             "  multicast_flags INTEGER NOT NULL,\n"
385             "  psycstore_flags INTEGER NOT NULL,\n"
386             "  data BLOB,\n"
387             "  PRIMARY KEY (channel_id, fragment_id),\n"
388             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
389             ");");
390
391   sql_exec (plugin->dbh,
392             "CREATE TABLE IF NOT EXISTS state (\n"
393             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
394             "  name TEXT NOT NULL,\n"
395             "  value_current BLOB,\n"
396             "  value_signed BLOB,\n"
397             "  PRIMARY KEY (channel_id, name)\n"
398             ");");
399
400   sql_exec (plugin->dbh,
401             "CREATE TABLE IF NOT EXISTS state_sync (\n"
402             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
403             "  name TEXT NOT NULL,\n"
404             "  value BLOB,\n"
405             "  PRIMARY KEY (channel_id, name)\n"
406             ");");
407
408   /* Prepare statements */
409
410   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
411
412   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
413
414   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
415
416   sql_prepare (plugin->dbh,
417                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
418                &plugin->insert_channel_key);
419
420   sql_prepare (plugin->dbh,
421                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
422                &plugin->insert_slave_key);
423
424   sql_prepare (plugin->dbh,
425                "INSERT INTO membership\n"
426                " (channel_id, slave_id, did_join, announced_at,\n"
427                "  effective_since, group_generation)\n"
428                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
429                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
430                "        ?, ?, ?, ?);",
431                &plugin->insert_membership);
432
433   sql_prepare (plugin->dbh,
434                "SELECT did_join FROM membership\n"
435                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
436                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
437                "      AND effective_since <= ? AND did_join = 1\n"
438                "ORDER BY announced_at DESC LIMIT 1;",
439                &plugin->select_membership);
440
441   sql_prepare (plugin->dbh,
442                "INSERT OR IGNORE INTO messages\n"
443                " (channel_id, hop_counter, signature, purpose,\n"
444                "  fragment_id, fragment_offset, message_id,\n"
445                "  group_generation, multicast_flags, psycstore_flags, data)\n"
446                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
447                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
448                &plugin->insert_fragment);
449
450   sql_prepare (plugin->dbh,
451                "UPDATE messages\n"
452                "SET psycstore_flags = psycstore_flags | ?\n"
453                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
454                "      AND message_id = ? AND fragment_offset = 0;",
455                &plugin->update_message_flags);
456
457   sql_prepare (plugin->dbh,
458                "SELECT hop_counter, signature, purpose, fragment_id,\n"
459                "       fragment_offset, message_id, group_generation,\n"
460                "       multicast_flags, psycstore_flags, data\n"
461                "FROM messages\n"
462                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
463                "      AND ? <= fragment_id AND fragment_id <= ?;",
464                &plugin->select_fragments);
465
466   /** @todo select_messages: add method_prefix filter */
467   sql_prepare (plugin->dbh,
468                "SELECT hop_counter, signature, purpose, fragment_id,\n"
469                "       fragment_offset, message_id, group_generation,\n"
470                "       multicast_flags, psycstore_flags, data\n"
471                "FROM messages\n"
472                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
473                "      AND ? <= message_id AND message_id <= ?"
474                "LIMIT ?;",
475                &plugin->select_messages);
476
477   sql_prepare (plugin->dbh,
478                "SELECT * FROM\n"
479                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
480                "        fragment_offset, message_id, group_generation,\n"
481                "        multicast_flags, psycstore_flags, data\n"
482                " FROM messages\n"
483                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
484                " ORDER BY fragment_id DESC\n"
485                " LIMIT ?)\n"
486                "ORDER BY fragment_id;",
487                &plugin->select_latest_fragments);
488
489   /** @todo select_latest_messages: add method_prefix filter */
490   sql_prepare (plugin->dbh,
491                "SELECT hop_counter, signature, purpose, fragment_id,\n"
492                "       fragment_offset, message_id, group_generation,\n"
493                "        multicast_flags, psycstore_flags, data\n"
494                "FROM messages\n"
495                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
496                "      AND message_id IN\n"
497                "      (SELECT message_id\n"
498                "       FROM messages\n"
499                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
500                "       GROUP BY message_id\n"
501                "       ORDER BY message_id\n"
502                "       DESC LIMIT ?)\n"
503                "ORDER BY fragment_id;",
504                &plugin->select_latest_messages);
505
506   sql_prepare (plugin->dbh,
507                "SELECT hop_counter, signature, purpose, fragment_id,\n"
508                "       fragment_offset, message_id, group_generation,\n"
509                "       multicast_flags, psycstore_flags, data\n"
510                "FROM messages\n"
511                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
512                "      AND message_id = ? AND fragment_offset = ?;",
513                &plugin->select_message_fragment);
514
515   sql_prepare (plugin->dbh,
516                "SELECT fragment_id, message_id, group_generation\n"
517                "FROM messages\n"
518                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
519                "ORDER BY fragment_id DESC LIMIT 1;",
520                &plugin->select_counters_message);
521
522   sql_prepare (plugin->dbh,
523                "SELECT max_state_message_id\n"
524                "FROM channels\n"
525                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
526                &plugin->select_counters_state);
527
528   sql_prepare (plugin->dbh,
529                "UPDATE channels\n"
530                "SET max_state_message_id = ?\n"
531                "WHERE pub_key = ?;",
532                &plugin->update_max_state_message_id);
533
534   sql_prepare (plugin->dbh,
535                "UPDATE channels\n"
536                "SET state_hash_message_id = ?\n"
537                "WHERE pub_key = ?;",
538                &plugin->update_state_hash_message_id);
539
540   sql_prepare (plugin->dbh,
541                "INSERT OR REPLACE INTO state\n"
542                "  (channel_id, name, value_current, value_signed)\n"
543                "SELECT new.channel_id, new.name,\n"
544                "       new.value_current, old.value_signed\n"
545                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
546                "             AS channel_id,\n"
547                "             ? AS name, ? AS value_current) AS new\n"
548                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
549                "           FROM state) AS old\n"
550                "ON new.channel_id = old.channel_id AND new.name = old.name;",
551                &plugin->insert_state_current);
552
553   sql_prepare (plugin->dbh,
554                "DELETE FROM state\n"
555                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
556                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
557                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
558                &plugin->delete_state_empty);
559
560   sql_prepare (plugin->dbh,
561                "UPDATE state\n"
562                "SET value_signed = value_current\n"
563                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
564                &plugin->update_state_signed);
565
566   sql_prepare (plugin->dbh,
567                "DELETE FROM state\n"
568                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
569                &plugin->delete_state);
570
571   sql_prepare (plugin->dbh,
572                "INSERT INTO state_sync (channel_id, name, value)\n"
573                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
574                &plugin->insert_state_sync);
575
576   sql_prepare (plugin->dbh,
577                "INSERT INTO state\n"
578                " (channel_id, name, value_current, value_signed)\n"
579                "SELECT channel_id, name, value, value\n"
580                "FROM state_sync\n"
581                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
582                &plugin->insert_state_from_sync);
583
584   sql_prepare (plugin->dbh,
585                "DELETE FROM state_sync\n"
586                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
587                &plugin->delete_state_sync);
588
589   sql_prepare (plugin->dbh,
590                "SELECT value_current\n"
591                "FROM state\n"
592                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
593                "      AND name = ?;",
594                &plugin->select_state_one);
595
596   sql_prepare (plugin->dbh,
597                "SELECT name, value_current\n"
598                "FROM state\n"
599                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
600                "      AND (name = ? OR substr(name, 1, ?) = ?);",
601                &plugin->select_state_prefix);
602
603   sql_prepare (plugin->dbh,
604                "SELECT name, value_signed\n"
605                "FROM state\n"
606                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
607                "      AND value_signed IS NOT NULL;",
608                &plugin->select_state_signed);
609
610   return GNUNET_OK;
611 }
612
613
614 /**
615  * Shutdown database connection and associate data
616  * structures.
617  * @param plugin the plugin context (state for this module)
618  */
619 static void
620 database_shutdown (struct Plugin *plugin)
621 {
622   int result;
623   sqlite3_stmt *stmt;
624   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
625   {
626     result = sqlite3_finalize (stmt);
627     if (SQLITE_OK != result)
628       LOG (GNUNET_ERROR_TYPE_WARNING,
629            "Failed to close statement %p: %d\n", stmt, result);
630   }
631   if (SQLITE_OK != sqlite3_close (plugin->dbh))
632     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
633
634   GNUNET_free_non_null (plugin->fn);
635 }
636
637 /**
638  * Execute a prepared statement with a @a channel_key argument.
639  *
640  * @param plugin Plugin handle.
641  * @param stmt Statement to execute.
642  * @param channel_key Public key of the channel.
643  *
644  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
645  */
646 static int
647 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
648               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
649 {
650   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
651                                       sizeof (*channel_key), SQLITE_STATIC))
652   {
653     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
654                 "sqlite3_bind");
655   }
656   else if (SQLITE_DONE != sqlite3_step (stmt))
657   {
658     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
659                 "sqlite3_step");
660   }
661
662   if (SQLITE_OK != sqlite3_reset (stmt))
663   {
664     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
665                 "sqlite3_reset");
666     return GNUNET_SYSERR;
667   }
668
669   return GNUNET_OK;
670 }
671
672 /**
673  * Begin a transaction.
674  */
675 static int
676 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
677 {
678   sqlite3_stmt *stmt = plugin->transaction_begin;
679
680   if (SQLITE_DONE != sqlite3_step (stmt))
681   {
682     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
683                 "sqlite3_step");
684   }
685   if (SQLITE_OK != sqlite3_reset (stmt))
686   {
687     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
688                 "sqlite3_reset");
689     return GNUNET_SYSERR;
690   }
691
692   plugin->transaction = transaction;
693   return GNUNET_OK;
694 }
695
696
697 /**
698  * Commit current transaction.
699  */
700 static int
701 transaction_commit (struct Plugin *plugin)
702 {
703   sqlite3_stmt *stmt = plugin->transaction_commit;
704
705   if (SQLITE_DONE != sqlite3_step (stmt))
706   {
707     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
708                 "sqlite3_step");
709   }
710   if (SQLITE_OK != sqlite3_reset (stmt))
711   {
712     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
713                 "sqlite3_reset");
714     return GNUNET_SYSERR;
715   }
716
717   plugin->transaction = TRANSACTION_NONE;
718   return GNUNET_OK;
719 }
720
721
722 /**
723  * Roll back current transaction.
724  */
725 static int
726 transaction_rollback (struct Plugin *plugin)
727 {
728   sqlite3_stmt *stmt = plugin->transaction_rollback;
729
730   if (SQLITE_DONE != sqlite3_step (stmt))
731   {
732     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
733                 "sqlite3_step");
734   }
735   if (SQLITE_OK != sqlite3_reset (stmt))
736   {
737     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
738                 "sqlite3_reset");
739     return GNUNET_SYSERR;
740   }
741   plugin->transaction = TRANSACTION_NONE;
742   return GNUNET_OK;
743 }
744
745
746 static int
747 channel_key_store (struct Plugin *plugin,
748                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
749 {
750   sqlite3_stmt *stmt = plugin->insert_channel_key;
751
752   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
753                                       sizeof (*channel_key), SQLITE_STATIC))
754   {
755     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
756                 "sqlite3_bind");
757   }
758   else if (SQLITE_DONE != sqlite3_step (stmt))
759   {
760     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
761                 "sqlite3_step");
762   }
763
764   if (SQLITE_OK != sqlite3_reset (stmt))
765   {
766     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
767                 "sqlite3_reset");
768     return GNUNET_SYSERR;
769   }
770
771   return GNUNET_OK;
772 }
773
774
775 static int
776 slave_key_store (struct Plugin *plugin,
777                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
778 {
779   sqlite3_stmt *stmt = plugin->insert_slave_key;
780
781   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
782                                       sizeof (*slave_key), SQLITE_STATIC))
783   {
784     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
785                 "sqlite3_bind");
786   }
787   else if (SQLITE_DONE != sqlite3_step (stmt))
788   {
789     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
790                 "sqlite3_step");
791   }
792
793
794   if (SQLITE_OK != sqlite3_reset (stmt))
795   {
796     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
797                 "sqlite3_reset");
798     return GNUNET_SYSERR;
799   }
800
801   return GNUNET_OK;
802 }
803
804
805 /**
806  * Store join/leave events for a PSYC channel in order to be able to answer
807  * membership test queries later.
808  *
809  * @see GNUNET_PSYCSTORE_membership_store()
810  *
811  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
812  */
813 static int
814 sqlite_membership_store (void *cls,
815                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
816                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
817                          int did_join,
818                          uint64_t announced_at,
819                          uint64_t effective_since,
820                          uint64_t group_generation)
821 {
822   struct Plugin *plugin = cls;
823   sqlite3_stmt *stmt = plugin->insert_membership;
824
825   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
826
827   if (announced_at > INT64_MAX ||
828       effective_since > INT64_MAX ||
829       group_generation > INT64_MAX)
830   {
831     GNUNET_break (0);
832     return GNUNET_SYSERR;
833   }
834
835   if (GNUNET_OK != channel_key_store (plugin, channel_key)
836       || GNUNET_OK != slave_key_store (plugin, slave_key))
837     return GNUNET_SYSERR;
838
839   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
840                                       sizeof (*channel_key), SQLITE_STATIC)
841       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
842                                          sizeof (*slave_key), SQLITE_STATIC)
843       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
844       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
845       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
846       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
847   {
848     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
849                 "sqlite3_bind");
850   }
851   else if (SQLITE_DONE != sqlite3_step (stmt))
852   {
853     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
854                 "sqlite3_step");
855   }
856
857   if (SQLITE_OK != sqlite3_reset (stmt))
858   {
859     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
860                 "sqlite3_reset");
861     return GNUNET_SYSERR;
862   }
863
864   return GNUNET_OK;
865 }
866
867 /**
868  * Test if a member was admitted to the channel at the given message ID.
869  *
870  * @see GNUNET_PSYCSTORE_membership_test()
871  *
872  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
873  *         #GNUNET_SYSERR if there was en error.
874  */
875 static int
876 membership_test (void *cls,
877                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
878                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
879                  uint64_t message_id)
880 {
881   struct Plugin *plugin = cls;
882   sqlite3_stmt *stmt = plugin->select_membership;
883   int ret = GNUNET_SYSERR;
884
885   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
886                                       sizeof (*channel_key), SQLITE_STATIC)
887       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
888                                          sizeof (*slave_key), SQLITE_STATIC)
889       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
890   {
891     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
892                 "sqlite3_bind");
893   }
894   else
895   {
896     switch (sqlite3_step (stmt))
897     {
898     case SQLITE_DONE:
899       ret = GNUNET_NO;
900       break;
901     case SQLITE_ROW:
902       ret = GNUNET_YES;
903     }
904   }
905
906   if (SQLITE_OK != sqlite3_reset (stmt))
907   {
908     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
909                 "sqlite3_reset");
910   }
911
912   return ret;
913 }
914
915 /**
916  * Store a message fragment sent to a channel.
917  *
918  * @see GNUNET_PSYCSTORE_fragment_store()
919  *
920  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
921  */
922 static int
923 fragment_store (void *cls,
924                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
925                 const struct GNUNET_MULTICAST_MessageHeader *msg,
926                 uint32_t psycstore_flags)
927 {
928   struct Plugin *plugin = cls;
929   sqlite3_stmt *stmt = plugin->insert_fragment;
930
931   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
932
933   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
934   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
935   uint64_t message_id = GNUNET_ntohll (msg->message_id);
936   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
937
938   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
939       message_id > INT64_MAX || group_generation > INT64_MAX)
940   {
941     LOG (GNUNET_ERROR_TYPE_ERROR,
942          "Tried to store fragment with a field > INT64_MAX: "
943          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
944          message_id, group_generation);
945     GNUNET_break (0);
946     return GNUNET_SYSERR;
947   }
948
949   if (GNUNET_OK != channel_key_store (plugin, channel_key))
950     return GNUNET_SYSERR;
951
952   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
953                                       sizeof (*channel_key), SQLITE_STATIC)
954       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
955       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
956                                          sizeof (msg->signature), SQLITE_STATIC)
957       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
958                                          sizeof (msg->purpose), SQLITE_STATIC)
959       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
960       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
961       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
962       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
963       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
964       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
965       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
966                                          ntohs (msg->header.size)
967                                          - sizeof (*msg), SQLITE_STATIC))
968   {
969     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
970                 "sqlite3_bind");
971   }
972   else if (SQLITE_DONE != sqlite3_step (stmt))
973   {
974     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
975                 "sqlite3_step");
976   }
977
978   if (SQLITE_OK != sqlite3_reset (stmt))
979   {
980     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
981                 "sqlite3_reset");
982     return GNUNET_SYSERR;
983   }
984
985   return GNUNET_OK;
986 }
987
988 /**
989  * Set additional flags for a given message.
990  *
991  * They are OR'd with any existing flags set.
992  *
993  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
994  */
995 static int
996 message_add_flags (void *cls,
997                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
998                    uint64_t message_id,
999                    uint32_t psycstore_flags)
1000 {
1001   struct Plugin *plugin = cls;
1002   sqlite3_stmt *stmt = plugin->update_message_flags;
1003   int ret = GNUNET_SYSERR;
1004
1005   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1006       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1007                                          sizeof (*channel_key), SQLITE_STATIC)
1008       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1009   {
1010     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1011                 "sqlite3_bind");
1012   }
1013   else
1014   {
1015     switch (sqlite3_step (stmt))
1016     {
1017     case SQLITE_DONE:
1018       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1019       break;
1020     default:
1021       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1022                   "sqlite3_step");
1023     }
1024   }
1025
1026   if (SQLITE_OK != sqlite3_reset (stmt))
1027   {
1028     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1029                 "sqlite3_reset");
1030     return GNUNET_SYSERR;
1031   }
1032
1033   return ret;
1034 }
1035
1036 static int
1037 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1038               void *cb_cls)
1039 {
1040   int data_size = sqlite3_column_bytes (stmt, 9);
1041   struct GNUNET_MULTICAST_MessageHeader *msg
1042     = GNUNET_malloc (sizeof (*msg) + data_size);
1043
1044   msg->header.size = htons (sizeof (*msg) + data_size);
1045   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1046   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1047   GNUNET_memcpy (&msg->signature,
1048           sqlite3_column_blob (stmt, 1),
1049           sqlite3_column_bytes (stmt, 1));
1050   GNUNET_memcpy (&msg->purpose,
1051           sqlite3_column_blob (stmt, 2),
1052           sqlite3_column_bytes (stmt, 2));
1053   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1054   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1055   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1056   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1057   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1058   GNUNET_memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1059
1060   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1061 }
1062
1063
1064 static int
1065 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1066                  uint64_t *returned_fragments,
1067                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1068 {
1069   int ret = GNUNET_SYSERR;
1070   int sql_ret;
1071
1072   do
1073   {
1074     sql_ret = sqlite3_step (stmt);
1075     switch (sql_ret)
1076     {
1077     case SQLITE_DONE:
1078       if (ret != GNUNET_OK)
1079         ret = GNUNET_NO;
1080       break;
1081     case SQLITE_ROW:
1082       ret = fragment_row (stmt, cb, cb_cls);
1083       (*returned_fragments)++;
1084       if (ret != GNUNET_YES)
1085         sql_ret = SQLITE_DONE;
1086       break;
1087     default:
1088       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1089                   "sqlite3_step");
1090     }
1091   }
1092   while (sql_ret == SQLITE_ROW);
1093
1094   return ret;
1095 }
1096
1097 /**
1098  * Retrieve a message fragment range by fragment ID.
1099  *
1100  * @see GNUNET_PSYCSTORE_fragment_get()
1101  *
1102  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1103  */
1104 static int
1105 fragment_get (void *cls,
1106               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1107               uint64_t first_fragment_id,
1108               uint64_t last_fragment_id,
1109               uint64_t *returned_fragments,
1110               GNUNET_PSYCSTORE_FragmentCallback cb,
1111               void *cb_cls)
1112 {
1113   struct Plugin *plugin = cls;
1114   sqlite3_stmt *stmt = plugin->select_fragments;
1115   int ret = GNUNET_SYSERR;
1116   *returned_fragments = 0;
1117
1118   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1119                                       sizeof (*channel_key),
1120                                       SQLITE_STATIC)
1121       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1122       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1123   {
1124     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1125                 "sqlite3_bind");
1126   }
1127   else
1128   {
1129     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1130   }
1131
1132   if (SQLITE_OK != sqlite3_reset (stmt))
1133   {
1134     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1135                 "sqlite3_reset");
1136   }
1137
1138   return ret;
1139 }
1140
1141
1142 /**
1143  * Retrieve a message fragment range by fragment ID.
1144  *
1145  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1146  *
1147  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1148  */
1149 static int
1150 fragment_get_latest (void *cls,
1151                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1152                      uint64_t fragment_limit,
1153                      uint64_t *returned_fragments,
1154                      GNUNET_PSYCSTORE_FragmentCallback cb,
1155                      void *cb_cls)
1156 {
1157   struct Plugin *plugin = cls;
1158   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1159   int ret = GNUNET_SYSERR;
1160   *returned_fragments = 0;
1161
1162   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1163                                       sizeof (*channel_key),
1164                                       SQLITE_STATIC)
1165       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1166   {
1167     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1168                 "sqlite3_bind");
1169   }
1170   else
1171   {
1172     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1173   }
1174
1175   if (SQLITE_OK != sqlite3_reset (stmt))
1176   {
1177     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1178                 "sqlite3_reset");
1179   }
1180
1181   return ret;
1182 }
1183
1184
1185 /**
1186  * Retrieve all fragments of a message ID range.
1187  *
1188  * @see GNUNET_PSYCSTORE_message_get()
1189  *
1190  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1191  */
1192 static int
1193 message_get (void *cls,
1194              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1195              uint64_t first_message_id,
1196              uint64_t last_message_id,
1197              uint64_t fragment_limit,
1198              uint64_t *returned_fragments,
1199              GNUNET_PSYCSTORE_FragmentCallback cb,
1200              void *cb_cls)
1201 {
1202   struct Plugin *plugin = cls;
1203   sqlite3_stmt *stmt = plugin->select_messages;
1204   int ret = GNUNET_SYSERR;
1205   *returned_fragments = 0;
1206
1207   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1208                                       sizeof (*channel_key),
1209                                       SQLITE_STATIC)
1210       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1211       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id)
1212       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1213                                           (0 != fragment_limit)
1214                                           ? fragment_limit
1215                                           : INT64_MAX))
1216   {
1217     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1218                 "sqlite3_bind");
1219   }
1220   else
1221   {
1222     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1223   }
1224
1225   if (SQLITE_OK != sqlite3_reset (stmt))
1226   {
1227     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1228                 "sqlite3_reset");
1229   }
1230
1231   return ret;
1232 }
1233
1234
1235 /**
1236  * Retrieve all fragments of the latest messages.
1237  *
1238  * @see GNUNET_PSYCSTORE_message_get_latest()
1239  *
1240  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1241  */
1242 static int
1243 message_get_latest (void *cls,
1244                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1245                     uint64_t message_limit,
1246                     uint64_t *returned_fragments,
1247                     GNUNET_PSYCSTORE_FragmentCallback cb,
1248                     void *cb_cls)
1249 {
1250   struct Plugin *plugin = cls;
1251   sqlite3_stmt *stmt = plugin->select_latest_messages;
1252   int ret = GNUNET_SYSERR;
1253   *returned_fragments = 0;
1254
1255   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1256                                       sizeof (*channel_key),
1257                                       SQLITE_STATIC)
1258       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1259                                          sizeof (*channel_key),
1260                                          SQLITE_STATIC)
1261       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1262   {
1263     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1264                 "sqlite3_bind");
1265   }
1266   else
1267   {
1268     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1269   }
1270
1271   if (SQLITE_OK != sqlite3_reset (stmt))
1272   {
1273     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1274                 "sqlite3_reset");
1275   }
1276
1277   return ret;
1278 }
1279
1280
1281 /**
1282  * Retrieve a fragment of message specified by its message ID and fragment
1283  * offset.
1284  *
1285  * @see GNUNET_PSYCSTORE_message_get_fragment()
1286  *
1287  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1288  */
1289 static int
1290 message_get_fragment (void *cls,
1291                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1292                       uint64_t message_id,
1293                       uint64_t fragment_offset,
1294                       GNUNET_PSYCSTORE_FragmentCallback cb,
1295                       void *cb_cls)
1296 {
1297   struct Plugin *plugin = cls;
1298   sqlite3_stmt *stmt = plugin->select_message_fragment;
1299   int ret = GNUNET_SYSERR;
1300
1301   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1302                                       sizeof (*channel_key),
1303                                       SQLITE_STATIC)
1304       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1305       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1306   {
1307     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1308                 "sqlite3_bind");
1309   }
1310   else
1311   {
1312     switch (sqlite3_step (stmt))
1313     {
1314     case SQLITE_DONE:
1315       ret = GNUNET_NO;
1316       break;
1317     case SQLITE_ROW:
1318       ret = fragment_row (stmt, cb, cb_cls);
1319       break;
1320     default:
1321       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1322                   "sqlite3_step");
1323     }
1324   }
1325
1326   if (SQLITE_OK != sqlite3_reset (stmt))
1327   {
1328     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1329                 "sqlite3_reset");
1330   }
1331
1332   return ret;
1333 }
1334
1335 /**
1336  * Retrieve the max. values of message counters for a channel.
1337  *
1338  * @see GNUNET_PSYCSTORE_counters_get()
1339  *
1340  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1341  */
1342 static int
1343 counters_message_get (void *cls,
1344                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1345                       uint64_t *max_fragment_id,
1346                       uint64_t *max_message_id,
1347                       uint64_t *max_group_generation)
1348 {
1349   struct Plugin *plugin = cls;
1350   sqlite3_stmt *stmt = plugin->select_counters_message;
1351   int ret = GNUNET_SYSERR;
1352
1353   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1354                                       sizeof (*channel_key),
1355                                       SQLITE_STATIC))
1356   {
1357     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1358                 "sqlite3_bind");
1359   }
1360   else
1361   {
1362     switch (sqlite3_step (stmt))
1363     {
1364     case SQLITE_DONE:
1365       ret = GNUNET_NO;
1366       break;
1367     case SQLITE_ROW:
1368       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1369       *max_message_id = sqlite3_column_int64 (stmt, 1);
1370       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1371       ret = GNUNET_OK;
1372       break;
1373     default:
1374       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1375                   "sqlite3_step");
1376     }
1377   }
1378
1379   if (SQLITE_OK != sqlite3_reset (stmt))
1380   {
1381     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1382                 "sqlite3_reset");
1383   }
1384
1385   return ret;
1386 }
1387
1388 /**
1389  * Retrieve the max. values of state counters for a channel.
1390  *
1391  * @see GNUNET_PSYCSTORE_counters_get()
1392  *
1393  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1394  */
1395 static int
1396 counters_state_get (void *cls,
1397                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1398                     uint64_t *max_state_message_id)
1399 {
1400   struct Plugin *plugin = cls;
1401   sqlite3_stmt *stmt = plugin->select_counters_state;
1402   int ret = GNUNET_SYSERR;
1403
1404   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1405                                       sizeof (*channel_key),
1406                                       SQLITE_STATIC))
1407   {
1408     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1409                 "sqlite3_bind");
1410   }
1411   else
1412   {
1413     switch (sqlite3_step (stmt))
1414     {
1415     case SQLITE_DONE:
1416       ret = GNUNET_NO;
1417       break;
1418     case SQLITE_ROW:
1419       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1420       ret = GNUNET_OK;
1421       break;
1422     default:
1423       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1424                   "sqlite3_step");
1425     }
1426   }
1427
1428   if (SQLITE_OK != sqlite3_reset (stmt))
1429   {
1430     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1431                 "sqlite3_reset");
1432   }
1433
1434   return ret;
1435 }
1436
1437
1438 /**
1439  * Assign a value to a state variable.
1440  *
1441  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1442  */
1443 static int
1444 state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
1445               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1446               const char *name, const void *value, size_t value_size)
1447 {
1448   int ret = GNUNET_SYSERR;
1449
1450   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1451                                       sizeof (*channel_key), SQLITE_STATIC)
1452       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1453       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1454                                          SQLITE_STATIC))
1455   {
1456     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1457                 "sqlite3_bind");
1458   }
1459   else
1460   {
1461     switch (sqlite3_step (stmt))
1462     {
1463     case SQLITE_DONE:
1464       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1465       break;
1466     default:
1467       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1468                   "sqlite3_step");
1469     }
1470   }
1471
1472   if (SQLITE_OK != sqlite3_reset (stmt))
1473   {
1474     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1475                 "sqlite3_reset");
1476     return GNUNET_SYSERR;
1477   }
1478
1479   return ret;
1480 }
1481
1482
1483 static int
1484 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1485                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1486                    uint64_t message_id)
1487 {
1488   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1489       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1490                                          sizeof (*channel_key), SQLITE_STATIC))
1491   {
1492     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1493                 "sqlite3_bind");
1494   }
1495   else if (SQLITE_DONE != sqlite3_step (stmt))
1496   {
1497     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1498                 "sqlite3_step");
1499   }
1500   if (SQLITE_OK != sqlite3_reset (stmt))
1501   {
1502     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1503                 "sqlite3_reset");
1504     return GNUNET_SYSERR;
1505   }
1506   return GNUNET_OK;
1507 }
1508
1509
1510 /**
1511  * Begin modifying current state.
1512  */
1513 static int
1514 state_modify_begin (void *cls,
1515                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1516                     uint64_t message_id, uint64_t state_delta)
1517 {
1518   struct Plugin *plugin = cls;
1519
1520   if (state_delta > 0)
1521   {
1522     /**
1523      * We can only apply state modifiers in the current message if modifiers in
1524      * the previous stateful message (message_id - state_delta) were already
1525      * applied.
1526      */
1527
1528     uint64_t max_state_message_id = 0;
1529     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1530     switch (ret)
1531     {
1532     case GNUNET_OK:
1533     case GNUNET_NO: // no state yet
1534       ret = GNUNET_OK;
1535       break;
1536     default:
1537       return ret;
1538     }
1539
1540     if (max_state_message_id < message_id - state_delta)
1541       return GNUNET_NO; /* some stateful messages not yet applied */
1542     else if (message_id - state_delta < max_state_message_id)
1543       return GNUNET_NO; /* changes already applied */
1544   }
1545
1546   if (TRANSACTION_NONE != plugin->transaction)
1547   {
1548     /** @todo FIXME: wait for other transaction to finish  */
1549     return GNUNET_SYSERR;
1550   }
1551   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1552 }
1553
1554
1555 /**
1556  * Set the current value of state variable.
1557  *
1558  * @see GNUNET_PSYCSTORE_state_modify()
1559  *
1560  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1561  */
1562 static int
1563 state_modify_op (void *cls,
1564                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1565                  enum GNUNET_PSYC_Operator op,
1566                  const char *name, const void *value, size_t value_size)
1567 {
1568   struct Plugin *plugin = cls;
1569   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1570
1571   switch (op)
1572   {
1573   case GNUNET_PSYC_OP_ASSIGN:
1574     return state_assign (plugin, plugin->insert_state_current, channel_key,
1575                          name, value, value_size);
1576
1577   default: /** @todo implement more state operations */
1578     GNUNET_break (0);
1579     return GNUNET_SYSERR;
1580   }
1581 }
1582
1583
1584 /**
1585  * End modifying current state.
1586  */
1587 static int
1588 state_modify_end (void *cls,
1589                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1590                   uint64_t message_id)
1591 {
1592   struct Plugin *plugin = cls;
1593   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1594
1595   return
1596     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1597     && GNUNET_OK == update_message_id (plugin,
1598                                        plugin->update_max_state_message_id,
1599                                        channel_key, message_id)
1600     && GNUNET_OK == transaction_commit (plugin)
1601     ? GNUNET_OK : GNUNET_SYSERR;
1602 }
1603
1604
1605 /**
1606  * Begin state synchronization.
1607  */
1608 static int
1609 state_sync_begin (void *cls,
1610                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1611 {
1612   struct Plugin *plugin = cls;
1613   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1614 }
1615
1616
1617 /**
1618  * Assign current value of a state variable.
1619  *
1620  * @see GNUNET_PSYCSTORE_state_modify()
1621  *
1622  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1623  */
1624 static int
1625 state_sync_assign (void *cls,
1626                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1627                 const char *name, const void *value, size_t value_size)
1628 {
1629   struct Plugin *plugin = cls;
1630   return state_assign (cls, plugin->insert_state_sync, channel_key,
1631                        name, value, value_size);
1632 }
1633
1634
1635 /**
1636  * End modifying current state.
1637  */
1638 static int
1639 state_sync_end (void *cls,
1640                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1641                 uint64_t max_state_message_id,
1642                 uint64_t state_hash_message_id)
1643 {
1644   struct Plugin *plugin = cls;
1645   int ret = GNUNET_SYSERR;
1646
1647   if (TRANSACTION_NONE != plugin->transaction)
1648   {
1649     /** @todo FIXME: wait for other transaction to finish  */
1650     return GNUNET_SYSERR;
1651   }
1652
1653   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1654     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1655     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1656                                   channel_key)
1657     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1658                                   channel_key)
1659     && GNUNET_OK == update_message_id (plugin,
1660                                        plugin->update_state_hash_message_id,
1661                                        channel_key, state_hash_message_id)
1662     && GNUNET_OK == update_message_id (plugin,
1663                                        plugin->update_max_state_message_id,
1664                                        channel_key, max_state_message_id)
1665     && GNUNET_OK == transaction_commit (plugin)
1666     ? ret = GNUNET_OK
1667     : transaction_rollback (plugin);
1668   return ret;
1669 }
1670
1671
1672 /**
1673  * Delete the whole state.
1674  *
1675  * @see GNUNET_PSYCSTORE_state_reset()
1676  *
1677  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1678  */
1679 static int
1680 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1681 {
1682   struct Plugin *plugin = cls;
1683   return exec_channel (plugin, plugin->delete_state, channel_key);
1684 }
1685
1686
1687 /**
1688  * Update signed values of state variables in the state store.
1689  *
1690  * @see GNUNET_PSYCSTORE_state_hash_update()
1691  *
1692  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1693  */
1694 static int
1695 state_update_signed (void *cls,
1696                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1697 {
1698   struct Plugin *plugin = cls;
1699   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1700 }
1701
1702
1703 /**
1704  * Retrieve a state variable by name.
1705  *
1706  * @see GNUNET_PSYCSTORE_state_get()
1707  *
1708  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1709  */
1710 static int
1711 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1712            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1713 {
1714   struct Plugin *plugin = cls;
1715   int ret = GNUNET_SYSERR;
1716
1717   sqlite3_stmt *stmt = plugin->select_state_one;
1718
1719   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1720                                       sizeof (*channel_key),
1721                                       SQLITE_STATIC)
1722       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1723   {
1724     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1725                 "sqlite3_bind");
1726   }
1727   else
1728   {
1729     switch (sqlite3_step (stmt))
1730     {
1731     case SQLITE_DONE:
1732       ret = GNUNET_NO;
1733       break;
1734     case SQLITE_ROW:
1735       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1736                 sqlite3_column_bytes (stmt, 0));
1737       break;
1738     default:
1739       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1740                   "sqlite3_step");
1741     }
1742   }
1743
1744   if (SQLITE_OK != sqlite3_reset (stmt))
1745   {
1746     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1747                 "sqlite3_reset");
1748   }
1749
1750   return ret;
1751 }
1752
1753
1754 /**
1755  * Retrieve all state variables for a channel with the given prefix.
1756  *
1757  * @see GNUNET_PSYCSTORE_state_get_prefix()
1758  *
1759  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1760  */
1761 static int
1762 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1763                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1764                   void *cb_cls)
1765 {
1766   struct Plugin *plugin = cls;
1767   int ret = GNUNET_SYSERR;
1768   sqlite3_stmt *stmt = plugin->select_state_prefix;
1769   size_t name_len = strlen (name);
1770
1771   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1772                                       sizeof (*channel_key), SQLITE_STATIC)
1773       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1774       || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len)
1775       || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC))
1776   {
1777     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1778                 "sqlite3_bind");
1779   }
1780   else
1781   {
1782     int sql_ret;
1783     do
1784     {
1785       sql_ret = sqlite3_step (stmt);
1786       switch (sql_ret)
1787       {
1788       case SQLITE_DONE:
1789         if (ret != GNUNET_OK)
1790           ret = GNUNET_NO;
1791         break;
1792       case SQLITE_ROW:
1793         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1794                   sqlite3_column_blob (stmt, 1),
1795                   sqlite3_column_bytes (stmt, 1));
1796         if (ret != GNUNET_YES)
1797           sql_ret = SQLITE_DONE;
1798         break;
1799       default:
1800         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1801                     "sqlite3_step");
1802       }
1803     }
1804     while (sql_ret == SQLITE_ROW);
1805   }
1806   if (SQLITE_OK != sqlite3_reset (stmt))
1807   {
1808     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1809                 "sqlite3_reset");
1810   }
1811   return ret;
1812 }
1813
1814
1815 /**
1816  * Retrieve all signed state variables for a channel.
1817  *
1818  * @see GNUNET_PSYCSTORE_state_get_signed()
1819  *
1820  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1821  */
1822 static int
1823 state_get_signed (void *cls,
1824                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1825                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1826 {
1827   struct Plugin *plugin = cls;
1828   int ret = GNUNET_SYSERR;
1829
1830   sqlite3_stmt *stmt = plugin->select_state_signed;
1831
1832   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1833                                       sizeof (*channel_key), SQLITE_STATIC))
1834   {
1835     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1836                 "sqlite3_bind");
1837   }
1838   else
1839   {
1840     int sql_ret;
1841     do
1842     {
1843       sql_ret = sqlite3_step (stmt);
1844       switch (sql_ret)
1845       {
1846       case SQLITE_DONE:
1847         if (ret != GNUNET_OK)
1848           ret = GNUNET_NO;
1849         break;
1850       case SQLITE_ROW:
1851         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1852                   sqlite3_column_blob (stmt, 1),
1853                   sqlite3_column_bytes (stmt, 1));
1854         if (ret != GNUNET_YES)
1855           sql_ret = SQLITE_DONE;
1856         break;
1857       default:
1858         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1859                     "sqlite3_step");
1860       }
1861     }
1862     while (sql_ret == SQLITE_ROW);
1863   }
1864
1865   if (SQLITE_OK != sqlite3_reset (stmt))
1866   {
1867     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1868                 "sqlite3_reset");
1869   }
1870
1871   return ret;
1872 }
1873
1874
1875 /**
1876  * Entry point for the plugin.
1877  *
1878  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1879  * @return NULL on error, otherwise the plugin context
1880  */
1881 void *
1882 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1883 {
1884   static struct Plugin plugin;
1885   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1886   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1887
1888   if (NULL != plugin.cfg)
1889     return NULL;                /* can only initialize once! */
1890   memset (&plugin, 0, sizeof (struct Plugin));
1891   plugin.cfg = cfg;
1892   if (GNUNET_OK != database_setup (&plugin))
1893   {
1894     database_shutdown (&plugin);
1895     return NULL;
1896   }
1897   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1898   api->cls = &plugin;
1899   api->membership_store = &sqlite_membership_store;
1900   api->membership_test = &membership_test;
1901   api->fragment_store = &fragment_store;
1902   api->message_add_flags = &message_add_flags;
1903   api->fragment_get = &fragment_get;
1904   api->fragment_get_latest = &fragment_get_latest;
1905   api->message_get = &message_get;
1906   api->message_get_latest = &message_get_latest;
1907   api->message_get_fragment = &message_get_fragment;
1908   api->counters_message_get = &counters_message_get;
1909   api->counters_state_get = &counters_state_get;
1910   api->state_modify_begin = &state_modify_begin;
1911   api->state_modify_op = &state_modify_op;
1912   api->state_modify_end = &state_modify_end;
1913   api->state_sync_begin = &state_sync_begin;
1914   api->state_sync_assign = &state_sync_assign;
1915   api->state_sync_end = &state_sync_end;
1916   api->state_reset = &state_reset;
1917   api->state_update_signed = &state_update_signed;
1918   api->state_get = &state_get;
1919   api->state_get_prefix = &state_get_prefix;
1920   api->state_get_signed = &state_get_signed;
1921
1922   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1923   return api;
1924 }
1925
1926
1927 /**
1928  * Exit point from the plugin.
1929  *
1930  * @param cls The plugin context (as returned by "init")
1931  * @return Always NULL
1932  */
1933 void *
1934 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1935 {
1936   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1937   struct Plugin *plugin = api->cls;
1938
1939   database_shutdown (plugin);
1940   plugin->cfg = NULL;
1941   GNUNET_free (api);
1942   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1943   return NULL;
1944 }
1945
1946 /* end of plugin_psycstore_sqlite.c */