- refactor to check messages from both enc systems
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
39 #include <sqlite3.h>
40
41 /**
42  * After how many ms "busy" should a DB operation fail for good?  A
43  * low value makes sure that we are more responsive to requests
44  * (especially PUTs).  A high value guarantees a higher success rate
45  * (SELECTs in iterate can take several seconds despite LIMIT=1).
46  *
47  * The default value of 1s should ensure that users do not experience
48  * huge latencies while at the same time allowing operations to
49  * succeed with reasonable probability.
50  */
51 #define BUSY_TIMEOUT_MS 1000
52
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54
55 /**
56  * Log an error message at log-level 'level' that indicates
57  * a failure of the command 'cmd' on file 'filename'
58  * with the message given by strerror(errno).
59  */
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
61
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
63
64 enum Transactions {
65   TRANSACTION_NONE = 0,
66   TRANSACTION_STATE_MODIFY
67 };
68
69 /**
70  * Context for all functions in this plugin.
71  */
72 struct Plugin
73 {
74
75   const struct GNUNET_CONFIGURATION_Handle *cfg;
76
77   /**
78    * Database filename.
79    */
80   char *fn;
81
82   /**
83    * Native SQLite database handle.
84    */
85   sqlite3 *dbh;
86
87   /**
88    * Current transaction.
89    */
90   enum Transactions transaction;
91
92   sqlite3_stmt *transaction_begin;
93
94   sqlite3_stmt *transaction_commit;
95
96   sqlite3_stmt *transaction_rollback;
97
98   /**
99    * Precompiled SQL for channel_key_store()
100    */
101   sqlite3_stmt *insert_channel_key;
102
103   /**
104    * Precompiled SQL for slave_key_store()
105    */
106   sqlite3_stmt *insert_slave_key;
107
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   sqlite3_stmt *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   sqlite3_stmt *select_membership;
118
119
120   /**
121    * Precompiled SQL for fragment_store()
122    */
123   sqlite3_stmt *insert_fragment;
124
125   /**
126    * Precompiled SQL for message_add_flags()
127    */
128   sqlite3_stmt *update_message_flags;
129
130   /**
131    * Precompiled SQL for fragment_get()
132    */
133   sqlite3_stmt *select_fragments;
134
135   /**
136    * Precompiled SQL for fragment_get()
137    */
138   sqlite3_stmt *select_latest_fragments;
139
140   /**
141    * Precompiled SQL for message_get()
142    */
143   sqlite3_stmt *select_messages;
144
145   /**
146    * Precompiled SQL for message_get()
147    */
148   sqlite3_stmt *select_latest_messages;
149
150   /**
151    * Precompiled SQL for message_get_fragment()
152    */
153   sqlite3_stmt *select_message_fragment;
154
155   /**
156    * Precompiled SQL for counters_get_message()
157    */
158   sqlite3_stmt *select_counters_message;
159
160   /**
161    * Precompiled SQL for counters_get_state()
162    */
163   sqlite3_stmt *select_counters_state;
164
165   /**
166    * Precompiled SQL for state_modify_end()
167    */
168   sqlite3_stmt *update_state_hash_message_id;
169
170   /**
171    * Precompiled SQL for state_sync_end()
172    */
173   sqlite3_stmt *update_max_state_message_id;
174
175
176   /**
177    * Precompiled SQL for message_modify_begin()
178    */
179   sqlite3_stmt *select_message_state_delta;
180
181   /**
182    * Precompiled SQL for state_modify_set()
183    */
184   sqlite3_stmt *insert_state_current;
185
186   /**
187    * Precompiled SQL for state_modify_end()
188    */
189   sqlite3_stmt *delete_state_empty;
190
191   /**
192    * Precompiled SQL for state_set_signed()
193    */
194   sqlite3_stmt *update_state_signed;
195
196   /**
197    * Precompiled SQL for state_sync()
198    */
199   sqlite3_stmt *insert_state_sync;
200
201   /**
202    * Precompiled SQL for state_sync()
203    */
204   sqlite3_stmt *delete_state;
205
206   /**
207    * Precompiled SQL for state_sync()
208    */
209   sqlite3_stmt *insert_state_from_sync;
210
211   /**
212    * Precompiled SQL for state_sync()
213    */
214   sqlite3_stmt *delete_state_sync;
215
216   /**
217    * Precompiled SQL for state_get_signed()
218    */
219   sqlite3_stmt *select_state_signed;
220
221   /**
222    * Precompiled SQL for state_get()
223    */
224   sqlite3_stmt *select_state_one;
225
226   /**
227    * Precompiled SQL for state_get_prefix()
228    */
229   sqlite3_stmt *select_state_prefix;
230
231 };
232
233 #if DEBUG_PSYCSTORE
234
235 static void
236 sql_trace (void *cls, const char *sql)
237 {
238   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
239 }
240
241 #endif
242
243 /**
244  * @brief Prepare a SQL statement
245  *
246  * @param dbh handle to the database
247  * @param sql SQL statement, UTF-8 encoded
248  * @param stmt set to the prepared statement
249  * @return 0 on success
250  */
251 static int
252 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
253 {
254   char *tail;
255   int result;
256
257   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
258                                (const char **) &tail);
259   LOG (GNUNET_ERROR_TYPE_DEBUG,
260        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
261   if (result != SQLITE_OK)
262     LOG (GNUNET_ERROR_TYPE_ERROR,
263          _("Error preparing SQL query: %s\n  %s\n"),
264          sqlite3_errmsg (dbh), sql);
265   return result;
266 }
267
268
269 /**
270  * @brief Prepare a SQL statement
271  *
272  * @param dbh handle to the database
273  * @param sql SQL statement, UTF-8 encoded
274  * @return 0 on success
275  */
276 static int
277 sql_exec (sqlite3 *dbh, const char *sql)
278 {
279   int result;
280
281   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
282   LOG (GNUNET_ERROR_TYPE_DEBUG,
283        "Executed `%s' / %d\n", sql, result);
284   if (result != SQLITE_OK)
285     LOG (GNUNET_ERROR_TYPE_ERROR,
286          _("Error executing SQL query: %s\n  %s\n"),
287          sqlite3_errmsg (dbh), sql);
288   return result;
289 }
290
291
292 /**
293  * Initialize the database connections and associated
294  * data structures (create tables and indices
295  * as needed as well).
296  *
297  * @param plugin the plugin context (state for this module)
298  * @return GNUNET_OK on success
299  */
300 static int
301 database_setup (struct Plugin *plugin)
302 {
303   char *filename;
304
305   if (GNUNET_OK !=
306       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
307                                                "FILENAME", &filename))
308   {
309     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
310                                "psycstore-sqlite", "FILENAME");
311     return GNUNET_SYSERR;
312   }
313   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
314   {
315     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
316     {
317       GNUNET_break (0);
318       GNUNET_free (filename);
319       return GNUNET_SYSERR;
320     }
321   }
322   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
323   plugin->fn = filename;
324
325   /* Open database and precompile statements */
326   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
327   {
328     LOG (GNUNET_ERROR_TYPE_ERROR,
329          _("Unable to initialize SQLite: %s.\n"),
330          sqlite3_errmsg (plugin->dbh));
331     return GNUNET_SYSERR;
332   }
333
334 #if DEBUG_PSYCSTORE
335   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
336 #endif
337
338   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
339   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
340   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
341   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
342   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
343 #if ! DEBUG_PSYCSTORE
344   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
345 #endif
346   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
347
348   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
349
350   /* Create tables */
351
352   sql_exec (plugin->dbh,
353             "CREATE TABLE IF NOT EXISTS channels (\n"
354             "  id INTEGER PRIMARY KEY,\n"
355             "  pub_key BLOB UNIQUE,\n"
356             "  max_state_message_id INTEGER,\n"
357             "  state_hash_message_id INTEGER\n"
358             ");");
359
360   sql_exec (plugin->dbh,
361             "CREATE TABLE IF NOT EXISTS slaves (\n"
362             "  id INTEGER PRIMARY KEY,\n"
363             "  pub_key BLOB UNIQUE\n"
364             ");");
365
366   sql_exec (plugin->dbh,
367             "CREATE TABLE IF NOT EXISTS membership (\n"
368             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
369             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
370             "  did_join INTEGER NOT NULL,\n"
371             "  announced_at INTEGER NOT NULL,\n"
372             "  effective_since INTEGER NOT NULL,\n"
373             "  group_generation INTEGER NOT NULL\n"
374             ");");
375   sql_exec (plugin->dbh,
376             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
377             "ON membership (channel_id, slave_id);");
378
379   sql_exec (plugin->dbh,
380             "CREATE TABLE IF NOT EXISTS messages (\n"
381             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
382             "  hop_counter INTEGER NOT NULL,\n"
383             "  signature BLOB,\n"
384             "  purpose BLOB,\n"
385             "  fragment_id INTEGER NOT NULL,\n"
386             "  fragment_offset INTEGER NOT NULL,\n"
387             "  message_id INTEGER NOT NULL,\n"
388             "  group_generation INTEGER NOT NULL,\n"
389             "  multicast_flags INTEGER NOT NULL,\n"
390             "  psycstore_flags INTEGER NOT NULL,\n"
391             "  data BLOB,\n"
392             "  PRIMARY KEY (channel_id, fragment_id),\n"
393             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
394             ");");
395
396   sql_exec (plugin->dbh,
397             "CREATE TABLE IF NOT EXISTS state (\n"
398             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
399             "  name TEXT NOT NULL,\n"
400             "  value_current BLOB,\n"
401             "  value_signed BLOB,\n"
402             "  PRIMARY KEY (channel_id, name)\n"
403             ");");
404
405   sql_exec (plugin->dbh,
406             "CREATE TABLE IF NOT EXISTS state_sync (\n"
407             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
408             "  name TEXT NOT NULL,\n"
409             "  value BLOB,\n"
410             "  PRIMARY KEY (channel_id, name)\n"
411             ");");
412
413   /* Prepare statements */
414
415   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
416
417   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
418
419   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
420
421   sql_prepare (plugin->dbh,
422                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
423                &plugin->insert_channel_key);
424
425   sql_prepare (plugin->dbh,
426                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
427                &plugin->insert_slave_key);
428
429   sql_prepare (plugin->dbh,
430                "INSERT INTO membership\n"
431                " (channel_id, slave_id, did_join, announced_at,\n"
432                "  effective_since, group_generation)\n"
433                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
434                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
435                "        ?, ?, ?, ?);",
436                &plugin->insert_membership);
437
438   sql_prepare (plugin->dbh,
439                "SELECT did_join FROM membership\n"
440                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
441                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
442                "      AND effective_since <= ? AND did_join = 1\n"
443                "ORDER BY announced_at DESC LIMIT 1;",
444                &plugin->select_membership);
445
446   sql_prepare (plugin->dbh,
447                "INSERT OR IGNORE INTO messages\n"
448                " (channel_id, hop_counter, signature, purpose,\n"
449                "  fragment_id, fragment_offset, message_id,\n"
450                "  group_generation, multicast_flags, psycstore_flags, data)\n"
451                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
452                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
453                &plugin->insert_fragment);
454
455   sql_prepare (plugin->dbh,
456                "UPDATE messages\n"
457                "SET psycstore_flags = psycstore_flags | ?\n"
458                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
459                "      AND message_id = ? AND fragment_offset = 0;",
460                &plugin->update_message_flags);
461
462   sql_prepare (plugin->dbh,
463                "SELECT hop_counter, signature, purpose, fragment_id,\n"
464                "       fragment_offset, message_id, group_generation,\n"
465                "       multicast_flags, psycstore_flags, data\n"
466                "FROM messages\n"
467                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
468                "      AND ? <= fragment_id AND fragment_id <= ?;",
469                &plugin->select_fragments);
470
471   sql_prepare (plugin->dbh,
472                "SELECT hop_counter, signature, purpose, fragment_id,\n"
473                "       fragment_offset, message_id, group_generation,\n"
474                "       multicast_flags, psycstore_flags, data\n"
475                "FROM messages\n"
476                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477                "      AND ? <= message_id AND message_id <= ?;",
478                &plugin->select_messages);
479
480   sql_prepare (plugin->dbh,
481                "SELECT * FROM\n"
482                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
483                "        fragment_offset, message_id, group_generation,\n"
484                "        multicast_flags, psycstore_flags, data\n"
485                " FROM messages\n"
486                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
487                " ORDER BY fragment_id DESC\n"
488                " LIMIT ?)\n"
489                "ORDER BY fragment_id;",
490                &plugin->select_latest_fragments);
491
492   sql_prepare (plugin->dbh,
493                "SELECT hop_counter, signature, purpose, fragment_id,\n"
494                "       fragment_offset, message_id, group_generation,\n"
495                "        multicast_flags, psycstore_flags, data\n"
496                "FROM messages\n"
497                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
498                "      AND message_id IN\n"
499                "      (SELECT message_id\n"
500                "       FROM messages\n"
501                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
502                "       ORDER BY message_id\n"
503                "       DESC LIMIT ?)\n"
504                "ORDER BY fragment_id;",
505                &plugin->select_latest_messages);
506
507   sql_prepare (plugin->dbh,
508                "SELECT hop_counter, signature, purpose, fragment_id,\n"
509                "       fragment_offset, message_id, group_generation,\n"
510                "       multicast_flags, psycstore_flags, data\n"
511                "FROM messages\n"
512                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
513                "      AND message_id = ? AND fragment_offset = ?;",
514                &plugin->select_message_fragment);
515
516   sql_prepare (plugin->dbh,
517                "SELECT fragment_id, message_id, group_generation\n"
518                "FROM messages\n"
519                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
520                "ORDER BY fragment_id DESC LIMIT 1;",
521                &plugin->select_counters_message);
522
523   sql_prepare (plugin->dbh,
524                "SELECT max_state_message_id\n"
525                "FROM channels\n"
526                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
527                &plugin->select_counters_state);
528
529   sql_prepare (plugin->dbh,
530                "UPDATE channels\n"
531                "SET max_state_message_id = ?\n"
532                "WHERE pub_key = ?;",
533                &plugin->update_max_state_message_id);
534
535   sql_prepare (plugin->dbh,
536                "UPDATE channels\n"
537                "SET state_hash_message_id = ?\n"
538                "WHERE pub_key = ?;",
539                &plugin->update_state_hash_message_id);
540
541   sql_prepare (plugin->dbh,
542                "SELECT 1\n"
543                "FROM channels AS c\n"
544                "LEFT JOIN messages AS m\n"
545                "ON c.id = m.channel_id\n"
546                "WHERE c.pub_key = ?\n"
547                "      AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
548                "           OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
549                "LIMIT 1;",
550                &plugin->select_message_state_delta);
551
552   sql_prepare (plugin->dbh,
553                "INSERT OR REPLACE INTO state\n"
554                "  (channel_id, name, value_current, value_signed)\n"
555                "SELECT new.channel_id, new.name,\n"
556                "       new.value_current, old.value_signed\n"
557                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
558                "             AS channel_id,\n"
559                "             ? AS name, ? AS value_current) AS new\n"
560                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
561                "           FROM state) AS old\n"
562                "ON new.channel_id = old.channel_id AND new.name = old.name;",
563                &plugin->insert_state_current);
564
565   sql_prepare (plugin->dbh,
566                "DELETE FROM state\n"
567                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
568                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
569                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
570                &plugin->delete_state_empty);
571
572   sql_prepare (plugin->dbh,
573                "UPDATE state\n"
574                "SET value_signed = value_current\n"
575                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
576                &plugin->update_state_signed);
577
578   sql_prepare (plugin->dbh,
579                "DELETE FROM state\n"
580                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
581                &plugin->delete_state);
582
583   sql_prepare (plugin->dbh,
584                "INSERT INTO state_sync (channel_id, name, value)\n"
585                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
586                &plugin->insert_state_sync);
587
588   sql_prepare (plugin->dbh,
589                "INSERT INTO state\n"
590                " (channel_id, name, value_current, value_signed)\n"
591                "SELECT channel_id, name, value, value\n"
592                "FROM state_sync\n"
593                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
594                &plugin->insert_state_from_sync);
595
596   sql_prepare (plugin->dbh,
597                "DELETE FROM state_sync\n"
598                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
599                &plugin->delete_state_sync);
600
601   sql_prepare (plugin->dbh,
602                "SELECT value_current\n"
603                "FROM state\n"
604                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
605                "      AND name = ?;",
606                &plugin->select_state_one);
607
608   sql_prepare (plugin->dbh,
609                "SELECT name, value_current\n"
610                "FROM state\n"
611                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
612                "      AND (name = ? OR name LIKE ?);",
613                &plugin->select_state_prefix);
614
615   sql_prepare (plugin->dbh,
616                "SELECT name, value_signed\n"
617                "FROM state\n"
618                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
619                "      AND value_signed IS NOT NULL;",
620                &plugin->select_state_signed);
621
622   return GNUNET_OK;
623 }
624
625
626 /**
627  * Shutdown database connection and associate data
628  * structures.
629  * @param plugin the plugin context (state for this module)
630  */
631 static void
632 database_shutdown (struct Plugin *plugin)
633 {
634   int result;
635   sqlite3_stmt *stmt;
636   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
637   {
638     result = sqlite3_finalize (stmt);
639     if (SQLITE_OK != result)
640       LOG (GNUNET_ERROR_TYPE_WARNING,
641            "Failed to close statement %p: %d\n", stmt, result);
642   }
643   if (SQLITE_OK != sqlite3_close (plugin->dbh))
644     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
645
646   GNUNET_free_non_null (plugin->fn);
647 }
648
649 /**
650  * Execute a prepared statement with a @a channel_key argument.
651  *
652  * @param plugin Plugin handle.
653  * @param stmt Statement to execute.
654  * @param channel_key Public key of the channel.
655  *
656  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
657  */
658 static int
659 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
660               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
661 {
662   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
663                                       sizeof (*channel_key), SQLITE_STATIC))
664   {
665     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
666                 "sqlite3_bind");
667   }
668   else if (SQLITE_DONE != sqlite3_step (stmt))
669   {
670     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
671                 "sqlite3_step");
672   }
673
674   if (SQLITE_OK != sqlite3_reset (stmt))
675   {
676     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
677                 "sqlite3_reset");
678     return GNUNET_SYSERR;
679   }
680
681   return GNUNET_OK;
682 }
683
684 /**
685  * Begin a transaction.
686  */
687 static int
688 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
689 {
690   sqlite3_stmt *stmt = plugin->transaction_begin;
691
692   if (SQLITE_DONE != sqlite3_step (stmt))
693   {
694     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
695                 "sqlite3_step");
696   }
697   if (SQLITE_OK != sqlite3_reset (stmt))
698   {
699     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
700                 "sqlite3_reset");
701     return GNUNET_SYSERR;
702   }
703
704   plugin->transaction = transaction;
705   return GNUNET_OK;
706 }
707
708
709 /**
710  * Commit current transaction.
711  */
712 static int
713 transaction_commit (struct Plugin *plugin)
714 {
715   sqlite3_stmt *stmt = plugin->transaction_commit;
716
717   if (SQLITE_DONE != sqlite3_step (stmt))
718   {
719     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
720                 "sqlite3_step");
721   }
722   if (SQLITE_OK != sqlite3_reset (stmt))
723   {
724     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
725                 "sqlite3_reset");
726     return GNUNET_SYSERR;
727   }
728
729   plugin->transaction = TRANSACTION_NONE;
730   return GNUNET_OK;
731 }
732
733
734 /**
735  * Roll back current transaction.
736  */
737 static int
738 transaction_rollback (struct Plugin *plugin)
739 {
740   sqlite3_stmt *stmt = plugin->transaction_rollback;
741
742   if (SQLITE_DONE != sqlite3_step (stmt))
743   {
744     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
745                 "sqlite3_step");
746   }
747   if (SQLITE_OK != sqlite3_reset (stmt))
748   {
749     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
750                 "sqlite3_reset");
751     return GNUNET_SYSERR;
752   }
753   plugin->transaction = TRANSACTION_NONE;
754   return GNUNET_OK;
755 }
756
757
758 static int
759 channel_key_store (struct Plugin *plugin,
760                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
761 {
762   sqlite3_stmt *stmt = plugin->insert_channel_key;
763
764   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
765                                       sizeof (*channel_key), SQLITE_STATIC))
766   {
767     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
768                 "sqlite3_bind");
769   }
770   else if (SQLITE_DONE != sqlite3_step (stmt))
771   {
772     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
773                 "sqlite3_step");
774   }
775
776   if (SQLITE_OK != sqlite3_reset (stmt))
777   {
778     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
779                 "sqlite3_reset");
780     return GNUNET_SYSERR;
781   }
782
783   return GNUNET_OK;
784 }
785
786
787 static int
788 slave_key_store (struct Plugin *plugin,
789                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
790 {
791   sqlite3_stmt *stmt = plugin->insert_slave_key;
792
793   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
794                                       sizeof (*slave_key), SQLITE_STATIC))
795   {
796     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
797                 "sqlite3_bind");
798   }
799   else if (SQLITE_DONE != sqlite3_step (stmt))
800   {
801     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
802                 "sqlite3_step");
803   }
804
805
806   if (SQLITE_OK != sqlite3_reset (stmt))
807   {
808     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
809                 "sqlite3_reset");
810     return GNUNET_SYSERR;
811   }
812
813   return GNUNET_OK;
814 }
815
816
817 /**
818  * Store join/leave events for a PSYC channel in order to be able to answer
819  * membership test queries later.
820  *
821  * @see GNUNET_PSYCSTORE_membership_store()
822  *
823  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
824  */
825 static int
826 membership_store (void *cls,
827                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
828                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
829                   int did_join,
830                   uint64_t announced_at,
831                   uint64_t effective_since,
832                   uint64_t group_generation)
833 {
834   struct Plugin *plugin = cls;
835   sqlite3_stmt *stmt = plugin->insert_membership;
836
837   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
838
839   if (announced_at > INT64_MAX ||
840       effective_since > INT64_MAX ||
841       group_generation > INT64_MAX)
842   {
843     GNUNET_break (0);
844     return GNUNET_SYSERR;
845   }
846
847   if (GNUNET_OK != channel_key_store (plugin, channel_key)
848       || GNUNET_OK != slave_key_store (plugin, slave_key))
849     return GNUNET_SYSERR;
850
851   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
852                                       sizeof (*channel_key), SQLITE_STATIC)
853       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
854                                          sizeof (*slave_key), SQLITE_STATIC)
855       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
856       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
857       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
858       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
859   {
860     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
861                 "sqlite3_bind");
862   }
863   else if (SQLITE_DONE != sqlite3_step (stmt))
864   {
865     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
866                 "sqlite3_step");
867   }
868
869   if (SQLITE_OK != sqlite3_reset (stmt))
870   {
871     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
872                 "sqlite3_reset");
873     return GNUNET_SYSERR;
874   }
875
876   return GNUNET_OK;
877 }
878
879 /**
880  * Test if a member was admitted to the channel at the given message ID.
881  *
882  * @see GNUNET_PSYCSTORE_membership_test()
883  *
884  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
885  *         #GNUNET_SYSERR if there was en error.
886  */
887 static int
888 membership_test (void *cls,
889                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
890                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
891                  uint64_t message_id)
892 {
893   struct Plugin *plugin = cls;
894   sqlite3_stmt *stmt = plugin->select_membership;
895   int ret = GNUNET_SYSERR;
896
897   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
898                                       sizeof (*channel_key), SQLITE_STATIC)
899       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
900                                          sizeof (*slave_key), SQLITE_STATIC)
901       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
902   {
903     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
904                 "sqlite3_bind");
905   }
906   else
907   {
908     switch (sqlite3_step (stmt))
909     {
910     case SQLITE_DONE:
911       ret = GNUNET_NO;
912       break;
913     case SQLITE_ROW:
914       ret = GNUNET_YES;
915     }
916   }
917
918   if (SQLITE_OK != sqlite3_reset (stmt))
919   {
920     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
921                 "sqlite3_reset");
922   }
923
924   return ret;
925 }
926
927 /**
928  * Store a message fragment sent to a channel.
929  *
930  * @see GNUNET_PSYCSTORE_fragment_store()
931  *
932  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
933  */
934 static int
935 fragment_store (void *cls,
936                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
937                 const struct GNUNET_MULTICAST_MessageHeader *msg,
938                 uint32_t psycstore_flags)
939 {
940   struct Plugin *plugin = cls;
941   sqlite3_stmt *stmt = plugin->insert_fragment;
942
943   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
944
945   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
946   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
947   uint64_t message_id = GNUNET_ntohll (msg->message_id);
948   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
949
950   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
951       message_id > INT64_MAX || group_generation > INT64_MAX)
952   {
953     LOG (GNUNET_ERROR_TYPE_ERROR,
954          "Tried to store fragment with a field > INT64_MAX: "
955          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
956          message_id, group_generation);
957     GNUNET_break (0);
958     return GNUNET_SYSERR;
959   }
960
961   if (GNUNET_OK != channel_key_store (plugin, channel_key))
962     return GNUNET_SYSERR;
963
964   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
965                                       sizeof (*channel_key), SQLITE_STATIC)
966       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
967       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
968                                          sizeof (msg->signature), SQLITE_STATIC)
969       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
970                                          sizeof (msg->purpose), SQLITE_STATIC)
971       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
972       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
973       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
974       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
975       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
976       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
977       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
978                                          ntohs (msg->header.size)
979                                          - sizeof (*msg), SQLITE_STATIC))
980   {
981     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
982                 "sqlite3_bind");
983   }
984   else if (SQLITE_DONE != sqlite3_step (stmt))
985   {
986     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
987                 "sqlite3_step");
988   }
989
990   if (SQLITE_OK != sqlite3_reset (stmt))
991   {
992     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
993                 "sqlite3_reset");
994     return GNUNET_SYSERR;
995   }
996
997   return GNUNET_OK;
998 }
999
1000 /**
1001  * Set additional flags for a given message.
1002  *
1003  * They are OR'd with any existing flags set.
1004  *
1005  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1006  */
1007 static int
1008 message_add_flags (void *cls,
1009                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1010                    uint64_t message_id,
1011                    uint64_t psycstore_flags)
1012 {
1013   struct Plugin *plugin = cls;
1014   sqlite3_stmt *stmt = plugin->update_message_flags;
1015   int ret = GNUNET_SYSERR;
1016
1017   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1018       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1019                                          sizeof (*channel_key), SQLITE_STATIC)
1020       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1021   {
1022     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1023                 "sqlite3_bind");
1024   }
1025   else
1026   {
1027     switch (sqlite3_step (stmt))
1028     {
1029     case SQLITE_DONE:
1030       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1031       break;
1032     default:
1033       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1034                   "sqlite3_step");
1035     }
1036   }
1037
1038   if (SQLITE_OK != sqlite3_reset (stmt))
1039   {
1040     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1041                 "sqlite3_reset");
1042     return GNUNET_SYSERR;
1043   }
1044
1045   return ret;
1046 }
1047
1048 static int
1049 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1050               void *cb_cls)
1051 {
1052   int data_size = sqlite3_column_bytes (stmt, 9);
1053   struct GNUNET_MULTICAST_MessageHeader *msg
1054     = GNUNET_malloc (sizeof (*msg) + data_size);
1055
1056   msg->header.size = htons (sizeof (*msg) + data_size);
1057   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1058   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1059   memcpy (&msg->signature,
1060           sqlite3_column_blob (stmt, 1),
1061           sqlite3_column_bytes (stmt, 1));
1062   memcpy (&msg->purpose,
1063           sqlite3_column_blob (stmt, 2),
1064           sqlite3_column_bytes (stmt, 2));
1065   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1066   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1067   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1068   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1069   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1070   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1071
1072   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1073 }
1074
1075
1076 static int
1077 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1078                  uint64_t *returned_fragments,
1079                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1080 {
1081   int ret = GNUNET_SYSERR;
1082   int sql_ret;
1083
1084   do
1085   {
1086     sql_ret = sqlite3_step (stmt);
1087     switch (sql_ret)
1088     {
1089     case SQLITE_DONE:
1090       if (ret != GNUNET_OK)
1091         ret = GNUNET_NO;
1092       break;
1093     case SQLITE_ROW:
1094       ret = fragment_row (stmt, cb, cb_cls);
1095       (*returned_fragments)++;
1096       if (ret != GNUNET_YES)
1097         sql_ret = SQLITE_DONE;
1098       break;
1099     default:
1100       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1101                   "sqlite3_step");
1102     }
1103   }
1104   while (sql_ret == SQLITE_ROW);
1105
1106   return ret;
1107 }
1108
1109 /**
1110  * Retrieve a message fragment range by fragment ID.
1111  *
1112  * @see GNUNET_PSYCSTORE_fragment_get()
1113  *
1114  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1115  */
1116 static int
1117 fragment_get (void *cls,
1118               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1119               uint64_t first_fragment_id,
1120               uint64_t last_fragment_id,
1121               uint64_t *returned_fragments,
1122               GNUNET_PSYCSTORE_FragmentCallback cb,
1123               void *cb_cls)
1124 {
1125   struct Plugin *plugin = cls;
1126   sqlite3_stmt *stmt = plugin->select_fragments;
1127   int ret = GNUNET_SYSERR;
1128   *returned_fragments = 0;
1129
1130   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1131                                       sizeof (*channel_key),
1132                                       SQLITE_STATIC)
1133       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1134       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1135   {
1136     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1137                 "sqlite3_bind");
1138   }
1139   else
1140   {
1141     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1142   }
1143
1144   if (SQLITE_OK != sqlite3_reset (stmt))
1145   {
1146     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1147                 "sqlite3_reset");
1148   }
1149
1150   return ret;
1151 }
1152
1153
1154 /**
1155  * Retrieve a message fragment range by fragment ID.
1156  *
1157  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1158  *
1159  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1160  */
1161 static int
1162 fragment_get_latest (void *cls,
1163                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1164                      uint64_t fragment_limit,
1165                      uint64_t *returned_fragments,
1166                      GNUNET_PSYCSTORE_FragmentCallback cb,
1167                      void *cb_cls)
1168 {
1169   struct Plugin *plugin = cls;
1170   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1171   int ret = GNUNET_SYSERR;
1172   *returned_fragments = 0;
1173
1174   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1175                                       sizeof (*channel_key),
1176                                       SQLITE_STATIC)
1177       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1178   {
1179     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1180                 "sqlite3_bind");
1181   }
1182   else
1183   {
1184     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1185   }
1186
1187   if (SQLITE_OK != sqlite3_reset (stmt))
1188   {
1189     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1190                 "sqlite3_reset");
1191   }
1192
1193   return ret;
1194 }
1195
1196
1197 /**
1198  * Retrieve all fragments of a message ID range.
1199  *
1200  * @see GNUNET_PSYCSTORE_message_get()
1201  *
1202  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1203  */
1204 static int
1205 message_get (void *cls,
1206              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1207              uint64_t first_message_id,
1208              uint64_t last_message_id,
1209              uint64_t *returned_fragments,
1210              GNUNET_PSYCSTORE_FragmentCallback cb,
1211              void *cb_cls)
1212 {
1213   struct Plugin *plugin = cls;
1214   sqlite3_stmt *stmt = plugin->select_messages;
1215   int ret = GNUNET_SYSERR;
1216   *returned_fragments = 0;
1217
1218   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1219                                       sizeof (*channel_key),
1220                                       SQLITE_STATIC)
1221       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1222       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1223   {
1224     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1225                 "sqlite3_bind");
1226   }
1227   else
1228   {
1229     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1230   }
1231
1232   if (SQLITE_OK != sqlite3_reset (stmt))
1233   {
1234     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1235                 "sqlite3_reset");
1236   }
1237
1238   return ret;
1239 }
1240
1241
1242 /**
1243  * Retrieve all fragments of the latest messages.
1244  *
1245  * @see GNUNET_PSYCSTORE_message_get_latest()
1246  *
1247  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1248  */
1249 static int
1250 message_get_latest (void *cls,
1251                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1252                     uint64_t message_limit,
1253                     uint64_t *returned_fragments,
1254                     GNUNET_PSYCSTORE_FragmentCallback cb,
1255                     void *cb_cls)
1256 {
1257   struct Plugin *plugin = cls;
1258   sqlite3_stmt *stmt = plugin->select_latest_messages;
1259   int ret = GNUNET_SYSERR;
1260   *returned_fragments = 0;
1261
1262   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1263                                       sizeof (*channel_key),
1264                                       SQLITE_STATIC)
1265       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1266                                          sizeof (*channel_key),
1267                                          SQLITE_STATIC)
1268       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1269   {
1270     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1271                 "sqlite3_bind");
1272   }
1273   else
1274   {
1275     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1276   }
1277
1278   if (SQLITE_OK != sqlite3_reset (stmt))
1279   {
1280     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1281                 "sqlite3_reset");
1282   }
1283
1284   return ret;
1285 }
1286
1287
1288 /**
1289  * Retrieve a fragment of message specified by its message ID and fragment
1290  * offset.
1291  *
1292  * @see GNUNET_PSYCSTORE_message_get_fragment()
1293  *
1294  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1295  */
1296 static int
1297 message_get_fragment (void *cls,
1298                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1299                       uint64_t message_id,
1300                       uint64_t fragment_offset,
1301                       GNUNET_PSYCSTORE_FragmentCallback cb,
1302                       void *cb_cls)
1303 {
1304   struct Plugin *plugin = cls;
1305   sqlite3_stmt *stmt = plugin->select_message_fragment;
1306   int ret = GNUNET_SYSERR;
1307
1308   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1309                                       sizeof (*channel_key),
1310                                       SQLITE_STATIC)
1311       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1312       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1313   {
1314     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1315                 "sqlite3_bind");
1316   }
1317   else
1318   {
1319     switch (sqlite3_step (stmt))
1320     {
1321     case SQLITE_DONE:
1322       ret = GNUNET_NO;
1323       break;
1324     case SQLITE_ROW:
1325       ret = fragment_row (stmt, cb, cb_cls);
1326       break;
1327     default:
1328       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1329                   "sqlite3_step");
1330     }
1331   }
1332
1333   if (SQLITE_OK != sqlite3_reset (stmt))
1334   {
1335     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1336                 "sqlite3_reset");
1337   }
1338
1339   return ret;
1340 }
1341
1342 /**
1343  * Retrieve the max. values of message counters for a channel.
1344  *
1345  * @see GNUNET_PSYCSTORE_counters_get()
1346  *
1347  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1348  */
1349 static int
1350 counters_message_get (void *cls,
1351                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1352                       uint64_t *max_fragment_id,
1353                       uint64_t *max_message_id,
1354                       uint64_t *max_group_generation)
1355 {
1356   struct Plugin *plugin = cls;
1357   sqlite3_stmt *stmt = plugin->select_counters_message;
1358   int ret = GNUNET_SYSERR;
1359
1360   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1361                                       sizeof (*channel_key),
1362                                       SQLITE_STATIC))
1363   {
1364     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1365                 "sqlite3_bind");
1366   }
1367   else
1368   {
1369     switch (sqlite3_step (stmt))
1370     {
1371     case SQLITE_DONE:
1372       ret = GNUNET_NO;
1373       break;
1374     case SQLITE_ROW:
1375       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1376       *max_message_id = sqlite3_column_int64 (stmt, 1);
1377       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1378       ret = GNUNET_OK;
1379       break;
1380     default:
1381       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1382                   "sqlite3_step");
1383     }
1384   }
1385
1386   if (SQLITE_OK != sqlite3_reset (stmt))
1387   {
1388     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1389                 "sqlite3_reset");
1390   }
1391
1392   return ret;
1393 }
1394
1395 /**
1396  * Retrieve the max. values of state counters for a channel.
1397  *
1398  * @see GNUNET_PSYCSTORE_counters_get()
1399  *
1400  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1401  */
1402 static int
1403 counters_state_get (void *cls,
1404                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1405                     uint64_t *max_state_message_id)
1406 {
1407   struct Plugin *plugin = cls;
1408   sqlite3_stmt *stmt = plugin->select_counters_state;
1409   int ret = GNUNET_SYSERR;
1410
1411   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1412                                       sizeof (*channel_key),
1413                                       SQLITE_STATIC))
1414   {
1415     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1416                 "sqlite3_bind");
1417   }
1418   else
1419   {
1420     switch (sqlite3_step (stmt))
1421     {
1422     case SQLITE_DONE:
1423       ret = GNUNET_NO;
1424       break;
1425     case SQLITE_ROW:
1426       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1427       ret = GNUNET_OK;
1428       break;
1429     default:
1430       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1431                   "sqlite3_step");
1432     }
1433   }
1434
1435   if (SQLITE_OK != sqlite3_reset (stmt))
1436   {
1437     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1438                 "sqlite3_reset");
1439   }
1440
1441   return ret;
1442 }
1443
1444
1445 /**
1446  * Set a state variable to the given value.
1447  *
1448  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1449  */
1450 static int
1451 state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1452            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1453            const char *name, const void *value, size_t value_size)
1454 {
1455   int ret = GNUNET_SYSERR;
1456
1457   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1458                                       sizeof (*channel_key), SQLITE_STATIC)
1459       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1460       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1461                                          SQLITE_STATIC))
1462   {
1463     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1464                 "sqlite3_bind");
1465   }
1466   else
1467   {
1468     switch (sqlite3_step (stmt))
1469     {
1470     case SQLITE_DONE:
1471       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1472       break;
1473     default:
1474       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1475                   "sqlite3_step");
1476     }
1477   }
1478
1479   if (SQLITE_OK != sqlite3_reset (stmt))
1480   {
1481     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1482                 "sqlite3_reset");
1483     return GNUNET_SYSERR;
1484   }
1485
1486   return ret;
1487 }
1488
1489
1490 static int
1491 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1492                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1493                    uint64_t message_id)
1494 {
1495   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1496       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1497                                          sizeof (*channel_key), SQLITE_STATIC))
1498   {
1499     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1500                 "sqlite3_bind");
1501   }
1502   else if (SQLITE_DONE != sqlite3_step (stmt))
1503   {
1504     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1505                 "sqlite3_step");
1506   }
1507   if (SQLITE_OK != sqlite3_reset (stmt))
1508   {
1509     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1510                 "sqlite3_reset");
1511     return GNUNET_SYSERR;
1512   }
1513   return GNUNET_OK;
1514 }
1515
1516
1517 /**
1518  * Begin modifying current state.
1519  */
1520 static int
1521 state_modify_begin (void *cls,
1522                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1523                     uint64_t message_id, uint64_t state_delta)
1524 {
1525   struct Plugin *plugin = cls;
1526   sqlite3_stmt *stmt = plugin->select_message_state_delta;
1527
1528   if (state_delta > 0)
1529   {
1530     int ret = GNUNET_SYSERR;
1531     if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1532                                         sizeof (*channel_key), SQLITE_STATIC)
1533         || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1534                                             message_id - state_delta)
1535         || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1536                                             message_id)
1537         || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1538                                             message_id - state_delta)
1539         || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1540                                             GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1541     {
1542       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1543                   "sqlite3_bind");
1544     }
1545     else
1546     {
1547       switch (sqlite3_step (stmt))
1548       {
1549       case SQLITE_DONE:
1550         ret = GNUNET_NO;
1551         break;
1552       case SQLITE_ROW:
1553         ret = GNUNET_OK;
1554         break;
1555       default:
1556         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1557                     "sqlite3_step");
1558       }
1559     }
1560     if (SQLITE_OK != sqlite3_reset (stmt))
1561     {
1562       ret = GNUNET_SYSERR;
1563       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1564                   "sqlite3_reset");
1565      }
1566     if (GNUNET_OK != ret)
1567       return ret;
1568   }
1569
1570   if (TRANSACTION_NONE != plugin->transaction)
1571       if (GNUNET_OK != transaction_rollback (plugin))
1572           return GNUNET_SYSERR;
1573
1574   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1575 }
1576
1577
1578 /**
1579  * Set the current value of state variable.
1580  *
1581  * @see GNUNET_PSYCSTORE_state_modify()
1582  *
1583  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1584  */
1585 static int
1586 state_modify_set (void *cls,
1587                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1588                   const char *name, const void *value, size_t value_size)
1589 {
1590   struct Plugin *plugin = cls;
1591   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1592
1593   return state_set (plugin, plugin->insert_state_current, channel_key,
1594                     name, value, value_size);
1595
1596 }
1597
1598
1599 /**
1600  * End modifying current state.
1601  */
1602 static int
1603 state_modify_end (void *cls,
1604                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1605                   uint64_t message_id)
1606 {
1607   struct Plugin *plugin = cls;
1608   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1609
1610   return
1611     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1612     && GNUNET_OK == update_message_id (plugin,
1613                                        plugin->update_max_state_message_id,
1614                                        channel_key, message_id)
1615     && GNUNET_OK == transaction_commit (plugin)
1616     ? GNUNET_OK : GNUNET_SYSERR;
1617 }
1618
1619
1620 /**
1621  * Begin state synchronization.
1622  */
1623 static int
1624 state_sync_begin (void *cls,
1625                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1626 {
1627   struct Plugin *plugin = cls;
1628   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1629 }
1630
1631
1632 /**
1633  * Set the current value of state variable.
1634  *
1635  * @see GNUNET_PSYCSTORE_state_modify()
1636  *
1637  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1638  */
1639 static int
1640 state_sync_set (void *cls,
1641                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1642                 const char *name, const void *value, size_t value_size)
1643 {
1644   struct Plugin *plugin = cls;
1645   return state_set (cls, plugin->insert_state_sync, channel_key,
1646                     name, value, value_size);
1647 }
1648
1649
1650 /**
1651  * End modifying current state.
1652  */
1653 static int
1654 state_sync_end (void *cls,
1655                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1656                 uint64_t message_id)
1657 {
1658   struct Plugin *plugin = cls;
1659   int ret = GNUNET_SYSERR;
1660
1661   GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1662     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1663     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1664                                   channel_key)
1665     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1666                                   channel_key)
1667     && GNUNET_OK == update_message_id (plugin,
1668                                        plugin->update_state_hash_message_id,
1669                                        channel_key, message_id)
1670     && GNUNET_OK == transaction_commit (plugin)
1671     ? ret = GNUNET_OK
1672     : transaction_rollback (plugin);
1673   return ret;
1674 }
1675
1676
1677 /**
1678  * Reset the state of a channel.
1679  *
1680  * @see GNUNET_PSYCSTORE_state_reset()
1681  *
1682  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1683  */
1684 static int
1685 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1686 {
1687   struct Plugin *plugin = cls;
1688   return exec_channel (plugin, plugin->delete_state, channel_key);
1689 }
1690
1691
1692 /**
1693  * Update signed values of state variables in the state store.
1694  *
1695  * @see GNUNET_PSYCSTORE_state_hash_update()
1696  *
1697  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1698  */
1699 static int
1700 state_update_signed (void *cls,
1701                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1702 {
1703   struct Plugin *plugin = cls;
1704   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1705 }
1706
1707
1708 /**
1709  * Retrieve a state variable by name.
1710  *
1711  * @see GNUNET_PSYCSTORE_state_get()
1712  *
1713  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1714  */
1715 static int
1716 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1717            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1718 {
1719   struct Plugin *plugin = cls;
1720   int ret = GNUNET_SYSERR;
1721
1722   sqlite3_stmt *stmt = plugin->select_state_one;
1723
1724   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1725                                       sizeof (*channel_key),
1726                                       SQLITE_STATIC)
1727       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1728   {
1729     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1730                 "sqlite3_bind");
1731   }
1732   else
1733   {
1734     switch (sqlite3_step (stmt))
1735     {
1736     case SQLITE_DONE:
1737       ret = GNUNET_NO;
1738       break;
1739     case SQLITE_ROW:
1740       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1741                 sqlite3_column_bytes (stmt, 0));
1742       break;
1743     default:
1744       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1745                   "sqlite3_step");
1746     }
1747   }
1748
1749   if (SQLITE_OK != sqlite3_reset (stmt))
1750   {
1751     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1752                 "sqlite3_reset");
1753   }
1754
1755   return ret;
1756 }
1757
1758
1759 /**
1760  * Retrieve all state variables for a channel with the given prefix.
1761  *
1762  * @see GNUNET_PSYCSTORE_state_get_prefix()
1763  *
1764  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1765  */
1766 static int
1767 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1768                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1769                   void *cb_cls)
1770 {
1771   struct Plugin *plugin = cls;
1772   int ret = GNUNET_SYSERR;
1773   sqlite3_stmt *stmt = plugin->select_state_prefix;
1774   size_t name_len = strlen (name);
1775   char *name_prefix;
1776
1777   GNUNET_asprintf (&name_prefix,
1778                    "%s_%%",
1779                    name);
1780   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1781                                       sizeof (*channel_key), SQLITE_STATIC)
1782       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1783       || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1784                                          SQLITE_STATIC))
1785   {
1786     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1787                 "sqlite3_bind");
1788   }
1789   else
1790   {
1791     int sql_ret;
1792     do
1793     {
1794       sql_ret = sqlite3_step (stmt);
1795       switch (sql_ret)
1796       {
1797       case SQLITE_DONE:
1798         if (ret != GNUNET_OK)
1799           ret = GNUNET_NO;
1800         break;
1801       case SQLITE_ROW:
1802         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1803                   sqlite3_column_blob (stmt, 1),
1804                   sqlite3_column_bytes (stmt, 1));
1805         if (ret != GNUNET_YES)
1806           sql_ret = SQLITE_DONE;
1807         break;
1808       default:
1809         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1810                     "sqlite3_step");
1811       }
1812     }
1813     while (sql_ret == SQLITE_ROW);
1814   }
1815   if (SQLITE_OK != sqlite3_reset (stmt))
1816   {
1817     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1818                 "sqlite3_reset");
1819   }
1820   GNUNET_free (name_prefix);
1821   return ret;
1822 }
1823
1824
1825 /**
1826  * Retrieve all signed state variables for a channel.
1827  *
1828  * @see GNUNET_PSYCSTORE_state_get_signed()
1829  *
1830  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1831  */
1832 static int
1833 state_get_signed (void *cls,
1834                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1835                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1836 {
1837   struct Plugin *plugin = cls;
1838   int ret = GNUNET_SYSERR;
1839
1840   sqlite3_stmt *stmt = plugin->select_state_signed;
1841
1842   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1843                                       sizeof (*channel_key), SQLITE_STATIC))
1844   {
1845     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1846                 "sqlite3_bind");
1847   }
1848   else
1849   {
1850     int sql_ret;
1851     do
1852     {
1853       sql_ret = sqlite3_step (stmt);
1854       switch (sql_ret)
1855       {
1856       case SQLITE_DONE:
1857         if (ret != GNUNET_OK)
1858           ret = GNUNET_NO;
1859         break;
1860       case SQLITE_ROW:
1861         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1862                   sqlite3_column_blob (stmt, 1),
1863                   sqlite3_column_bytes (stmt, 1));
1864         if (ret != GNUNET_YES)
1865           sql_ret = SQLITE_DONE;
1866         break;
1867       default:
1868         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1869                     "sqlite3_step");
1870       }
1871     }
1872     while (sql_ret == SQLITE_ROW);
1873   }
1874
1875   if (SQLITE_OK != sqlite3_reset (stmt))
1876   {
1877     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1878                 "sqlite3_reset");
1879   }
1880
1881   return ret;
1882 }
1883
1884
1885 /**
1886  * Entry point for the plugin.
1887  *
1888  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1889  * @return NULL on error, otherwise the plugin context
1890  */
1891 void *
1892 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1893 {
1894   static struct Plugin plugin;
1895   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1896   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1897
1898   if (NULL != plugin.cfg)
1899     return NULL;                /* can only initialize once! */
1900   memset (&plugin, 0, sizeof (struct Plugin));
1901   plugin.cfg = cfg;
1902   if (GNUNET_OK != database_setup (&plugin))
1903   {
1904     database_shutdown (&plugin);
1905     return NULL;
1906   }
1907   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1908   api->cls = &plugin;
1909   api->membership_store = &membership_store;
1910   api->membership_test = &membership_test;
1911   api->fragment_store = &fragment_store;
1912   api->message_add_flags = &message_add_flags;
1913   api->fragment_get = &fragment_get;
1914   api->fragment_get_latest = &fragment_get_latest;
1915   api->message_get = &message_get;
1916   api->message_get_latest = &message_get_latest;
1917   api->message_get_fragment = &message_get_fragment;
1918   api->counters_message_get = &counters_message_get;
1919   api->counters_state_get = &counters_state_get;
1920   api->state_modify_begin = &state_modify_begin;
1921   api->state_modify_set = &state_modify_set;
1922   api->state_modify_end = &state_modify_end;
1923   api->state_sync_begin = &state_sync_begin;
1924   api->state_sync_set = &state_sync_set;
1925   api->state_sync_end = &state_sync_end;
1926   api->state_reset = &state_reset;
1927   api->state_update_signed = &state_update_signed;
1928   api->state_get = &state_get;
1929   api->state_get_prefix = &state_get_prefix;
1930   api->state_get_signed = &state_get_signed;
1931
1932   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1933   return api;
1934 }
1935
1936
1937 /**
1938  * Exit point from the plugin.
1939  *
1940  * @param cls The plugin context (as returned by "init")
1941  * @return Always NULL
1942  */
1943 void *
1944 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1945 {
1946   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1947   struct Plugin *plugin = api->cls;
1948
1949   database_shutdown (plugin);
1950   plugin->cfg = NULL;
1951   GNUNET_free (api);
1952   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1953   return NULL;
1954 }
1955
1956 /* end of plugin_psycstore_sqlite.c */