1bf14644b1c5a9f4ee007a26ee5c5b475793c298
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_env_lib.h"
39 #include "psycstore.h"
40 #include <sqlite3.h>
41
42 /**
43  * After how many ms "busy" should a DB operation fail for good?  A
44  * low value makes sure that we are more responsive to requests
45  * (especially PUTs).  A high value guarantees a higher success rate
46  * (SELECTs in iterate can take several seconds despite LIMIT=1).
47  *
48  * The default value of 1s should ensure that users do not experience
49  * huge latencies while at the same time allowing operations to
50  * succeed with reasonable probability.
51  */
52 #define BUSY_TIMEOUT_MS 1000
53
54 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
55
56 /**
57  * Log an error message at log-level 'level' that indicates
58  * a failure of the command 'cmd' on file 'filename'
59  * with the message given by strerror(errno).
60  */
61 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
62
63 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
64
65 enum Transactions {
66   TRANSACTION_NONE = 0,
67   TRANSACTION_STATE_MODIFY,
68   TRANSACTION_STATE_SYNC,
69 };
70
71 /**
72  * Context for all functions in this plugin.
73  */
74 struct Plugin
75 {
76
77   const struct GNUNET_CONFIGURATION_Handle *cfg;
78
79   /**
80    * Database filename.
81    */
82   char *fn;
83
84   /**
85    * Native SQLite database handle.
86    */
87   sqlite3 *dbh;
88
89   /**
90    * Current transaction.
91    */
92   enum Transactions transaction;
93
94   sqlite3_stmt *transaction_begin;
95
96   sqlite3_stmt *transaction_commit;
97
98   sqlite3_stmt *transaction_rollback;
99
100   /**
101    * Precompiled SQL for channel_key_store()
102    */
103   sqlite3_stmt *insert_channel_key;
104
105   /**
106    * Precompiled SQL for slave_key_store()
107    */
108   sqlite3_stmt *insert_slave_key;
109
110
111   /**
112    * Precompiled SQL for membership_store()
113    */
114   sqlite3_stmt *insert_membership;
115
116   /**
117    * Precompiled SQL for membership_test()
118    */
119   sqlite3_stmt *select_membership;
120
121
122   /**
123    * Precompiled SQL for fragment_store()
124    */
125   sqlite3_stmt *insert_fragment;
126
127   /**
128    * Precompiled SQL for message_add_flags()
129    */
130   sqlite3_stmt *update_message_flags;
131
132   /**
133    * Precompiled SQL for fragment_get()
134    */
135   sqlite3_stmt *select_fragments;
136
137   /**
138    * Precompiled SQL for fragment_get()
139    */
140   sqlite3_stmt *select_latest_fragments;
141
142   /**
143    * Precompiled SQL for message_get()
144    */
145   sqlite3_stmt *select_messages;
146
147   /**
148    * Precompiled SQL for message_get()
149    */
150   sqlite3_stmt *select_latest_messages;
151
152   /**
153    * Precompiled SQL for message_get_fragment()
154    */
155   sqlite3_stmt *select_message_fragment;
156
157   /**
158    * Precompiled SQL for counters_get_message()
159    */
160   sqlite3_stmt *select_counters_message;
161
162   /**
163    * Precompiled SQL for counters_get_state()
164    */
165   sqlite3_stmt *select_counters_state;
166
167   /**
168    * Precompiled SQL for state_modify_end()
169    */
170   sqlite3_stmt *update_state_hash_message_id;
171
172   /**
173    * Precompiled SQL for state_sync_end()
174    */
175   sqlite3_stmt *update_max_state_message_id;
176
177   /**
178    * Precompiled SQL for state_modify_op()
179    */
180   sqlite3_stmt *insert_state_current;
181
182   /**
183    * Precompiled SQL for state_modify_end()
184    */
185   sqlite3_stmt *delete_state_empty;
186
187   /**
188    * Precompiled SQL for state_set_signed()
189    */
190   sqlite3_stmt *update_state_signed;
191
192   /**
193    * Precompiled SQL for state_sync()
194    */
195   sqlite3_stmt *insert_state_sync;
196
197   /**
198    * Precompiled SQL for state_sync()
199    */
200   sqlite3_stmt *delete_state;
201
202   /**
203    * Precompiled SQL for state_sync()
204    */
205   sqlite3_stmt *insert_state_from_sync;
206
207   /**
208    * Precompiled SQL for state_sync()
209    */
210   sqlite3_stmt *delete_state_sync;
211
212   /**
213    * Precompiled SQL for state_get_signed()
214    */
215   sqlite3_stmt *select_state_signed;
216
217   /**
218    * Precompiled SQL for state_get()
219    */
220   sqlite3_stmt *select_state_one;
221
222   /**
223    * Precompiled SQL for state_get_prefix()
224    */
225   sqlite3_stmt *select_state_prefix;
226
227 };
228
229 #if DEBUG_PSYCSTORE
230
231 static void
232 sql_trace (void *cls, const char *sql)
233 {
234   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
235 }
236
237 #endif
238
239 /**
240  * @brief Prepare a SQL statement
241  *
242  * @param dbh handle to the database
243  * @param sql SQL statement, UTF-8 encoded
244  * @param stmt set to the prepared statement
245  * @return 0 on success
246  */
247 static int
248 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
249 {
250   char *tail;
251   int result;
252
253   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
254                                (const char **) &tail);
255   LOG (GNUNET_ERROR_TYPE_DEBUG,
256        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
257   if (result != SQLITE_OK)
258     LOG (GNUNET_ERROR_TYPE_ERROR,
259          _("Error preparing SQL query: %s\n  %s\n"),
260          sqlite3_errmsg (dbh), sql);
261   return result;
262 }
263
264
265 /**
266  * @brief Prepare a SQL statement
267  *
268  * @param dbh handle to the database
269  * @param sql SQL statement, UTF-8 encoded
270  * @return 0 on success
271  */
272 static int
273 sql_exec (sqlite3 *dbh, const char *sql)
274 {
275   int result;
276
277   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
278   LOG (GNUNET_ERROR_TYPE_DEBUG,
279        "Executed `%s' / %d\n", sql, result);
280   if (result != SQLITE_OK)
281     LOG (GNUNET_ERROR_TYPE_ERROR,
282          _("Error executing SQL query: %s\n  %s\n"),
283          sqlite3_errmsg (dbh), sql);
284   return result;
285 }
286
287
288 /**
289  * Initialize the database connections and associated
290  * data structures (create tables and indices
291  * as needed as well).
292  *
293  * @param plugin the plugin context (state for this module)
294  * @return GNUNET_OK on success
295  */
296 static int
297 database_setup (struct Plugin *plugin)
298 {
299   char *filename;
300
301   if (GNUNET_OK !=
302       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
303                                                "FILENAME", &filename))
304   {
305     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
306                                "psycstore-sqlite", "FILENAME");
307     return GNUNET_SYSERR;
308   }
309   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
310   {
311     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
312     {
313       GNUNET_break (0);
314       GNUNET_free (filename);
315       return GNUNET_SYSERR;
316     }
317   }
318   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
319   plugin->fn = filename;
320
321   /* Open database and precompile statements */
322   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
323   {
324     LOG (GNUNET_ERROR_TYPE_ERROR,
325          _("Unable to initialize SQLite: %s.\n"),
326          sqlite3_errmsg (plugin->dbh));
327     return GNUNET_SYSERR;
328   }
329
330 #if DEBUG_PSYCSTORE
331   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
332 #endif
333
334   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
335   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
336   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
337   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
338   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
339 #if ! DEBUG_PSYCSTORE
340   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
341 #endif
342   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
343
344   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
345
346   /* Create tables */
347
348   sql_exec (plugin->dbh,
349             "CREATE TABLE IF NOT EXISTS channels (\n"
350             "  id INTEGER PRIMARY KEY,\n"
351             "  pub_key BLOB UNIQUE,\n"
352             "  max_state_message_id INTEGER,\n" // last applied state message ID
353             "  state_hash_message_id INTEGER\n" // last message ID with a state hash
354             ");");
355
356   sql_exec (plugin->dbh,
357             "CREATE TABLE IF NOT EXISTS slaves (\n"
358             "  id INTEGER PRIMARY KEY,\n"
359             "  pub_key BLOB UNIQUE\n"
360             ");");
361
362   sql_exec (plugin->dbh,
363             "CREATE TABLE IF NOT EXISTS membership (\n"
364             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
365             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
366             "  did_join INTEGER NOT NULL,\n"
367             "  announced_at INTEGER NOT NULL,\n"
368             "  effective_since INTEGER NOT NULL,\n"
369             "  group_generation INTEGER NOT NULL\n"
370             ");");
371   sql_exec (plugin->dbh,
372             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
373             "ON membership (channel_id, slave_id);");
374
375   /** @todo messages table: add method_name column */
376   sql_exec (plugin->dbh,
377             "CREATE TABLE IF NOT EXISTS messages (\n"
378             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
379             "  hop_counter INTEGER NOT NULL,\n"
380             "  signature BLOB,\n"
381             "  purpose BLOB,\n"
382             "  fragment_id INTEGER NOT NULL,\n"
383             "  fragment_offset INTEGER NOT NULL,\n"
384             "  message_id INTEGER NOT NULL,\n"
385             "  group_generation INTEGER NOT NULL,\n"
386             "  multicast_flags INTEGER NOT NULL,\n"
387             "  psycstore_flags INTEGER NOT NULL,\n"
388             "  data BLOB,\n"
389             "  PRIMARY KEY (channel_id, fragment_id),\n"
390             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
391             ");");
392
393   sql_exec (plugin->dbh,
394             "CREATE TABLE IF NOT EXISTS state (\n"
395             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
396             "  name TEXT NOT NULL,\n"
397             "  value_current BLOB,\n"
398             "  value_signed BLOB,\n"
399             "  PRIMARY KEY (channel_id, name)\n"
400             ");");
401
402   sql_exec (plugin->dbh,
403             "CREATE TABLE IF NOT EXISTS state_sync (\n"
404             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
405             "  name TEXT NOT NULL,\n"
406             "  value BLOB,\n"
407             "  PRIMARY KEY (channel_id, name)\n"
408             ");");
409
410   /* Prepare statements */
411
412   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
413
414   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
415
416   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
417
418   sql_prepare (plugin->dbh,
419                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
420                &plugin->insert_channel_key);
421
422   sql_prepare (plugin->dbh,
423                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
424                &plugin->insert_slave_key);
425
426   sql_prepare (plugin->dbh,
427                "INSERT INTO membership\n"
428                " (channel_id, slave_id, did_join, announced_at,\n"
429                "  effective_since, group_generation)\n"
430                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
431                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
432                "        ?, ?, ?, ?);",
433                &plugin->insert_membership);
434
435   sql_prepare (plugin->dbh,
436                "SELECT did_join FROM membership\n"
437                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
438                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
439                "      AND effective_since <= ? AND did_join = 1\n"
440                "ORDER BY announced_at DESC LIMIT 1;",
441                &plugin->select_membership);
442
443   sql_prepare (plugin->dbh,
444                "INSERT OR IGNORE INTO messages\n"
445                " (channel_id, hop_counter, signature, purpose,\n"
446                "  fragment_id, fragment_offset, message_id,\n"
447                "  group_generation, multicast_flags, psycstore_flags, data)\n"
448                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
449                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
450                &plugin->insert_fragment);
451
452   sql_prepare (plugin->dbh,
453                "UPDATE messages\n"
454                "SET psycstore_flags = psycstore_flags | ?\n"
455                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
456                "      AND message_id = ? AND fragment_offset = 0;",
457                &plugin->update_message_flags);
458
459   sql_prepare (plugin->dbh,
460                "SELECT hop_counter, signature, purpose, fragment_id,\n"
461                "       fragment_offset, message_id, group_generation,\n"
462                "       multicast_flags, psycstore_flags, data\n"
463                "FROM messages\n"
464                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
465                "      AND ? <= fragment_id AND fragment_id <= ?;",
466                &plugin->select_fragments);
467
468   /** @todo select_messages: add method_prefix filter */
469   sql_prepare (plugin->dbh,
470                "SELECT hop_counter, signature, purpose, fragment_id,\n"
471                "       fragment_offset, message_id, group_generation,\n"
472                "       multicast_flags, psycstore_flags, data\n"
473                "FROM messages\n"
474                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
475                "      AND ? <= message_id AND message_id <= ?;",
476                &plugin->select_messages);
477
478   sql_prepare (plugin->dbh,
479                "SELECT * FROM\n"
480                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
481                "        fragment_offset, message_id, group_generation,\n"
482                "        multicast_flags, psycstore_flags, data\n"
483                " FROM messages\n"
484                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
485                " ORDER BY fragment_id DESC\n"
486                " LIMIT ?)\n"
487                "ORDER BY fragment_id;",
488                &plugin->select_latest_fragments);
489
490   /** @todo select_latest_messages: add method_prefix filter */
491   sql_prepare (plugin->dbh,
492                "SELECT hop_counter, signature, purpose, fragment_id,\n"
493                "       fragment_offset, message_id, group_generation,\n"
494                "        multicast_flags, psycstore_flags, data\n"
495                "FROM messages\n"
496                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
497                "      AND message_id IN\n"
498                "      (SELECT message_id\n"
499                "       FROM messages\n"
500                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
501                "       GROUP BY message_id\n"
502                "       ORDER BY message_id\n"
503                "       DESC LIMIT ?)\n"
504                "ORDER BY fragment_id;",
505                &plugin->select_latest_messages);
506
507   sql_prepare (plugin->dbh,
508                "SELECT hop_counter, signature, purpose, fragment_id,\n"
509                "       fragment_offset, message_id, group_generation,\n"
510                "       multicast_flags, psycstore_flags, data\n"
511                "FROM messages\n"
512                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
513                "      AND message_id = ? AND fragment_offset = ?;",
514                &plugin->select_message_fragment);
515
516   sql_prepare (plugin->dbh,
517                "SELECT fragment_id, message_id, group_generation\n"
518                "FROM messages\n"
519                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
520                "ORDER BY fragment_id DESC LIMIT 1;",
521                &plugin->select_counters_message);
522
523   sql_prepare (plugin->dbh,
524                "SELECT max_state_message_id\n"
525                "FROM channels\n"
526                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
527                &plugin->select_counters_state);
528
529   sql_prepare (plugin->dbh,
530                "UPDATE channels\n"
531                "SET max_state_message_id = ?\n"
532                "WHERE pub_key = ?;",
533                &plugin->update_max_state_message_id);
534
535   sql_prepare (plugin->dbh,
536                "UPDATE channels\n"
537                "SET state_hash_message_id = ?\n"
538                "WHERE pub_key = ?;",
539                &plugin->update_state_hash_message_id);
540
541   sql_prepare (plugin->dbh,
542                "INSERT OR REPLACE INTO state\n"
543                "  (channel_id, name, value_current, value_signed)\n"
544                "SELECT new.channel_id, new.name,\n"
545                "       new.value_current, old.value_signed\n"
546                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
547                "             AS channel_id,\n"
548                "             ? AS name, ? AS value_current) AS new\n"
549                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
550                "           FROM state) AS old\n"
551                "ON new.channel_id = old.channel_id AND new.name = old.name;",
552                &plugin->insert_state_current);
553
554   sql_prepare (plugin->dbh,
555                "DELETE FROM state\n"
556                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
557                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
558                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
559                &plugin->delete_state_empty);
560
561   sql_prepare (plugin->dbh,
562                "UPDATE state\n"
563                "SET value_signed = value_current\n"
564                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
565                &plugin->update_state_signed);
566
567   sql_prepare (plugin->dbh,
568                "DELETE FROM state\n"
569                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
570                &plugin->delete_state);
571
572   sql_prepare (plugin->dbh,
573                "INSERT INTO state_sync (channel_id, name, value)\n"
574                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
575                &plugin->insert_state_sync);
576
577   sql_prepare (plugin->dbh,
578                "INSERT INTO state\n"
579                " (channel_id, name, value_current, value_signed)\n"
580                "SELECT channel_id, name, value, value\n"
581                "FROM state_sync\n"
582                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
583                &plugin->insert_state_from_sync);
584
585   sql_prepare (plugin->dbh,
586                "DELETE FROM state_sync\n"
587                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
588                &plugin->delete_state_sync);
589
590   sql_prepare (plugin->dbh,
591                "SELECT value_current\n"
592                "FROM state\n"
593                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
594                "      AND name = ?;",
595                &plugin->select_state_one);
596
597   sql_prepare (plugin->dbh,
598                "SELECT name, value_current\n"
599                "FROM state\n"
600                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
601                "      AND (name = ? OR name LIKE ?);",
602                &plugin->select_state_prefix);
603
604   sql_prepare (plugin->dbh,
605                "SELECT name, value_signed\n"
606                "FROM state\n"
607                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
608                "      AND value_signed IS NOT NULL;",
609                &plugin->select_state_signed);
610
611   return GNUNET_OK;
612 }
613
614
615 /**
616  * Shutdown database connection and associate data
617  * structures.
618  * @param plugin the plugin context (state for this module)
619  */
620 static void
621 database_shutdown (struct Plugin *plugin)
622 {
623   int result;
624   sqlite3_stmt *stmt;
625   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
626   {
627     result = sqlite3_finalize (stmt);
628     if (SQLITE_OK != result)
629       LOG (GNUNET_ERROR_TYPE_WARNING,
630            "Failed to close statement %p: %d\n", stmt, result);
631   }
632   if (SQLITE_OK != sqlite3_close (plugin->dbh))
633     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
634
635   GNUNET_free_non_null (plugin->fn);
636 }
637
638 /**
639  * Execute a prepared statement with a @a channel_key argument.
640  *
641  * @param plugin Plugin handle.
642  * @param stmt Statement to execute.
643  * @param channel_key Public key of the channel.
644  *
645  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
646  */
647 static int
648 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
649               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
650 {
651   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
652                                       sizeof (*channel_key), SQLITE_STATIC))
653   {
654     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
655                 "sqlite3_bind");
656   }
657   else if (SQLITE_DONE != sqlite3_step (stmt))
658   {
659     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
660                 "sqlite3_step");
661   }
662
663   if (SQLITE_OK != sqlite3_reset (stmt))
664   {
665     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
666                 "sqlite3_reset");
667     return GNUNET_SYSERR;
668   }
669
670   return GNUNET_OK;
671 }
672
673 /**
674  * Begin a transaction.
675  */
676 static int
677 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
678 {
679   sqlite3_stmt *stmt = plugin->transaction_begin;
680
681   if (SQLITE_DONE != sqlite3_step (stmt))
682   {
683     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
684                 "sqlite3_step");
685   }
686   if (SQLITE_OK != sqlite3_reset (stmt))
687   {
688     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
689                 "sqlite3_reset");
690     return GNUNET_SYSERR;
691   }
692
693   plugin->transaction = transaction;
694   return GNUNET_OK;
695 }
696
697
698 /**
699  * Commit current transaction.
700  */
701 static int
702 transaction_commit (struct Plugin *plugin)
703 {
704   sqlite3_stmt *stmt = plugin->transaction_commit;
705
706   if (SQLITE_DONE != sqlite3_step (stmt))
707   {
708     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
709                 "sqlite3_step");
710   }
711   if (SQLITE_OK != sqlite3_reset (stmt))
712   {
713     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
714                 "sqlite3_reset");
715     return GNUNET_SYSERR;
716   }
717
718   plugin->transaction = TRANSACTION_NONE;
719   return GNUNET_OK;
720 }
721
722
723 /**
724  * Roll back current transaction.
725  */
726 static int
727 transaction_rollback (struct Plugin *plugin)
728 {
729   sqlite3_stmt *stmt = plugin->transaction_rollback;
730
731   if (SQLITE_DONE != sqlite3_step (stmt))
732   {
733     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
734                 "sqlite3_step");
735   }
736   if (SQLITE_OK != sqlite3_reset (stmt))
737   {
738     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
739                 "sqlite3_reset");
740     return GNUNET_SYSERR;
741   }
742   plugin->transaction = TRANSACTION_NONE;
743   return GNUNET_OK;
744 }
745
746
747 static int
748 channel_key_store (struct Plugin *plugin,
749                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
750 {
751   sqlite3_stmt *stmt = plugin->insert_channel_key;
752
753   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
754                                       sizeof (*channel_key), SQLITE_STATIC))
755   {
756     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
757                 "sqlite3_bind");
758   }
759   else if (SQLITE_DONE != sqlite3_step (stmt))
760   {
761     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
762                 "sqlite3_step");
763   }
764
765   if (SQLITE_OK != sqlite3_reset (stmt))
766   {
767     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
768                 "sqlite3_reset");
769     return GNUNET_SYSERR;
770   }
771
772   return GNUNET_OK;
773 }
774
775
776 static int
777 slave_key_store (struct Plugin *plugin,
778                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
779 {
780   sqlite3_stmt *stmt = plugin->insert_slave_key;
781
782   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
783                                       sizeof (*slave_key), SQLITE_STATIC))
784   {
785     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
786                 "sqlite3_bind");
787   }
788   else if (SQLITE_DONE != sqlite3_step (stmt))
789   {
790     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
791                 "sqlite3_step");
792   }
793
794
795   if (SQLITE_OK != sqlite3_reset (stmt))
796   {
797     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
798                 "sqlite3_reset");
799     return GNUNET_SYSERR;
800   }
801
802   return GNUNET_OK;
803 }
804
805
806 /**
807  * Store join/leave events for a PSYC channel in order to be able to answer
808  * membership test queries later.
809  *
810  * @see GNUNET_PSYCSTORE_membership_store()
811  *
812  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
813  */
814 static int
815 membership_store (void *cls,
816                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
817                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
818                   int did_join,
819                   uint64_t announced_at,
820                   uint64_t effective_since,
821                   uint64_t group_generation)
822 {
823   struct Plugin *plugin = cls;
824   sqlite3_stmt *stmt = plugin->insert_membership;
825
826   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
827
828   if (announced_at > INT64_MAX ||
829       effective_since > INT64_MAX ||
830       group_generation > INT64_MAX)
831   {
832     GNUNET_break (0);
833     return GNUNET_SYSERR;
834   }
835
836   if (GNUNET_OK != channel_key_store (plugin, channel_key)
837       || GNUNET_OK != slave_key_store (plugin, slave_key))
838     return GNUNET_SYSERR;
839
840   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
841                                       sizeof (*channel_key), SQLITE_STATIC)
842       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
843                                          sizeof (*slave_key), SQLITE_STATIC)
844       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
845       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
846       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
847       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
848   {
849     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
850                 "sqlite3_bind");
851   }
852   else if (SQLITE_DONE != sqlite3_step (stmt))
853   {
854     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
855                 "sqlite3_step");
856   }
857
858   if (SQLITE_OK != sqlite3_reset (stmt))
859   {
860     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
861                 "sqlite3_reset");
862     return GNUNET_SYSERR;
863   }
864
865   return GNUNET_OK;
866 }
867
868 /**
869  * Test if a member was admitted to the channel at the given message ID.
870  *
871  * @see GNUNET_PSYCSTORE_membership_test()
872  *
873  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
874  *         #GNUNET_SYSERR if there was en error.
875  */
876 static int
877 membership_test (void *cls,
878                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
879                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
880                  uint64_t message_id)
881 {
882   struct Plugin *plugin = cls;
883   sqlite3_stmt *stmt = plugin->select_membership;
884   int ret = GNUNET_SYSERR;
885
886   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
887                                       sizeof (*channel_key), SQLITE_STATIC)
888       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
889                                          sizeof (*slave_key), SQLITE_STATIC)
890       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
891   {
892     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
893                 "sqlite3_bind");
894   }
895   else
896   {
897     switch (sqlite3_step (stmt))
898     {
899     case SQLITE_DONE:
900       ret = GNUNET_NO;
901       break;
902     case SQLITE_ROW:
903       ret = GNUNET_YES;
904     }
905   }
906
907   if (SQLITE_OK != sqlite3_reset (stmt))
908   {
909     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
910                 "sqlite3_reset");
911   }
912
913   return ret;
914 }
915
916 /**
917  * Store a message fragment sent to a channel.
918  *
919  * @see GNUNET_PSYCSTORE_fragment_store()
920  *
921  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
922  */
923 static int
924 fragment_store (void *cls,
925                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
926                 const struct GNUNET_MULTICAST_MessageHeader *msg,
927                 uint32_t psycstore_flags)
928 {
929   struct Plugin *plugin = cls;
930   sqlite3_stmt *stmt = plugin->insert_fragment;
931
932   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
933
934   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
935   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
936   uint64_t message_id = GNUNET_ntohll (msg->message_id);
937   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
938
939   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
940       message_id > INT64_MAX || group_generation > INT64_MAX)
941   {
942     LOG (GNUNET_ERROR_TYPE_ERROR,
943          "Tried to store fragment with a field > INT64_MAX: "
944          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
945          message_id, group_generation);
946     GNUNET_break (0);
947     return GNUNET_SYSERR;
948   }
949
950   if (GNUNET_OK != channel_key_store (plugin, channel_key))
951     return GNUNET_SYSERR;
952
953   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
954                                       sizeof (*channel_key), SQLITE_STATIC)
955       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
956       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
957                                          sizeof (msg->signature), SQLITE_STATIC)
958       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
959                                          sizeof (msg->purpose), SQLITE_STATIC)
960       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
961       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
962       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
963       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
964       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
965       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
966       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
967                                          ntohs (msg->header.size)
968                                          - sizeof (*msg), SQLITE_STATIC))
969   {
970     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
971                 "sqlite3_bind");
972   }
973   else if (SQLITE_DONE != sqlite3_step (stmt))
974   {
975     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
976                 "sqlite3_step");
977   }
978
979   if (SQLITE_OK != sqlite3_reset (stmt))
980   {
981     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
982                 "sqlite3_reset");
983     return GNUNET_SYSERR;
984   }
985
986   return GNUNET_OK;
987 }
988
989 /**
990  * Set additional flags for a given message.
991  *
992  * They are OR'd with any existing flags set.
993  *
994  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
995  */
996 static int
997 message_add_flags (void *cls,
998                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
999                    uint64_t message_id,
1000                    uint64_t psycstore_flags)
1001 {
1002   struct Plugin *plugin = cls;
1003   sqlite3_stmt *stmt = plugin->update_message_flags;
1004   int ret = GNUNET_SYSERR;
1005
1006   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1007       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1008                                          sizeof (*channel_key), SQLITE_STATIC)
1009       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1010   {
1011     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1012                 "sqlite3_bind");
1013   }
1014   else
1015   {
1016     switch (sqlite3_step (stmt))
1017     {
1018     case SQLITE_DONE:
1019       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1020       break;
1021     default:
1022       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1023                   "sqlite3_step");
1024     }
1025   }
1026
1027   if (SQLITE_OK != sqlite3_reset (stmt))
1028   {
1029     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1030                 "sqlite3_reset");
1031     return GNUNET_SYSERR;
1032   }
1033
1034   return ret;
1035 }
1036
1037 static int
1038 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1039               void *cb_cls)
1040 {
1041   int data_size = sqlite3_column_bytes (stmt, 9);
1042   struct GNUNET_MULTICAST_MessageHeader *msg
1043     = GNUNET_malloc (sizeof (*msg) + data_size);
1044
1045   msg->header.size = htons (sizeof (*msg) + data_size);
1046   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1047   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1048   memcpy (&msg->signature,
1049           sqlite3_column_blob (stmt, 1),
1050           sqlite3_column_bytes (stmt, 1));
1051   memcpy (&msg->purpose,
1052           sqlite3_column_blob (stmt, 2),
1053           sqlite3_column_bytes (stmt, 2));
1054   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1055   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1056   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1057   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1058   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1059   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1060
1061   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1062 }
1063
1064
1065 static int
1066 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1067                  uint64_t *returned_fragments,
1068                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1069 {
1070   int ret = GNUNET_SYSERR;
1071   int sql_ret;
1072
1073   do
1074   {
1075     sql_ret = sqlite3_step (stmt);
1076     switch (sql_ret)
1077     {
1078     case SQLITE_DONE:
1079       if (ret != GNUNET_OK)
1080         ret = GNUNET_NO;
1081       break;
1082     case SQLITE_ROW:
1083       ret = fragment_row (stmt, cb, cb_cls);
1084       (*returned_fragments)++;
1085       if (ret != GNUNET_YES)
1086         sql_ret = SQLITE_DONE;
1087       break;
1088     default:
1089       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1090                   "sqlite3_step");
1091     }
1092   }
1093   while (sql_ret == SQLITE_ROW);
1094
1095   return ret;
1096 }
1097
1098 /**
1099  * Retrieve a message fragment range by fragment ID.
1100  *
1101  * @see GNUNET_PSYCSTORE_fragment_get()
1102  *
1103  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1104  */
1105 static int
1106 fragment_get (void *cls,
1107               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1108               uint64_t first_fragment_id,
1109               uint64_t last_fragment_id,
1110               uint64_t *returned_fragments,
1111               GNUNET_PSYCSTORE_FragmentCallback cb,
1112               void *cb_cls)
1113 {
1114   struct Plugin *plugin = cls;
1115   sqlite3_stmt *stmt = plugin->select_fragments;
1116   int ret = GNUNET_SYSERR;
1117   *returned_fragments = 0;
1118
1119   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1120                                       sizeof (*channel_key),
1121                                       SQLITE_STATIC)
1122       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1123       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1124   {
1125     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1126                 "sqlite3_bind");
1127   }
1128   else
1129   {
1130     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1131   }
1132
1133   if (SQLITE_OK != sqlite3_reset (stmt))
1134   {
1135     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1136                 "sqlite3_reset");
1137   }
1138
1139   return ret;
1140 }
1141
1142
1143 /**
1144  * Retrieve a message fragment range by fragment ID.
1145  *
1146  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1147  *
1148  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1149  */
1150 static int
1151 fragment_get_latest (void *cls,
1152                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1153                      uint64_t fragment_limit,
1154                      uint64_t *returned_fragments,
1155                      GNUNET_PSYCSTORE_FragmentCallback cb,
1156                      void *cb_cls)
1157 {
1158   struct Plugin *plugin = cls;
1159   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1160   int ret = GNUNET_SYSERR;
1161   *returned_fragments = 0;
1162
1163   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1164                                       sizeof (*channel_key),
1165                                       SQLITE_STATIC)
1166       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1167   {
1168     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1169                 "sqlite3_bind");
1170   }
1171   else
1172   {
1173     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1174   }
1175
1176   if (SQLITE_OK != sqlite3_reset (stmt))
1177   {
1178     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1179                 "sqlite3_reset");
1180   }
1181
1182   return ret;
1183 }
1184
1185
1186 /**
1187  * Retrieve all fragments of a message ID range.
1188  *
1189  * @see GNUNET_PSYCSTORE_message_get()
1190  *
1191  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1192  */
1193 static int
1194 message_get (void *cls,
1195              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1196              uint64_t first_message_id,
1197              uint64_t last_message_id,
1198              uint64_t *returned_fragments,
1199              GNUNET_PSYCSTORE_FragmentCallback cb,
1200              void *cb_cls)
1201 {
1202   struct Plugin *plugin = cls;
1203   sqlite3_stmt *stmt = plugin->select_messages;
1204   int ret = GNUNET_SYSERR;
1205   *returned_fragments = 0;
1206
1207   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1208                                       sizeof (*channel_key),
1209                                       SQLITE_STATIC)
1210       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1211       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1212   {
1213     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1214                 "sqlite3_bind");
1215   }
1216   else
1217   {
1218     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1219   }
1220
1221   if (SQLITE_OK != sqlite3_reset (stmt))
1222   {
1223     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1224                 "sqlite3_reset");
1225   }
1226
1227   return ret;
1228 }
1229
1230
1231 /**
1232  * Retrieve all fragments of the latest messages.
1233  *
1234  * @see GNUNET_PSYCSTORE_message_get_latest()
1235  *
1236  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1237  */
1238 static int
1239 message_get_latest (void *cls,
1240                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1241                     uint64_t message_limit,
1242                     uint64_t *returned_fragments,
1243                     GNUNET_PSYCSTORE_FragmentCallback cb,
1244                     void *cb_cls)
1245 {
1246   struct Plugin *plugin = cls;
1247   sqlite3_stmt *stmt = plugin->select_latest_messages;
1248   int ret = GNUNET_SYSERR;
1249   *returned_fragments = 0;
1250
1251   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1252                                       sizeof (*channel_key),
1253                                       SQLITE_STATIC)
1254       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1255                                          sizeof (*channel_key),
1256                                          SQLITE_STATIC)
1257       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1258   {
1259     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1260                 "sqlite3_bind");
1261   }
1262   else
1263   {
1264     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1265   }
1266
1267   if (SQLITE_OK != sqlite3_reset (stmt))
1268   {
1269     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1270                 "sqlite3_reset");
1271   }
1272
1273   return ret;
1274 }
1275
1276
1277 /**
1278  * Retrieve a fragment of message specified by its message ID and fragment
1279  * offset.
1280  *
1281  * @see GNUNET_PSYCSTORE_message_get_fragment()
1282  *
1283  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1284  */
1285 static int
1286 message_get_fragment (void *cls,
1287                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1288                       uint64_t message_id,
1289                       uint64_t fragment_offset,
1290                       GNUNET_PSYCSTORE_FragmentCallback cb,
1291                       void *cb_cls)
1292 {
1293   struct Plugin *plugin = cls;
1294   sqlite3_stmt *stmt = plugin->select_message_fragment;
1295   int ret = GNUNET_SYSERR;
1296
1297   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1298                                       sizeof (*channel_key),
1299                                       SQLITE_STATIC)
1300       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1301       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1302   {
1303     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1304                 "sqlite3_bind");
1305   }
1306   else
1307   {
1308     switch (sqlite3_step (stmt))
1309     {
1310     case SQLITE_DONE:
1311       ret = GNUNET_NO;
1312       break;
1313     case SQLITE_ROW:
1314       ret = fragment_row (stmt, cb, cb_cls);
1315       break;
1316     default:
1317       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1318                   "sqlite3_step");
1319     }
1320   }
1321
1322   if (SQLITE_OK != sqlite3_reset (stmt))
1323   {
1324     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1325                 "sqlite3_reset");
1326   }
1327
1328   return ret;
1329 }
1330
1331 /**
1332  * Retrieve the max. values of message counters for a channel.
1333  *
1334  * @see GNUNET_PSYCSTORE_counters_get()
1335  *
1336  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1337  */
1338 static int
1339 counters_message_get (void *cls,
1340                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1341                       uint64_t *max_fragment_id,
1342                       uint64_t *max_message_id,
1343                       uint64_t *max_group_generation)
1344 {
1345   struct Plugin *plugin = cls;
1346   sqlite3_stmt *stmt = plugin->select_counters_message;
1347   int ret = GNUNET_SYSERR;
1348
1349   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1350                                       sizeof (*channel_key),
1351                                       SQLITE_STATIC))
1352   {
1353     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1354                 "sqlite3_bind");
1355   }
1356   else
1357   {
1358     switch (sqlite3_step (stmt))
1359     {
1360     case SQLITE_DONE:
1361       ret = GNUNET_NO;
1362       break;
1363     case SQLITE_ROW:
1364       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1365       *max_message_id = sqlite3_column_int64 (stmt, 1);
1366       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1367       ret = GNUNET_OK;
1368       break;
1369     default:
1370       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1371                   "sqlite3_step");
1372     }
1373   }
1374
1375   if (SQLITE_OK != sqlite3_reset (stmt))
1376   {
1377     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1378                 "sqlite3_reset");
1379   }
1380
1381   return ret;
1382 }
1383
1384 /**
1385  * Retrieve the max. values of state counters for a channel.
1386  *
1387  * @see GNUNET_PSYCSTORE_counters_get()
1388  *
1389  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1390  */
1391 static int
1392 counters_state_get (void *cls,
1393                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1394                     uint64_t *max_state_message_id)
1395 {
1396   struct Plugin *plugin = cls;
1397   sqlite3_stmt *stmt = plugin->select_counters_state;
1398   int ret = GNUNET_SYSERR;
1399
1400   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1401                                       sizeof (*channel_key),
1402                                       SQLITE_STATIC))
1403   {
1404     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1405                 "sqlite3_bind");
1406   }
1407   else
1408   {
1409     switch (sqlite3_step (stmt))
1410     {
1411     case SQLITE_DONE:
1412       ret = GNUNET_NO;
1413       break;
1414     case SQLITE_ROW:
1415       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1416       ret = GNUNET_OK;
1417       break;
1418     default:
1419       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1420                   "sqlite3_step");
1421     }
1422   }
1423
1424   if (SQLITE_OK != sqlite3_reset (stmt))
1425   {
1426     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1427                 "sqlite3_reset");
1428   }
1429
1430   return ret;
1431 }
1432
1433
1434 /**
1435  * Assign a value to a state variable.
1436  *
1437  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1438  */
1439 static int
1440 state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
1441               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1442               const char *name, const void *value, size_t value_size)
1443 {
1444   int ret = GNUNET_SYSERR;
1445
1446   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1447                                       sizeof (*channel_key), SQLITE_STATIC)
1448       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1449       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1450                                          SQLITE_STATIC))
1451   {
1452     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1453                 "sqlite3_bind");
1454   }
1455   else
1456   {
1457     switch (sqlite3_step (stmt))
1458     {
1459     case SQLITE_DONE:
1460       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1461       break;
1462     default:
1463       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1464                   "sqlite3_step");
1465     }
1466   }
1467
1468   if (SQLITE_OK != sqlite3_reset (stmt))
1469   {
1470     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1471                 "sqlite3_reset");
1472     return GNUNET_SYSERR;
1473   }
1474
1475   return ret;
1476 }
1477
1478
1479 static int
1480 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1481                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1482                    uint64_t message_id)
1483 {
1484   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1485       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1486                                          sizeof (*channel_key), SQLITE_STATIC))
1487   {
1488     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1489                 "sqlite3_bind");
1490   }
1491   else if (SQLITE_DONE != sqlite3_step (stmt))
1492   {
1493     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1494                 "sqlite3_step");
1495   }
1496   if (SQLITE_OK != sqlite3_reset (stmt))
1497   {
1498     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1499                 "sqlite3_reset");
1500     return GNUNET_SYSERR;
1501   }
1502   return GNUNET_OK;
1503 }
1504
1505
1506 /**
1507  * Begin modifying current state.
1508  */
1509 static int
1510 state_modify_begin (void *cls,
1511                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1512                     uint64_t message_id, uint64_t state_delta)
1513 {
1514   struct Plugin *plugin = cls;
1515
1516   if (state_delta > 0)
1517   {
1518     /**
1519      * We can only apply state modifiers in the current message if modifiers in
1520      * the previous stateful message (message_id - state_delta) were already
1521      * applied.
1522      */
1523
1524     uint64_t max_state_message_id = 0;
1525     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1526     switch (ret)
1527     {
1528     case GNUNET_OK:
1529     case GNUNET_NO: // no state yet
1530       ret = GNUNET_OK;
1531       break;
1532     default:
1533       return ret;
1534     }
1535
1536     if (max_state_message_id < message_id - state_delta)
1537       return GNUNET_NO; /* some stateful messages not yet applied */
1538     else if (message_id - state_delta < max_state_message_id)
1539       return GNUNET_NO; /* changes already applied */
1540   }
1541
1542   if (TRANSACTION_NONE != plugin->transaction)
1543   {
1544     /** @todo FIXME: wait for other transaction to finish  */
1545     return GNUNET_SYSERR;
1546   }
1547   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1548 }
1549
1550
1551 /**
1552  * Set the current value of state variable.
1553  *
1554  * @see GNUNET_PSYCSTORE_state_modify()
1555  *
1556  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1557  */
1558 static int
1559 state_modify_op (void *cls,
1560                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1561                  enum GNUNET_ENV_Operator op,
1562                  const char *name, const void *value, size_t value_size)
1563 {
1564   struct Plugin *plugin = cls;
1565   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1566
1567   switch (op)
1568   {
1569   case GNUNET_ENV_OP_ASSIGN:
1570     return state_assign (plugin, plugin->insert_state_current, channel_key,
1571                          name, value, value_size);
1572
1573   default: /** @todo implement more state operations */
1574     GNUNET_break (0);
1575     return GNUNET_SYSERR;
1576   }
1577 }
1578
1579
1580 /**
1581  * End modifying current state.
1582  */
1583 static int
1584 state_modify_end (void *cls,
1585                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1586                   uint64_t message_id)
1587 {
1588   struct Plugin *plugin = cls;
1589   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1590
1591   return
1592     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1593     && GNUNET_OK == update_message_id (plugin,
1594                                        plugin->update_max_state_message_id,
1595                                        channel_key, message_id)
1596     && GNUNET_OK == transaction_commit (plugin)
1597     ? GNUNET_OK : GNUNET_SYSERR;
1598 }
1599
1600
1601 /**
1602  * Begin state synchronization.
1603  */
1604 static int
1605 state_sync_begin (void *cls,
1606                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1607 {
1608   struct Plugin *plugin = cls;
1609   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1610 }
1611
1612
1613 /**
1614  * Assign current value of a state variable.
1615  *
1616  * @see GNUNET_PSYCSTORE_state_modify()
1617  *
1618  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1619  */
1620 static int
1621 state_sync_assign (void *cls,
1622                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1623                 const char *name, const void *value, size_t value_size)
1624 {
1625   struct Plugin *plugin = cls;
1626   return state_assign (cls, plugin->insert_state_sync, channel_key,
1627                        name, value, value_size);
1628 }
1629
1630
1631 /**
1632  * End modifying current state.
1633  */
1634 static int
1635 state_sync_end (void *cls,
1636                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1637                 uint64_t max_state_message_id,
1638                 uint64_t state_hash_message_id)
1639 {
1640   struct Plugin *plugin = cls;
1641   int ret = GNUNET_SYSERR;
1642
1643   if (TRANSACTION_NONE != plugin->transaction)
1644   {
1645     /** @todo FIXME: wait for other transaction to finish  */
1646     return GNUNET_SYSERR;
1647   }
1648
1649   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1650     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1651     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1652                                   channel_key)
1653     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1654                                   channel_key)
1655     && GNUNET_OK == update_message_id (plugin,
1656                                        plugin->update_state_hash_message_id,
1657                                        channel_key, state_hash_message_id)
1658     && GNUNET_OK == update_message_id (plugin,
1659                                        plugin->update_max_state_message_id,
1660                                        channel_key, max_state_message_id)
1661     && GNUNET_OK == transaction_commit (plugin)
1662     ? ret = GNUNET_OK
1663     : transaction_rollback (plugin);
1664   return ret;
1665 }
1666
1667
1668 /**
1669  * Delete the whole state.
1670  *
1671  * @see GNUNET_PSYCSTORE_state_reset()
1672  *
1673  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1674  */
1675 static int
1676 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1677 {
1678   struct Plugin *plugin = cls;
1679   return exec_channel (plugin, plugin->delete_state, channel_key);
1680 }
1681
1682
1683 /**
1684  * Update signed values of state variables in the state store.
1685  *
1686  * @see GNUNET_PSYCSTORE_state_hash_update()
1687  *
1688  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1689  */
1690 static int
1691 state_update_signed (void *cls,
1692                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1693 {
1694   struct Plugin *plugin = cls;
1695   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1696 }
1697
1698
1699 /**
1700  * Retrieve a state variable by name.
1701  *
1702  * @see GNUNET_PSYCSTORE_state_get()
1703  *
1704  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1705  */
1706 static int
1707 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1708            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1709 {
1710   struct Plugin *plugin = cls;
1711   int ret = GNUNET_SYSERR;
1712
1713   sqlite3_stmt *stmt = plugin->select_state_one;
1714
1715   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1716                                       sizeof (*channel_key),
1717                                       SQLITE_STATIC)
1718       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1719   {
1720     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1721                 "sqlite3_bind");
1722   }
1723   else
1724   {
1725     switch (sqlite3_step (stmt))
1726     {
1727     case SQLITE_DONE:
1728       ret = GNUNET_NO;
1729       break;
1730     case SQLITE_ROW:
1731       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1732                 sqlite3_column_bytes (stmt, 0));
1733       break;
1734     default:
1735       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1736                   "sqlite3_step");
1737     }
1738   }
1739
1740   if (SQLITE_OK != sqlite3_reset (stmt))
1741   {
1742     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1743                 "sqlite3_reset");
1744   }
1745
1746   return ret;
1747 }
1748
1749
1750 /**
1751  * Retrieve all state variables for a channel with the given prefix.
1752  *
1753  * @see GNUNET_PSYCSTORE_state_get_prefix()
1754  *
1755  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1756  */
1757 static int
1758 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1759                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1760                   void *cb_cls)
1761 {
1762   struct Plugin *plugin = cls;
1763   int ret = GNUNET_SYSERR;
1764   sqlite3_stmt *stmt = plugin->select_state_prefix;
1765   size_t name_len = strlen (name);
1766   char *name_prefix;
1767
1768   GNUNET_asprintf (&name_prefix,
1769                    "%s_%%",
1770                    name);
1771   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1772                                       sizeof (*channel_key), SQLITE_STATIC)
1773       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1774       || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1775                                          SQLITE_STATIC))
1776   {
1777     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1778                 "sqlite3_bind");
1779   }
1780   else
1781   {
1782     int sql_ret;
1783     do
1784     {
1785       sql_ret = sqlite3_step (stmt);
1786       switch (sql_ret)
1787       {
1788       case SQLITE_DONE:
1789         if (ret != GNUNET_OK)
1790           ret = GNUNET_NO;
1791         break;
1792       case SQLITE_ROW:
1793         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1794                   sqlite3_column_blob (stmt, 1),
1795                   sqlite3_column_bytes (stmt, 1));
1796         if (ret != GNUNET_YES)
1797           sql_ret = SQLITE_DONE;
1798         break;
1799       default:
1800         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1801                     "sqlite3_step");
1802       }
1803     }
1804     while (sql_ret == SQLITE_ROW);
1805   }
1806   if (SQLITE_OK != sqlite3_reset (stmt))
1807   {
1808     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1809                 "sqlite3_reset");
1810   }
1811   GNUNET_free (name_prefix);
1812   return ret;
1813 }
1814
1815
1816 /**
1817  * Retrieve all signed state variables for a channel.
1818  *
1819  * @see GNUNET_PSYCSTORE_state_get_signed()
1820  *
1821  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1822  */
1823 static int
1824 state_get_signed (void *cls,
1825                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1826                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1827 {
1828   struct Plugin *plugin = cls;
1829   int ret = GNUNET_SYSERR;
1830
1831   sqlite3_stmt *stmt = plugin->select_state_signed;
1832
1833   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1834                                       sizeof (*channel_key), SQLITE_STATIC))
1835   {
1836     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1837                 "sqlite3_bind");
1838   }
1839   else
1840   {
1841     int sql_ret;
1842     do
1843     {
1844       sql_ret = sqlite3_step (stmt);
1845       switch (sql_ret)
1846       {
1847       case SQLITE_DONE:
1848         if (ret != GNUNET_OK)
1849           ret = GNUNET_NO;
1850         break;
1851       case SQLITE_ROW:
1852         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1853                   sqlite3_column_blob (stmt, 1),
1854                   sqlite3_column_bytes (stmt, 1));
1855         if (ret != GNUNET_YES)
1856           sql_ret = SQLITE_DONE;
1857         break;
1858       default:
1859         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1860                     "sqlite3_step");
1861       }
1862     }
1863     while (sql_ret == SQLITE_ROW);
1864   }
1865
1866   if (SQLITE_OK != sqlite3_reset (stmt))
1867   {
1868     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1869                 "sqlite3_reset");
1870   }
1871
1872   return ret;
1873 }
1874
1875
1876 /**
1877  * Entry point for the plugin.
1878  *
1879  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1880  * @return NULL on error, otherwise the plugin context
1881  */
1882 void *
1883 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1884 {
1885   static struct Plugin plugin;
1886   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1887   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1888
1889   if (NULL != plugin.cfg)
1890     return NULL;                /* can only initialize once! */
1891   memset (&plugin, 0, sizeof (struct Plugin));
1892   plugin.cfg = cfg;
1893   if (GNUNET_OK != database_setup (&plugin))
1894   {
1895     database_shutdown (&plugin);
1896     return NULL;
1897   }
1898   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1899   api->cls = &plugin;
1900   api->membership_store = &membership_store;
1901   api->membership_test = &membership_test;
1902   api->fragment_store = &fragment_store;
1903   api->message_add_flags = &message_add_flags;
1904   api->fragment_get = &fragment_get;
1905   api->fragment_get_latest = &fragment_get_latest;
1906   api->message_get = &message_get;
1907   api->message_get_latest = &message_get_latest;
1908   api->message_get_fragment = &message_get_fragment;
1909   api->counters_message_get = &counters_message_get;
1910   api->counters_state_get = &counters_state_get;
1911   api->state_modify_begin = &state_modify_begin;
1912   api->state_modify_op = &state_modify_op;
1913   api->state_modify_end = &state_modify_end;
1914   api->state_sync_begin = &state_sync_begin;
1915   api->state_sync_assign = &state_sync_assign;
1916   api->state_sync_end = &state_sync_end;
1917   api->state_reset = &state_reset;
1918   api->state_update_signed = &state_update_signed;
1919   api->state_get = &state_get;
1920   api->state_get_prefix = &state_get_prefix;
1921   api->state_get_signed = &state_get_signed;
1922
1923   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1924   return api;
1925 }
1926
1927
1928 /**
1929  * Exit point from the plugin.
1930  *
1931  * @param cls The plugin context (as returned by "init")
1932  * @return Always NULL
1933  */
1934 void *
1935 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1936 {
1937   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1938   struct Plugin *plugin = api->cls;
1939
1940   database_shutdown (plugin);
1941   plugin->cfg = NULL;
1942   GNUNET_free (api);
1943   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1944   return NULL;
1945 }
1946
1947 /* end of plugin_psycstore_sqlite.c */