-fix (C) notices
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_psyc_util_lib.h"
39 #include "psycstore.h"
40 #include <sqlite3.h>
41
42 /**
43  * After how many ms "busy" should a DB operation fail for good?  A
44  * low value makes sure that we are more responsive to requests
45  * (especially PUTs).  A high value guarantees a higher success rate
46  * (SELECTs in iterate can take several seconds despite LIMIT=1).
47  *
48  * The default value of 1s should ensure that users do not experience
49  * huge latencies while at the same time allowing operations to
50  * succeed with reasonable probability.
51  */
52 #define BUSY_TIMEOUT_MS 1000
53
54 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
55
56 /**
57  * Log an error message at log-level 'level' that indicates
58  * a failure of the command 'cmd' on file 'filename'
59  * with the message given by strerror(errno).
60  */
61 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
62
63 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
64
65 enum Transactions {
66   TRANSACTION_NONE = 0,
67   TRANSACTION_STATE_MODIFY,
68   TRANSACTION_STATE_SYNC,
69 };
70
71 /**
72  * Context for all functions in this plugin.
73  */
74 struct Plugin
75 {
76
77   const struct GNUNET_CONFIGURATION_Handle *cfg;
78
79   /**
80    * Database filename.
81    */
82   char *fn;
83
84   /**
85    * Native SQLite database handle.
86    */
87   sqlite3 *dbh;
88
89   /**
90    * Current transaction.
91    */
92   enum Transactions transaction;
93
94   sqlite3_stmt *transaction_begin;
95
96   sqlite3_stmt *transaction_commit;
97
98   sqlite3_stmt *transaction_rollback;
99
100   /**
101    * Precompiled SQL for channel_key_store()
102    */
103   sqlite3_stmt *insert_channel_key;
104
105   /**
106    * Precompiled SQL for slave_key_store()
107    */
108   sqlite3_stmt *insert_slave_key;
109
110
111   /**
112    * Precompiled SQL for membership_store()
113    */
114   sqlite3_stmt *insert_membership;
115
116   /**
117    * Precompiled SQL for membership_test()
118    */
119   sqlite3_stmt *select_membership;
120
121
122   /**
123    * Precompiled SQL for fragment_store()
124    */
125   sqlite3_stmt *insert_fragment;
126
127   /**
128    * Precompiled SQL for message_add_flags()
129    */
130   sqlite3_stmt *update_message_flags;
131
132   /**
133    * Precompiled SQL for fragment_get()
134    */
135   sqlite3_stmt *select_fragments;
136
137   /**
138    * Precompiled SQL for fragment_get()
139    */
140   sqlite3_stmt *select_latest_fragments;
141
142   /**
143    * Precompiled SQL for message_get()
144    */
145   sqlite3_stmt *select_messages;
146
147   /**
148    * Precompiled SQL for message_get()
149    */
150   sqlite3_stmt *select_latest_messages;
151
152   /**
153    * Precompiled SQL for message_get_fragment()
154    */
155   sqlite3_stmt *select_message_fragment;
156
157   /**
158    * Precompiled SQL for counters_get_message()
159    */
160   sqlite3_stmt *select_counters_message;
161
162   /**
163    * Precompiled SQL for counters_get_state()
164    */
165   sqlite3_stmt *select_counters_state;
166
167   /**
168    * Precompiled SQL for state_modify_end()
169    */
170   sqlite3_stmt *update_state_hash_message_id;
171
172   /**
173    * Precompiled SQL for state_sync_end()
174    */
175   sqlite3_stmt *update_max_state_message_id;
176
177   /**
178    * Precompiled SQL for state_modify_op()
179    */
180   sqlite3_stmt *insert_state_current;
181
182   /**
183    * Precompiled SQL for state_modify_end()
184    */
185   sqlite3_stmt *delete_state_empty;
186
187   /**
188    * Precompiled SQL for state_set_signed()
189    */
190   sqlite3_stmt *update_state_signed;
191
192   /**
193    * Precompiled SQL for state_sync()
194    */
195   sqlite3_stmt *insert_state_sync;
196
197   /**
198    * Precompiled SQL for state_sync()
199    */
200   sqlite3_stmt *delete_state;
201
202   /**
203    * Precompiled SQL for state_sync()
204    */
205   sqlite3_stmt *insert_state_from_sync;
206
207   /**
208    * Precompiled SQL for state_sync()
209    */
210   sqlite3_stmt *delete_state_sync;
211
212   /**
213    * Precompiled SQL for state_get_signed()
214    */
215   sqlite3_stmt *select_state_signed;
216
217   /**
218    * Precompiled SQL for state_get()
219    */
220   sqlite3_stmt *select_state_one;
221
222   /**
223    * Precompiled SQL for state_get_prefix()
224    */
225   sqlite3_stmt *select_state_prefix;
226
227 };
228
229 #if DEBUG_PSYCSTORE
230
231 static void
232 sql_trace (void *cls, const char *sql)
233 {
234   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
235 }
236
237 #endif
238
239 /**
240  * @brief Prepare a SQL statement
241  *
242  * @param dbh handle to the database
243  * @param sql SQL statement, UTF-8 encoded
244  * @param stmt set to the prepared statement
245  * @return 0 on success
246  */
247 static int
248 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
249 {
250   char *tail;
251   int result;
252
253   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
254                                (const char **) &tail);
255   LOG (GNUNET_ERROR_TYPE_DEBUG,
256        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
257   if (result != SQLITE_OK)
258     LOG (GNUNET_ERROR_TYPE_ERROR,
259          _("Error preparing SQL query: %s\n  %s\n"),
260          sqlite3_errmsg (dbh), sql);
261   return result;
262 }
263
264
265 /**
266  * @brief Prepare a SQL statement
267  *
268  * @param dbh handle to the database
269  * @param sql SQL statement, UTF-8 encoded
270  * @return 0 on success
271  */
272 static int
273 sql_exec (sqlite3 *dbh, const char *sql)
274 {
275   int result;
276
277   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
278   LOG (GNUNET_ERROR_TYPE_DEBUG,
279        "Executed `%s' / %d\n", sql, result);
280   if (result != SQLITE_OK)
281     LOG (GNUNET_ERROR_TYPE_ERROR,
282          _("Error executing SQL query: %s\n  %s\n"),
283          sqlite3_errmsg (dbh), sql);
284   return result;
285 }
286
287
288 /**
289  * Initialize the database connections and associated
290  * data structures (create tables and indices
291  * as needed as well).
292  *
293  * @param plugin the plugin context (state for this module)
294  * @return GNUNET_OK on success
295  */
296 static int
297 database_setup (struct Plugin *plugin)
298 {
299   char *filename;
300
301   if (GNUNET_OK !=
302       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
303                                                "FILENAME", &filename))
304   {
305     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
306                                "psycstore-sqlite", "FILENAME");
307     return GNUNET_SYSERR;
308   }
309   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
310   {
311     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
312     {
313       GNUNET_break (0);
314       GNUNET_free (filename);
315       return GNUNET_SYSERR;
316     }
317   }
318   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
319   plugin->fn = filename;
320
321   /* Open database and precompile statements */
322   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
323   {
324     LOG (GNUNET_ERROR_TYPE_ERROR,
325          _("Unable to initialize SQLite: %s.\n"),
326          sqlite3_errmsg (plugin->dbh));
327     return GNUNET_SYSERR;
328   }
329
330 #if DEBUG_PSYCSTORE
331   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
332 #endif
333
334   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
335   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
336   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
337   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
338   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
339 #if ! DEBUG_PSYCSTORE
340   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
341 #endif
342   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
343
344   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
345
346   /* Create tables */
347
348   sql_exec (plugin->dbh,
349             "CREATE TABLE IF NOT EXISTS channels (\n"
350             "  id INTEGER PRIMARY KEY,\n"
351             "  pub_key BLOB UNIQUE,\n"
352             "  max_state_message_id INTEGER,\n" // last applied state message ID
353             "  state_hash_message_id INTEGER\n" // last message ID with a state hash
354             ");");
355
356   sql_exec (plugin->dbh,
357             "CREATE TABLE IF NOT EXISTS slaves (\n"
358             "  id INTEGER PRIMARY KEY,\n"
359             "  pub_key BLOB UNIQUE\n"
360             ");");
361
362   sql_exec (plugin->dbh,
363             "CREATE TABLE IF NOT EXISTS membership (\n"
364             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
365             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
366             "  did_join INTEGER NOT NULL,\n"
367             "  announced_at INTEGER NOT NULL,\n"
368             "  effective_since INTEGER NOT NULL,\n"
369             "  group_generation INTEGER NOT NULL\n"
370             ");");
371   sql_exec (plugin->dbh,
372             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
373             "ON membership (channel_id, slave_id);");
374
375   /** @todo messages table: add method_name column */
376   sql_exec (plugin->dbh,
377             "CREATE TABLE IF NOT EXISTS messages (\n"
378             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
379             "  hop_counter INTEGER NOT NULL,\n"
380             "  signature BLOB,\n"
381             "  purpose BLOB,\n"
382             "  fragment_id INTEGER NOT NULL,\n"
383             "  fragment_offset INTEGER NOT NULL,\n"
384             "  message_id INTEGER NOT NULL,\n"
385             "  group_generation INTEGER NOT NULL,\n"
386             "  multicast_flags INTEGER NOT NULL,\n"
387             "  psycstore_flags INTEGER NOT NULL,\n"
388             "  data BLOB,\n"
389             "  PRIMARY KEY (channel_id, fragment_id),\n"
390             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
391             ");");
392
393   sql_exec (plugin->dbh,
394             "CREATE TABLE IF NOT EXISTS state (\n"
395             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
396             "  name TEXT NOT NULL,\n"
397             "  value_current BLOB,\n"
398             "  value_signed BLOB,\n"
399             "  PRIMARY KEY (channel_id, name)\n"
400             ");");
401
402   sql_exec (plugin->dbh,
403             "CREATE TABLE IF NOT EXISTS state_sync (\n"
404             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
405             "  name TEXT NOT NULL,\n"
406             "  value BLOB,\n"
407             "  PRIMARY KEY (channel_id, name)\n"
408             ");");
409
410   /* Prepare statements */
411
412   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
413
414   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
415
416   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
417
418   sql_prepare (plugin->dbh,
419                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
420                &plugin->insert_channel_key);
421
422   sql_prepare (plugin->dbh,
423                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
424                &plugin->insert_slave_key);
425
426   sql_prepare (plugin->dbh,
427                "INSERT INTO membership\n"
428                " (channel_id, slave_id, did_join, announced_at,\n"
429                "  effective_since, group_generation)\n"
430                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
431                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
432                "        ?, ?, ?, ?);",
433                &plugin->insert_membership);
434
435   sql_prepare (plugin->dbh,
436                "SELECT did_join FROM membership\n"
437                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
438                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
439                "      AND effective_since <= ? AND did_join = 1\n"
440                "ORDER BY announced_at DESC LIMIT 1;",
441                &plugin->select_membership);
442
443   sql_prepare (plugin->dbh,
444                "INSERT OR IGNORE INTO messages\n"
445                " (channel_id, hop_counter, signature, purpose,\n"
446                "  fragment_id, fragment_offset, message_id,\n"
447                "  group_generation, multicast_flags, psycstore_flags, data)\n"
448                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
449                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
450                &plugin->insert_fragment);
451
452   sql_prepare (plugin->dbh,
453                "UPDATE messages\n"
454                "SET psycstore_flags = psycstore_flags | ?\n"
455                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
456                "      AND message_id = ? AND fragment_offset = 0;",
457                &plugin->update_message_flags);
458
459   sql_prepare (plugin->dbh,
460                "SELECT hop_counter, signature, purpose, fragment_id,\n"
461                "       fragment_offset, message_id, group_generation,\n"
462                "       multicast_flags, psycstore_flags, data\n"
463                "FROM messages\n"
464                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
465                "      AND ? <= fragment_id AND fragment_id <= ?;",
466                &plugin->select_fragments);
467
468   /** @todo select_messages: add method_prefix filter */
469   sql_prepare (plugin->dbh,
470                "SELECT hop_counter, signature, purpose, fragment_id,\n"
471                "       fragment_offset, message_id, group_generation,\n"
472                "       multicast_flags, psycstore_flags, data\n"
473                "FROM messages\n"
474                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
475                "      AND ? <= message_id AND message_id <= ?"
476                "LIMIT ?;",
477                &plugin->select_messages);
478
479   sql_prepare (plugin->dbh,
480                "SELECT * FROM\n"
481                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
482                "        fragment_offset, message_id, group_generation,\n"
483                "        multicast_flags, psycstore_flags, data\n"
484                " FROM messages\n"
485                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
486                " ORDER BY fragment_id DESC\n"
487                " LIMIT ?)\n"
488                "ORDER BY fragment_id;",
489                &plugin->select_latest_fragments);
490
491   /** @todo select_latest_messages: add method_prefix filter */
492   sql_prepare (plugin->dbh,
493                "SELECT hop_counter, signature, purpose, fragment_id,\n"
494                "       fragment_offset, message_id, group_generation,\n"
495                "        multicast_flags, psycstore_flags, data\n"
496                "FROM messages\n"
497                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
498                "      AND message_id IN\n"
499                "      (SELECT message_id\n"
500                "       FROM messages\n"
501                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
502                "       GROUP BY message_id\n"
503                "       ORDER BY message_id\n"
504                "       DESC LIMIT ?)\n"
505                "ORDER BY fragment_id;",
506                &plugin->select_latest_messages);
507
508   sql_prepare (plugin->dbh,
509                "SELECT hop_counter, signature, purpose, fragment_id,\n"
510                "       fragment_offset, message_id, group_generation,\n"
511                "       multicast_flags, psycstore_flags, data\n"
512                "FROM messages\n"
513                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
514                "      AND message_id = ? AND fragment_offset = ?;",
515                &plugin->select_message_fragment);
516
517   sql_prepare (plugin->dbh,
518                "SELECT fragment_id, message_id, group_generation\n"
519                "FROM messages\n"
520                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
521                "ORDER BY fragment_id DESC LIMIT 1;",
522                &plugin->select_counters_message);
523
524   sql_prepare (plugin->dbh,
525                "SELECT max_state_message_id\n"
526                "FROM channels\n"
527                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
528                &plugin->select_counters_state);
529
530   sql_prepare (plugin->dbh,
531                "UPDATE channels\n"
532                "SET max_state_message_id = ?\n"
533                "WHERE pub_key = ?;",
534                &plugin->update_max_state_message_id);
535
536   sql_prepare (plugin->dbh,
537                "UPDATE channels\n"
538                "SET state_hash_message_id = ?\n"
539                "WHERE pub_key = ?;",
540                &plugin->update_state_hash_message_id);
541
542   sql_prepare (plugin->dbh,
543                "INSERT OR REPLACE INTO state\n"
544                "  (channel_id, name, value_current, value_signed)\n"
545                "SELECT new.channel_id, new.name,\n"
546                "       new.value_current, old.value_signed\n"
547                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
548                "             AS channel_id,\n"
549                "             ? AS name, ? AS value_current) AS new\n"
550                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
551                "           FROM state) AS old\n"
552                "ON new.channel_id = old.channel_id AND new.name = old.name;",
553                &plugin->insert_state_current);
554
555   sql_prepare (plugin->dbh,
556                "DELETE FROM state\n"
557                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
558                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
559                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
560                &plugin->delete_state_empty);
561
562   sql_prepare (plugin->dbh,
563                "UPDATE state\n"
564                "SET value_signed = value_current\n"
565                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
566                &plugin->update_state_signed);
567
568   sql_prepare (plugin->dbh,
569                "DELETE FROM state\n"
570                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
571                &plugin->delete_state);
572
573   sql_prepare (plugin->dbh,
574                "INSERT INTO state_sync (channel_id, name, value)\n"
575                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
576                &plugin->insert_state_sync);
577
578   sql_prepare (plugin->dbh,
579                "INSERT INTO state\n"
580                " (channel_id, name, value_current, value_signed)\n"
581                "SELECT channel_id, name, value, value\n"
582                "FROM state_sync\n"
583                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
584                &plugin->insert_state_from_sync);
585
586   sql_prepare (plugin->dbh,
587                "DELETE FROM state_sync\n"
588                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
589                &plugin->delete_state_sync);
590
591   sql_prepare (plugin->dbh,
592                "SELECT value_current\n"
593                "FROM state\n"
594                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
595                "      AND name = ?;",
596                &plugin->select_state_one);
597
598   sql_prepare (plugin->dbh,
599                "SELECT name, value_current\n"
600                "FROM state\n"
601                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
602                "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
603                &plugin->select_state_prefix);
604
605   sql_prepare (plugin->dbh,
606                "SELECT name, value_signed\n"
607                "FROM state\n"
608                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
609                "      AND value_signed IS NOT NULL;",
610                &plugin->select_state_signed);
611
612   return GNUNET_OK;
613 }
614
615
616 /**
617  * Shutdown database connection and associate data
618  * structures.
619  * @param plugin the plugin context (state for this module)
620  */
621 static void
622 database_shutdown (struct Plugin *plugin)
623 {
624   int result;
625   sqlite3_stmt *stmt;
626   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
627   {
628     result = sqlite3_finalize (stmt);
629     if (SQLITE_OK != result)
630       LOG (GNUNET_ERROR_TYPE_WARNING,
631            "Failed to close statement %p: %d\n", stmt, result);
632   }
633   if (SQLITE_OK != sqlite3_close (plugin->dbh))
634     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
635
636   GNUNET_free_non_null (plugin->fn);
637 }
638
639 /**
640  * Execute a prepared statement with a @a channel_key argument.
641  *
642  * @param plugin Plugin handle.
643  * @param stmt Statement to execute.
644  * @param channel_key Public key of the channel.
645  *
646  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
647  */
648 static int
649 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
650               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
651 {
652   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
653                                       sizeof (*channel_key), SQLITE_STATIC))
654   {
655     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
656                 "sqlite3_bind");
657   }
658   else if (SQLITE_DONE != sqlite3_step (stmt))
659   {
660     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
661                 "sqlite3_step");
662   }
663
664   if (SQLITE_OK != sqlite3_reset (stmt))
665   {
666     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
667                 "sqlite3_reset");
668     return GNUNET_SYSERR;
669   }
670
671   return GNUNET_OK;
672 }
673
674 /**
675  * Begin a transaction.
676  */
677 static int
678 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
679 {
680   sqlite3_stmt *stmt = plugin->transaction_begin;
681
682   if (SQLITE_DONE != sqlite3_step (stmt))
683   {
684     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
685                 "sqlite3_step");
686   }
687   if (SQLITE_OK != sqlite3_reset (stmt))
688   {
689     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
690                 "sqlite3_reset");
691     return GNUNET_SYSERR;
692   }
693
694   plugin->transaction = transaction;
695   return GNUNET_OK;
696 }
697
698
699 /**
700  * Commit current transaction.
701  */
702 static int
703 transaction_commit (struct Plugin *plugin)
704 {
705   sqlite3_stmt *stmt = plugin->transaction_commit;
706
707   if (SQLITE_DONE != sqlite3_step (stmt))
708   {
709     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
710                 "sqlite3_step");
711   }
712   if (SQLITE_OK != sqlite3_reset (stmt))
713   {
714     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
715                 "sqlite3_reset");
716     return GNUNET_SYSERR;
717   }
718
719   plugin->transaction = TRANSACTION_NONE;
720   return GNUNET_OK;
721 }
722
723
724 /**
725  * Roll back current transaction.
726  */
727 static int
728 transaction_rollback (struct Plugin *plugin)
729 {
730   sqlite3_stmt *stmt = plugin->transaction_rollback;
731
732   if (SQLITE_DONE != sqlite3_step (stmt))
733   {
734     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
735                 "sqlite3_step");
736   }
737   if (SQLITE_OK != sqlite3_reset (stmt))
738   {
739     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
740                 "sqlite3_reset");
741     return GNUNET_SYSERR;
742   }
743   plugin->transaction = TRANSACTION_NONE;
744   return GNUNET_OK;
745 }
746
747
748 static int
749 channel_key_store (struct Plugin *plugin,
750                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
751 {
752   sqlite3_stmt *stmt = plugin->insert_channel_key;
753
754   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
755                                       sizeof (*channel_key), SQLITE_STATIC))
756   {
757     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
758                 "sqlite3_bind");
759   }
760   else if (SQLITE_DONE != sqlite3_step (stmt))
761   {
762     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
763                 "sqlite3_step");
764   }
765
766   if (SQLITE_OK != sqlite3_reset (stmt))
767   {
768     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
769                 "sqlite3_reset");
770     return GNUNET_SYSERR;
771   }
772
773   return GNUNET_OK;
774 }
775
776
777 static int
778 slave_key_store (struct Plugin *plugin,
779                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
780 {
781   sqlite3_stmt *stmt = plugin->insert_slave_key;
782
783   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
784                                       sizeof (*slave_key), SQLITE_STATIC))
785   {
786     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
787                 "sqlite3_bind");
788   }
789   else if (SQLITE_DONE != sqlite3_step (stmt))
790   {
791     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
792                 "sqlite3_step");
793   }
794
795
796   if (SQLITE_OK != sqlite3_reset (stmt))
797   {
798     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
799                 "sqlite3_reset");
800     return GNUNET_SYSERR;
801   }
802
803   return GNUNET_OK;
804 }
805
806
807 /**
808  * Store join/leave events for a PSYC channel in order to be able to answer
809  * membership test queries later.
810  *
811  * @see GNUNET_PSYCSTORE_membership_store()
812  *
813  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
814  */
815 static int
816 membership_store (void *cls,
817                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
818                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
819                   int did_join,
820                   uint64_t announced_at,
821                   uint64_t effective_since,
822                   uint64_t group_generation)
823 {
824   struct Plugin *plugin = cls;
825   sqlite3_stmt *stmt = plugin->insert_membership;
826
827   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
828
829   if (announced_at > INT64_MAX ||
830       effective_since > INT64_MAX ||
831       group_generation > INT64_MAX)
832   {
833     GNUNET_break (0);
834     return GNUNET_SYSERR;
835   }
836
837   if (GNUNET_OK != channel_key_store (plugin, channel_key)
838       || GNUNET_OK != slave_key_store (plugin, slave_key))
839     return GNUNET_SYSERR;
840
841   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
842                                       sizeof (*channel_key), SQLITE_STATIC)
843       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
844                                          sizeof (*slave_key), SQLITE_STATIC)
845       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
846       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
847       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
848       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
849   {
850     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
851                 "sqlite3_bind");
852   }
853   else if (SQLITE_DONE != sqlite3_step (stmt))
854   {
855     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
856                 "sqlite3_step");
857   }
858
859   if (SQLITE_OK != sqlite3_reset (stmt))
860   {
861     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
862                 "sqlite3_reset");
863     return GNUNET_SYSERR;
864   }
865
866   return GNUNET_OK;
867 }
868
869 /**
870  * Test if a member was admitted to the channel at the given message ID.
871  *
872  * @see GNUNET_PSYCSTORE_membership_test()
873  *
874  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
875  *         #GNUNET_SYSERR if there was en error.
876  */
877 static int
878 membership_test (void *cls,
879                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
880                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
881                  uint64_t message_id)
882 {
883   struct Plugin *plugin = cls;
884   sqlite3_stmt *stmt = plugin->select_membership;
885   int ret = GNUNET_SYSERR;
886
887   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
888                                       sizeof (*channel_key), SQLITE_STATIC)
889       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
890                                          sizeof (*slave_key), SQLITE_STATIC)
891       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
892   {
893     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
894                 "sqlite3_bind");
895   }
896   else
897   {
898     switch (sqlite3_step (stmt))
899     {
900     case SQLITE_DONE:
901       ret = GNUNET_NO;
902       break;
903     case SQLITE_ROW:
904       ret = GNUNET_YES;
905     }
906   }
907
908   if (SQLITE_OK != sqlite3_reset (stmt))
909   {
910     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
911                 "sqlite3_reset");
912   }
913
914   return ret;
915 }
916
917 /**
918  * Store a message fragment sent to a channel.
919  *
920  * @see GNUNET_PSYCSTORE_fragment_store()
921  *
922  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
923  */
924 static int
925 fragment_store (void *cls,
926                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
927                 const struct GNUNET_MULTICAST_MessageHeader *msg,
928                 uint32_t psycstore_flags)
929 {
930   struct Plugin *plugin = cls;
931   sqlite3_stmt *stmt = plugin->insert_fragment;
932
933   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
934
935   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
936   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
937   uint64_t message_id = GNUNET_ntohll (msg->message_id);
938   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
939
940   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
941       message_id > INT64_MAX || group_generation > INT64_MAX)
942   {
943     LOG (GNUNET_ERROR_TYPE_ERROR,
944          "Tried to store fragment with a field > INT64_MAX: "
945          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
946          message_id, group_generation);
947     GNUNET_break (0);
948     return GNUNET_SYSERR;
949   }
950
951   if (GNUNET_OK != channel_key_store (plugin, channel_key))
952     return GNUNET_SYSERR;
953
954   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
955                                       sizeof (*channel_key), SQLITE_STATIC)
956       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
957       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
958                                          sizeof (msg->signature), SQLITE_STATIC)
959       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
960                                          sizeof (msg->purpose), SQLITE_STATIC)
961       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
962       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
963       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
964       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
965       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
966       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
967       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
968                                          ntohs (msg->header.size)
969                                          - sizeof (*msg), SQLITE_STATIC))
970   {
971     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
972                 "sqlite3_bind");
973   }
974   else if (SQLITE_DONE != sqlite3_step (stmt))
975   {
976     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
977                 "sqlite3_step");
978   }
979
980   if (SQLITE_OK != sqlite3_reset (stmt))
981   {
982     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
983                 "sqlite3_reset");
984     return GNUNET_SYSERR;
985   }
986
987   return GNUNET_OK;
988 }
989
990 /**
991  * Set additional flags for a given message.
992  *
993  * They are OR'd with any existing flags set.
994  *
995  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
996  */
997 static int
998 message_add_flags (void *cls,
999                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1000                    uint64_t message_id,
1001                    uint64_t psycstore_flags)
1002 {
1003   struct Plugin *plugin = cls;
1004   sqlite3_stmt *stmt = plugin->update_message_flags;
1005   int ret = GNUNET_SYSERR;
1006
1007   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1008       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1009                                          sizeof (*channel_key), SQLITE_STATIC)
1010       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1011   {
1012     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1013                 "sqlite3_bind");
1014   }
1015   else
1016   {
1017     switch (sqlite3_step (stmt))
1018     {
1019     case SQLITE_DONE:
1020       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1021       break;
1022     default:
1023       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1024                   "sqlite3_step");
1025     }
1026   }
1027
1028   if (SQLITE_OK != sqlite3_reset (stmt))
1029   {
1030     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1031                 "sqlite3_reset");
1032     return GNUNET_SYSERR;
1033   }
1034
1035   return ret;
1036 }
1037
1038 static int
1039 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1040               void *cb_cls)
1041 {
1042   int data_size = sqlite3_column_bytes (stmt, 9);
1043   struct GNUNET_MULTICAST_MessageHeader *msg
1044     = GNUNET_malloc (sizeof (*msg) + data_size);
1045
1046   msg->header.size = htons (sizeof (*msg) + data_size);
1047   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1048   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1049   memcpy (&msg->signature,
1050           sqlite3_column_blob (stmt, 1),
1051           sqlite3_column_bytes (stmt, 1));
1052   memcpy (&msg->purpose,
1053           sqlite3_column_blob (stmt, 2),
1054           sqlite3_column_bytes (stmt, 2));
1055   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1056   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1057   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1058   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1059   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1060   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1061
1062   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1063 }
1064
1065
1066 static int
1067 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1068                  uint64_t *returned_fragments,
1069                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1070 {
1071   int ret = GNUNET_SYSERR;
1072   int sql_ret;
1073
1074   do
1075   {
1076     sql_ret = sqlite3_step (stmt);
1077     switch (sql_ret)
1078     {
1079     case SQLITE_DONE:
1080       if (ret != GNUNET_OK)
1081         ret = GNUNET_NO;
1082       break;
1083     case SQLITE_ROW:
1084       ret = fragment_row (stmt, cb, cb_cls);
1085       (*returned_fragments)++;
1086       if (ret != GNUNET_YES)
1087         sql_ret = SQLITE_DONE;
1088       break;
1089     default:
1090       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1091                   "sqlite3_step");
1092     }
1093   }
1094   while (sql_ret == SQLITE_ROW);
1095
1096   return ret;
1097 }
1098
1099 /**
1100  * Retrieve a message fragment range by fragment ID.
1101  *
1102  * @see GNUNET_PSYCSTORE_fragment_get()
1103  *
1104  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1105  */
1106 static int
1107 fragment_get (void *cls,
1108               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1109               uint64_t first_fragment_id,
1110               uint64_t last_fragment_id,
1111               uint64_t *returned_fragments,
1112               GNUNET_PSYCSTORE_FragmentCallback cb,
1113               void *cb_cls)
1114 {
1115   struct Plugin *plugin = cls;
1116   sqlite3_stmt *stmt = plugin->select_fragments;
1117   int ret = GNUNET_SYSERR;
1118   *returned_fragments = 0;
1119
1120   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1121                                       sizeof (*channel_key),
1122                                       SQLITE_STATIC)
1123       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1124       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1125   {
1126     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1127                 "sqlite3_bind");
1128   }
1129   else
1130   {
1131     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1132   }
1133
1134   if (SQLITE_OK != sqlite3_reset (stmt))
1135   {
1136     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1137                 "sqlite3_reset");
1138   }
1139
1140   return ret;
1141 }
1142
1143
1144 /**
1145  * Retrieve a message fragment range by fragment ID.
1146  *
1147  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1148  *
1149  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1150  */
1151 static int
1152 fragment_get_latest (void *cls,
1153                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1154                      uint64_t fragment_limit,
1155                      uint64_t *returned_fragments,
1156                      GNUNET_PSYCSTORE_FragmentCallback cb,
1157                      void *cb_cls)
1158 {
1159   struct Plugin *plugin = cls;
1160   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1161   int ret = GNUNET_SYSERR;
1162   *returned_fragments = 0;
1163
1164   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1165                                       sizeof (*channel_key),
1166                                       SQLITE_STATIC)
1167       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1168   {
1169     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1170                 "sqlite3_bind");
1171   }
1172   else
1173   {
1174     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1175   }
1176
1177   if (SQLITE_OK != sqlite3_reset (stmt))
1178   {
1179     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1180                 "sqlite3_reset");
1181   }
1182
1183   return ret;
1184 }
1185
1186
1187 /**
1188  * Retrieve all fragments of a message ID range.
1189  *
1190  * @see GNUNET_PSYCSTORE_message_get()
1191  *
1192  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1193  */
1194 static int
1195 message_get (void *cls,
1196              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1197              uint64_t first_message_id,
1198              uint64_t last_message_id,
1199              uint64_t fragment_limit,
1200              uint64_t *returned_fragments,
1201              GNUNET_PSYCSTORE_FragmentCallback cb,
1202              void *cb_cls)
1203 {
1204   struct Plugin *plugin = cls;
1205   sqlite3_stmt *stmt = plugin->select_messages;
1206   int ret = GNUNET_SYSERR;
1207   *returned_fragments = 0;
1208
1209   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1210                                       sizeof (*channel_key),
1211                                       SQLITE_STATIC)
1212       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1213       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id)
1214       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1215                                           (0 != fragment_limit)
1216                                           ? fragment_limit
1217                                           : -1))
1218   {
1219     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1220                 "sqlite3_bind");
1221   }
1222   else
1223   {
1224     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1225   }
1226
1227   if (SQLITE_OK != sqlite3_reset (stmt))
1228   {
1229     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1230                 "sqlite3_reset");
1231   }
1232
1233   return ret;
1234 }
1235
1236
1237 /**
1238  * Retrieve all fragments of the latest messages.
1239  *
1240  * @see GNUNET_PSYCSTORE_message_get_latest()
1241  *
1242  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1243  */
1244 static int
1245 message_get_latest (void *cls,
1246                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1247                     uint64_t message_limit,
1248                     uint64_t *returned_fragments,
1249                     GNUNET_PSYCSTORE_FragmentCallback cb,
1250                     void *cb_cls)
1251 {
1252   struct Plugin *plugin = cls;
1253   sqlite3_stmt *stmt = plugin->select_latest_messages;
1254   int ret = GNUNET_SYSERR;
1255   *returned_fragments = 0;
1256
1257   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1258                                       sizeof (*channel_key),
1259                                       SQLITE_STATIC)
1260       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1261                                          sizeof (*channel_key),
1262                                          SQLITE_STATIC)
1263       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1264   {
1265     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1266                 "sqlite3_bind");
1267   }
1268   else
1269   {
1270     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1271   }
1272
1273   if (SQLITE_OK != sqlite3_reset (stmt))
1274   {
1275     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1276                 "sqlite3_reset");
1277   }
1278
1279   return ret;
1280 }
1281
1282
1283 /**
1284  * Retrieve a fragment of message specified by its message ID and fragment
1285  * offset.
1286  *
1287  * @see GNUNET_PSYCSTORE_message_get_fragment()
1288  *
1289  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1290  */
1291 static int
1292 message_get_fragment (void *cls,
1293                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1294                       uint64_t message_id,
1295                       uint64_t fragment_offset,
1296                       GNUNET_PSYCSTORE_FragmentCallback cb,
1297                       void *cb_cls)
1298 {
1299   struct Plugin *plugin = cls;
1300   sqlite3_stmt *stmt = plugin->select_message_fragment;
1301   int ret = GNUNET_SYSERR;
1302
1303   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1304                                       sizeof (*channel_key),
1305                                       SQLITE_STATIC)
1306       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1307       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1308   {
1309     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1310                 "sqlite3_bind");
1311   }
1312   else
1313   {
1314     switch (sqlite3_step (stmt))
1315     {
1316     case SQLITE_DONE:
1317       ret = GNUNET_NO;
1318       break;
1319     case SQLITE_ROW:
1320       ret = fragment_row (stmt, cb, cb_cls);
1321       break;
1322     default:
1323       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1324                   "sqlite3_step");
1325     }
1326   }
1327
1328   if (SQLITE_OK != sqlite3_reset (stmt))
1329   {
1330     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1331                 "sqlite3_reset");
1332   }
1333
1334   return ret;
1335 }
1336
1337 /**
1338  * Retrieve the max. values of message counters for a channel.
1339  *
1340  * @see GNUNET_PSYCSTORE_counters_get()
1341  *
1342  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1343  */
1344 static int
1345 counters_message_get (void *cls,
1346                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1347                       uint64_t *max_fragment_id,
1348                       uint64_t *max_message_id,
1349                       uint64_t *max_group_generation)
1350 {
1351   struct Plugin *plugin = cls;
1352   sqlite3_stmt *stmt = plugin->select_counters_message;
1353   int ret = GNUNET_SYSERR;
1354
1355   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1356                                       sizeof (*channel_key),
1357                                       SQLITE_STATIC))
1358   {
1359     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1360                 "sqlite3_bind");
1361   }
1362   else
1363   {
1364     switch (sqlite3_step (stmt))
1365     {
1366     case SQLITE_DONE:
1367       ret = GNUNET_NO;
1368       break;
1369     case SQLITE_ROW:
1370       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1371       *max_message_id = sqlite3_column_int64 (stmt, 1);
1372       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1373       ret = GNUNET_OK;
1374       break;
1375     default:
1376       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1377                   "sqlite3_step");
1378     }
1379   }
1380
1381   if (SQLITE_OK != sqlite3_reset (stmt))
1382   {
1383     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1384                 "sqlite3_reset");
1385   }
1386
1387   return ret;
1388 }
1389
1390 /**
1391  * Retrieve the max. values of state counters for a channel.
1392  *
1393  * @see GNUNET_PSYCSTORE_counters_get()
1394  *
1395  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1396  */
1397 static int
1398 counters_state_get (void *cls,
1399                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1400                     uint64_t *max_state_message_id)
1401 {
1402   struct Plugin *plugin = cls;
1403   sqlite3_stmt *stmt = plugin->select_counters_state;
1404   int ret = GNUNET_SYSERR;
1405
1406   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1407                                       sizeof (*channel_key),
1408                                       SQLITE_STATIC))
1409   {
1410     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1411                 "sqlite3_bind");
1412   }
1413   else
1414   {
1415     switch (sqlite3_step (stmt))
1416     {
1417     case SQLITE_DONE:
1418       ret = GNUNET_NO;
1419       break;
1420     case SQLITE_ROW:
1421       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1422       ret = GNUNET_OK;
1423       break;
1424     default:
1425       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1426                   "sqlite3_step");
1427     }
1428   }
1429
1430   if (SQLITE_OK != sqlite3_reset (stmt))
1431   {
1432     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1433                 "sqlite3_reset");
1434   }
1435
1436   return ret;
1437 }
1438
1439
1440 /**
1441  * Assign a value to a state variable.
1442  *
1443  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1444  */
1445 static int
1446 state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
1447               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1448               const char *name, const void *value, size_t value_size)
1449 {
1450   int ret = GNUNET_SYSERR;
1451
1452   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1453                                       sizeof (*channel_key), SQLITE_STATIC)
1454       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1455       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1456                                          SQLITE_STATIC))
1457   {
1458     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1459                 "sqlite3_bind");
1460   }
1461   else
1462   {
1463     switch (sqlite3_step (stmt))
1464     {
1465     case SQLITE_DONE:
1466       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1467       break;
1468     default:
1469       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1470                   "sqlite3_step");
1471     }
1472   }
1473
1474   if (SQLITE_OK != sqlite3_reset (stmt))
1475   {
1476     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1477                 "sqlite3_reset");
1478     return GNUNET_SYSERR;
1479   }
1480
1481   return ret;
1482 }
1483
1484
1485 static int
1486 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1487                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1488                    uint64_t message_id)
1489 {
1490   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1491       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1492                                          sizeof (*channel_key), SQLITE_STATIC))
1493   {
1494     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1495                 "sqlite3_bind");
1496   }
1497   else if (SQLITE_DONE != sqlite3_step (stmt))
1498   {
1499     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1500                 "sqlite3_step");
1501   }
1502   if (SQLITE_OK != sqlite3_reset (stmt))
1503   {
1504     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1505                 "sqlite3_reset");
1506     return GNUNET_SYSERR;
1507   }
1508   return GNUNET_OK;
1509 }
1510
1511
1512 /**
1513  * Begin modifying current state.
1514  */
1515 static int
1516 state_modify_begin (void *cls,
1517                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1518                     uint64_t message_id, uint64_t state_delta)
1519 {
1520   struct Plugin *plugin = cls;
1521
1522   if (state_delta > 0)
1523   {
1524     /**
1525      * We can only apply state modifiers in the current message if modifiers in
1526      * the previous stateful message (message_id - state_delta) were already
1527      * applied.
1528      */
1529
1530     uint64_t max_state_message_id = 0;
1531     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1532     switch (ret)
1533     {
1534     case GNUNET_OK:
1535     case GNUNET_NO: // no state yet
1536       ret = GNUNET_OK;
1537       break;
1538     default:
1539       return ret;
1540     }
1541
1542     if (max_state_message_id < message_id - state_delta)
1543       return GNUNET_NO; /* some stateful messages not yet applied */
1544     else if (message_id - state_delta < max_state_message_id)
1545       return GNUNET_NO; /* changes already applied */
1546   }
1547
1548   if (TRANSACTION_NONE != plugin->transaction)
1549   {
1550     /** @todo FIXME: wait for other transaction to finish  */
1551     return GNUNET_SYSERR;
1552   }
1553   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1554 }
1555
1556
1557 /**
1558  * Set the current value of state variable.
1559  *
1560  * @see GNUNET_PSYCSTORE_state_modify()
1561  *
1562  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1563  */
1564 static int
1565 state_modify_op (void *cls,
1566                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1567                  enum GNUNET_PSYC_Operator op,
1568                  const char *name, const void *value, size_t value_size)
1569 {
1570   struct Plugin *plugin = cls;
1571   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1572
1573   switch (op)
1574   {
1575   case GNUNET_PSYC_OP_ASSIGN:
1576     return state_assign (plugin, plugin->insert_state_current, channel_key,
1577                          name, value, value_size);
1578
1579   default: /** @todo implement more state operations */
1580     GNUNET_break (0);
1581     return GNUNET_SYSERR;
1582   }
1583 }
1584
1585
1586 /**
1587  * End modifying current state.
1588  */
1589 static int
1590 state_modify_end (void *cls,
1591                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1592                   uint64_t message_id)
1593 {
1594   struct Plugin *plugin = cls;
1595   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1596
1597   return
1598     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1599     && GNUNET_OK == update_message_id (plugin,
1600                                        plugin->update_max_state_message_id,
1601                                        channel_key, message_id)
1602     && GNUNET_OK == transaction_commit (plugin)
1603     ? GNUNET_OK : GNUNET_SYSERR;
1604 }
1605
1606
1607 /**
1608  * Begin state synchronization.
1609  */
1610 static int
1611 state_sync_begin (void *cls,
1612                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1613 {
1614   struct Plugin *plugin = cls;
1615   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1616 }
1617
1618
1619 /**
1620  * Assign current value of a state variable.
1621  *
1622  * @see GNUNET_PSYCSTORE_state_modify()
1623  *
1624  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1625  */
1626 static int
1627 state_sync_assign (void *cls,
1628                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1629                 const char *name, const void *value, size_t value_size)
1630 {
1631   struct Plugin *plugin = cls;
1632   return state_assign (cls, plugin->insert_state_sync, channel_key,
1633                        name, value, value_size);
1634 }
1635
1636
1637 /**
1638  * End modifying current state.
1639  */
1640 static int
1641 state_sync_end (void *cls,
1642                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1643                 uint64_t max_state_message_id,
1644                 uint64_t state_hash_message_id)
1645 {
1646   struct Plugin *plugin = cls;
1647   int ret = GNUNET_SYSERR;
1648
1649   if (TRANSACTION_NONE != plugin->transaction)
1650   {
1651     /** @todo FIXME: wait for other transaction to finish  */
1652     return GNUNET_SYSERR;
1653   }
1654
1655   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1656     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1657     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1658                                   channel_key)
1659     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1660                                   channel_key)
1661     && GNUNET_OK == update_message_id (plugin,
1662                                        plugin->update_state_hash_message_id,
1663                                        channel_key, state_hash_message_id)
1664     && GNUNET_OK == update_message_id (plugin,
1665                                        plugin->update_max_state_message_id,
1666                                        channel_key, max_state_message_id)
1667     && GNUNET_OK == transaction_commit (plugin)
1668     ? ret = GNUNET_OK
1669     : transaction_rollback (plugin);
1670   return ret;
1671 }
1672
1673
1674 /**
1675  * Delete the whole state.
1676  *
1677  * @see GNUNET_PSYCSTORE_state_reset()
1678  *
1679  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1680  */
1681 static int
1682 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1683 {
1684   struct Plugin *plugin = cls;
1685   return exec_channel (plugin, plugin->delete_state, channel_key);
1686 }
1687
1688
1689 /**
1690  * Update signed values of state variables in the state store.
1691  *
1692  * @see GNUNET_PSYCSTORE_state_hash_update()
1693  *
1694  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1695  */
1696 static int
1697 state_update_signed (void *cls,
1698                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1699 {
1700   struct Plugin *plugin = cls;
1701   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1702 }
1703
1704
1705 /**
1706  * Retrieve a state variable by name.
1707  *
1708  * @see GNUNET_PSYCSTORE_state_get()
1709  *
1710  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1711  */
1712 static int
1713 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1714            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1715 {
1716   struct Plugin *plugin = cls;
1717   int ret = GNUNET_SYSERR;
1718
1719   sqlite3_stmt *stmt = plugin->select_state_one;
1720
1721   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1722                                       sizeof (*channel_key),
1723                                       SQLITE_STATIC)
1724       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1725   {
1726     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1727                 "sqlite3_bind");
1728   }
1729   else
1730   {
1731     switch (sqlite3_step (stmt))
1732     {
1733     case SQLITE_DONE:
1734       ret = GNUNET_NO;
1735       break;
1736     case SQLITE_ROW:
1737       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1738                 sqlite3_column_bytes (stmt, 0));
1739       break;
1740     default:
1741       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1742                   "sqlite3_step");
1743     }
1744   }
1745
1746   if (SQLITE_OK != sqlite3_reset (stmt))
1747   {
1748     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1749                 "sqlite3_reset");
1750   }
1751
1752   return ret;
1753 }
1754
1755
1756 /**
1757  * Retrieve all state variables for a channel with the given prefix.
1758  *
1759  * @see GNUNET_PSYCSTORE_state_get_prefix()
1760  *
1761  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1762  */
1763 static int
1764 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1765                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1766                   void *cb_cls)
1767 {
1768   struct Plugin *plugin = cls;
1769   int ret = GNUNET_SYSERR;
1770   sqlite3_stmt *stmt = plugin->select_state_prefix;
1771   size_t name_len = strlen (name);
1772
1773   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1774                                       sizeof (*channel_key), SQLITE_STATIC)
1775       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1776       || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len + 1)
1777       || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC))
1778   {
1779     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1780                 "sqlite3_bind");
1781   }
1782   else
1783   {
1784     int sql_ret;
1785     do
1786     {
1787       sql_ret = sqlite3_step (stmt);
1788       switch (sql_ret)
1789       {
1790       case SQLITE_DONE:
1791         if (ret != GNUNET_OK)
1792           ret = GNUNET_NO;
1793         break;
1794       case SQLITE_ROW:
1795         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1796                   sqlite3_column_blob (stmt, 1),
1797                   sqlite3_column_bytes (stmt, 1));
1798         if (ret != GNUNET_YES)
1799           sql_ret = SQLITE_DONE;
1800         break;
1801       default:
1802         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1803                     "sqlite3_step");
1804       }
1805     }
1806     while (sql_ret == SQLITE_ROW);
1807   }
1808   if (SQLITE_OK != sqlite3_reset (stmt))
1809   {
1810     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1811                 "sqlite3_reset");
1812   }
1813   return ret;
1814 }
1815
1816
1817 /**
1818  * Retrieve all signed state variables for a channel.
1819  *
1820  * @see GNUNET_PSYCSTORE_state_get_signed()
1821  *
1822  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1823  */
1824 static int
1825 state_get_signed (void *cls,
1826                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1827                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1828 {
1829   struct Plugin *plugin = cls;
1830   int ret = GNUNET_SYSERR;
1831
1832   sqlite3_stmt *stmt = plugin->select_state_signed;
1833
1834   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1835                                       sizeof (*channel_key), SQLITE_STATIC))
1836   {
1837     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1838                 "sqlite3_bind");
1839   }
1840   else
1841   {
1842     int sql_ret;
1843     do
1844     {
1845       sql_ret = sqlite3_step (stmt);
1846       switch (sql_ret)
1847       {
1848       case SQLITE_DONE:
1849         if (ret != GNUNET_OK)
1850           ret = GNUNET_NO;
1851         break;
1852       case SQLITE_ROW:
1853         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1854                   sqlite3_column_blob (stmt, 1),
1855                   sqlite3_column_bytes (stmt, 1));
1856         if (ret != GNUNET_YES)
1857           sql_ret = SQLITE_DONE;
1858         break;
1859       default:
1860         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1861                     "sqlite3_step");
1862       }
1863     }
1864     while (sql_ret == SQLITE_ROW);
1865   }
1866
1867   if (SQLITE_OK != sqlite3_reset (stmt))
1868   {
1869     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1870                 "sqlite3_reset");
1871   }
1872
1873   return ret;
1874 }
1875
1876
1877 /**
1878  * Entry point for the plugin.
1879  *
1880  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1881  * @return NULL on error, otherwise the plugin context
1882  */
1883 void *
1884 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1885 {
1886   static struct Plugin plugin;
1887   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1888   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1889
1890   if (NULL != plugin.cfg)
1891     return NULL;                /* can only initialize once! */
1892   memset (&plugin, 0, sizeof (struct Plugin));
1893   plugin.cfg = cfg;
1894   if (GNUNET_OK != database_setup (&plugin))
1895   {
1896     database_shutdown (&plugin);
1897     return NULL;
1898   }
1899   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1900   api->cls = &plugin;
1901   api->membership_store = &membership_store;
1902   api->membership_test = &membership_test;
1903   api->fragment_store = &fragment_store;
1904   api->message_add_flags = &message_add_flags;
1905   api->fragment_get = &fragment_get;
1906   api->fragment_get_latest = &fragment_get_latest;
1907   api->message_get = &message_get;
1908   api->message_get_latest = &message_get_latest;
1909   api->message_get_fragment = &message_get_fragment;
1910   api->counters_message_get = &counters_message_get;
1911   api->counters_state_get = &counters_state_get;
1912   api->state_modify_begin = &state_modify_begin;
1913   api->state_modify_op = &state_modify_op;
1914   api->state_modify_end = &state_modify_end;
1915   api->state_sync_begin = &state_sync_begin;
1916   api->state_sync_assign = &state_sync_assign;
1917   api->state_sync_end = &state_sync_end;
1918   api->state_reset = &state_reset;
1919   api->state_update_signed = &state_update_signed;
1920   api->state_get = &state_get;
1921   api->state_get_prefix = &state_get_prefix;
1922   api->state_get_signed = &state_get_signed;
1923
1924   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1925   return api;
1926 }
1927
1928
1929 /**
1930  * Exit point from the plugin.
1931  *
1932  * @param cls The plugin context (as returned by "init")
1933  * @return Always NULL
1934  */
1935 void *
1936 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1937 {
1938   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1939   struct Plugin *plugin = api->cls;
1940
1941   database_shutdown (plugin);
1942   plugin->cfg = NULL;
1943   GNUNET_free (api);
1944   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1945   return NULL;
1946 }
1947
1948 /* end of plugin_psycstore_sqlite.c */